* fhandler.h (class fhandler_socket): Add has_been_closed member.

* fhandler_socket.cc (fhandler_socket::fhandler_socket): Initialize
	has_been_closed to 0.
	(fhandler_socket::recvfrom): Use new asynchronous I/O driven
	wsock_event methods.
	(fhandler_socket::recvmsg): Ditto.
	(fhandler_socket::sendto): Ditto.
	(fhandler_socket::sendmsg): Ditto.
	* net.cc (wsock_event::prepare): Reimplement using asynchronous I/O.
	(wsock_event::wait): Ditto.
	(wsock_event::release): New method.
	* wsock_event.h (class wsock_event): Remove ovr member.  Accomodate
	new implementation of prepare and wait methods.  Add release method.
This commit is contained in:
Corinna Vinschen 2004-03-29 19:41:17 +00:00
parent c7f060d0da
commit 81f5200ba3
5 changed files with 142 additions and 66 deletions

View File

@ -1,3 +1,19 @@
2004-03-29 Corinna Vinschen <corinna@vinschen.de>
* fhandler.h (class fhandler_socket): Add has_been_closed member.
* fhandler_socket.cc (fhandler_socket::fhandler_socket): Initialize
has_been_closed to 0.
(fhandler_socket::recvfrom): Use new asynchronous I/O driven
wsock_event methods.
(fhandler_socket::recvmsg): Ditto.
(fhandler_socket::sendto): Ditto.
(fhandler_socket::sendmsg): Ditto.
* net.cc (wsock_event::prepare): Reimplement using asynchronous I/O.
(wsock_event::wait): Ditto.
(wsock_event::release): New method.
* wsock_event.h (class wsock_event): Remove ovr member. Accomodate
new implementation of prepare and wait methods. Add release method.
2004-03-29 Thomas Pfaff <tpfaff@gmx.net> 2004-03-29 Thomas Pfaff <tpfaff@gmx.net>
* thread.cc (pthread::atforkprepare): Call * thread.cc (pthread::atforkprepare): Call

View File

@ -377,6 +377,7 @@ class fhandler_socket: public fhandler_base
struct _WSAPROTOCOL_INFOA *prot_info_ptr; struct _WSAPROTOCOL_INFOA *prot_info_ptr;
char *sun_path; char *sun_path;
int had_connect_or_listen; int had_connect_or_listen;
int has_been_closed;
public: public:
fhandler_socket (); fhandler_socket ();

View File

@ -122,7 +122,7 @@ get_inet_addr (const struct sockaddr *in, int inlen,
/* fhandler_socket */ /* fhandler_socket */
fhandler_socket::fhandler_socket () fhandler_socket::fhandler_socket ()
: fhandler_base (), sun_path (NULL) : fhandler_base (), sun_path (NULL), has_been_closed (0)
{ {
set_need_fork_fixup (); set_need_fork_fixup ();
prot_info_ptr = (LPWSAPROTOCOL_INFOA) cmalloc (HEAP_BUF, prot_info_ptr = (LPWSAPROTOCOL_INFOA) cmalloc (HEAP_BUF,
@ -726,19 +726,28 @@ fhandler_socket::recvfrom (void *ptr, size_t len, int flags,
{ {
WSABUF wsabuf = { len, (char *) ptr }; WSABUF wsabuf = { len, (char *) ptr };
if (is_nonblocking ()) if (is_nonblocking () || has_been_closed)
res = WSARecvFrom (get_socket (), &wsabuf, 1, &ret, (DWORD *) &flags, res = WSARecvFrom (get_socket (), &wsabuf, 1, &ret, (DWORD *) &flags,
from, fromlen, from, fromlen,
NULL, NULL); NULL, NULL);
else else
{ {
wsock_event wsock_evt; wsock_event wsock_evt;
res = WSARecvFrom (get_socket (), &wsabuf, 1, &ret, (DWORD *) &flags, long evt = (FD_CLOSE | ((flags & MSG_OOB) ? FD_OOB : FD_READ));
from, fromlen, if (wsock_evt.prepare (get_socket (), evt))
wsock_evt.prepare (), NULL); {
do
if (res == SOCKET_ERROR && WSAGetLastError () == WSA_IO_PENDING) {
ret = res = wsock_evt.wait (get_socket (), (DWORD *) &flags); if (!(res = wsock_evt.wait (get_socket (), has_been_closed)))
res = WSARecvFrom (get_socket (), &wsabuf, 1, &ret,
(DWORD *) &flags, from, fromlen,
NULL, NULL);
}
while (res == SOCKET_ERROR
&& WSAGetLastError () == WSAEWOULDBLOCK
&& !has_been_closed);
wsock_evt.release (get_socket ());
}
} }
} }
@ -844,7 +853,7 @@ fhandler_socket::recvmsg (struct msghdr *msg, int flags, ssize_t tot)
DWORD ret; DWORD ret;
if (is_nonblocking ()) if (is_nonblocking () || has_been_closed)
res = WSARecvFrom (get_socket (), res = WSARecvFrom (get_socket (),
wsabuf, iovcnt, &ret, (DWORD *) &flags, wsabuf, iovcnt, &ret, (DWORD *) &flags,
from, fromlen, from, fromlen,
@ -852,13 +861,21 @@ fhandler_socket::recvmsg (struct msghdr *msg, int flags, ssize_t tot)
else else
{ {
wsock_event wsock_evt; wsock_event wsock_evt;
res = WSARecvFrom (get_socket (), long evt = (FD_CLOSE | ((flags & MSG_OOB) ? FD_OOB : FD_READ));
wsabuf, iovcnt, &ret, (DWORD *) &flags, if (wsock_evt.prepare (get_socket (), evt))
from, fromlen, {
wsock_evt.prepare (), NULL); do
{
if (res == SOCKET_ERROR && WSAGetLastError () == WSA_IO_PENDING) if (!(res = wsock_evt.wait (get_socket (), has_been_closed)))
ret = res = wsock_evt.wait (get_socket (), (DWORD *) &flags); res = WSARecvFrom (get_socket (), wsabuf, iovcnt, &ret,
(DWORD *) &flags, from, fromlen,
NULL, NULL);
}
while (res == SOCKET_ERROR
&& WSAGetLastError () == WSAEWOULDBLOCK
&& !has_been_closed);
wsock_evt.release (get_socket ());
}
} }
if (res == SOCKET_ERROR) if (res == SOCKET_ERROR)
@ -915,7 +932,7 @@ fhandler_socket::sendto (const void *ptr, size_t len, int flags,
{ {
WSABUF wsabuf = { len, (char *) ptr }; WSABUF wsabuf = { len, (char *) ptr };
if (is_nonblocking ()) if (is_nonblocking () || has_been_closed)
res = WSASendTo (get_socket (), &wsabuf, 1, &ret, res = WSASendTo (get_socket (), &wsabuf, 1, &ret,
flags & MSG_WINMASK, flags & MSG_WINMASK,
(to ? (const struct sockaddr *) &sin : NULL), tolen, (to ? (const struct sockaddr *) &sin : NULL), tolen,
@ -923,13 +940,26 @@ fhandler_socket::sendto (const void *ptr, size_t len, int flags,
else else
{ {
wsock_event wsock_evt; wsock_event wsock_evt;
if (wsock_evt.prepare (get_socket (), FD_CLOSE | FD_WRITE))
{
do
{
res = WSASendTo (get_socket (), &wsabuf, 1, &ret, res = WSASendTo (get_socket (), &wsabuf, 1, &ret,
flags & MSG_WINMASK, flags & MSG_WINMASK,
(to ? (const struct sockaddr *) &sin : NULL), tolen, (to ? (const struct sockaddr *) &sin : NULL),
wsock_evt.prepare (), NULL); tolen, NULL, NULL);
if (res != SOCKET_ERROR
if (res == SOCKET_ERROR && WSAGetLastError () == WSA_IO_PENDING) || WSAGetLastError () != WSAEWOULDBLOCK)
ret = res = wsock_evt.wait (get_socket (), (DWORD *) &flags); break;
if (ret > 0)
{
res = 0;
break;
}
}
while (!(res = wsock_evt.wait (get_socket (), has_been_closed)));
wsock_evt.release (get_socket ());
}
} }
} }
@ -1041,7 +1071,7 @@ fhandler_socket::sendmsg (const struct msghdr *msg, int flags, ssize_t tot)
DWORD ret; DWORD ret;
if (is_nonblocking ()) if (is_nonblocking () || has_been_closed)
res = WSASendTo (get_socket (), wsabuf, iovcnt, &ret, flags, res = WSASendTo (get_socket (), wsabuf, iovcnt, &ret, flags,
(struct sockaddr *) msg->msg_name, (struct sockaddr *) msg->msg_name,
msg->msg_namelen, msg->msg_namelen,
@ -1049,13 +1079,25 @@ fhandler_socket::sendmsg (const struct msghdr *msg, int flags, ssize_t tot)
else else
{ {
wsock_event wsock_evt; wsock_event wsock_evt;
res = WSASendTo (get_socket (), wsabuf, iovcnt, &ret, flags, if (wsock_evt.prepare (get_socket (), FD_CLOSE | FD_WRITE))
(struct sockaddr *) msg->msg_name, {
msg->msg_namelen, do
wsock_evt.prepare (), NULL); {
res = WSASendTo (get_socket (), wsabuf, iovcnt, &ret,
if (res == SOCKET_ERROR && WSAGetLastError () == WSA_IO_PENDING) flags, (struct sockaddr *) msg->msg_name,
ret = res = wsock_evt.wait (get_socket (), (DWORD *) &flags); msg->msg_namelen, NULL, NULL);
if (res != SOCKET_ERROR
|| WSAGetLastError () != WSAEWOULDBLOCK)
break;
if (ret > 0)
{
res = 0;
break;
}
}
while (!(res = wsock_evt.wait (get_socket (), has_been_closed)));
wsock_evt.release (get_socket ());
}
} }
if (res == SOCKET_ERROR) if (res == SOCKET_ERROR)

View File

@ -50,58 +50,75 @@ extern "C"
int sscanf (const char *, const char *, ...); int sscanf (const char *, const char *, ...);
} /* End of "C" section */ } /* End of "C" section */
LPWSAOVERLAPPED bool
wsock_event::prepare () wsock_event::prepare (int sock, long event_mask)
{ {
LPWSAOVERLAPPED ret = NULL;
SetLastError (0); SetLastError (0);
if ((event = WSACreateEvent ()) != WSA_INVALID_EVENT) if ((event = WSACreateEvent ()) != WSA_INVALID_EVENT
&& WSAEventSelect (sock, event, event_mask) == SOCKET_ERROR)
{ {
memset (&ovr, 0, sizeof ovr); debug_printf ("WSAEventSelect: %E");
ovr.hEvent = event; WSACloseEvent (event);
ret = &ovr; event = WSA_INVALID_EVENT;
} }
else if (GetLastError () == ERROR_PROC_NOT_FOUND) /* winsock2 not available */ return event != WSA_INVALID_EVENT;
WSASetLastError (0);
debug_printf ("%d = wsock_event::prepare ()", ret);
return ret;
} }
int int
wsock_event::wait (int socket, LPDWORD flags) wsock_event::wait (int sock, int &closed)
{ {
int ret = -1; int ret = -1;
DWORD wsa_err = 0;
WSAEVENT ev[2] = { event, signal_arrived }; WSAEVENT ev[2] = { event, signal_arrived };
DWORD len;
switch (WSAWaitForMultipleEvents (2, ev, FALSE, WSA_INFINITE, FALSE)) switch (WSAWaitForMultipleEvents (2, ev, FALSE, WSA_INFINITE, FALSE))
{ {
case WSA_WAIT_EVENT_0: case WSA_WAIT_EVENT_0:
if (WSAGetOverlappedResult (socket, &ovr, &len, FALSE, flags)) WSANETWORKEVENTS evts;
ret = (int) len; memset (&evts, 0, sizeof evts);
WSAEnumNetworkEvents (sock, event, &evts);
if (evts.lNetworkEvents & FD_READ)
{
if (evts.iErrorCode[FD_READ_BIT])
wsa_err = evts.iErrorCode[FD_READ_BIT];
else
ret = 0;
}
else if (evts.lNetworkEvents & FD_WRITE)
{
if (evts.iErrorCode[FD_WRITE_BIT])
wsa_err = evts.iErrorCode[FD_WRITE_BIT];
else
ret = 0;
}
if (evts.lNetworkEvents & FD_CLOSE)
{
closed = 1;
if (!wsa_err && evts.iErrorCode[FD_CLOSE_BIT])
wsa_err = evts.iErrorCode[FD_CLOSE_BIT];
else
ret = 0;
}
if (wsa_err)
WSASetLastError (wsa_err);
break; break;
case WSA_WAIT_EVENT_0 + 1: case WSA_WAIT_EVENT_0 + 1:
if (!CancelIo ((HANDLE) socket))
{
debug_printf ("CancelIo() %E, fallback to blocking io");
WSAGetOverlappedResult (socket, &ovr, &len, TRUE, flags);
}
else
WSASetLastError (WSAEINTR); WSASetLastError (WSAEINTR);
break; break;
case WSA_WAIT_FAILED: default:
break;
default: /* Should be impossible. *LOL* */
WSASetLastError (WSAEFAULT); WSASetLastError (WSAEFAULT);
break;
} }
WSACloseEvent (event);
event = NULL;
return ret; return ret;
} }
void
wsock_event::release (int sock)
{
WSAEventSelect (sock, event, 0);
WSACloseEvent (event);
unsigned long non_block = 0;
ioctlsocket (sock, FIONBIO, &non_block);
}
WSADATA wsadata; WSADATA wsadata;
static fhandler_socket * static fhandler_socket *

View File

@ -14,7 +14,6 @@ details. */
class wsock_event class wsock_event
{ {
WSAEVENT event; WSAEVENT event;
WSAOVERLAPPED ovr;
public: public:
wsock_event () : event (NULL) {}; wsock_event () : event (NULL) {};
~wsock_event () ~wsock_event ()
@ -25,8 +24,9 @@ public:
}; };
/* The methods are implemented in net.cc */ /* The methods are implemented in net.cc */
LPWSAOVERLAPPED prepare (); bool prepare (int sock, long event_mask);
int wait (int socket, LPDWORD flags); int wait (int sock, int &closed);
void release (int sock);
}; };
#endif /* __WSOCK_EVENT_H__ */ #endif /* __WSOCK_EVENT_H__ */