cygserver: raise number of worker threads on demand
The number of threads in the worker pool is fixed so far. This is a problem in XSI IPC scenarions with an unknown number of consumers. It doesn't make sense to make the pool very big for a start, but when the need arises, we need to make sure we can serve the request even if all other worker threads are in a wait state. This patch changes threaded_queue to just add another worker thread if all current workers are busy. Signed-off-by: Corinna Vinschen <corinna@vinschen.de>
This commit is contained in:
parent
838eaf6674
commit
0b73dba4de
@ -32,6 +32,7 @@ queue_request::~queue_request ()
|
|||||||
|
|
||||||
threaded_queue::threaded_queue (const size_t initial_workers)
|
threaded_queue::threaded_queue (const size_t initial_workers)
|
||||||
: _workers_count (0),
|
: _workers_count (0),
|
||||||
|
_workers_busy (0),
|
||||||
_running (false),
|
_running (false),
|
||||||
_submitters_head (NULL),
|
_submitters_head (NULL),
|
||||||
_requests_count (0),
|
_requests_count (0),
|
||||||
@ -161,12 +162,6 @@ threaded_queue::add (queue_request *const therequest)
|
|||||||
assert (therequest);
|
assert (therequest);
|
||||||
assert (!therequest->_next);
|
assert (!therequest->_next);
|
||||||
|
|
||||||
if (!_workers_count)
|
|
||||||
{
|
|
||||||
system_printf ("warning: no worker threads to handle request!");
|
|
||||||
// FIXME: And then what?
|
|
||||||
}
|
|
||||||
|
|
||||||
EnterCriticalSection (&_queue_lock);
|
EnterCriticalSection (&_queue_lock);
|
||||||
if (!_requests_head)
|
if (!_requests_head)
|
||||||
_requests_head = therequest;
|
_requests_head = therequest;
|
||||||
@ -186,6 +181,12 @@ threaded_queue::add (queue_request *const therequest)
|
|||||||
LeaveCriticalSection (&_queue_lock);
|
LeaveCriticalSection (&_queue_lock);
|
||||||
|
|
||||||
(void) ReleaseSemaphore (_requests_sem, 1, NULL);
|
(void) ReleaseSemaphore (_requests_sem, 1, NULL);
|
||||||
|
|
||||||
|
if (_workers_busy >= _workers_count)
|
||||||
|
{
|
||||||
|
create_workers (1);
|
||||||
|
system_printf ("All threads busy, added one (now %u)", _workers_count);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*static*/ DWORD WINAPI
|
/*static*/ DWORD WINAPI
|
||||||
@ -205,17 +206,12 @@ threaded_queue::start_routine (const LPVOID lpParam)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Called from the constructor: so no need to be thread-safe until the
|
|
||||||
* worker threads start to be created; thus the interlocked increment
|
|
||||||
* of the `_workers_count' field.
|
|
||||||
*/
|
|
||||||
|
|
||||||
void
|
void
|
||||||
threaded_queue::create_workers (const size_t initial_workers)
|
threaded_queue::create_workers (const size_t initial_workers)
|
||||||
{
|
{
|
||||||
assert (initial_workers > 0);
|
assert (initial_workers > 0);
|
||||||
|
|
||||||
for (unsigned int i = 0; i != initial_workers; i++)
|
for (unsigned int i = 0; i < initial_workers; i++)
|
||||||
{
|
{
|
||||||
const long count = InterlockedIncrement (&_workers_count);
|
const long count = InterlockedIncrement (&_workers_count);
|
||||||
assert (count > 0);
|
assert (count > 0);
|
||||||
@ -265,7 +261,9 @@ threaded_queue::worker_loop ()
|
|||||||
LeaveCriticalSection (&_queue_lock);
|
LeaveCriticalSection (&_queue_lock);
|
||||||
|
|
||||||
assert (reqptr);
|
assert (reqptr);
|
||||||
|
InterlockedIncrement (&_workers_busy);
|
||||||
reqptr->process ();
|
reqptr->process ();
|
||||||
|
InterlockedDecrement (&_workers_busy);
|
||||||
delete reqptr;
|
delete reqptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
LONG _workers_count;
|
LONG _workers_count;
|
||||||
|
LONG _workers_busy;
|
||||||
bool _running;
|
bool _running;
|
||||||
|
|
||||||
queue_submission_loop *_submitters_head;
|
queue_submission_loop *_submitters_head;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user