251 lines
6.1 KiB
C++
251 lines
6.1 KiB
C++
/* threaded_queue.cc
|
|
|
|
Copyright 2001, 2002 Red Hat Inc.
|
|
|
|
Written by Robert Collins <rbtcollins@hotmail.com>
|
|
|
|
This file is part of Cygwin.
|
|
|
|
This software is a copyrighted work licensed under the terms of the
|
|
Cygwin license. Please consult the file "CYGWIN_LICENSE" for
|
|
details. */
|
|
|
|
#include <errno.h>
|
|
#include <stdio.h>
|
|
#include <unistd.h>
|
|
#include <windows.h>
|
|
#include <sys/types.h>
|
|
#include <stdlib.h>
|
|
#include "wincap.h"
|
|
#include "threaded_queue.h"
|
|
#define DEBUG 1
|
|
#define debug_printf if (DEBUG) printf
|
|
|
|
/* threaded_queue */
|
|
|
|
DWORD WINAPI
|
|
worker_function (LPVOID LpParam)
|
|
{
|
|
class threaded_queue *queue = (class threaded_queue *) LpParam;
|
|
class queue_request *request;
|
|
/* FIXME use a threadsafe pop instead for speed? */
|
|
while (queue->active)
|
|
{
|
|
EnterCriticalSection (&queue->queuelock);
|
|
while (!queue->request && queue->active)
|
|
{
|
|
LeaveCriticalSection (&queue->queuelock);
|
|
DWORD rc = WaitForSingleObject (queue->event, INFINITE);
|
|
if (rc == WAIT_FAILED)
|
|
{
|
|
printf ("Wait for event failed\n");
|
|
queue->running--;
|
|
ExitThread (0);
|
|
}
|
|
EnterCriticalSection (&queue->queuelock);
|
|
}
|
|
if (!queue->active)
|
|
{
|
|
queue->running--;
|
|
LeaveCriticalSection (&queue->queuelock);
|
|
ExitThread (0);
|
|
}
|
|
/* not needed, but it is efficient */
|
|
request =
|
|
(class queue_request *) InterlockedExchangePointer (&queue->request,
|
|
queue->request->
|
|
next);
|
|
LeaveCriticalSection (&queue->queuelock);
|
|
request->process ();
|
|
delete request;
|
|
}
|
|
queue->running--;
|
|
ExitThread (0);
|
|
}
|
|
|
|
void
|
|
threaded_queue::create_workers ()
|
|
{
|
|
InitializeCriticalSection (&queuelock);
|
|
if ((event = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
|
|
{
|
|
printf ("Failed to create event queue (%lu), terminating\n",
|
|
GetLastError ());
|
|
exit (1);
|
|
}
|
|
active = true;
|
|
|
|
/* FIXME: Use a stack pair and create threads on the fly whenever
|
|
* we have to to service a request.
|
|
*/
|
|
for (unsigned int i = 0; i < initial_workers; i++)
|
|
{
|
|
HANDLE hThread;
|
|
DWORD tid;
|
|
hThread = CreateThread (NULL, 0, worker_function, this, 0, &tid);
|
|
if (hThread == NULL)
|
|
{
|
|
printf ("Failed to create thread (%lu), terminating\n",
|
|
GetLastError ());
|
|
exit (1);
|
|
}
|
|
CloseHandle (hThread);
|
|
running++;
|
|
}
|
|
}
|
|
|
|
void
|
|
threaded_queue::cleanup ()
|
|
{
|
|
/* harvest the threads */
|
|
active = false;
|
|
/* kill the request processing loops */
|
|
queue_process_param *reqloop;
|
|
/* make sure we don't race with a incoming request creation */
|
|
EnterCriticalSection (&queuelock);
|
|
reqloop =
|
|
(queue_process_param *) InterlockedExchangePointer (&process_head, NULL);
|
|
while (reqloop)
|
|
{
|
|
queue_process_param *t = reqloop;
|
|
reqloop = reqloop->next;
|
|
delete t;
|
|
}
|
|
LeaveCriticalSection (&queuelock);
|
|
if (!running)
|
|
return;
|
|
printf ("Waiting for current queue threads to terminate\n");
|
|
for (int n = running; n; n--)
|
|
PulseEvent (event);
|
|
while (running)
|
|
sleep (1);
|
|
DeleteCriticalSection (&queuelock);
|
|
CloseHandle (event);
|
|
}
|
|
|
|
/* FIXME: return success or failure */
|
|
void
|
|
threaded_queue::add (queue_request * therequest)
|
|
{
|
|
/* safe to not "Try" because workers don't hog this, they wait on the event
|
|
*/
|
|
EnterCriticalSection (&queuelock);
|
|
if (!running)
|
|
{
|
|
printf ("No worker threads to handle request!\n");
|
|
}
|
|
if (!request)
|
|
request = therequest;
|
|
else
|
|
{
|
|
/* add to the queue end. */
|
|
queue_request *listrequest = request;
|
|
while (listrequest->next)
|
|
listrequest = listrequest->next;
|
|
listrequest->next = therequest;
|
|
}
|
|
PulseEvent (event);
|
|
LeaveCriticalSection (&queuelock);
|
|
}
|
|
|
|
/* FIXME: return success or failure rather than quitting */
|
|
void
|
|
threaded_queue::process_requests (queue_process_param * params,
|
|
threaded_queue_thread_function *
|
|
request_loop)
|
|
{
|
|
if (params->start (request_loop, this) == false)
|
|
exit (1);
|
|
params->next =
|
|
(queue_process_param *) InterlockedExchangePointer (&process_head,
|
|
params);
|
|
}
|
|
|
|
/* queue_process_param */
|
|
/* How does a constructor return an error? */
|
|
queue_process_param::queue_process_param (bool ninterruptible):running (false), shutdown (false),
|
|
interruptible
|
|
(ninterruptible)
|
|
{
|
|
if (!interruptible)
|
|
return;
|
|
debug_printf ("creating an interruptible processing thread\n");
|
|
if ((interrupt = CreateEvent (NULL, FALSE, FALSE, NULL)) == NULL)
|
|
{
|
|
printf ("Failed to create interrupt event (%lu), terminating\n",
|
|
GetLastError ());
|
|
exit (1);
|
|
}
|
|
}
|
|
|
|
queue_process_param::~queue_process_param ()
|
|
{
|
|
if (running)
|
|
stop ();
|
|
if (!interruptible)
|
|
return;
|
|
CloseHandle (interrupt);
|
|
}
|
|
|
|
bool
|
|
queue_process_param::start (threaded_queue_thread_function * request_loop,
|
|
threaded_queue * thequeue)
|
|
{
|
|
queue = thequeue;
|
|
hThread = CreateThread (NULL, 0, request_loop, this, 0, &tid);
|
|
if (hThread)
|
|
{
|
|
running = true;
|
|
return true;
|
|
}
|
|
printf ("Failed to create thread (%lu), terminating\n", GetLastError ());
|
|
return false;
|
|
}
|
|
|
|
void
|
|
queue_process_param::stop ()
|
|
{
|
|
if (interruptible)
|
|
{
|
|
InterlockedExchange (&shutdown, true);
|
|
PulseEvent (interrupt);
|
|
/* Wait up to 50 ms for the thread to exit. If it doesn't _and_ we get
|
|
* scheduled again, we print an error and exit. We _should_ loop or
|
|
* try resignalling. We don't want to hand here though...
|
|
*/
|
|
int n = 5;
|
|
while (n-- && WaitForSingleObject (hThread, 1000) == WAIT_TIMEOUT);
|
|
if (!n)
|
|
{
|
|
printf ("Process thread didn't shutdown cleanly after 200ms!\n");
|
|
exit (1);
|
|
}
|
|
else
|
|
running = false;
|
|
}
|
|
else
|
|
{
|
|
printf ("killing request loop thread %ld\n", tid);
|
|
int rc;
|
|
if (!(rc = TerminateThread (hThread, 0)))
|
|
{
|
|
printf ("error shutting down request loop worker thread\n");
|
|
}
|
|
running = false;
|
|
}
|
|
CloseHandle (hThread);
|
|
}
|
|
|
|
/* queue_request */
|
|
queue_request::queue_request ():next (NULL)
|
|
{
|
|
}
|
|
|
|
void
|
|
queue_request::process (void)
|
|
{
|
|
printf ("\n**********************************************\n"
|
|
"Oh no! we've hit the base queue_request process() function, and this indicates a coding\n"
|
|
"fault !!!\n" "***********************************************\n");
|
|
}
|