Cygwin: FIFO: allow any reader to take ownership

Add a take_ownership method, used by raw_read and select.cc:peek_fifo.
It wakes up all fifo_reader_threads and allows the caller to become
owner.  The work is done by the fifo_reader_threads.

For synchronization we introduce several new fhandler_fifo data
members and methods:

- update_needed_evt signals the current owner to stop listening for
  writer connections and update its fc_handler list.

- shared_fc_handler() gets and sets the status of the fc_handler
  update process.

- get_pending_owner() and set_pending_owner() get and set the reader
  that is requesting ownership.

Finally, a new 'reading_lock' prevents two readers from trying to take
ownership simultaneously.
This commit is contained in:
Ken Brown 2020-04-25 09:54:18 -04:00
parent f35dfff3de
commit bf66a56cca
3 changed files with 122 additions and 16 deletions

View File

@ -1323,12 +1323,13 @@ struct fifo_reader_id_t
class fifo_shmem_t class fifo_shmem_t
{ {
LONG _nreaders; LONG _nreaders;
fifo_reader_id_t _owner, _prev_owner; fifo_reader_id_t _owner, _prev_owner, _pending_owner;
af_unix_spinlock_t _owner_lock; af_unix_spinlock_t _owner_lock, _reading_lock;
/* Info about shared memory block used for temporary storage of the /* Info about shared memory block used for temporary storage of the
owner's fc_handler list. */ owner's fc_handler list. */
LONG _sh_nhandlers, _sh_shandlers, _sh_fc_handler_committed; LONG _sh_nhandlers, _sh_shandlers, _sh_fc_handler_committed,
_sh_fc_handler_updated;
public: public:
int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); } int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); }
@ -1338,9 +1339,13 @@ public:
void set_owner (fifo_reader_id_t fr_id) { _owner = fr_id; } void set_owner (fifo_reader_id_t fr_id) { _owner = fr_id; }
fifo_reader_id_t get_prev_owner () const { return _prev_owner; } fifo_reader_id_t get_prev_owner () const { return _prev_owner; }
void set_prev_owner (fifo_reader_id_t fr_id) { _prev_owner = fr_id; } void set_prev_owner (fifo_reader_id_t fr_id) { _prev_owner = fr_id; }
fifo_reader_id_t get_pending_owner () const { return _pending_owner; }
void set_pending_owner (fifo_reader_id_t fr_id) { _pending_owner = fr_id; }
void owner_lock () { _owner_lock.lock (); } void owner_lock () { _owner_lock.lock (); }
void owner_unlock () { _owner_lock.unlock (); } void owner_unlock () { _owner_lock.unlock (); }
void reading_lock () { _reading_lock.lock (); }
void reading_unlock () { _reading_lock.unlock (); }
int get_shared_nhandlers () const { return (int) _sh_nhandlers; } int get_shared_nhandlers () const { return (int) _sh_nhandlers; }
void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); } void set_shared_nhandlers (int n) { InterlockedExchange (&_sh_nhandlers, n); }
@ -1350,6 +1355,9 @@ public:
{ return (size_t) _sh_fc_handler_committed; } { return (size_t) _sh_fc_handler_committed; }
void set_shared_fc_handler_committed (size_t n) void set_shared_fc_handler_committed (size_t n)
{ InterlockedExchange (&_sh_fc_handler_committed, (LONG) n); } { InterlockedExchange (&_sh_fc_handler_committed, (LONG) n); }
bool shared_fc_handler_updated () const { return _sh_fc_handler_updated; }
void shared_fc_handler_updated (bool val)
{ InterlockedExchange (&_sh_fc_handler_updated, val); }
}; };
class fhandler_fifo: public fhandler_base class fhandler_fifo: public fhandler_base
@ -1362,6 +1370,7 @@ class fhandler_fifo: public fhandler_base
/* Handles to named events needed by all readers of a given FIFO. */ /* Handles to named events needed by all readers of a given FIFO. */
HANDLE owner_needed_evt; /* The owner is closing. */ HANDLE owner_needed_evt; /* The owner is closing. */
HANDLE owner_found_evt; /* A new owner has taken over. */ HANDLE owner_found_evt; /* A new owner has taken over. */
HANDLE update_needed_evt; /* shared_fc_handler needs updating. */
/* Handles to non-shared events needed for fifo_reader_threads. */ /* Handles to non-shared events needed for fifo_reader_threads. */
HANDLE cancel_evt; /* Signal thread to terminate. */ HANDLE cancel_evt; /* Signal thread to terminate. */
@ -1409,6 +1418,11 @@ class fhandler_fifo: public fhandler_base
fifo_reader_id_t get_prev_owner () const { return shmem->get_prev_owner (); } fifo_reader_id_t get_prev_owner () const { return shmem->get_prev_owner (); }
void set_prev_owner (fifo_reader_id_t fr_id) void set_prev_owner (fifo_reader_id_t fr_id)
{ shmem->set_prev_owner (fr_id); } { shmem->set_prev_owner (fr_id); }
fifo_reader_id_t get_pending_owner () const
{ return shmem->get_pending_owner (); }
void set_pending_owner (fifo_reader_id_t fr_id)
{ shmem->set_pending_owner (fr_id); }
void owner_needed () void owner_needed ()
{ {
ResetEvent (owner_found_evt); ResetEvent (owner_found_evt);
@ -1430,6 +1444,10 @@ class fhandler_fifo: public fhandler_base
{ shmem->set_shared_fc_handler_committed (n); } { shmem->set_shared_fc_handler_committed (n); }
int update_my_handlers (bool from_exec = false); int update_my_handlers (bool from_exec = false);
int update_shared_handlers (); int update_shared_handlers ();
bool shared_fc_handler_updated () const
{ return shmem->shared_fc_handler_updated (); }
void shared_fc_handler_updated (bool val)
{ shmem->shared_fc_handler_updated (val); }
public: public:
fhandler_fifo (); fhandler_fifo ();
@ -1449,6 +1467,10 @@ public:
void owner_lock () { shmem->owner_lock (); } void owner_lock () { shmem->owner_lock (); }
void owner_unlock () { shmem->owner_unlock (); } void owner_unlock () { shmem->owner_unlock (); }
void take_ownership ();
void reading_lock () { shmem->reading_lock (); }
void reading_unlock () { shmem->reading_unlock (); }
int open (int, mode_t); int open (int, mode_t);
off_t lseek (off_t offset, int whence); off_t lseek (off_t offset, int whence);
int close (); int close ();

View File

@ -74,7 +74,7 @@ static NO_COPY fifo_reader_id_t null_fr_id = { .winpid = 0, .fh = NULL };
fhandler_fifo::fhandler_fifo (): fhandler_fifo::fhandler_fifo ():
fhandler_base (), fhandler_base (),
read_ready (NULL), write_ready (NULL), writer_opening (NULL), read_ready (NULL), write_ready (NULL), writer_opening (NULL),
owner_needed_evt (NULL), owner_found_evt (NULL), owner_needed_evt (NULL), owner_found_evt (NULL), update_needed_evt (NULL),
cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false), cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false),
fc_handler (NULL), shandlers (0), nhandlers (0), fc_handler (NULL), shandlers (0), nhandlers (0),
reader (false), writer (false), duplexer (false), reader (false), writer (false), duplexer (false),
@ -436,6 +436,8 @@ fhandler_fifo::update_shared_handlers ()
} }
set_shared_nhandlers (nhandlers); set_shared_nhandlers (nhandlers);
memcpy (shared_fc_handler, fc_handler, nhandlers * sizeof (fc_handler[0])); memcpy (shared_fc_handler, fc_handler, nhandlers * sizeof (fc_handler[0]));
shared_fc_handler_updated (true);
set_prev_owner (me);
return 0; return 0;
} }
@ -456,20 +458,44 @@ fhandler_fifo::fifo_reader_thread_func ()
while (1) while (1)
{ {
fifo_reader_id_t cur_owner; fifo_reader_id_t cur_owner, pending_owner;
bool idle = false, take_ownership = false;
owner_lock (); owner_lock ();
cur_owner = get_owner (); cur_owner = get_owner ();
if (!cur_owner) pending_owner = get_pending_owner ();
if (pending_owner)
{ {
set_owner (me); if (pending_owner != me)
if (update_my_handlers () < 0) idle = true;
api_fatal ("Can't update my handlers, %E"); else
owner_found (); take_ownership = true;
owner_unlock ();
continue;
} }
else if (!cur_owner)
take_ownership = true;
else if (cur_owner != me) else if (cur_owner != me)
idle = true;
if (take_ownership)
{
if (!shared_fc_handler_updated ())
{
owner_unlock ();
yield ();
continue;
}
else
{
set_owner (me);
set_pending_owner (null_fr_id);
if (update_my_handlers () < 0)
api_fatal ("Can't update my handlers, %E");
owner_found ();
owner_unlock ();
continue;
}
}
else if (idle)
{ {
owner_unlock (); owner_unlock ();
HANDLE w[2] = { owner_needed_evt, cancel_evt }; HANDLE w[2] = { owner_needed_evt, cancel_evt };
@ -494,6 +520,7 @@ fhandler_fifo::fifo_reader_thread_func ()
/* Listen for a writer to connect to the new client handler. */ /* Listen for a writer to connect to the new client handler. */
fifo_client_handler& fc = fc_handler[nhandlers - 1]; fifo_client_handler& fc = fc_handler[nhandlers - 1];
fifo_client_unlock (); fifo_client_unlock ();
shared_fc_handler_updated (false);
owner_unlock (); owner_unlock ();
NTSTATUS status; NTSTATUS status;
IO_STATUS_BLOCK io; IO_STATUS_BLOCK io;
@ -504,8 +531,8 @@ fhandler_fifo::fifo_reader_thread_func ()
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
if (status == STATUS_PENDING) if (status == STATUS_PENDING)
{ {
HANDLE w[2] = { conn_evt, cancel_evt }; HANDLE w[3] = { conn_evt, update_needed_evt, cancel_evt };
switch (WaitForMultipleObjects (2, w, false, INFINITE)) switch (WaitForMultipleObjects (3, w, false, INFINITE))
{ {
case WAIT_OBJECT_0: case WAIT_OBJECT_0:
status = io.Status; status = io.Status;
@ -513,6 +540,10 @@ fhandler_fifo::fifo_reader_thread_func ()
status); status);
break; break;
case WAIT_OBJECT_0 + 1: case WAIT_OBJECT_0 + 1:
status = STATUS_WAIT_1;
update = true;
break;
case WAIT_OBJECT_0 + 2:
status = STATUS_THREAD_IS_TERMINATING; status = STATUS_THREAD_IS_TERMINATING;
cancel = true; cancel = true;
update = true; update = true;
@ -538,6 +569,7 @@ fhandler_fifo::fifo_reader_thread_func ()
record_connection (fc, fc_closing); record_connection (fc, fc_closing);
break; break;
case STATUS_THREAD_IS_TERMINATING: case STATUS_THREAD_IS_TERMINATING:
case STATUS_WAIT_1:
/* Try to connect a bogus client. Otherwise fc is still /* Try to connect a bogus client. Otherwise fc is still
listening, and the next connection might not get recorded. */ listening, and the next connection might not get recorded. */
status1 = open_pipe (ph); status1 = open_pipe (ph);
@ -807,6 +839,8 @@ fhandler_fifo::open (int flags, mode_t)
if (create_shared_fc_handler () < 0) if (create_shared_fc_handler () < 0)
goto err_close_shmem; goto err_close_shmem;
inc_nreaders (); inc_nreaders ();
/* Reinitialize _sh_fc_handler_updated, which starts as 0. */
shared_fc_handler_updated (true);
npbuf[0] = 'n'; npbuf[0] = 'n';
if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf))) if (!(owner_needed_evt = CreateEvent (sa_buf, true, false, npbuf)))
{ {
@ -821,9 +855,16 @@ fhandler_fifo::open (int flags, mode_t)
__seterrno (); __seterrno ();
goto err_close_owner_needed_evt; goto err_close_owner_needed_evt;
} }
npbuf[0] = 'u';
if (!(update_needed_evt = CreateEvent (sa_buf, false, false, npbuf)))
{
debug_printf ("CreateEvent for %s failed, %E", npbuf);
__seterrno ();
goto err_close_owner_found_evt;
}
/* Make cancel and sync inheritable for exec. */ /* Make cancel and sync inheritable for exec. */
if (!(cancel_evt = create_event (true))) if (!(cancel_evt = create_event (true)))
goto err_close_owner_found_evt; goto err_close_update_needed_evt;
if (!(thr_sync_evt = create_event (true))) if (!(thr_sync_evt = create_event (true)))
goto err_close_cancel_evt; goto err_close_cancel_evt;
me.winpid = GetCurrentProcessId (); me.winpid = GetCurrentProcessId ();
@ -943,6 +984,8 @@ err_close_reader:
return 0; return 0;
err_close_cancel_evt: err_close_cancel_evt:
NtClose (cancel_evt); NtClose (cancel_evt);
err_close_update_needed_evt:
NtClose (update_needed_evt);
err_close_owner_found_evt: err_close_owner_found_evt:
NtClose (owner_found_evt); NtClose (owner_found_evt);
err_close_owner_needed_evt: err_close_owner_needed_evt:
@ -1136,6 +1179,24 @@ fhandler_fifo::hit_eof ()
return ret; return ret;
} }
/* Called from raw_read and select.cc:peek_fifo. */
void
fhandler_fifo::take_ownership ()
{
owner_lock ();
if (get_owner () == me)
{
owner_unlock ();
return;
}
set_pending_owner (me);
owner_needed ();
SetEvent (update_needed_evt);
owner_unlock ();
/* The reader threads should now do the transfer. */
WaitForSingleObject (owner_found_evt, INFINITE);
}
void __reg3 void __reg3
fhandler_fifo::raw_read (void *in_ptr, size_t& len) fhandler_fifo::raw_read (void *in_ptr, size_t& len)
{ {
@ -1144,6 +1205,9 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
while (1) while (1)
{ {
/* No one else can take ownership while we hold the reading_lock. */
reading_lock ();
take_ownership ();
/* Poll the connected clients for input. */ /* Poll the connected clients for input. */
int nconnected = 0; int nconnected = 0;
fifo_client_lock (); fifo_client_lock ();
@ -1167,6 +1231,7 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
{ {
len = nbytes; len = nbytes;
fifo_client_unlock (); fifo_client_unlock ();
reading_unlock ();
return; return;
} }
break; break;
@ -1187,9 +1252,11 @@ fhandler_fifo::raw_read (void *in_ptr, size_t& len)
fifo_client_unlock (); fifo_client_unlock ();
if (maybe_eof () && hit_eof ()) if (maybe_eof () && hit_eof ())
{ {
reading_unlock ();
len = 0; len = 0;
return; return;
} }
reading_unlock ();
if (is_nonblocking ()) if (is_nonblocking ())
{ {
set_errno (EAGAIN); set_errno (EAGAIN);
@ -1327,6 +1394,8 @@ fhandler_fifo::close ()
NtClose (owner_needed_evt); NtClose (owner_needed_evt);
if (owner_found_evt) if (owner_found_evt)
NtClose (owner_found_evt); NtClose (owner_found_evt);
if (update_needed_evt)
NtClose (update_needed_evt);
if (cancel_evt) if (cancel_evt)
NtClose (cancel_evt); NtClose (cancel_evt);
if (thr_sync_evt) if (thr_sync_evt)
@ -1443,8 +1512,15 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
__seterrno (); __seterrno ();
goto err_close_owner_needed_evt; goto err_close_owner_needed_evt;
} }
if (!DuplicateHandle (GetCurrentProcess (), update_needed_evt,
GetCurrentProcess (), &fhf->update_needed_evt,
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
{
__seterrno ();
goto err_close_owner_found_evt;
}
if (!(fhf->cancel_evt = create_event (true))) if (!(fhf->cancel_evt = create_event (true)))
goto err_close_owner_found_evt; goto err_close_update_needed_evt;
if (!(fhf->thr_sync_evt = create_event (true))) if (!(fhf->thr_sync_evt = create_event (true)))
goto err_close_cancel_evt; goto err_close_cancel_evt;
inc_nreaders (); inc_nreaders ();
@ -1454,6 +1530,8 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
return 0; return 0;
err_close_cancel_evt: err_close_cancel_evt:
NtClose (fhf->cancel_evt); NtClose (fhf->cancel_evt);
err_close_update_needed_evt:
NtClose (fhf->update_needed_evt);
err_close_owner_found_evt: err_close_owner_found_evt:
NtClose (fhf->owner_found_evt); NtClose (fhf->owner_found_evt);
err_close_owner_needed_evt: err_close_owner_needed_evt:
@ -1496,6 +1574,7 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
api_fatal ("Can't reopen shared fc_handler memory during fork, %E"); api_fatal ("Can't reopen shared fc_handler memory during fork, %E");
fork_fixup (parent, owner_needed_evt, "owner_needed_evt"); fork_fixup (parent, owner_needed_evt, "owner_needed_evt");
fork_fixup (parent, owner_found_evt, "owner_found_evt"); fork_fixup (parent, owner_found_evt, "owner_found_evt");
fork_fixup (parent, update_needed_evt, "update_needed_evt");
if (close_on_exec ()) if (close_on_exec ())
/* Prevent a later attempt to close the non-inherited /* Prevent a later attempt to close the non-inherited
pipe-instance handles copied from the parent. */ pipe-instance handles copied from the parent. */
@ -1578,6 +1657,7 @@ fhandler_fifo::set_close_on_exec (bool val)
{ {
set_no_inheritance (owner_needed_evt, val); set_no_inheritance (owner_needed_evt, val);
set_no_inheritance (owner_found_evt, val); set_no_inheritance (owner_found_evt, val);
set_no_inheritance (update_needed_evt, val);
set_no_inheritance (cancel_evt, val); set_no_inheritance (cancel_evt, val);
set_no_inheritance (thr_sync_evt, val); set_no_inheritance (thr_sync_evt, val);
fifo_client_lock (); fifo_client_lock ();

View File

@ -866,6 +866,8 @@ peek_fifo (select_record *s, bool from_select)
goto out; goto out;
} }
fh->reading_lock ();
fh->take_ownership ();
fh->fifo_client_lock (); fh->fifo_client_lock ();
int nconnected = 0; int nconnected = 0;
for (int i = 0; i < fh->get_nhandlers (); i++) for (int i = 0; i < fh->get_nhandlers (); i++)
@ -888,6 +890,7 @@ peek_fifo (select_record *s, bool from_select)
fh->get_fc_handler (i).get_state () = fc_input_avail; fh->get_fc_handler (i).get_state () = fc_input_avail;
select_printf ("read: %s, ready for read", fh->get_name ()); select_printf ("read: %s, ready for read", fh->get_name ());
fh->fifo_client_unlock (); fh->fifo_client_unlock ();
fh->reading_unlock ();
gotone += s->read_ready = true; gotone += s->read_ready = true;
goto out; goto out;
default: default:
@ -905,6 +908,7 @@ peek_fifo (select_record *s, bool from_select)
if (s->except_selected) if (s->except_selected)
gotone += s->except_ready = true; gotone += s->except_ready = true;
} }
fh->reading_unlock ();
} }
out: out:
if (s->write_selected) if (s->write_selected)