* thread.h (pthread_cond::ExitingWait): Remove.

(pthread_cond::mutex): Ditto.
(pthread_cond::cond_access): Ditto.
(pthread_cond::win32_obj_id): Ditto.
(pthread_cond::TimedWait): Ditto.
(pthread_cond::BroadCast): Ditto.
(pthread_cond::Signal): Ditto.
(pthread_cond::waiting): Change type to unsigned long.
(pthread_cond::pending): New member.
(pthread_cond::semWait): Ditto.
(pthread_cond::mtxIn): Ditto.
(pthread_cond::mtxOut): Ditto.
(pthread_cond::mtxCond): Ditto.
(pthread_cond::UnBlock): New method.
(pthread_cond::Wait): Ditto.
* thread.cc: Update list of cancellation points.
(pthread_cond::pthread_cond): Rewrite.
(pthread_cond::~pthread_cond): Ditto.
(pthread_cond::TimedWait): Remove.
(pthread_cond::BroadCast): Ditto.
(pthread_cond::Signal): Ditto.
(pthread_cond::UnBlock): Implement.
(pthread_cond::Wait): Ditto.
(pthread_cond::fixup_after_fork): Rewrite.
(pthread_mutex::fixup_after_fork): Remove DETECT_BAD_APP
conditional.
(__pthread_cond_broadcast): Just return 0 if the condition is
not initialized. Call pthread_cond::UnBlock to release blocked
threads.
(__pthread_cond_signal): Ditto.
(__pthread_cond__dowait): Rewrite.
(pthread_cond_timedwait): Add pthread_testcancel call. Fix
waitlength calculation.
(pthread_cond_wait): Add pthread_testcancel call.
This commit is contained in:
Thomas Pfaff 2003-03-18 19:49:38 +00:00
parent 0bad7c2e26
commit f592b05df1
3 changed files with 215 additions and 198 deletions

View File

@ -1,3 +1,40 @@
2003-03-18 Thomas Pfaff <tpfaff@gmx.net>
* thread.h (pthread_cond::ExitingWait): Remove.
(pthread_cond::mutex): Ditto.
(pthread_cond::cond_access): Ditto.
(pthread_cond::win32_obj_id): Ditto.
(pthread_cond::TimedWait): Ditto.
(pthread_cond::BroadCast): Ditto.
(pthread_cond::Signal): Ditto.
(pthread_cond::waiting): Change type to unsigned long.
(pthread_cond::pending): New member.
(pthread_cond::semWait): Ditto.
(pthread_cond::mtxIn): Ditto.
(pthread_cond::mtxOut): Ditto.
(pthread_cond::mtxCond): Ditto.
(pthread_cond::UnBlock): New method.
(pthread_cond::Wait): Ditto.
* thread.cc: Update list of cancellation points.
(pthread_cond::pthread_cond): Rewrite.
(pthread_cond::~pthread_cond): Ditto.
(pthread_cond::TimedWait): Remove.
(pthread_cond::BroadCast): Ditto.
(pthread_cond::Signal): Ditto.
(pthread_cond::UnBlock): Implement.
(pthread_cond::Wait): Ditto.
(pthread_cond::fixup_after_fork): Rewrite.
(pthread_mutex::fixup_after_fork): Remove DETECT_BAD_APP
conditional.
(__pthread_cond_broadcast): Just return 0 if the condition is
not initialized. Call pthread_cond::UnBlock to release blocked
threads.
(__pthread_cond_signal): Ditto.
(__pthread_cond__dowait): Rewrite.
(pthread_cond_timedwait): Add pthread_testcancel call. Fix
waitlength calculation.
(pthread_cond_wait): Add pthread_testcancel call.
2003-03-18 Thomas Pfaff <tpfaff@gmx.net> 2003-03-18 Thomas Pfaff <tpfaff@gmx.net>
* include/pthread.h (PTHREAD_MUTEX_NORMAL): New define. * include/pthread.h (PTHREAD_MUTEX_NORMAL): New define.

View File

@ -462,8 +462,8 @@ open ()
*pause () *pause ()
poll () poll ()
pread () pread ()
pthread_cond_timedwait () *pthread_cond_timedwait ()
pthread_cond_wait () *pthread_cond_wait ()
*pthread_join () *pthread_join ()
*pthread_testcancel () *pthread_testcancel ()
putmsg () putmsg ()
@ -812,36 +812,57 @@ pthread_cond::initMutex ()
api_fatal ("Could not create win32 Mutex for pthread cond static initializer support."); api_fatal ("Could not create win32 Mutex for pthread cond static initializer support.");
} }
pthread_cond::pthread_cond (pthread_condattr *attr):verifyable_object (PTHREAD_COND_MAGIC) pthread_cond::pthread_cond (pthread_condattr *attr) :
verifyable_object (PTHREAD_COND_MAGIC),
shared (0), waiting (0), pending (0), semWait (NULL),
mtxCond(NULL), next (NULL)
{ {
int temperr; pthread_mutex *verifyable_mutex_obj;
this->shared = attr ? attr->shared : PTHREAD_PROCESS_PRIVATE;
this->mutex = NULL;
this->waiting = 0;
this->win32_obj_id = ::CreateEvent (&sec_none_nih, false, /* auto signal reset - which I think is pthreads like ? */ if (attr)
false, /* start non signaled */ if (attr->shared != PTHREAD_PROCESS_PRIVATE)
NULL /* no name */); {
/* TODO: make a shared mem mutex if out attributes request shared mem cond */ magic = 0;
cond_access = NULL; return;
if ((temperr = pthread_mutex_init (&this->cond_access, NULL))) }
verifyable_mutex_obj = &mtxIn;
if (!pthread_mutex::isGoodObject (&verifyable_mutex_obj))
{ {
system_printf ("couldn't init mutex, this %p errno %d", this, temperr); thread_printf ("Internal cond mutex is not valid. this %p", this);
/* we need the mutex for correct behaviour */
magic = 0; magic = 0;
return;
}
/* Change the mutex type to NORMAL to speed up mutex operations */
mtxIn.type = PTHREAD_MUTEX_NORMAL;
verifyable_mutex_obj = &mtxOut;
if (!pthread_mutex::isGoodObject (&verifyable_mutex_obj))
{
thread_printf ("Internal cond mutex is not valid. this %p", this);
magic = 0;
return;
}
/* Change the mutex type to NORMAL to speed up mutex operations */
mtxOut.type = PTHREAD_MUTEX_NORMAL;
semWait = ::CreateSemaphore (&sec_none_nih, 0, LONG_MAX, NULL);
if (!semWait)
{
debug_printf ("CreateSemaphore failed. %E");
magic = 0;
return;
} }
if (!this->win32_obj_id)
magic = 0;
/* threadsafe addition is easy */ /* threadsafe addition is easy */
next = (pthread_cond *) InterlockedExchangePointer (&MT_INTERFACE->conds, this); next = (pthread_cond *) InterlockedExchangePointer (&MT_INTERFACE->conds, this);
} }
pthread_cond::~pthread_cond () pthread_cond::~pthread_cond ()
{ {
if (win32_obj_id) if (semWait)
CloseHandle (win32_obj_id); CloseHandle (semWait);
pthread_mutex_destroy (&cond_access);
/* I'm not 100% sure the next bit is threadsafe. I think it is... */ /* I'm not 100% sure the next bit is threadsafe. I think it is... */
if (MT_INTERFACE->conds == this) if (MT_INTERFACE->conds == this)
InterlockedExchangePointer (&MT_INTERFACE->conds, this->next); InterlockedExchangePointer (&MT_INTERFACE->conds, this->next);
@ -856,132 +877,125 @@ pthread_cond::~pthread_cond ()
} }
void void
pthread_cond::BroadCast () pthread_cond::UnBlock (const bool all)
{ {
/* TODO: implement the same race fix as Signal has */ unsigned long releaseable;
if (pthread_mutex_lock (&cond_access))
system_printf ("Failed to lock condition variable access mutex, this %p", this);
int count = waiting;
if (!pthread_mutex::isGoodObject (&mutex))
{
if (pthread_mutex_unlock (&cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p", this);
/* This isn't and API error - users are allowed to call this when no threads
are waiting
system_printf ("Broadcast called with invalid mutex");
*/
return;
}
while (count--)
PulseEvent (win32_obj_id);
if (pthread_mutex_unlock (&cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p", this);
}
void /*
pthread_cond::Signal () * Block outgoing threads (and avoid simultanous unblocks)
{
if (pthread_mutex_lock (&cond_access))
system_printf ("Failed to lock condition variable access mutex, this %p", this);
if (!pthread_mutex::isGoodObject (&mutex))
{
if (pthread_mutex_unlock (&cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p",
this);
return;
}
int temp = waiting;
if (!temp)
/* nothing to signal */
{
if (pthread_mutex_unlock (&cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p", this);
return;
}
/* Prime the detection flag */
ExitingWait = 1;
/* Signal any waiting thread */
PulseEvent (win32_obj_id);
/* No one can start waiting until we release the condition access mutex */
/* The released thread will decrement waiting when it gets a time slice...
without waiting for the access mutex
* InterLockedIncrement on 98 +, NT4 + returns the incremented value.
* On 95, nt 3.51 < it returns a sign correct number - 0=0, + for greater than 0, -
* for less than 0.
* Because of this we cannot spin on the waiting count, but rather we need a
* dedicated flag for a thread exiting the Wait function.
* Also not that Interlocked* sync CPU caches with memory.
*/ */
int spins = 10; mtxOut.Lock ();
/* When ExitingWait is nonzero after a decrement, the leaving thread has
* done it's thing releaseable = waiting - pending;
*/ if (releaseable)
while (InterlockedDecrement (&ExitingWait) == 0 && spins)
{ {
InterlockedIncrement (&ExitingWait); unsigned long released;
/* give up the cpu to force a context switch. */
low_priority_sleep (0); if (!pending)
if (spins == 5) {
/* we've had 5 timeslices, and the woken thread still hasn't done it's /*
* thing - maybe we raced it with the event? */ * Block incoming threads until all waiting threads are released.
PulseEvent (win32_obj_id); */
spins--; mtxIn.Lock ();
/*
* Calculate releaseable again because threads can enter until
* the semaphore has been taken, but they can not leave, therefore pending
* is unchanged and releaseable can only get higher
*/
releaseable = waiting - pending;
}
released = all ? releaseable : 1;
pending += released;
/*
* Signal threads
*/
::ReleaseSemaphore (semWait, released, NULL);
} }
if (waiting + 1 != temp)
system_printf ("Released too many threads - %d now %d originally", waiting, temp); /*
if (pthread_mutex_unlock (&cond_access)) * And let the threads release.
system_printf ("Failed to unlock condition variable access mutex, this %p", this); */
mtxOut.UnLock ();
} }
int int
pthread_cond::TimedWait (DWORD dwMilliseconds) pthread_cond::Wait (pthread_mutex_t mutex, DWORD dwMilliseconds)
{ {
DWORD rv; DWORD rv;
// FIXME: race condition (potentially drop events mtxIn.Lock ();
// Possible solution (single process only) - place this in a critical section. if (1 == InterlockedIncrement ((long *)&waiting))
mutex->UnLock (); mtxCond = mutex;
rv = WaitForSingleObject (win32_obj_id, dwMilliseconds); else if (mtxCond != mutex)
#if 0
/* we need to use native win32 mutex's here, because the cygwin ones now use
* critical sections, which are faster, but introduce a race _here_. Until then
* The NT variant of the code is redundant.
*/
rv = SignalObjectAndWait (mutex->win32_obj_id, win32_obj_id, dwMilliseconds,
false);
#endif
switch (rv)
{ {
case WAIT_FAILED: InterlockedDecrement ((long *)&waiting);
return 0; /* POSIX doesn't allow errors after we modify the mutex state */ mtxIn.UnLock ();
case WAIT_ABANDONED: return EINVAL;
case WAIT_TIMEOUT:
return ETIMEDOUT;
case WAIT_OBJECT_0:
return 0; /* we have been signaled */
default:
return 0;
} }
mtxIn.UnLock ();
/*
* Release the mutex and wait on semaphore
*/
++mutex->condwaits;
mutex->UnLock ();
rv = pthread::cancelable_wait (semWait, dwMilliseconds, false);
mtxOut.Lock ();
if (rv != WAIT_OBJECT_0)
{
/*
* It might happen that a signal is sent while the thread got canceled
* or timed out. Try to take one.
* If the thread gets one than a signal|broadcast is in progress.
*/
if (WAIT_OBJECT_0 == WaitForSingleObject (semWait, 0))
/*
* thread got cancelled ot timed out while a signalling is in progress.
* Set wait result back to signaled
*/
rv = WAIT_OBJECT_0;
}
InterlockedDecrement ((long *)&waiting);
if (rv == WAIT_OBJECT_0 && 0 == --pending)
/*
* All signaled threads are released,
* new threads can enter Wait
*/
mtxIn.UnLock ();
mtxOut.UnLock ();
mutex->Lock ();
--mutex->condwaits;
if (rv == WAIT_CANCELED)
pthread::static_cancel_self ();
else if (rv == WAIT_TIMEOUT)
return ETIMEDOUT;
return 0;
} }
void void
pthread_cond::fixup_after_fork () pthread_cond::fixup_after_fork ()
{ {
debug_printf ("cond %x in fixup_after_fork", this); waiting = pending = 0;
if (shared != PTHREAD_PROCESS_PRIVATE) mtxCond = NULL;
api_fatal ("doesn't understand PROCESS_SHARED condition variables");
/* FIXME: duplicate code here and in the constructor. */ /* Unlock eventually locked mutexes */
this->win32_obj_id = ::CreateEvent (&sec_none_nih, false, false, NULL); mtxIn.UnLock ();
if (!win32_obj_id) mtxOut.UnLock ();
api_fatal ("failed to create new win32 mutex");
#if DETECT_BAD_APPS semWait = ::CreateSemaphore (&sec_none_nih, 0, LONG_MAX, NULL);
if (waiting) if (!semWait)
api_fatal ("Forked () while a condition variable has waiting threads.\nReport to cygwin@cygwin.com"); api_fatal ("pthread_cond::fixup_after_fork () failed to recreate win32 semaphore");
#else
waiting = 0;
mutex = NULL;
#endif
} }
/* pthread_key */ /* pthread_key */
@ -1325,12 +1339,7 @@ pthread_mutex::fixup_after_fork ()
if (!win32_obj_id) if (!win32_obj_id)
api_fatal ("pthread_mutex::fixup_after_fork () failed to recreate win32 semaphore for mutex"); api_fatal ("pthread_mutex::fixup_after_fork () failed to recreate win32 semaphore for mutex");
#if DETECT_BAD_APPS
if (condwaits)
api_fatal ("Forked () while a mutex has condition variables waiting on it.\nReport to cygwin@cygwin.com");
#else
condwaits = 0; condwaits = 0;
#endif
} }
bool bool
@ -2168,11 +2177,11 @@ int
__pthread_cond_broadcast (pthread_cond_t *cond) __pthread_cond_broadcast (pthread_cond_t *cond)
{ {
if (pthread_cond::isGoodInitializer (cond)) if (pthread_cond::isGoodInitializer (cond))
pthread_cond::init (cond, NULL); return 0;
if (!pthread_cond::isGoodObject (cond)) if (!pthread_cond::isGoodObject (cond))
return EINVAL; return EINVAL;
(*cond)->BroadCast (); (*cond)->UnBlock (true);
return 0; return 0;
} }
@ -2181,82 +2190,47 @@ int
__pthread_cond_signal (pthread_cond_t *cond) __pthread_cond_signal (pthread_cond_t *cond)
{ {
if (pthread_cond::isGoodInitializer (cond)) if (pthread_cond::isGoodInitializer (cond))
pthread_cond::init (cond, NULL); return 0;
if (!pthread_cond::isGoodObject (cond)) if (!pthread_cond::isGoodObject (cond))
return EINVAL; return EINVAL;
(*cond)->Signal (); (*cond)->UnBlock (false);
return 0; return 0;
} }
int static int
__pthread_cond_dowait (pthread_cond_t *cond, pthread_mutex_t *mutex, __pthread_cond_dowait (pthread_cond_t *cond, pthread_mutex_t *mutex,
long waitlength) DWORD waitlength)
{ {
// and yes cond_access here is still open to a race. (we increment, context swap, if (!pthread_mutex::isGoodObject (mutex))
// broadcast occurs - we miss the broadcast. the functions aren't split properly. return EINVAL;
int rv; if (!pthread_mutex::canBeUnlocked (mutex))
pthread_mutex **themutex = NULL; return EPERM;
if (pthread_mutex::isGoodInitializer (mutex))
pthread_mutex::init (mutex, NULL);
themutex = mutex;
if (pthread_cond::isGoodInitializer (cond)) if (pthread_cond::isGoodInitializer (cond))
pthread_cond::init (cond, NULL); pthread_cond::init (cond, NULL);
if (!pthread_mutex::isGoodObject (themutex))
return EINVAL;
if (!pthread_cond::isGoodObject (cond)) if (!pthread_cond::isGoodObject (cond))
return EINVAL; return EINVAL;
/* if the cond variable is blocked, then the above timer test maybe wrong. *shrug**/ return (*cond)->Wait (*mutex, waitlength);
if (pthread_mutex_lock (&(*cond)->cond_access))
system_printf ("Failed to lock condition variable access mutex, this %p", *cond);
if ((*cond)->waiting)
if ((*cond)->mutex && ((*cond)->mutex != (*themutex)))
{
if (pthread_mutex_unlock (&(*cond)->cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p", *cond);
return EINVAL;
}
InterlockedIncrement (&((*cond)->waiting));
(*cond)->mutex = (*themutex);
InterlockedIncrement (&((*themutex)->condwaits));
if (pthread_mutex_unlock (&(*cond)->cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p", *cond);
/* At this point calls to Signal will progress evebn if we aren' yet waiting
However, the loop there should allow us to get scheduled and call wait,
and have them call PulseEvent again if we dont' respond. */
rv = (*cond)->TimedWait (waitlength);
/* this may allow a race on the mutex acquisition and waits.
But doing this within the cond access mutex creates a different race */
InterlockedDecrement (&((*cond)->waiting));
/* Tell Signal that we have been released */
InterlockedDecrement (&((*cond)->ExitingWait));
(*themutex)->Lock ();
if (pthread_mutex_lock (&(*cond)->cond_access))
system_printf ("Failed to lock condition variable access mutex, this %p", *cond);
if ((*cond)->waiting == 0)
(*cond)->mutex = NULL;
InterlockedDecrement (&((*themutex)->condwaits));
if (pthread_mutex_unlock (&(*cond)->cond_access))
system_printf ("Failed to unlock condition variable access mutex, this %p", *cond);
return rv;
} }
extern "C" int extern "C" int
pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex, pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex,
const struct timespec *abstime) const struct timespec *abstime)
{ {
struct timeval tv;
long waitlength;
pthread_testcancel ();
if (check_valid_pointer (abstime)) if (check_valid_pointer (abstime))
return EINVAL; return EINVAL;
struct timeb currSysTime;
long waitlength; gettimeofday (&tv, NULL);
ftime (&currSysTime); waitlength = abstime->tv_sec * 1000 + abstime->tv_nsec / (1000 * 1000);
waitlength = (abstime->tv_sec - currSysTime.time) * 1000; waitlength -= tv.tv_sec * 1000 + tv.tv_usec / 1000;
if (waitlength < 0) if (waitlength < 0)
return ETIMEDOUT; return ETIMEDOUT;
return __pthread_cond_dowait (cond, mutex, waitlength); return __pthread_cond_dowait (cond, mutex, waitlength);
@ -2265,6 +2239,8 @@ pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex,
extern "C" int extern "C" int
pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex)
{ {
pthread_testcancel ();
return __pthread_cond_dowait (cond, mutex, INFINITE); return __pthread_cond_dowait (cond, mutex, INFINITE);
} }

View File

@ -494,16 +494,20 @@ public:
static int init (pthread_cond_t *, const pthread_condattr_t *); static int init (pthread_cond_t *, const pthread_condattr_t *);
int shared; int shared;
LONG waiting;
LONG ExitingWait; unsigned long waiting;
pthread_mutex *mutex; unsigned long pending;
/* to allow atomic behaviour for cond_broadcast */ HANDLE semWait;
pthread_mutex_t cond_access;
HANDLE win32_obj_id; pthread_mutex mtxIn;
pthread_mutex mtxOut;
pthread_mutex_t mtxCond;
class pthread_cond * next; class pthread_cond * next;
int TimedWait (DWORD dwMilliseconds);
void BroadCast (); void UnBlock (const bool all);
void Signal (); int Wait (pthread_mutex_t mutex, DWORD dwMilliseconds = INFINITE);
void fixup_after_fork (); void fixup_after_fork ();
pthread_cond (pthread_condattr *); pthread_cond (pthread_condattr *);