diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h index 57e97c277..e7c4af6a1 100644 --- a/winsup/cygwin/fhandler.h +++ b/winsup/cygwin/fhandler.h @@ -1235,20 +1235,49 @@ public: }; #define CYGWIN_FIFO_PIPE_NAME_LEN 47 +#define MAX_CLIENTS 64 + +enum fifo_client_connect_state + { + fc_unknown, + fc_connecting, + fc_connected, + fc_invalid + }; + +struct fifo_client_handler +{ + fhandler_base *fh; + fifo_client_connect_state state; + HANDLE connect_evt; + HANDLE dummy_evt; /* Never signaled. */ + fifo_client_handler () : fh (NULL), state (fc_unknown), connect_evt (NULL), + dummy_evt (NULL) {} + int connect (); + int close (); +}; class fhandler_fifo: public fhandler_base { HANDLE read_ready; HANDLE write_ready; + HANDLE listen_client_thr; + HANDLE lct_termination_evt; UNICODE_STRING pipe_name; WCHAR pipe_name_buf[CYGWIN_FIFO_PIPE_NAME_LEN + 1]; + fifo_client_handler client[MAX_CLIENTS]; + int nclients, nconnected; bool __reg2 wait (HANDLE); NTSTATUS npfs_handle (HANDLE &); - HANDLE create_pipe (); + HANDLE create_pipe_instance (bool); NTSTATUS open_pipe (); + int disconnect_and_reconnect (int); + int add_client (); + bool listen_client (); public: fhandler_fifo (); PUNICODE_STRING get_pipe_name (); + DWORD listen_client_thread (); int open (int, mode_t); off_t lseek (off_t offset, int whence); int close (); diff --git a/winsup/cygwin/fhandler_fifo.cc b/winsup/cygwin/fhandler_fifo.cc index cb269e344..e91e88050 100644 --- a/winsup/cygwin/fhandler_fifo.cc +++ b/winsup/cygwin/fhandler_fifo.cc @@ -31,8 +31,9 @@ STATUS_PIPE_EMPTY simply means there's no data to be read. */ || _s == STATUS_PIPE_EMPTY; }) fhandler_fifo::fhandler_fifo (): - fhandler_base (), - read_ready (NULL), write_ready (NULL) + fhandler_base (), read_ready (NULL), write_ready (NULL), + listen_client_thr (NULL), lct_termination_evt (NULL), nclients (0), + nconnected (0) { pipe_name_buf[0] = L'\0'; need_fork_fixup (true); @@ -78,6 +79,94 @@ fhandler_fifo::arm (HANDLE h) return res; } +static HANDLE +create_event () +{ + NTSTATUS status; + OBJECT_ATTRIBUTES attr; + HANDLE evt = NULL; + + InitializeObjectAttributes (&attr, NULL, 0, NULL, NULL); + status = NtCreateEvent (&evt, EVENT_ALL_ACCESS, &attr, + NotificationEvent, FALSE); + if (!NT_SUCCESS (status)) + __seterrno_from_nt_status (status); + return evt; +} + + +static void +set_pipe_non_blocking (HANDLE ph, bool nonblocking) +{ + NTSTATUS status; + IO_STATUS_BLOCK io; + FILE_PIPE_INFORMATION fpi; + + fpi.ReadMode = FILE_PIPE_MESSAGE_MODE; + fpi.CompletionMode = nonblocking ? FILE_PIPE_COMPLETE_OPERATION + : FILE_PIPE_QUEUE_OPERATION; + status = NtSetInformationFile (ph, &io, &fpi, sizeof fpi, + FilePipeInformation); + if (!NT_SUCCESS (status)) + debug_printf ("NtSetInformationFile(FilePipeInformation): %y", status); +} + +/* The pipe instance is always in blocking mode when this is called. */ +int +fifo_client_handler::connect () +{ + NTSTATUS status; + IO_STATUS_BLOCK io; + + if (connect_evt) + ResetEvent (connect_evt); + else if (!(connect_evt = create_event ())) + return -1; + status = NtFsControlFile (fh->get_handle (), connect_evt, NULL, NULL, &io, + FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); + switch (status) + { + case STATUS_PENDING: + case STATUS_PIPE_LISTENING: + state = fc_connecting; + break; + case STATUS_PIPE_CONNECTED: + state = fc_connected; + set_pipe_non_blocking (fh->get_handle (), true); + break; + default: + __seterrno_from_nt_status (status); + return -1; + } + return 0; +} + +int +fhandler_fifo::disconnect_and_reconnect (int i) +{ + NTSTATUS status; + IO_STATUS_BLOCK io; + HANDLE ph = client[i].fh->get_handle (); + + status = NtFsControlFile (ph, NULL, NULL, NULL, &io, FSCTL_PIPE_DISCONNECT, + NULL, 0, NULL, 0); + /* Short-lived. Don't use cygwait. We don't want to be interrupted. */ + if (status == STATUS_PENDING + && NtWaitForSingleObject (ph, FALSE, NULL) == WAIT_OBJECT_0) + status = io.Status; + if (!NT_SUCCESS (status)) + { + __seterrno_from_nt_status (status); + return -1; + } + set_pipe_non_blocking (client[i].fh->get_handle (), false); + if (client[i].connect () < 0) + return -1; + if (client[i].state == fc_connected) + nconnected++; + return 0; +} + NTSTATUS fhandler_fifo::npfs_handle (HANDLE &nph) { @@ -108,9 +197,12 @@ fhandler_fifo::npfs_handle (HANDLE &nph) return status; } -/* Called when pipe is opened for reading. */ +/* Called when a FIFO is first opened for reading and again each time + a new client is needed. Each pipe instance is created in blocking + mode so that we can easily wait for a connection. After it is + connected, it is put in nonblocking mode. */ HANDLE -fhandler_fifo::create_pipe () +fhandler_fifo::create_pipe_instance (bool first) { NTSTATUS status; HANDLE npfsh; @@ -121,7 +213,7 @@ fhandler_fifo::create_pipe () ULONG hattr; ULONG sharing; ULONG nonblocking = FILE_PIPE_QUEUE_OPERATION; - ULONG max_instances = 1; + ULONG max_instances = -1; LARGE_INTEGER timeout; status = npfs_handle (npfsh); @@ -133,12 +225,14 @@ fhandler_fifo::create_pipe () access = GENERIC_READ | FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES | SYNCHRONIZE; sharing = FILE_SHARE_READ | FILE_SHARE_WRITE; - hattr = OBJ_INHERIT | OBJ_CASE_INSENSITIVE; + hattr = OBJ_INHERIT; + if (first) + hattr |= OBJ_CASE_INSENSITIVE; InitializeObjectAttributes (&attr, get_pipe_name (), hattr, npfsh, NULL); timeout.QuadPart = -500000; status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing, - FILE_CREATE, 0, + first ? FILE_CREATE : FILE_OPEN, 0, FILE_PIPE_MESSAGE_TYPE, FILE_PIPE_MESSAGE_MODE, nonblocking, max_instances, @@ -149,7 +243,7 @@ fhandler_fifo::create_pipe () return ph; } -/* Called when file is opened for writing. */ +/* Called when a FIFO is opened for writing. */ NTSTATUS fhandler_fifo::open_pipe () { @@ -174,6 +268,140 @@ fhandler_fifo::open_pipe () return status; } +int +fhandler_fifo::add_client () +{ + fifo_client_handler fc; + fhandler_base *fh; + bool first = (nclients == 0); + + if (nclients == MAX_CLIENTS) + { + set_errno (EMFILE); + return -1; + } + if (!(fc.dummy_evt = create_event ())) + return -1; + if (!(fh = build_fh_dev (dev ()))) + { + set_errno (EMFILE); + return -1; + } + fc.fh = fh; + HANDLE ph = create_pipe_instance (first); + if (!ph) + goto errout; + fh->set_io_handle (ph); + fh->set_flags (get_flags ()); + if (fc.connect () < 0) + { + fc.close (); + goto errout; + } + if (fc.state == fc_connected) + nconnected++; + client[nclients++] = fc; + return 0; +errout: + delete fh; + return -1; + +} + +/* Just hop to the listen_client_thread method. */ +DWORD WINAPI +listen_client_func (LPVOID param) +{ + fhandler_fifo *fh = (fhandler_fifo *) param; + return fh->listen_client_thread (); +} + +/* Start a thread that listens for client connections. Whenever a new + client connects, it creates a new pipe_instance if necessary. + (There may already be an available instance if a client has + disconnected.) */ +bool +fhandler_fifo::listen_client () +{ + if (!(lct_termination_evt = create_event ())) + return false; + + listen_client_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE, + listen_client_func, (PVOID) this, 0, NULL); + if (!listen_client_thr) + { + __seterrno (); + HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL); + if (evt) + CloseHandle (evt); + return false; + } + return true; +} + +DWORD +fhandler_fifo::listen_client_thread () +{ + while (1) + { + bool found; + HANDLE w[MAX_CLIENTS + 1]; + int i; + DWORD wait_ret; + + found = false; + for (i = 0; i < nclients; i++) + switch (client[i].state) + { + case fc_invalid: + if (disconnect_and_reconnect (i) < 0) + goto errout; + /* Fall through. */ + case fc_connected: + w[i] = client[i].dummy_evt; + break; + case fc_connecting: + found = true; + w[i] = client[i].connect_evt; + break; + case fc_unknown: /* Shouldn't happen. */ + default: + break; + } + w[nclients] = lct_termination_evt; + if (!found) + { + if (add_client () < 0) + goto errout; + else + continue; + } + if (!arm (read_ready)) + { + __seterrno (); + goto errout; + } + + /* Wait for a client to connect. */ + wait_ret = WaitForMultipleObjects (nclients + 1, w, false, INFINITE); + i = wait_ret - WAIT_OBJECT_0; + if (i < 0 || i > nclients) + goto errout; + else if (i == nclients) /* Reader is closing. */ + return 0; + else + { + client[i].state = fc_connected; + nconnected++; + set_pipe_non_blocking (client[i].fh->get_handle (), true); + yield (); + } + } +errout: + ResetEvent (read_ready); + return -1; +} + int fhandler_fifo::open (int flags, mode_t) { @@ -184,7 +412,6 @@ fhandler_fifo::open (int flags, mode_t) error_set_errno } res; bool reader, writer, duplexer; - HANDLE ph = NULL; /* Determine what we're doing with this fhandler: reading, writing, both */ switch (flags & O_ACCMODE) @@ -212,6 +439,9 @@ fhandler_fifo::open (int flags, mode_t) debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, duplexer); set_flags (flags); + if (reader) + nohandle (true); + /* Create control events for this named pipe */ char char_sa_buf[1024]; LPSECURITY_ATTRIBUTES sa_buf; @@ -234,24 +464,42 @@ fhandler_fifo::open (int flags, mode_t) goto out; } - /* If we're reading, create the pipe, signal that we're ready and wait for - a writer. - FIXME: Probably need to special case O_RDWR case. */ + /* If we're reading, start the listen_client thread (which should + signal read_ready), and wait for a writer. */ if (reader) { - ph = create_pipe (); - if (!ph) + if (!listen_client ()) { - debug_printf ("create of reader failed"); + debug_printf ("create of listen_client thread failed"); res = error_errno_set; goto out; } - else if (!arm (read_ready)) + /* Wait for the listen_client thread to create the pipe and + signal read_ready. This should be quick. */ + HANDLE w[2] = { listen_client_thr, read_ready }; + switch (WaitForMultipleObjects (2, w, FALSE, INFINITE)) { + case WAIT_OBJECT_0: + debug_printf ("listen_client_thread exited unexpectedly"); + DWORD err; + GetExitCodeThread (listen_client_thr, &err); + __seterrno_from_win_error (err); + res = error_errno_set; + goto out; + break; + case WAIT_OBJECT_0 + 1: + if (!arm (read_ready)) + { + res = error_set_errno; + goto out; + } + break; + default: res = error_set_errno; goto out; + break; } - else if (!duplexer && !wait (write_ready)) + if (!duplexer && !wait (write_ready)) { res = error_errno_set; goto out; @@ -261,7 +509,8 @@ fhandler_fifo::open (int flags, mode_t) } /* If we're writing, wait for read_ready and then connect to the - pipe. Then signal write_ready. */ + pipe. This should always succeed quickly if the reader's + listen_client thread is running. Then signal write_ready. */ if (writer) { if (!wait (read_ready)) @@ -283,7 +532,10 @@ fhandler_fifo::open (int flags, mode_t) goto out; } else - res = success; + { + set_pipe_non_blocking (get_handle (), true); + res = success; + } } out: if (res == error_set_errno) @@ -302,6 +554,8 @@ out: } if (get_io_handle ()) CloseHandle (get_io_handle ()); + if (listen_client_thr) + CloseHandle (listen_client_thr); } debug_printf ("res %d", res); return res == success; @@ -396,19 +650,36 @@ void __reg3 fhandler_fifo::raw_read (void *in_ptr, size_t& len) { size_t orig_len = len; + + /* Start the listen_client thread if necessary (e.g., after dup or fork). */ + if (!listen_client_thr && !listen_client ()) + goto errout; + while (1) { - len = orig_len; - fhandler_base::raw_read (in_ptr, len); - ssize_t nread = (ssize_t) len; - if (nread > 0) - return; - else if (nread < 0 && GetLastError () != ERROR_NO_DATA) - goto errout; - else if (nread == 0) /* Writer has disconnected. */ + if (nconnected == 0) /* EOF */ { - /* Not implemented yet. */ + len = 0; + return; } + + /* Poll the connected clients for input. */ + for (int i = 0; i < nclients; i++) + if (client[i].state == fc_connected) + { + len = orig_len; + client[i].fh->fhandler_base::raw_read (in_ptr, len); + ssize_t nread = (ssize_t) len; + if (nread > 0) + return; + else if (nread < 0 && GetLastError () != ERROR_NO_DATA) + goto errout; + else if (nread == 0) /* Client has disconnected. */ + { + client[i].state = fc_invalid; + nconnected--; + } + } if (is_nonblocking ()) { set_errno (EAGAIN); @@ -441,12 +712,47 @@ fhandler_fifo::fstatvfs (struct statvfs *sfs) return fh.fstatvfs (sfs); } +int +fifo_client_handler::close () +{ + int res = 0; + + if (fh) + res = fh->close (); + if (connect_evt) + CloseHandle (connect_evt); + if (dummy_evt) + CloseHandle (dummy_evt); + return res; +} + int fhandler_fifo::close () { - CloseHandle (read_ready); - CloseHandle (write_ready); - return fhandler_base::close (); + int res = 0; + HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL); + HANDLE thr = InterlockedExchangePointer (&listen_client_thr, NULL); + if (thr) + { + if (evt) + SetEvent (evt); + WaitForSingleObject (thr, INFINITE); + DWORD err; + GetExitCodeThread (thr, &err); + if (err) + debug_printf ("listen_client_thread exited with code %d", err); + CloseHandle (thr); + } + if (evt) + CloseHandle (evt); + if (read_ready) + CloseHandle (read_ready); + if (write_ready) + CloseHandle (write_ready); + for (int i = 0; i < nclients; i++) + if (client[i].close () < 0) + res = -1; + return fhandler_base::close () || res; } int