Cygwin: FIFO: designate one reader as owner
Among all the open readers of a FIFO, one is declared to be the owner. This is the only reader that listens for client connections, and it is the only one that has an accurate fc_handler list. Add shared data and methods for getting and setting the owner, as well as a lock to prevent more than one reader from accessing these data simultaneously. Modify the fifo_reader_thread so that it checks the owner at the beginning of its loop. If there is no owner, it takes ownership. If there is an owner but it is a different reader, the thread just waits to be canceled. Otherwise, it listens for client connections as before. Remove the 'first' argument from create_pipe_instance. It is not needed, and it may be confusing in the future since only the owner knows whether a pipe instance is the first. When opening a reader, don't return until the fifo_reader_thread has time to set an owner. If the owner closes, indicate that there is no longer an owner. Clear the child's fc_handler list in dup, and don't bother duplicating the handles. The child never starts out as owner, so it can't use those handles. Do the same thing in fixup_after_fork in the close-on-exec case. In the non-close-on-exec case, the child inherits an fc_handler list that it can't use, but we can just leave it alone; the handles will be closed when the child is closed.
This commit is contained in:
parent
16e7c10578
commit
606baf5566
@ -1324,10 +1324,17 @@ struct fifo_reader_id_t
|
|||||||
class fifo_shmem_t
|
class fifo_shmem_t
|
||||||
{
|
{
|
||||||
LONG _nreaders;
|
LONG _nreaders;
|
||||||
|
fifo_reader_id_t _owner;
|
||||||
|
af_unix_spinlock_t _owner_lock;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); }
|
int inc_nreaders () { return (int) InterlockedIncrement (&_nreaders); }
|
||||||
int dec_nreaders () { return (int) InterlockedDecrement (&_nreaders); }
|
int dec_nreaders () { return (int) InterlockedDecrement (&_nreaders); }
|
||||||
|
|
||||||
|
fifo_reader_id_t get_owner () const { return _owner; }
|
||||||
|
void set_owner (fifo_reader_id_t fr_id) { _owner = fr_id; }
|
||||||
|
void owner_lock () { _owner_lock.lock (); }
|
||||||
|
void owner_unlock () { _owner_lock.unlock (); }
|
||||||
};
|
};
|
||||||
|
|
||||||
class fhandler_fifo: public fhandler_base
|
class fhandler_fifo: public fhandler_base
|
||||||
@ -1356,7 +1363,7 @@ class fhandler_fifo: public fhandler_base
|
|||||||
|
|
||||||
bool __reg2 wait (HANDLE);
|
bool __reg2 wait (HANDLE);
|
||||||
static NTSTATUS npfs_handle (HANDLE &);
|
static NTSTATUS npfs_handle (HANDLE &);
|
||||||
HANDLE create_pipe_instance (bool);
|
HANDLE create_pipe_instance ();
|
||||||
NTSTATUS open_pipe (HANDLE&);
|
NTSTATUS open_pipe (HANDLE&);
|
||||||
NTSTATUS wait_open_pipe (HANDLE&);
|
NTSTATUS wait_open_pipe (HANDLE&);
|
||||||
int add_client_handler ();
|
int add_client_handler ();
|
||||||
@ -1384,6 +1391,10 @@ public:
|
|||||||
void fifo_client_unlock () { _fifo_client_lock.unlock (); }
|
void fifo_client_unlock () { _fifo_client_lock.unlock (); }
|
||||||
|
|
||||||
fifo_reader_id_t get_me () const { return me; }
|
fifo_reader_id_t get_me () const { return me; }
|
||||||
|
fifo_reader_id_t get_owner () const { return shmem->get_owner (); }
|
||||||
|
void set_owner (fifo_reader_id_t fr_id) { shmem->set_owner (fr_id); }
|
||||||
|
void owner_lock () { shmem->owner_lock (); }
|
||||||
|
void owner_unlock () { shmem->owner_unlock (); }
|
||||||
|
|
||||||
int open (int, mode_t);
|
int open (int, mode_t);
|
||||||
off_t lseek (off_t offset, int whence);
|
off_t lseek (off_t offset, int whence);
|
||||||
|
@ -164,7 +164,7 @@ fhandler_fifo::npfs_handle (HANDLE &nph)
|
|||||||
blocking mode so that we can easily wait for a connection. After
|
blocking mode so that we can easily wait for a connection. After
|
||||||
it is connected, it is put in nonblocking mode. */
|
it is connected, it is put in nonblocking mode. */
|
||||||
HANDLE
|
HANDLE
|
||||||
fhandler_fifo::create_pipe_instance (bool first)
|
fhandler_fifo::create_pipe_instance ()
|
||||||
{
|
{
|
||||||
NTSTATUS status;
|
NTSTATUS status;
|
||||||
HANDLE npfsh;
|
HANDLE npfsh;
|
||||||
@ -187,14 +187,12 @@ fhandler_fifo::create_pipe_instance (bool first)
|
|||||||
access = GENERIC_READ | FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES
|
access = GENERIC_READ | FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES
|
||||||
| SYNCHRONIZE;
|
| SYNCHRONIZE;
|
||||||
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
|
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
|
||||||
hattr = openflags & O_CLOEXEC ? 0 : OBJ_INHERIT;
|
hattr = (openflags & O_CLOEXEC ? 0 : OBJ_INHERIT) | OBJ_CASE_INSENSITIVE;
|
||||||
if (first)
|
|
||||||
hattr |= OBJ_CASE_INSENSITIVE;
|
|
||||||
InitializeObjectAttributes (&attr, get_pipe_name (),
|
InitializeObjectAttributes (&attr, get_pipe_name (),
|
||||||
hattr, npfsh, NULL);
|
hattr, npfsh, NULL);
|
||||||
timeout.QuadPart = -500000;
|
timeout.QuadPart = -500000;
|
||||||
status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing,
|
status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing,
|
||||||
first ? FILE_CREATE : FILE_OPEN, 0,
|
FILE_OPEN_IF, 0,
|
||||||
FILE_PIPE_MESSAGE_TYPE
|
FILE_PIPE_MESSAGE_TYPE
|
||||||
| FILE_PIPE_REJECT_REMOTE_CLIENTS,
|
| FILE_PIPE_REJECT_REMOTE_CLIENTS,
|
||||||
FILE_PIPE_MESSAGE_MODE,
|
FILE_PIPE_MESSAGE_MODE,
|
||||||
@ -292,14 +290,13 @@ fhandler_fifo::add_client_handler ()
|
|||||||
int ret = -1;
|
int ret = -1;
|
||||||
fifo_client_handler fc;
|
fifo_client_handler fc;
|
||||||
HANDLE ph = NULL;
|
HANDLE ph = NULL;
|
||||||
bool first = (nhandlers == 0);
|
|
||||||
|
|
||||||
if (nhandlers == MAX_CLIENTS)
|
if (nhandlers == MAX_CLIENTS)
|
||||||
{
|
{
|
||||||
set_errno (EMFILE);
|
set_errno (EMFILE);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
ph = create_pipe_instance (first);
|
ph = create_pipe_instance ();
|
||||||
if (!ph)
|
if (!ph)
|
||||||
goto out;
|
goto out;
|
||||||
else
|
else
|
||||||
@ -349,92 +346,120 @@ fhandler_fifo::fifo_reader_thread_func ()
|
|||||||
|
|
||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
/* Cleanup the fc_handler list. */
|
fifo_reader_id_t cur_owner;
|
||||||
fifo_client_lock ();
|
|
||||||
int i = 0;
|
owner_lock ();
|
||||||
while (i < nhandlers)
|
cur_owner = get_owner ();
|
||||||
|
if (!cur_owner)
|
||||||
{
|
{
|
||||||
if (fc_handler[i].state < fc_connected)
|
set_owner (me);
|
||||||
delete_client_handler (i);
|
owner_unlock ();
|
||||||
else
|
continue;
|
||||||
i++;
|
|
||||||
}
|
}
|
||||||
|
else if (cur_owner != me)
|
||||||
/* Create a new client handler. */
|
|
||||||
if (add_client_handler () < 0)
|
|
||||||
api_fatal ("Can't add a client handler, %E");
|
|
||||||
|
|
||||||
/* Listen for a writer to connect to the new client handler. */
|
|
||||||
fifo_client_handler& fc = fc_handler[nhandlers - 1];
|
|
||||||
fifo_client_unlock ();
|
|
||||||
NTSTATUS status;
|
|
||||||
IO_STATUS_BLOCK io;
|
|
||||||
bool cancel = false;
|
|
||||||
|
|
||||||
status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io,
|
|
||||||
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
|
|
||||||
if (status == STATUS_PENDING)
|
|
||||||
{
|
{
|
||||||
HANDLE w[2] = { conn_evt, cancel_evt };
|
owner_unlock ();
|
||||||
switch (WaitForMultipleObjects (2, w, false, INFINITE))
|
WaitForSingleObject (cancel_evt, INFINITE);
|
||||||
|
goto canceled;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* I'm the owner */
|
||||||
|
fifo_client_lock ();
|
||||||
|
|
||||||
|
/* Cleanup the fc_handler list. */
|
||||||
|
fifo_client_lock ();
|
||||||
|
int i = 0;
|
||||||
|
while (i < nhandlers)
|
||||||
{
|
{
|
||||||
case WAIT_OBJECT_0:
|
if (fc_handler[i].state < fc_connected)
|
||||||
status = io.Status;
|
delete_client_handler (i);
|
||||||
|
else
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create a new client handler. */
|
||||||
|
if (add_client_handler () < 0)
|
||||||
|
api_fatal ("Can't add a client handler, %E");
|
||||||
|
|
||||||
|
/* Listen for a writer to connect to the new client handler. */
|
||||||
|
fifo_client_handler& fc = fc_handler[nhandlers - 1];
|
||||||
|
fifo_client_unlock ();
|
||||||
|
owner_unlock ();
|
||||||
|
NTSTATUS status;
|
||||||
|
IO_STATUS_BLOCK io;
|
||||||
|
bool cancel = false;
|
||||||
|
|
||||||
|
status = NtFsControlFile (fc.h, conn_evt, NULL, NULL, &io,
|
||||||
|
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
|
||||||
|
if (status == STATUS_PENDING)
|
||||||
|
{
|
||||||
|
HANDLE w[2] = { conn_evt, cancel_evt };
|
||||||
|
switch (WaitForMultipleObjects (2, w, false, INFINITE))
|
||||||
|
{
|
||||||
|
case WAIT_OBJECT_0:
|
||||||
|
status = io.Status;
|
||||||
|
debug_printf ("NtFsControlFile STATUS_PENDING, then %y",
|
||||||
|
status);
|
||||||
|
break;
|
||||||
|
case WAIT_OBJECT_0 + 1:
|
||||||
|
status = STATUS_THREAD_IS_TERMINATING;
|
||||||
|
cancel = true;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
api_fatal ("WFMO failed, %E");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
debug_printf ("NtFsControlFile status %y, no STATUS_PENDING",
|
||||||
|
status);
|
||||||
|
HANDLE ph = NULL;
|
||||||
|
NTSTATUS status1;
|
||||||
|
|
||||||
|
fifo_client_lock ();
|
||||||
|
switch (status)
|
||||||
|
{
|
||||||
|
case STATUS_SUCCESS:
|
||||||
|
case STATUS_PIPE_CONNECTED:
|
||||||
|
record_connection (fc);
|
||||||
break;
|
break;
|
||||||
case WAIT_OBJECT_0 + 1:
|
case STATUS_PIPE_CLOSING:
|
||||||
status = STATUS_THREAD_IS_TERMINATING;
|
record_connection (fc, fc_closing);
|
||||||
cancel = true;
|
break;
|
||||||
|
case STATUS_THREAD_IS_TERMINATING:
|
||||||
|
/* Try to connect a bogus client. Otherwise fc is still
|
||||||
|
listening, and the next connection might not get recorded. */
|
||||||
|
status1 = open_pipe (ph);
|
||||||
|
WaitForSingleObject (conn_evt, INFINITE);
|
||||||
|
if (NT_SUCCESS (status1))
|
||||||
|
/* Bogus cilent connected. */
|
||||||
|
delete_client_handler (nhandlers - 1);
|
||||||
|
else
|
||||||
|
/* Did a real client connect? */
|
||||||
|
switch (io.Status)
|
||||||
|
{
|
||||||
|
case STATUS_SUCCESS:
|
||||||
|
case STATUS_PIPE_CONNECTED:
|
||||||
|
record_connection (fc);
|
||||||
|
break;
|
||||||
|
case STATUS_PIPE_CLOSING:
|
||||||
|
record_connection (fc, fc_closing);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
debug_printf ("NtFsControlFile status %y after failing to connect bogus client or real client", io.Status);
|
||||||
|
fc.state = fc_unknown;
|
||||||
|
break;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
api_fatal ("WFMO failed, %E");
|
break;
|
||||||
}
|
}
|
||||||
|
fifo_client_unlock ();
|
||||||
|
if (ph)
|
||||||
|
NtClose (ph);
|
||||||
|
if (cancel)
|
||||||
|
goto canceled;
|
||||||
}
|
}
|
||||||
HANDLE ph = NULL;
|
|
||||||
NTSTATUS status1;
|
|
||||||
|
|
||||||
fifo_client_lock ();
|
|
||||||
switch (status)
|
|
||||||
{
|
|
||||||
case STATUS_SUCCESS:
|
|
||||||
case STATUS_PIPE_CONNECTED:
|
|
||||||
record_connection (fc);
|
|
||||||
break;
|
|
||||||
case STATUS_PIPE_CLOSING:
|
|
||||||
record_connection (fc, fc_closing);
|
|
||||||
break;
|
|
||||||
case STATUS_THREAD_IS_TERMINATING:
|
|
||||||
/* Try to connect a bogus client. Otherwise fc is still
|
|
||||||
listening, and the next connection might not get recorded. */
|
|
||||||
status1 = open_pipe (ph);
|
|
||||||
WaitForSingleObject (conn_evt, INFINITE);
|
|
||||||
if (NT_SUCCESS (status1))
|
|
||||||
/* Bogus cilent connected. */
|
|
||||||
delete_client_handler (nhandlers - 1);
|
|
||||||
else
|
|
||||||
/* Did a real client connect? */
|
|
||||||
switch (io.Status)
|
|
||||||
{
|
|
||||||
case STATUS_SUCCESS:
|
|
||||||
case STATUS_PIPE_CONNECTED:
|
|
||||||
record_connection (fc);
|
|
||||||
break;
|
|
||||||
case STATUS_PIPE_CLOSING:
|
|
||||||
record_connection (fc, fc_closing);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
debug_printf ("NtFsControlFile status %y after failing to connect bogus client or real client", io.Status);
|
|
||||||
fc.state = fc_unknown;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
fifo_client_unlock ();
|
|
||||||
if (ph)
|
|
||||||
NtClose (ph);
|
|
||||||
if (cancel)
|
|
||||||
goto canceled;
|
|
||||||
}
|
}
|
||||||
canceled:
|
canceled:
|
||||||
if (conn_evt)
|
if (conn_evt)
|
||||||
@ -580,6 +605,15 @@ fhandler_fifo::open (int flags, mode_t)
|
|||||||
me.winpid = GetCurrentProcessId ();
|
me.winpid = GetCurrentProcessId ();
|
||||||
me.fh = this;
|
me.fh = this;
|
||||||
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
|
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
|
||||||
|
/* Wait until there's an owner. */
|
||||||
|
owner_lock ();
|
||||||
|
while (!get_owner ())
|
||||||
|
{
|
||||||
|
owner_unlock ();
|
||||||
|
yield ();
|
||||||
|
owner_lock ();
|
||||||
|
}
|
||||||
|
owner_unlock ();
|
||||||
|
|
||||||
/* If we're a duplexer, we need a handle for writing. */
|
/* If we're a duplexer, we need a handle for writing. */
|
||||||
if (duplexer)
|
if (duplexer)
|
||||||
@ -1014,6 +1048,10 @@ fhandler_fifo::close ()
|
|||||||
if (dec_nreaders () == 0)
|
if (dec_nreaders () == 0)
|
||||||
ResetEvent (read_ready);
|
ResetEvent (read_ready);
|
||||||
cancel_reader_thread ();
|
cancel_reader_thread ();
|
||||||
|
owner_lock ();
|
||||||
|
if (get_owner () == me)
|
||||||
|
set_owner (null_fr_id);
|
||||||
|
owner_unlock ();
|
||||||
if (cancel_evt)
|
if (cancel_evt)
|
||||||
NtClose (cancel_evt);
|
NtClose (cancel_evt);
|
||||||
if (thr_sync_evt)
|
if (thr_sync_evt)
|
||||||
@ -1056,7 +1094,6 @@ fhandler_fifo::fcntl (int cmd, intptr_t arg)
|
|||||||
int
|
int
|
||||||
fhandler_fifo::dup (fhandler_base *child, int flags)
|
fhandler_fifo::dup (fhandler_base *child, int flags)
|
||||||
{
|
{
|
||||||
int i = 0;
|
|
||||||
fhandler_fifo *fhf = NULL;
|
fhandler_fifo *fhf = NULL;
|
||||||
|
|
||||||
if (get_flags () & O_PATH)
|
if (get_flags () & O_PATH)
|
||||||
@ -1092,6 +1129,9 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
|
|||||||
/* Make sure the child starts unlocked. */
|
/* Make sure the child starts unlocked. */
|
||||||
fhf->fifo_client_unlock ();
|
fhf->fifo_client_unlock ();
|
||||||
|
|
||||||
|
/* Clear fc_handler list; the child never starts as owner. */
|
||||||
|
fhf->nhandlers = 0;
|
||||||
|
|
||||||
if (!DuplicateHandle (GetCurrentProcess (), shmem_handle,
|
if (!DuplicateHandle (GetCurrentProcess (), shmem_handle,
|
||||||
GetCurrentProcess (), &fhf->shmem_handle,
|
GetCurrentProcess (), &fhf->shmem_handle,
|
||||||
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
|
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
|
||||||
@ -1101,25 +1141,8 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
|
|||||||
}
|
}
|
||||||
if (fhf->reopen_shmem () < 0)
|
if (fhf->reopen_shmem () < 0)
|
||||||
goto err_close_shmem_handle;
|
goto err_close_shmem_handle;
|
||||||
fifo_client_lock ();
|
|
||||||
for (i = 0; i < nhandlers; i++)
|
|
||||||
{
|
|
||||||
if (!DuplicateHandle (GetCurrentProcess (), fc_handler[i].h,
|
|
||||||
GetCurrentProcess (), &fhf->fc_handler[i].h,
|
|
||||||
0, !(flags & O_CLOEXEC), DUPLICATE_SAME_ACCESS))
|
|
||||||
{
|
|
||||||
__seterrno ();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (i < nhandlers)
|
|
||||||
{
|
|
||||||
fifo_client_unlock ();
|
|
||||||
goto err_close_handlers;
|
|
||||||
}
|
|
||||||
fifo_client_unlock ();
|
|
||||||
if (!(fhf->cancel_evt = create_event ()))
|
if (!(fhf->cancel_evt = create_event ()))
|
||||||
goto err_close_handlers;
|
goto err_close_shmem;
|
||||||
if (!(fhf->thr_sync_evt = create_event ()))
|
if (!(fhf->thr_sync_evt = create_event ()))
|
||||||
goto err_close_cancel_evt;
|
goto err_close_cancel_evt;
|
||||||
inc_nreaders ();
|
inc_nreaders ();
|
||||||
@ -1129,9 +1152,7 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
|
|||||||
return 0;
|
return 0;
|
||||||
err_close_cancel_evt:
|
err_close_cancel_evt:
|
||||||
NtClose (fhf->cancel_evt);
|
NtClose (fhf->cancel_evt);
|
||||||
err_close_handlers:
|
err_close_shmem:
|
||||||
for (int j = 0; j < i; j++)
|
|
||||||
fhf->fc_handler[j].close ();
|
|
||||||
NtUnmapViewOfSection (GetCurrentProcess (), fhf->shmem);
|
NtUnmapViewOfSection (GetCurrentProcess (), fhf->shmem);
|
||||||
err_close_shmem_handle:
|
err_close_shmem_handle:
|
||||||
NtClose (fhf->shmem_handle);
|
NtClose (fhf->shmem_handle);
|
||||||
@ -1160,10 +1181,10 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
|
|||||||
fork_fixup (parent, shmem_handle, "shmem_handle");
|
fork_fixup (parent, shmem_handle, "shmem_handle");
|
||||||
if (reopen_shmem () < 0)
|
if (reopen_shmem () < 0)
|
||||||
api_fatal ("Can't reopen shared memory during fork, %E");
|
api_fatal ("Can't reopen shared memory during fork, %E");
|
||||||
fifo_client_lock ();
|
if (close_on_exec ())
|
||||||
for (int i = 0; i < nhandlers; i++)
|
/* Prevent a later attempt to close the non-inherited
|
||||||
fork_fixup (parent, fc_handler[i].h, "fc_handler[].h");
|
pipe-instance handles copied from the parent. */
|
||||||
fifo_client_unlock ();
|
nhandlers = 0;
|
||||||
if (!(cancel_evt = create_event ()))
|
if (!(cancel_evt = create_event ()))
|
||||||
api_fatal ("Can't create reader thread cancel event during fork, %E");
|
api_fatal ("Can't create reader thread cancel event during fork, %E");
|
||||||
if (!(thr_sync_evt = create_event ()))
|
if (!(thr_sync_evt = create_event ()))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user