Cygwin: FIFO: make opening a writer more robust

- Make read_ready a manual-reset event.

- Signal read_ready in open instead of in the listen_client_thread.

- Don't reset read_ready when the listen_client thread terminates;
  instead do it in close().

- Rearrange open and change its error handling.

- Add a wait_open_pipe method that waits for a pipe instance to be
  available and then calls open_pipe.  Use it when opening a writer if
  we can't connect immediately.  This can happen if the system is
  heavily loaded and/or if many writers are trying to open
  simultaneously.
This commit is contained in:
Ken Brown 2020-03-17 14:14:47 -04:00
parent 301454f132
commit 9ee8fdf2b3
2 changed files with 170 additions and 102 deletions

View File

@ -1323,6 +1323,7 @@ class fhandler_fifo: public fhandler_base
static NTSTATUS npfs_handle (HANDLE &);
HANDLE create_pipe_instance (bool);
NTSTATUS open_pipe (HANDLE&);
NTSTATUS wait_open_pipe (HANDLE&);
int add_client_handler ();
void delete_client_handler (int);
bool listen_client ();

View File

@ -222,7 +222,64 @@ fhandler_fifo::open_pipe (HANDLE& ph)
openflags & O_CLOEXEC ? 0 : OBJ_INHERIT,
npfsh, NULL);
sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
status = NtOpenFile (&ph, access, &attr, &io, sharing, 0);
return NtOpenFile (&ph, access, &attr, &io, sharing, 0);
}
/* Wait up to 100ms for a pipe instance to be available, then connect. */
NTSTATUS
fhandler_fifo::wait_open_pipe (HANDLE& ph)
{
HANDLE npfsh;
HANDLE evt;
NTSTATUS status;
IO_STATUS_BLOCK io;
ULONG pwbuf_size;
PFILE_PIPE_WAIT_FOR_BUFFER pwbuf;
LONGLONG stamp;
LONGLONG orig_timeout = -100 * NS100PERSEC / MSPERSEC; /* 100ms */
status = npfs_handle (npfsh);
if (!NT_SUCCESS (status))
return status;
if (!(evt = create_event ()))
api_fatal ("Can't create event, %E");
pwbuf_size
= offsetof (FILE_PIPE_WAIT_FOR_BUFFER, Name) + get_pipe_name ()->Length;
pwbuf = (PFILE_PIPE_WAIT_FOR_BUFFER) alloca (pwbuf_size);
pwbuf->Timeout.QuadPart = orig_timeout;
pwbuf->NameLength = get_pipe_name ()->Length;
pwbuf->TimeoutSpecified = TRUE;
memcpy (pwbuf->Name, get_pipe_name ()->Buffer, get_pipe_name ()->Length);
stamp = get_clock (CLOCK_MONOTONIC)->n100secs ();
bool retry;
do
{
retry = false;
status = NtFsControlFile (npfsh, evt, NULL, NULL, &io, FSCTL_PIPE_WAIT,
pwbuf, pwbuf_size, NULL, 0);
if (status == STATUS_PENDING)
{
if (WaitForSingleObject (evt, INFINITE) == WAIT_OBJECT_0)
status = io.Status;
else
api_fatal ("WFSO failed, %E");
}
if (NT_SUCCESS (status))
status = open_pipe (ph);
if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
{
/* Another writer has grabbed the pipe instance. Adjust
the timeout and keep waiting if there's time left. */
pwbuf->Timeout.QuadPart = orig_timeout
+ get_clock (CLOCK_MONOTONIC)->n100secs () - stamp;
if (pwbuf->Timeout.QuadPart < 0)
retry = true;
else
status = STATUS_IO_TIMEOUT;
}
}
while (retry);
NtClose (evt);
return status;
}
@ -294,7 +351,6 @@ void
fhandler_fifo::record_connection (fifo_client_handler& fc,
fifo_client_connect_state s)
{
SetEvent (write_ready);
fc.state = s;
maybe_eof (false);
ResetEvent (writer_opening);
@ -327,9 +383,6 @@ fhandler_fifo::listen_client_thread ()
if (add_client_handler () < 0)
api_fatal ("Can't add a client handler, %E");
/* Allow a writer to open. */
SetEvent (read_ready);
/* Listen for a writer to connect to the new client handler. */
fifo_client_handler& fc = fc_handler[nhandlers - 1];
NTSTATUS status;
@ -405,19 +458,13 @@ fhandler_fifo::listen_client_thread ()
out:
if (conn_evt)
NtClose (conn_evt);
ResetEvent (read_ready);
return 0;
}
int
fhandler_fifo::open (int flags, mode_t)
{
enum
{
success,
error_errno_set,
error_set_errno
} res;
int saved_errno = 0;
if (flags & O_PATH)
return open_fs (flags);
@ -437,8 +484,7 @@ fhandler_fifo::open (int flags, mode_t)
break;
default:
set_errno (EINVAL);
res = error_errno_set;
goto out;
goto err;
}
debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, duplexer);
@ -454,135 +500,151 @@ fhandler_fifo::open (int flags, mode_t)
char npbuf[MAX_PATH];
__small_sprintf (npbuf, "r-event.%08x.%016X", get_dev (), get_ino ());
if (!(read_ready = CreateEvent (sa_buf, false, false, npbuf)))
if (!(read_ready = CreateEvent (sa_buf, true, false, npbuf)))
{
debug_printf ("CreateEvent for %s failed, %E", npbuf);
res = error_set_errno;
goto out;
__seterrno ();
goto err;
}
npbuf[0] = 'w';
if (!(write_ready = CreateEvent (sa_buf, true, false, npbuf)))
{
debug_printf ("CreateEvent for %s failed, %E", npbuf);
res = error_set_errno;
goto out;
__seterrno ();
goto err_close_read_ready;
}
npbuf[0] = 'o';
if (!(writer_opening = CreateEvent (sa_buf, true, false, npbuf)))
{
debug_printf ("CreateEvent for %s failed, %E", npbuf);
res = error_set_errno;
goto out;
__seterrno ();
goto err_close_write_ready;
}
/* If we're a duplexer, create the pipe and the first client handler. */
if (duplexer)
{
HANDLE ph = NULL;
if (add_client_handler () < 0)
{
res = error_errno_set;
goto out;
}
NTSTATUS status = open_pipe (ph);
if (NT_SUCCESS (status))
{
record_connection (fc_handler[0]);
set_handle (ph);
set_pipe_non_blocking (ph, flags & O_NONBLOCK);
}
else
{
__seterrno_from_nt_status (status);
res = error_errno_set;
goto out;
}
}
/* If we're reading, start the listen_client thread (which should
signal read_ready), and wait for a writer. */
/* If we're reading, signal read_ready and start the listen_client
thread. */
if (reader)
{
if (!listen_client ())
{
debug_printf ("create of listen_client thread failed");
res = error_errno_set;
goto out;
goto err_close_writer_opening;
}
else if (!duplexer && !wait (write_ready))
SetEvent (read_ready);
/* If we're a duplexer, we need a handle for writing. */
if (duplexer)
{
res = error_errno_set;
goto out;
}
else
{
init_fixup_before ();
res = success;
HANDLE ph = NULL;
NTSTATUS status;
while (1)
{
status = open_pipe (ph);
if (NT_SUCCESS (status))
{
set_handle (ph);
set_pipe_non_blocking (ph, flags & O_NONBLOCK);
break;
}
else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
{
/* The pipe hasn't been created yet. */
yield ();
continue;
}
else
{
__seterrno_from_nt_status (status);
goto err_close_reader;
}
}
}
/* Not a duplexer; wait for a writer to connect. */
else if (!wait (write_ready))
goto err_close_reader;
init_fixup_before ();
goto success;
}
/* If we're writing, wait for read_ready and then connect to the
pipe. This should always succeed quickly if the reader's
listen_client thread is running. Then signal write_ready. */
/* If we're writing, wait for read_ready, connect to the pipe, and
signal write_ready. */
if (writer)
{
NTSTATUS status;
SetEvent (writer_opening);
if (!wait (read_ready))
{
ResetEvent (writer_opening);
goto err_close_writer_opening;
}
while (1)
{
if (!wait (read_ready))
{
ResetEvent (writer_opening);
res = error_errno_set;
goto out;
}
NTSTATUS status = open_pipe (get_handle ());
status = open_pipe (get_handle ());
if (NT_SUCCESS (status))
goto writer_success;
else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
{
set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
SetEvent (write_ready);
res = success;
goto out;
/* The pipe hasn't been created yet. */
yield ();
continue;
}
else if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
Sleep (1);
break;
else
{
debug_printf ("create of writer failed");
__seterrno_from_nt_status (status);
res = error_errno_set;
ResetEvent (writer_opening);
goto out;
goto err_close_writer_opening;
}
}
/* We should get here only if the system is heavily loaded
and/or many writers are trying to connect simultaneously */
while (1)
{
SetEvent (writer_opening);
if (!wait (read_ready))
{
ResetEvent (writer_opening);
goto err_close_writer_opening;
}
status = wait_open_pipe (get_handle ());
if (NT_SUCCESS (status))
goto writer_success;
else if (status == STATUS_IO_TIMEOUT)
continue;
else
{
debug_printf ("create of writer failed");
__seterrno_from_nt_status (status);
ResetEvent (writer_opening);
goto err_close_writer_opening;
}
}
}
out:
if (res == error_set_errno)
__seterrno ();
if (res != success)
{
if (read_ready)
{
NtClose (read_ready);
read_ready = NULL;
}
if (write_ready)
{
NtClose (write_ready);
write_ready = NULL;
}
if (writer_opening)
{
NtClose (writer_opening);
writer_opening = NULL;
}
if (get_handle ())
NtClose (get_handle ());
if (listen_client_thr)
stop_listen_client ();
}
debug_printf ("res %d", res);
return res == success;
writer_success:
set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
SetEvent (write_ready);
success:
return 1;
err_close_reader:
saved_errno = get_errno ();
close ();
set_errno (saved_errno);
return 0;
err_close_writer_opening:
NtClose (writer_opening);
err_close_write_ready:
NtClose (write_ready);
err_close_read_ready:
NtClose (read_ready);
err:
if (get_handle ())
NtClose (get_handle ());
return 0;
}
off_t
@ -938,6 +1000,11 @@ fhandler_fifo::close ()
handler or another thread. */
fifo_client_unlock ();
stop_listen_client ();
if (reader)
/* FIXME: There could be several readers open because of
dup/fork/exec; we should only reset read_ready when the last
one closes. */
ResetEvent (read_ready);
if (read_ready)
NtClose (read_ready);
if (write_ready)