/* fhandler_fifo.cc - See fhandler.h for a description of the fhandler classes. This file is part of Cygwin. This software is a copyrighted work licensed under the terms of the Cygwin license. Please consult the file "CYGWIN_LICENSE" for details. */ #include "winsup.h" #include #include "miscfuncs.h" #include "cygerrno.h" #include "security.h" #include "path.h" #include "fhandler.h" #include "dtable.h" #include "cygheap.h" #include "sigproc.h" #include "cygtls.h" #include "shared_info.h" #include "ntdll.h" #include "cygwait.h" /* This is only to be used for writers. When reading, STATUS_PIPE_EMPTY simply means there's no data to be read. */ #define STATUS_PIPE_IS_CLOSED(status) \ ({ NTSTATUS _s = (status); \ _s == STATUS_PIPE_CLOSING \ || _s == STATUS_PIPE_BROKEN \ || _s == STATUS_PIPE_EMPTY; }) fhandler_fifo::fhandler_fifo (): fhandler_base (), read_ready (NULL), write_ready (NULL), listen_client_thr (NULL), lct_termination_evt (NULL), nclients (0), nconnected (0) { pipe_name_buf[0] = L'\0'; need_fork_fixup (true); } PUNICODE_STRING fhandler_fifo::get_pipe_name () { if (!pipe_name_buf[0]) { __small_swprintf (pipe_name_buf, L"%S-fifo.%08x.%016X", &cygheap->installation_key, get_dev (), get_ino ()); RtlInitUnicodeString (&pipe_name, pipe_name_buf); } return &pipe_name; } inline PSECURITY_ATTRIBUTES sec_user_cloexec (bool cloexec, PSECURITY_ATTRIBUTES sa, PSID sid) { return cloexec ? sec_user_nih (sa, sid) : sec_user (sa, sid); } bool inline fhandler_fifo::arm (HANDLE h) { #ifdef DEBUGGING const char *what; if (h == read_ready) what = "reader"; else what = "writer"; debug_only_printf ("arming %s", what); #endif bool res = SetEvent (h); if (!res) #ifdef DEBUGGING debug_printf ("SetEvent for %s failed, %E", what); #else debug_printf ("SetEvent failed, %E"); #endif return res; } static HANDLE create_event () { NTSTATUS status; OBJECT_ATTRIBUTES attr; HANDLE evt = NULL; InitializeObjectAttributes (&attr, NULL, 0, NULL, NULL); status = NtCreateEvent (&evt, EVENT_ALL_ACCESS, &attr, NotificationEvent, FALSE); if (!NT_SUCCESS (status)) __seterrno_from_nt_status (status); return evt; } static void set_pipe_non_blocking (HANDLE ph, bool nonblocking) { NTSTATUS status; IO_STATUS_BLOCK io; FILE_PIPE_INFORMATION fpi; fpi.ReadMode = FILE_PIPE_MESSAGE_MODE; fpi.CompletionMode = nonblocking ? FILE_PIPE_COMPLETE_OPERATION : FILE_PIPE_QUEUE_OPERATION; status = NtSetInformationFile (ph, &io, &fpi, sizeof fpi, FilePipeInformation); if (!NT_SUCCESS (status)) debug_printf ("NtSetInformationFile(FilePipeInformation): %y", status); } /* The pipe instance is always in blocking mode when this is called. */ int fifo_client_handler::connect () { NTSTATUS status; IO_STATUS_BLOCK io; if (connect_evt) ResetEvent (connect_evt); else if (!(connect_evt = create_event ())) return -1; status = NtFsControlFile (fh->get_handle (), connect_evt, NULL, NULL, &io, FSCTL_PIPE_LISTEN, NULL, 0, NULL, 0); switch (status) { case STATUS_PENDING: case STATUS_PIPE_LISTENING: state = fc_connecting; break; case STATUS_PIPE_CONNECTED: state = fc_connected; set_pipe_non_blocking (fh->get_handle (), true); break; default: __seterrno_from_nt_status (status); return -1; } return 0; } int fhandler_fifo::disconnect_and_reconnect (int i) { NTSTATUS status; IO_STATUS_BLOCK io; HANDLE ph = client[i].fh->get_handle (); status = NtFsControlFile (ph, NULL, NULL, NULL, &io, FSCTL_PIPE_DISCONNECT, NULL, 0, NULL, 0); /* Short-lived. Don't use cygwait. We don't want to be interrupted. */ if (status == STATUS_PENDING && NtWaitForSingleObject (ph, FALSE, NULL) == WAIT_OBJECT_0) status = io.Status; if (!NT_SUCCESS (status)) { __seterrno_from_nt_status (status); return -1; } set_pipe_non_blocking (client[i].fh->get_handle (), false); if (client[i].connect () < 0) return -1; if (client[i].state == fc_connected) nconnected++; return 0; } NTSTATUS fhandler_fifo::npfs_handle (HANDLE &nph) { static NO_COPY SRWLOCK npfs_lock; static NO_COPY HANDLE npfs_dirh; NTSTATUS status = STATUS_SUCCESS; OBJECT_ATTRIBUTES attr; IO_STATUS_BLOCK io; /* Lockless after first call. */ if (npfs_dirh) { nph = npfs_dirh; return STATUS_SUCCESS; } AcquireSRWLockExclusive (&npfs_lock); if (!npfs_dirh) { InitializeObjectAttributes (&attr, &ro_u_npfs, 0, NULL, NULL); status = NtOpenFile (&npfs_dirh, FILE_READ_ATTRIBUTES | SYNCHRONIZE, &attr, &io, FILE_SHARE_READ | FILE_SHARE_WRITE, 0); } ReleaseSRWLockExclusive (&npfs_lock); if (NT_SUCCESS (status)) nph = npfs_dirh; return status; } /* Called when a FIFO is first opened for reading and again each time a new client is needed. Each pipe instance is created in blocking mode so that we can easily wait for a connection. After it is connected, it is put in nonblocking mode. */ HANDLE fhandler_fifo::create_pipe_instance (bool first) { NTSTATUS status; HANDLE npfsh; HANDLE ph = NULL; ACCESS_MASK access; OBJECT_ATTRIBUTES attr; IO_STATUS_BLOCK io; ULONG hattr; ULONG sharing; ULONG nonblocking = FILE_PIPE_QUEUE_OPERATION; ULONG max_instances = -1; LARGE_INTEGER timeout; status = npfs_handle (npfsh); if (!NT_SUCCESS (status)) { __seterrno_from_nt_status (status); return NULL; } access = GENERIC_READ | FILE_READ_ATTRIBUTES | FILE_WRITE_ATTRIBUTES | SYNCHRONIZE; sharing = FILE_SHARE_READ | FILE_SHARE_WRITE; hattr = OBJ_INHERIT; if (first) hattr |= OBJ_CASE_INSENSITIVE; InitializeObjectAttributes (&attr, get_pipe_name (), hattr, npfsh, NULL); timeout.QuadPart = -500000; status = NtCreateNamedPipeFile (&ph, access, &attr, &io, sharing, first ? FILE_CREATE : FILE_OPEN, 0, FILE_PIPE_MESSAGE_TYPE, FILE_PIPE_MESSAGE_MODE, nonblocking, max_instances, DEFAULT_PIPEBUFSIZE, DEFAULT_PIPEBUFSIZE, &timeout); if (!NT_SUCCESS (status)) __seterrno_from_nt_status (status); return ph; } /* Called when a FIFO is opened for writing. */ NTSTATUS fhandler_fifo::open_pipe () { NTSTATUS status; HANDLE npfsh; ACCESS_MASK access; OBJECT_ATTRIBUTES attr; IO_STATUS_BLOCK io; ULONG sharing; HANDLE ph = NULL; status = npfs_handle (npfsh); if (!NT_SUCCESS (status)) return status; access = GENERIC_WRITE | SYNCHRONIZE; InitializeObjectAttributes (&attr, get_pipe_name (), OBJ_INHERIT, npfsh, NULL); sharing = FILE_SHARE_READ | FILE_SHARE_WRITE; status = NtOpenFile (&ph, access, &attr, &io, sharing, 0); if (NT_SUCCESS (status)) set_io_handle (ph); return status; } int fhandler_fifo::add_client () { fifo_client_handler fc; fhandler_base *fh; bool first = (nclients == 0); if (nclients == MAX_CLIENTS) { set_errno (EMFILE); return -1; } if (!(fc.dummy_evt = create_event ())) return -1; if (!(fh = build_fh_dev (dev ()))) { set_errno (EMFILE); return -1; } fc.fh = fh; HANDLE ph = create_pipe_instance (first); if (!ph) goto errout; fh->set_io_handle (ph); fh->set_flags (get_flags ()); if (fc.connect () < 0) { fc.close (); goto errout; } if (fc.state == fc_connected) nconnected++; client[nclients++] = fc; return 0; errout: delete fh; return -1; } /* Just hop to the listen_client_thread method. */ DWORD WINAPI listen_client_func (LPVOID param) { fhandler_fifo *fh = (fhandler_fifo *) param; return fh->listen_client_thread (); } /* Start a thread that listens for client connections. Whenever a new client connects, it creates a new pipe_instance if necessary. (There may already be an available instance if a client has disconnected.) */ bool fhandler_fifo::listen_client () { if (!(lct_termination_evt = create_event ())) return false; listen_client_thr = CreateThread (NULL, PREFERRED_IO_BLKSIZE, listen_client_func, (PVOID) this, 0, NULL); if (!listen_client_thr) { __seterrno (); HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL); if (evt) CloseHandle (evt); return false; } return true; } DWORD fhandler_fifo::listen_client_thread () { while (1) { bool found; HANDLE w[MAX_CLIENTS + 1]; int i; DWORD wait_ret; fifo_client_lock (); found = false; for (i = 0; i < nclients; i++) switch (client[i].state) { case fc_invalid: if (disconnect_and_reconnect (i) < 0) { fifo_client_unlock (); goto errout; } /* Fall through. */ case fc_connected: w[i] = client[i].dummy_evt; break; case fc_connecting: found = true; w[i] = client[i].connect_evt; break; case fc_unknown: /* Shouldn't happen. */ default: break; } w[nclients] = lct_termination_evt; int res = 0; if (!found) res = add_client (); fifo_client_unlock (); if (res < 0) goto errout; else if (!found) continue; if (!arm (read_ready)) { __seterrno (); goto errout; } /* Wait for a client to connect. */ wait_ret = WaitForMultipleObjects (nclients + 1, w, false, INFINITE); i = wait_ret - WAIT_OBJECT_0; if (i < 0 || i > nclients) goto errout; else if (i == nclients) /* Reader is closing. */ return 0; else { fifo_client_lock (); client[i].state = fc_connected; nconnected++; set_pipe_non_blocking (client[i].fh->get_handle (), true); fifo_client_unlock (); yield (); } } errout: ResetEvent (read_ready); return -1; } int fhandler_fifo::open (int flags, mode_t) { enum { success, error_errno_set, error_set_errno } res; bool reader, writer, duplexer; /* Determine what we're doing with this fhandler: reading, writing, both */ switch (flags & O_ACCMODE) { case O_RDONLY: reader = true; writer = false; duplexer = false; break; case O_WRONLY: writer = true; reader = false; duplexer = false; break; case O_RDWR: reader = true; writer = false; duplexer = true; break; default: set_errno (EINVAL); res = error_errno_set; goto out; } debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, duplexer); set_flags (flags); if (reader) nohandle (true); /* Create control events for this named pipe */ char char_sa_buf[1024]; LPSECURITY_ATTRIBUTES sa_buf; sa_buf = sec_user_cloexec (flags & O_CLOEXEC, (PSECURITY_ATTRIBUTES) char_sa_buf, cygheap->user.sid()); char npbuf[MAX_PATH]; __small_sprintf (npbuf, "r-event.%08x.%016X", get_dev (), get_ino ()); if (!(read_ready = CreateEvent (sa_buf, duplexer, false, npbuf))) { debug_printf ("CreateEvent for %s failed, %E", npbuf); res = error_set_errno; goto out; } npbuf[0] = 'w'; if (!(write_ready = CreateEvent (sa_buf, false, false, npbuf))) { debug_printf ("CreateEvent for %s failed, %E", npbuf); res = error_set_errno; goto out; } /* If we're reading, start the listen_client thread (which should signal read_ready), and wait for a writer. */ if (reader) { if (!listen_client ()) { debug_printf ("create of listen_client thread failed"); res = error_errno_set; goto out; } /* Wait for the listen_client thread to create the pipe and signal read_ready. This should be quick. */ HANDLE w[2] = { listen_client_thr, read_ready }; switch (WaitForMultipleObjects (2, w, FALSE, INFINITE)) { case WAIT_OBJECT_0: debug_printf ("listen_client_thread exited unexpectedly"); DWORD err; GetExitCodeThread (listen_client_thr, &err); __seterrno_from_win_error (err); res = error_errno_set; goto out; break; case WAIT_OBJECT_0 + 1: if (!arm (read_ready)) { res = error_set_errno; goto out; } break; default: res = error_set_errno; goto out; break; } if (!duplexer && !wait (write_ready)) { res = error_errno_set; goto out; } else res = success; } /* If we're writing, wait for read_ready and then connect to the pipe. This should always succeed quickly if the reader's listen_client thread is running. Then signal write_ready. */ if (writer) { if (!wait (read_ready)) { res = error_errno_set; goto out; } NTSTATUS status = open_pipe (); if (!NT_SUCCESS (status)) { debug_printf ("create of writer failed"); __seterrno_from_nt_status (status); res = error_errno_set; goto out; } else if (!arm (write_ready)) { res = error_set_errno; goto out; } else { set_pipe_non_blocking (get_handle (), true); res = success; } } out: if (res == error_set_errno) __seterrno (); if (res != success) { if (read_ready) { CloseHandle (read_ready); read_ready = NULL; } if (write_ready) { CloseHandle (write_ready); write_ready = NULL; } if (get_io_handle ()) CloseHandle (get_io_handle ()); if (listen_client_thr) CloseHandle (listen_client_thr); } debug_printf ("res %d", res); return res == success; } off_t fhandler_fifo::lseek (off_t offset, int whence) { debug_printf ("(%D, %d)", offset, whence); set_errno (ESPIPE); return -1; } bool fhandler_fifo::wait (HANDLE h) { #ifdef DEBUGGING const char *what; if (h == read_ready) what = "reader"; else what = "writer"; #endif /* Set the wait to zero for non-blocking I/O-related events. */ DWORD wait = ((h == read_ready || h == write_ready) && get_flags () & O_NONBLOCK) ? 0 : INFINITE; debug_only_printf ("waiting for %s", what); /* Wait for the event. Set errno, as appropriate if something goes wrong. */ switch (cygwait (h, wait)) { case WAIT_OBJECT_0: debug_only_printf ("successfully waited for %s", what); return true; case WAIT_SIGNALED: debug_only_printf ("interrupted by signal while waiting for %s", what); set_errno (EINTR); return false; case WAIT_CANCELED: debug_only_printf ("cancellable interruption while waiting for %s", what); pthread::static_cancel_self (); /* never returns */ break; case WAIT_TIMEOUT: if (h == write_ready) { debug_only_printf ("wait timed out waiting for write but will still open reader since non-blocking mode"); return true; } else { set_errno (ENXIO); return false; } break; default: debug_only_printf ("unknown error while waiting for %s", what); __seterrno (); return false; } } ssize_t __reg3 fhandler_fifo::raw_write (const void *ptr, size_t len) { ssize_t ret = -1; NTSTATUS status; IO_STATUS_BLOCK io; status = NtWriteFile (get_handle (), NULL, NULL, NULL, &io, (PVOID) ptr, len, NULL, NULL); if (NT_SUCCESS (status)) { /* NtWriteFile returns success with # of bytes written == 0 in case writing on a non-blocking pipe fails if the pipe buffer is full. */ if (io.Information == 0) set_errno (EAGAIN); else ret = io.Information; } else if (STATUS_PIPE_IS_CLOSED (status)) { set_errno (EPIPE); raise (SIGPIPE); } else __seterrno_from_nt_status (status); return ret; } void __reg3 fhandler_fifo::raw_read (void *in_ptr, size_t& len) { size_t orig_len = len; /* Start the listen_client thread if necessary (e.g., after dup or fork). */ if (!listen_client_thr && !listen_client ()) goto errout; while (1) { if (nconnected == 0) /* EOF */ { len = 0; return; } /* Poll the connected clients for input. */ fifo_client_lock (); for (int i = 0; i < nclients; i++) if (client[i].state == fc_connected) { len = orig_len; client[i].fh->fhandler_base::raw_read (in_ptr, len); ssize_t nread = (ssize_t) len; if (nread > 0) { fifo_client_unlock (); return; } else if (nread < 0 && GetLastError () != ERROR_NO_DATA) { fifo_client_unlock (); goto errout; } else if (nread == 0) /* Client has disconnected. */ { client[i].state = fc_invalid; nconnected--; } } fifo_client_unlock (); if (is_nonblocking ()) { set_errno (EAGAIN); goto errout; } else { /* Allow interruption. Copied from fhandler_socket_unix::open_reparse_point. */ pthread_testcancel (); if (cygwait (NULL, cw_nowait, cw_sig_eintr) == WAIT_SIGNALED && !_my_tls.call_signal_handler ()) { set_errno (EINTR); goto errout; } /* Don't hog the CPU. */ Sleep (1); } } errout: len = -1; } int __reg2 fhandler_fifo::fstatvfs (struct statvfs *sfs) { fhandler_disk_file fh (pc); fh.get_device () = FH_FS; return fh.fstatvfs (sfs); } int fifo_client_handler::close () { int res = 0; if (fh) res = fh->close (); if (connect_evt) CloseHandle (connect_evt); if (dummy_evt) CloseHandle (dummy_evt); return res; } int fhandler_fifo::close () { int res = 0; HANDLE evt = InterlockedExchangePointer (&lct_termination_evt, NULL); HANDLE thr = InterlockedExchangePointer (&listen_client_thr, NULL); if (thr) { if (evt) SetEvent (evt); WaitForSingleObject (thr, INFINITE); DWORD err; GetExitCodeThread (thr, &err); if (err) debug_printf ("listen_client_thread exited with code %d", err); CloseHandle (thr); } if (evt) CloseHandle (evt); if (read_ready) CloseHandle (read_ready); if (write_ready) CloseHandle (write_ready); for (int i = 0; i < nclients; i++) if (client[i].close () < 0) res = -1; return fhandler_base::close () || res; } int fhandler_fifo::dup (fhandler_base *child, int flags) { if (fhandler_base::dup (child, flags)) { __seterrno (); return -1; } fhandler_fifo *fhf = (fhandler_fifo *) child; if (!DuplicateHandle (GetCurrentProcess (), read_ready, GetCurrentProcess (), &fhf->read_ready, 0, true, DUPLICATE_SAME_ACCESS)) { fhf->close (); __seterrno (); return -1; } if (!DuplicateHandle (GetCurrentProcess (), write_ready, GetCurrentProcess (), &fhf->write_ready, 0, true, DUPLICATE_SAME_ACCESS)) { CloseHandle (fhf->read_ready); fhf->close (); __seterrno (); return -1; } return 0; } void fhandler_fifo::fixup_after_fork (HANDLE parent) { fhandler_base::fixup_after_fork (parent); fork_fixup (parent, read_ready, "read_ready"); fork_fixup (parent, write_ready, "write_ready"); } void fhandler_fifo::set_close_on_exec (bool val) { fhandler_base::set_close_on_exec (val); set_no_inheritance (read_ready, val); set_no_inheritance (write_ready, val); }