Cygwin: FIFO: use a cygthread instead of a homemade thread

This will simplify future work.

Rename the thread from "listen_client_thread" to "fifo_reader_thread"
because it will be used for more than just listening.

Remove the fixup_before stuff, which won't be needed after future
changes to fixup_after_fork and fixup_after_exec.
This commit is contained in:
Ken Brown
2020-03-26 14:29:50 -04:00
parent 9ee8fdf2b3
commit 71726ba70b
2 changed files with 65 additions and 125 deletions

View File

@@ -1307,9 +1307,9 @@ class fhandler_fifo: public fhandler_base
HANDLE write_ready; /* A writer is open; OK for a reader to open. */ HANDLE write_ready; /* A writer is open; OK for a reader to open. */
HANDLE writer_opening; /* A writer is opening; no EOF. */ HANDLE writer_opening; /* A writer is opening; no EOF. */
/* Non-shared handles needed for the listen_client_thread. */ /* Handles to non-shared events needed for fifo_reader_threads. */
HANDLE listen_client_thr; HANDLE cancel_evt; /* Signal thread to terminate. */
HANDLE lct_termination_evt; HANDLE thr_sync_evt; /* The thread has terminated. */
UNICODE_STRING pipe_name; UNICODE_STRING pipe_name;
WCHAR pipe_name_buf[CYGWIN_FIFO_PIPE_NAME_LEN + 1]; WCHAR pipe_name_buf[CYGWIN_FIFO_PIPE_NAME_LEN + 1];
@@ -1326,11 +1326,10 @@ class fhandler_fifo: public fhandler_base
NTSTATUS wait_open_pipe (HANDLE&); NTSTATUS wait_open_pipe (HANDLE&);
int add_client_handler (); int add_client_handler ();
void delete_client_handler (int); void delete_client_handler (int);
bool listen_client (); void cancel_reader_thread ();
void stop_listen_client ();
int check_listen_client_thread ();
void record_connection (fifo_client_handler&, void record_connection (fifo_client_handler&,
fifo_client_connect_state = fc_connected); fifo_client_connect_state = fc_connected);
public: public:
fhandler_fifo (); fhandler_fifo ();
bool hit_eof (); bool hit_eof ();
@@ -1339,7 +1338,7 @@ public:
int get_nhandlers () const { return nhandlers; } int get_nhandlers () const { return nhandlers; }
fifo_client_handler get_fc_handler (int i) const { return fc_handler[i]; } fifo_client_handler get_fc_handler (int i) const { return fc_handler[i]; }
PUNICODE_STRING get_pipe_name (); PUNICODE_STRING get_pipe_name ();
DWORD listen_client_thread (); DWORD fifo_reader_thread_func ();
void fifo_client_lock () { _fifo_client_lock.lock (); } void fifo_client_lock () { _fifo_client_lock.lock (); }
void fifo_client_unlock () { _fifo_client_lock.unlock (); } void fifo_client_unlock () { _fifo_client_lock.unlock (); }
int open (int, mode_t); int open (int, mode_t);
@@ -1351,9 +1350,6 @@ public:
void set_close_on_exec (bool val); void set_close_on_exec (bool val);
void __reg3 raw_read (void *ptr, size_t& ulen); void __reg3 raw_read (void *ptr, size_t& ulen);
ssize_t __reg3 raw_write (const void *ptr, size_t ulen); ssize_t __reg3 raw_write (const void *ptr, size_t ulen);
bool need_fixup_before () const { return reader; }
int fixup_before_fork_exec (DWORD) { stop_listen_client (); return 0; }
void init_fixup_before ();
void fixup_after_fork (HANDLE); void fixup_after_fork (HANDLE);
void fixup_after_exec (); void fixup_after_exec ();
int __reg2 fstatvfs (struct statvfs *buf); int __reg2 fstatvfs (struct statvfs *buf);
@@ -1375,7 +1371,6 @@ public:
void *ptr = (void *) ccalloc (malloc_type, 1, sizeof (fhandler_fifo)); void *ptr = (void *) ccalloc (malloc_type, 1, sizeof (fhandler_fifo));
fhandler_fifo *fhf = new (ptr) fhandler_fifo (ptr); fhandler_fifo *fhf = new (ptr) fhandler_fifo (ptr);
/* We don't want our client list to change any more. */ /* We don't want our client list to change any more. */
stop_listen_client ();
copyto (fhf); copyto (fhf);
/* fhf->pipe_name_buf is a *copy* of this->pipe_name_buf, but /* fhf->pipe_name_buf is a *copy* of this->pipe_name_buf, but
fhf->pipe_name.Buffer == this->pipe_name_buf. */ fhf->pipe_name.Buffer == this->pipe_name_buf. */

View File

@@ -32,11 +32,11 @@
When a FIFO is opened for reading, When a FIFO is opened for reading,
fhandler_fifo::create_pipe_instance is called to create the first fhandler_fifo::create_pipe_instance is called to create the first
instance of a Windows named pipe server (Windows terminology). A instance of a Windows named pipe server (Windows terminology). A
"listen_client" thread is also started; it waits for pipe clients "fifo_reader" thread is also started; it waits for pipe clients
(Windows terminology again) to connect. This happens every time (Windows terminology again) to connect. This happens every time
a process opens the FIFO for writing. a process opens the FIFO for writing.
The listen_client thread creates new instances of the pipe server The fifo_reader thread creates new instances of the pipe server
as needed, so that there is always an instance available for a as needed, so that there is always an instance available for a
writer to connect to. writer to connect to.
@@ -68,7 +68,7 @@ STATUS_PIPE_EMPTY simply means there's no data to be read. */
fhandler_fifo::fhandler_fifo (): fhandler_fifo::fhandler_fifo ():
fhandler_base (), fhandler_base (),
read_ready (NULL), write_ready (NULL), writer_opening (NULL), read_ready (NULL), write_ready (NULL), writer_opening (NULL),
listen_client_thr (NULL), lct_termination_evt (NULL), _maybe_eof (false), nhandlers (0), cancel_evt (NULL), thr_sync_evt (NULL), _maybe_eof (false), nhandlers (0),
reader (false), writer (false), duplexer (false), reader (false), writer (false), duplexer (false),
max_atomic_write (DEFAULT_PIPEBUFSIZE) max_atomic_write (DEFAULT_PIPEBUFSIZE)
{ {
@@ -319,34 +319,6 @@ fhandler_fifo::delete_client_handler (int i)
(nhandlers - i) * sizeof (fc_handler[i])); (nhandlers - i) * sizeof (fc_handler[i]));
} }
/* Just hop to the listen_client_thread method. */
DWORD WINAPI
listen_client_func (LPVOID param)
{
fhandler_fifo *fh = (fhandler_fifo *) param;
return fh->listen_client_thread ();
}
/* Start a thread that listens for client connections. */
bool
fhandler_fifo::listen_client ()
{
if (!(lct_termination_evt = create_event ()))
return false;
listen_client_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE,
listen_client_func, (PVOID) this, 0, NULL);
if (!listen_client_thr)
{
__seterrno ();
HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL);
if (evt)
NtClose (evt);
return false;
}
return true;
}
void void
fhandler_fifo::record_connection (fifo_client_handler& fc, fhandler_fifo::record_connection (fifo_client_handler& fc,
fifo_client_connect_state s) fifo_client_connect_state s)
@@ -357,8 +329,15 @@ fhandler_fifo::record_connection (fifo_client_handler& fc,
set_pipe_non_blocking (fc.h, true); set_pipe_non_blocking (fc.h, true);
} }
static DWORD WINAPI
fifo_reader_thread (LPVOID param)
{
fhandler_fifo *fh = (fhandler_fifo *) param;
return fh->fifo_reader_thread_func ();
}
DWORD DWORD
fhandler_fifo::listen_client_thread () fhandler_fifo::fifo_reader_thread_func ()
{ {
HANDLE conn_evt; HANDLE conn_evt;
@@ -377,7 +356,6 @@ fhandler_fifo::listen_client_thread ()
else else
i++; i++;
} }
fifo_client_unlock ();
/* Create a new client handler. */ /* Create a new client handler. */
if (add_client_handler () < 0) if (add_client_handler () < 0)
@@ -385,6 +363,7 @@ fhandler_fifo::listen_client_thread ()
/* Listen for a writer to connect to the new client handler. */ /* Listen for a writer to connect to the new client handler. */
fifo_client_handler& fc = fc_handler[nhandlers - 1]; fifo_client_handler& fc = fc_handler[nhandlers - 1];
fifo_client_unlock ();
NTSTATUS status; NTSTATUS status;
IO_STATUS_BLOCK io; IO_STATUS_BLOCK io;
bool cancel = false; bool cancel = false;
@@ -393,9 +372,8 @@ fhandler_fifo::listen_client_thread ()
FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0);
if (status == STATUS_PENDING) if (status == STATUS_PENDING)
{ {
HANDLE w[2] = { conn_evt, lct_termination_evt }; HANDLE w[2] = { conn_evt, cancel_evt };
DWORD waitret = WaitForMultipleObjects (2, w, false, INFINITE); switch (WaitForMultipleObjects (2, w, false, INFINITE))
switch (waitret)
{ {
case WAIT_OBJECT_0: case WAIT_OBJECT_0:
status = io.Status; status = io.Status;
@@ -453,11 +431,13 @@ fhandler_fifo::listen_client_thread ()
if (ph) if (ph)
NtClose (ph); NtClose (ph);
if (cancel) if (cancel)
goto out; goto canceled;
} }
out: canceled:
if (conn_evt) if (conn_evt)
NtClose (conn_evt); NtClose (conn_evt);
/* automatically return the cygthread to the cygthread pool */
_my_tls._ctinfo->auto_release ();
return 0; return 0;
} }
@@ -521,16 +501,15 @@ fhandler_fifo::open (int flags, mode_t)
goto err_close_write_ready; goto err_close_write_ready;
} }
/* If we're reading, signal read_ready and start the listen_client /* If we're reading, signal read_ready and start the fifo_reader_thread. */
thread. */
if (reader) if (reader)
{ {
if (!listen_client ())
{
debug_printf ("create of listen_client thread failed");
goto err_close_writer_opening;
}
SetEvent (read_ready); SetEvent (read_ready);
if (!(cancel_evt = create_event ()))
goto err_close_writer_opening;
if (!(thr_sync_evt = create_event ()))
goto err_close_cancel_evt;
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
/* If we're a duplexer, we need a handle for writing. */ /* If we're a duplexer, we need a handle for writing. */
if (duplexer) if (duplexer)
@@ -563,7 +542,6 @@ fhandler_fifo::open (int flags, mode_t)
/* Not a duplexer; wait for a writer to connect. */ /* Not a duplexer; wait for a writer to connect. */
else if (!wait (write_ready)) else if (!wait (write_ready))
goto err_close_reader; goto err_close_reader;
init_fixup_before ();
goto success; goto success;
} }
@@ -635,6 +613,8 @@ err_close_reader:
close (); close ();
set_errno (saved_errno); set_errno (saved_errno);
return 0; return 0;
err_close_cancel_evt:
NtClose (cancel_evt);
err_close_writer_opening: err_close_writer_opening:
NtClose (writer_opening); NtClose (writer_opening);
err_close_write_ready: err_close_write_ready:
@@ -815,43 +795,9 @@ fhandler_fifo::hit_eof ()
return ret; return ret;
} }
/* Is the lct running? */
int
fhandler_fifo::check_listen_client_thread ()
{
int ret = 0;
if (listen_client_thr)
{
DWORD waitret = WaitForSingleObject (listen_client_thr, 0);
switch (waitret)
{
case WAIT_OBJECT_0:
NtClose (listen_client_thr);
break;
case WAIT_TIMEOUT:
ret = 1;
break;
default:
debug_printf ("WaitForSingleObject failed, %E");
ret = -1;
__seterrno ();
NtClose (listen_client_thr);
break;
}
}
return ret;
}
void __reg3 void __reg3
fhandler_fifo::raw_read (void *in_ptr, size_t& len) fhandler_fifo::raw_read (void *in_ptr, size_t& len)
{ {
/* Make sure the lct is running. */
int res = check_listen_client_thread ();
debug_printf ("lct status %d", res);
if (res < 0 || (res == 0 && !listen_client ()))
goto errout;
if (!len) if (!len)
return; return;
@@ -976,35 +922,29 @@ fifo_client_handler::pipe_state ()
} }
void void
fhandler_fifo::stop_listen_client () fhandler_fifo::cancel_reader_thread ()
{ {
HANDLE thr, evt; if (cancel_evt)
SetEvent (cancel_evt);
thr = InterlockedExchangePointer (&listen_client_thr, NULL); if (thr_sync_evt)
if (thr) WaitForSingleObject (thr_sync_evt, INFINITE);
{
if (lct_termination_evt)
SetEvent (lct_termination_evt);
WaitForSingleObject (thr, INFINITE);
NtClose (thr);
}
evt = InterlockedExchangePointer (&lct_termination_evt, NULL);
if (evt)
NtClose (evt);
} }
int int
fhandler_fifo::close () fhandler_fifo::close ()
{ {
/* Avoid deadlock with lct in case this is called from a signal
handler or another thread. */
fifo_client_unlock ();
stop_listen_client ();
if (reader) if (reader)
{
cancel_reader_thread ();
if (cancel_evt)
NtClose (cancel_evt);
if (thr_sync_evt)
NtClose (thr_sync_evt);
/* FIXME: There could be several readers open because of /* FIXME: There could be several readers open because of
dup/fork/exec; we should only reset read_ready when the last dup/fork/exec; we should only reset read_ready when the last
one closes. */ one closes. */
ResetEvent (read_ready); ResetEvent (read_ready);
}
if (read_ready) if (read_ready)
NtClose (read_ready); NtClose (read_ready);
if (write_ready) if (write_ready)
@@ -1091,11 +1031,16 @@ fhandler_fifo::dup (fhandler_base *child, int flags)
goto err_close_handlers; goto err_close_handlers;
} }
fifo_client_unlock (); fifo_client_unlock ();
if (!fhf->listen_client ()) if (!(fhf->cancel_evt = create_event ()))
goto err_close_handlers; goto err_close_handlers;
fhf->init_fixup_before (); if (!(fhf->thr_sync_evt = create_event ()))
goto err_close_cancel_evt;
new cygthread (fifo_reader_thread, fhf, "fifo_reader",
fhf->thr_sync_evt);
} }
return 0; return 0;
err_close_cancel_evt:
NtClose (fhf->cancel_evt);
err_close_handlers: err_close_handlers:
for (int j = 0; j < i; j++) for (int j = 0; j < i; j++)
fhf->fc_handler[j].close (); fhf->fc_handler[j].close ();
@@ -1109,12 +1054,6 @@ err:
return -1; return -1;
} }
void
fhandler_fifo::init_fixup_before ()
{
cygheap->fdtab.inc_need_fixup_before ();
}
void void
fhandler_fifo::fixup_after_fork (HANDLE parent) fhandler_fifo::fixup_after_fork (HANDLE parent)
{ {
@@ -1131,8 +1070,11 @@ fhandler_fifo::fixup_after_fork (HANDLE parent)
for (int i = 0; i < nhandlers; i++) for (int i = 0; i < nhandlers; i++)
fork_fixup (parent, fc_handler[i].h, "fc_handler[].h"); fork_fixup (parent, fc_handler[i].h, "fc_handler[].h");
fifo_client_unlock (); fifo_client_unlock ();
if (!listen_client ()) if (!(cancel_evt = create_event ()))
debug_printf ("failed to start lct, %E"); api_fatal ("Can't create reader thread cancel event during fork, %E");
if (!(thr_sync_evt = create_event ()))
api_fatal ("Can't create reader thread sync event during fork, %E");
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
} }
} }
@@ -1145,8 +1087,11 @@ fhandler_fifo::fixup_after_exec ()
/* Make sure the child starts unlocked. */ /* Make sure the child starts unlocked. */
fifo_client_unlock (); fifo_client_unlock ();
if (!listen_client ()) if (!(cancel_evt = create_event ()))
debug_printf ("failed to start lct, %E"); api_fatal ("Can't create reader thread cancel event during exec, %E");
if (!(thr_sync_evt = create_event ()))
api_fatal ("Can't create reader thread sync event during exec, %E");
new cygthread (fifo_reader_thread, this, "fifo_reader", thr_sync_evt);
} }
} }