Cygwin: FIFO: support opening multiple readers
Although we can have multiple readers open because of dup/fork/exec, the current code does not support multiple readers opening a FIFO by explicitly calling 'open'. The main complication in supporting this is that when a blocking reader tries to open and there's already one open, it has to check whether there any writers open. It can't rely on the write_ready event, whose state hasn't changed since the first writer opened. To fix this, add two new named events, check_write_ready_evt and write_ready_ok_evt, and a new method, check_write_ready(). The first event signals the owner's reader thread to call check_write_ready(), which polls the fc_handler list to check for connected writers. If it finds none, it checks to see if there's a writer in the process and then sets/resets write_ready appropriately. When check_write_ready() finishes it sets write_ready_ok_evt to signal the reader that write_ready has been updated. The polling is done via fifo_client_handler::pipe_state(). As long as it's calling that function anyway, check_write_ready() updates the state of each handler. Also add a new lock to prevent a race if two readers are trying to open simultaneously.
This commit is contained in:
parent
bf66a56cca
commit
4811889e0c
|
@ -1324,7 +1324,7 @@ class fifo_shmem_t
|
||||||
{
|
{
|
||||||
LONG _nreaders;
|
LONG _nreaders;
|
||||||
fifo_reader_id_t _owner, _prev_owner, _pending_owner;
|
fifo_reader_id_t _owner, _prev_owner, _pending_owner;
|
||||||
af_unix_spinlock_t _owner_lock, _reading_lock;
|
af_unix_spinlock_t _owner_lock, _reading_lock, _reader_opening_lock;
|
||||||
|
|
||||||
/* Info about shared memory block used for temporary storage of the
|
/* Info about shared memory block used for temporary storage of the
|
||||||
owner's fc_handler list. */
|
owner's fc_handler list. */
|
||||||
|
@ -1346,6 +1346,8 @@ public:
|
||||||
void owner_unlock () { _owner_lock.unlock (); }
|
void owner_unlock () { _owner_lock.unlock (); }
|
||||||
void reading_lock () { _reading_lock.lock (); }
|
void reading_lock () { _reading_lock.lock (); }
|
||||||
void reading_unlock () { _reading_lock.unlock (); }
|
void reading_unlock () { _reading_lock.unlock (); }
|
||||||
|
void reader_opening_lock () { _reader_opening_lock.lock (); }
|
||||||
|
void reader_opening_unlock () { _reader_opening_lock.unlock (); }
|
||||||
|
|
||||||
int get_shared_nhandlers () const { return (int) _sh_nhandlers; }
|
int get_shared_nhandlers () const { return (int) _sh_nhandlers; }
|
||||||
void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); }
|
void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); }
|
||||||
|
@ -1371,6 +1373,8 @@ class fhandler_fifo: public fhandler_base
|
||||||
HANDLE owner_needed_evt; /* The owner is closing. */
|
HANDLE owner_needed_evt; /* The owner is closing. */
|
||||||
HANDLE owner_found_evt; /* A new owner has taken over. */
|
HANDLE owner_found_evt; /* A new owner has taken over. */
|
||||||
HANDLE update_needed_evt; /* shared_fc_handler needs updating. */
|
HANDLE update_needed_evt; /* shared_fc_handler needs updating. */
|
||||||
|
HANDLE check_write_ready_evt; /* write_ready needs to be checked. */
|
||||||
|
HANDLE write_ready_ok_evt; /* check_write_ready is done. */
|
||||||
|
|
||||||
/* Handles to non-shared events needed for fifo_reader_threads. */
|
/* Handles to non-shared events needed for fifo_reader_threads. */
|
||||||
HANDLE cancel_evt; /* Signal thread to terminate. */
|
HANDLE cancel_evt; /* Signal thread to terminate. */
|
||||||
|
@ -1448,6 +1452,9 @@ class fhandler_fifo: public fhandler_base
|
||||||
{ return shmem->shared_fc_handler_updated (); }
|
{ return shmem->shared_fc_handler_updated (); }
|
||||||
void shared_fc_handler_updated (bool val)
|
void shared_fc_handler_updated (bool val)
|
||||||
{ shmem->shared_fc_handler_updated (val); }
|
{ shmem->shared_fc_handler_updated (val); }
|
||||||
|
void check_write_ready ();
|
||||||
|
void reader_opening_lock () { shmem->reader_opening_lock (); }
|
||||||
|
void reader_opening_unlock () { shmem->reader_opening_unlock (); }
|
||||||
|
|
||||||
public:
|
public:
|
||||||
fhandler_fifo ();
|
fhandler_fifo ();
|
||||||
|
|
|
@ -75,6 +75,7 @@ fhandler_fifo::fhandler_fifo ():
|
||||||
fhandler_base (),
|
fhandler_base (),
|
||||||
read_ready (NULL), write_ready (NULL), writer_opening (NULL),
|
read_ready (NULL), write_ready (NULL), writer_opening (NULL),
|
||||||
owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL),
|
owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL),
|
||||||
|
check_write_ready_evt (NULL), write_ready_ok_evt (NULL),
|
||||||
cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false),
|
cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false),
|
||||||
fc_handler (NULL), shandlers (0), nhandlers (0),
|
fc_handler (NULL), shandlers (0), nhandlers (0),
|
||||||
reader (false), writer (false), duplexer (false),
|
reader (false), writer (false), duplexer (false),
|
||||||
|
@ -441,6 +442,45 @@ fhandler_fifo::update_shared_handlers ()
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* The write_ready event gets set when a writer opens, to indicate
|
||||||
|
that a blocking reader can open. If a second reader wants to open,
|
||||||
|
we need to see if there are still any writers open. */
|
||||||
|
void
|
||||||
|
fhandler_fifo::check_write_ready ()
|
||||||
|
{
|
||||||
|
bool set = false;
|
||||||
|
|
||||||
|
fifo_client_lock ();
|
||||||
|
for (int i = 0; i < nhandlers && !set; i++)
|
||||||
|
switch (fc_handler[i].pipe_state ())
|
||||||
|
{
|
||||||
|
case FILE_PIPE_CONNECTED_STATE:
|
||||||
|
fc_handler[i].state = fc_connected;
|
||||||
|
set = true;
|
||||||
|
break;
|
||||||
|
case FILE_PIPE_INPUT_AVAILABLE_STATE:
|
||||||
|
fc_handler[i].state = fc_input_avail;
|
||||||
|
set = true;
|
||||||
|
break;
|
||||||
|
case FILE_PIPE_DISCONNECTED_STATE:
|
||||||
|
fc_handler[i].state = fc_disconnected;
|
||||||
|
break;
|
||||||
|
case FILE_PIPE_LISTENING_STATE:
|
||||||
|
fc_handler[i].state = fc_listening;
|
||||||
|
case FILE_PIPE_CLOSING_STATE:
|
||||||
|
fc_handler[i].state = fc_closing;
|
||||||
|
default:
|
||||||
|
fc_handler[i].state = fc_error;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
fifo_client_unlock ();
|
||||||
|
if (set || IsEventSignalled (writer_opening))
|
||||||
|
SetEvent (write_ready);
|
||||||
|
else
|
||||||
|
ResetEvent (write_ready);
|
||||||
|
SetEvent (write_ready_ok_evt);
|
||||||
|
}
|
||||||
|
|
||||||
static DWORD WINAPI
|
static DWORD WINAPI
|
||||||
fifo_reader_thread (LPVOID param)
|
fifo_reader_thread (LPVOID param)
|
||||||
{
|
{
|
||||||
|
@ -526,13 +566,15 @@ fhandler_fifo::fifo_reader_thread_func ()
|
||||||
IO_STATUS_BLOCK io;
|
IO_STATUS_BLOCK io;
|
||||||
bool cancel = false;
|
bool cancel = false;
|
||||||
bool update = false;
|
bool update = false;
|
||||||
|
bool check = false;
|
||||||
|
|
||||||
status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io,
|
status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io,
|
||||||
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
|
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
|
||||||
if (status == STATUS_PENDING)
|
if (status == STATUS_PENDING)
|
||||||
{
|
{
|
||||||
HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt };
|
HANDLE w[4] = { conn_evt, update_needed_evt,
|
||||||
switch (WaitForMultipleObjects (3, w, false, INFINITE))
|
check_write_ready_evt, cancel_evt };
|
||||||
|
switch (WaitForMultipleObjects (4, w, false, INFINITE))
|
||||||
{
|
{
|
||||||
case WAIT_OBJECT_0:
|
case WAIT_OBJECT_0:
|
||||||
status = io.Status;
|
status = io.Status;
|
||||||
|
@ -544,6 +586,10 @@ fhandler_fifo::fifo_reader_thread_func ()
|
||||||
update = true;
|
update = true;
|
||||||
break;
|
break;
|
||||||
case WAIT_OBJECT_0 + 2:
|
case WAIT_OBJECT_0 + 2:
|
||||||
|
status = STATUS_WAIT_2;
|
||||||
|
check = true;
|
||||||
|
break;
|
||||||
|
case WAIT_OBJECT_0 + 3:
|
||||||
status = STATUS_THREAD_IS_TERMINATING;
|
status = STATUS_THREAD_IS_TERMINATING;
|
||||||
cancel = true;
|
cancel = true;
|
||||||
update = true;
|
update = true;
|
||||||
|
@ -570,6 +616,7 @@ fhandler_fifo::fifo_reader_thread_func ()
|
||||||
break;
|
break;
|
||||||
case STATUS_THREAD_IS_TERMINATING:
|
case STATUS_THREAD_IS_TERMINATING:
|
||||||
case STATUS_WAIT_1:
|
case STATUS_WAIT_1:
|
||||||
|
case STATUS_WAIT_2:
|
||||||
/* Try to connect a bogus client. Otherwise fc is still
|
/* Try to connect a bogus client. Otherwise fc is still
|
||||||
listening, and the next connection might not get recorded. */
|
listening, and the next connection might not get recorded. */
|
||||||
status1 = open_pipe (ph);
|
status1 = open_pipe (ph);
|
||||||
|
@ -602,6 +649,8 @@ fhandler_fifo::fifo_reader_thread_func ()
|
||||||
NtClose (ph);
|
NtClose (ph);
|
||||||
if (update && update_shared_handlers () < 0)
|
if (update && update_shared_handlers () < 0)
|
||||||
api_fatal ("Can't update shared handlers, %E");
|
api_fatal ("Can't update shared handlers, %E");
|
||||||
|
if (check)
|
||||||
|
check_write_ready ();
|
||||||
if (cancel)
|
if (cancel)
|
||||||
goto canceled;
|
goto canceled;
|
||||||
}
|
}
|
||||||
|
@ -833,14 +882,19 @@ fhandler_fifo::open (int flags, mode_t)
|
||||||
and start the fifo_reader_thread. */
|
and start the fifo_reader_thread. */
|
||||||
if (reader)
|
if (reader)
|
||||||
{
|
{
|
||||||
|
bool first = true;
|
||||||
|
|
||||||
SetEvent (read_ready);
|
SetEvent (read_ready);
|
||||||
if (create_shmem () < 0)
|
if (create_shmem () < 0)
|
||||||
goto err_close_writer_opening;
|
goto err_close_writer_opening;
|
||||||
if (create_shared_fc_handler () < 0)
|
if (create_shared_fc_handler () < 0)
|
||||||
goto err_close_shmem;
|
goto err_close_shmem;
|
||||||
inc_nreaders ();
|
reader_opening_lock ();
|
||||||
/* Reinitialize _sh_fc_handler_updated, which starts as 0. */
|
if (inc_nreaders () == 1)
|
||||||
shared_fc_handler_updated (true);
|
/* Reinitialize _sh_fc_handler_updated, which starts as 0. */
|
||||||
|
shared_fc_handler_updated (true);
|
||||||
|
else
|
||||||
|
first = false;
|
||||||
npbuf[0] = 'n';
|
npbuf[0] = 'n';
|
||||||
if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf)))
|
if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf)))
|
||||||
{
|
{
|
||||||
|
@ -862,9 +916,23 @@ fhandler_fifo::open (int flags, mode_t)
|
||||||
__seterrno ();
|
__seterrno ();
|
||||||
goto err_close_owner_found_evt;
|
goto err_close_owner_found_evt;
|
||||||
}
|
}
|
||||||
|
npbuf[0] = 'c';
|
||||||
|
if (!(check_write_ready_evt = CreateEvent (sa_buf, false, false, npbuf)))
|
||||||
|
{
|
||||||
|
debug_printf ("CreateEvent for %s failed, %E", npbuf);
|
||||||
|
__seterrno ();
|
||||||
|
goto err_close_update_needed_evt;
|
||||||
|
}
|
||||||
|
npbuf[0] = 'k';
|
||||||
|
if (!(write_ready_ok_evt = CreateEvent (sa_buf, false, false, npbuf)))
|
||||||
|
{
|
||||||
|
debug_printf ("CreateEvent for %s failed, %E", npbuf);
|
||||||
|
__seterrno ();
|
||||||
|
goto err_close_check_write_ready_evt;
|
||||||
|
}
|
||||||
/* Make cancel and sync inheritable for exec. */
|
/* Make cancel and sync inheritable for exec. */
|
||||||
if (!(cancel_evt = create_event (true)))
|
if (!(cancel_evt = create_event (true)))
|
||||||
goto err_close_update_needed_evt;
|
goto err_close_write_ready_ok_evt;
|
||||||
if (!(thr_sync_evt = create_event (true)))
|
if (!(thr_sync_evt = create_event (true)))
|
||||||
goto err_close_cancel_evt;
|
goto err_close_cancel_evt;
|
||||||
me.winpid = GetCurrentProcessId ();
|
me.winpid = GetCurrentProcessId ();
|
||||||
|
@ -908,9 +976,19 @@ fhandler_fifo::open (int flags, mode_t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Not a duplexer; wait for a writer to connect. */
|
/* Not a duplexer; wait for a writer to connect if we're blocking. */
|
||||||
else if (!wait (write_ready))
|
else if (!(flags & O_NONBLOCK))
|
||||||
goto err_close_reader;
|
{
|
||||||
|
if (!first)
|
||||||
|
{
|
||||||
|
/* Ask the owner to update write_ready. */
|
||||||
|
SetEvent (check_write_ready_evt);
|
||||||
|
WaitForSingleObject (write_ready_ok_evt, INFINITE);
|
||||||
|
}
|
||||||
|
if (!wait (write_ready))
|
||||||
|
goto err_close_reader;
|
||||||
|
}
|
||||||
|
reader_opening_unlock ();
|
||||||
goto success;
|
goto success;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -984,6 +1062,10 @@ err_close_reader:
|
||||||
return 0;
|
return 0;
|
||||||
err_close_cancel_evt:
|
err_close_cancel_evt:
|
||||||
NtClose (cancel_evt);
|
NtClose (cancel_evt);
|
||||||
|
err_close_write_ready_ok_evt:
|
||||||
|
NtClose (write_ready_ok_evt);
|
||||||
|
err_close_check_write_ready_evt:
|
||||||
|
NtClose (check_write_ready_evt);
|
||||||
err_close_update_needed_evt:
|
err_close_update_needed_evt:
|
||||||
NtClose (update_needed_evt);
|
NtClose (update_needed_evt);
|
||||||
err_close_owner_found_evt:
|
err_close_owner_found_evt:
|
||||||
|
@ -993,6 +1075,7 @@ err_close_owner_needed_evt:
|
||||||
err_dec_nreaders:
|
err_dec_nreaders:
|
||||||
if (dec_nreaders () == 0)
|
if (dec_nreaders () == 0)
|
||||||
ResetEvent (read_ready);
|
ResetEvent (read_ready);
|
||||||
|
reader_opening_unlock ();
|
||||||
/* err_close_shared_fc_handler: */
|
/* err_close_shared_fc_handler: */
|
||||||
NtUnmapViewOfSection (NtCurrentProcess (), shared_fc_handler);
|
NtUnmapViewOfSection (NtCurrentProcess (), shared_fc_handler);
|
||||||
NtClose (shared_fc_hdl);
|
NtClose (shared_fc_hdl);
|
||||||
|
@ -1396,6 +1479,10 @@ fhandler_fifo::close ()
|
||||||
NtClose (owner_found_evt);
|
NtClose (owner_found_evt);
|
||||||
if (update_needed_evt)
|
if (update_needed_evt)
|
||||||
NtClose (update_needed_evt);
|
NtClose (update_needed_evt);
|
||||||
|
if (check_write_ready_evt)
|
||||||
|
NtClose (check_write_ready_evt);
|
||||||
|
if (write_ready_ok_evt)
|
||||||
|
NtClose (write_ready_ok_evt);
|
||||||
if (cancel_evt)
|
if (cancel_evt)
|
||||||
NtClose (cancel_evt);
|
NtClose (cancel_evt);
|
||||||
if (thr_sync_evt)
|
if (thr_sync_evt)
|
||||||
|
@ -1519,8 +1606,22 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
|
||||||
__seterrno ();
|
__seterrno ();
|
||||||
goto err_close_owner_found_evt;
|
goto err_close_owner_found_evt;
|
||||||
}
|
}
|
||||||
|
if (!DuplicateHandle (GetCurrentProcess (), check_write_ready_evt,
|
||||||
|
GetCurrentProcess (), &fhf->check_write_ready_evt,
|
||||||
|
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
|
||||||
|
{
|
||||||
|
__seterrno ();
|
||||||
|
goto err_close_update_needed_evt;
|
||||||
|
}
|
||||||
|
if (!DuplicateHandle (GetCurrentProcess (), write_ready_ok_evt,
|
||||||
|
GetCurrentProcess (), &fhf->write_ready_ok_evt,
|
||||||
|
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
|
||||||
|
{
|
||||||
|
__seterrno ();
|
||||||
|
goto err_close_check_write_ready_evt;
|
||||||
|
}
|
||||||
if (!(fhf->cancel_evt = create_event (true)))
|
if (!(fhf->cancel_evt = create_event (true)))
|
||||||
goto err_close_update_needed_evt;
|
goto err_close_write_ready_ok_evt;
|
||||||
if (!(fhf->thr_sync_evt = create_event (true)))
|
if (!(fhf->thr_sync_evt = create_event (true)))
|
||||||
goto err_close_cancel_evt;
|
goto err_close_cancel_evt;
|
||||||
inc_nreaders ();
|
inc_nreaders ();
|
||||||
|
@ -1530,6 +1631,10 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
|
||||||
return 0;
|
return 0;
|
||||||
err_close_cancel_evt:
|
err_close_cancel_evt:
|
||||||
NtClose (fhf->cancel_evt);
|
NtClose (fhf->cancel_evt);
|
||||||
|
err_close_write_ready_ok_evt:
|
||||||
|
NtClose (fhf->write_ready_ok_evt);
|
||||||
|
err_close_check_write_ready_evt:
|
||||||
|
NtClose (fhf->check_write_ready_evt);
|
||||||
err_close_update_needed_evt:
|
err_close_update_needed_evt:
|
||||||
NtClose (fhf->update_needed_evt);
|
NtClose (fhf->update_needed_evt);
|
||||||
err_close_owner_found_evt:
|
err_close_owner_found_evt:
|
||||||
|
@ -1575,6 +1680,8 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
|
||||||
fork_fixup (parent, owner_needed_evt, "owner_needed_evt");
|
fork_fixup (parent, owner_needed_evt, "owner_needed_evt");
|
||||||
fork_fixup (parent, owner_found_evt, "owner_found_evt");
|
fork_fixup (parent, owner_found_evt, "owner_found_evt");
|
||||||
fork_fixup (parent, update_needed_evt, "update_needed_evt");
|
fork_fixup (parent, update_needed_evt, "update_needed_evt");
|
||||||
|
fork_fixup (parent, check_write_ready_evt, "check_write_ready_evt");
|
||||||
|
fork_fixup (parent, write_ready_ok_evt, "write_ready_ok_evt");
|
||||||
if (close_on_exec ())
|
if (close_on_exec ())
|
||||||
/* Prevent a later attempt to close the non-inherited
|
/* Prevent a later attempt to close the non-inherited
|
||||||
pipe-instance handles copied from the parent. */
|
pipe-instance handles copied from the parent. */
|
||||||
|
@ -1658,6 +1765,8 @@ fhandler_fifo::set_close_on_exec (bool val)
|
||||||
set_no_inheritance (owner_needed_evt, val);
|
set_no_inheritance (owner_needed_evt, val);
|
||||||
set_no_inheritance (owner_found_evt, val);
|
set_no_inheritance (owner_found_evt, val);
|
||||||
set_no_inheritance (update_needed_evt, val);
|
set_no_inheritance (update_needed_evt, val);
|
||||||
|
set_no_inheritance (check_write_ready_evt, val);
|
||||||
|
set_no_inheritance (write_ready_ok_evt, val);
|
||||||
set_no_inheritance (cancel_evt, val);
|
set_no_inheritance (cancel_evt, val);
|
||||||
set_no_inheritance (thr_sync_evt, val);
|
set_no_inheritance (thr_sync_evt, val);
|
||||||
fifo_client_lock ();
|
fifo_client_lock ();
|
||||||
|
|
Loading…
Reference in New Issue