*** ../ice/src/IceUtil/Cond.cpp Fri Jun 1 13:51:49 2007 --- src/IceUtil/Cond.cpp Fri Jun 1 13:55:44 2007 *************** *** 60,65 **** --- 60,96 ---- } } + // + // The _queue semaphore is used to wait for the condition variable to + // be signaled. When signal is called any further thread signaling or + // threads waiting to wait (in preWait) are blocked from proceeding + // using an addition _gate semaphore) until the correct number of + // threads waiting on the _queue semaphore drain through postWait + // + // As each thread drains through postWait if there are further threads + // to unblock (toUnblock > 0) the _queue is posted again to wake a + // further waiting thread, otherwise the _gate is posted which permits + // any signaling or preWait threads to continue. Therefore, the _gate + // semaphore is used protect further entry into signal or wait until + // all signaled threads have woken. + // + // _blocked is the number of waiting threads. _unblocked is the + // number of threads which have unblocked. We use two variables + // because _blocked is protected by the _gate, whereas _unblocked is + // protected by the _internal mutex. There is an assumption here about + // memory visibility since postWait does not itself acquire the _gate + // semaphore (note that the _gate must be held if _toUnblock != 0). + // + // _toUnblock is a tri-state variable. 0 is no signal/broadcast + // pending. -1 is signal pending. 1 is a broadcast pending. + // + // Threads timing out present a particular issue because they may have + // woken without a corresponding notification and its easy to leave + // the _queue in a state where a spurious wakeup will occur -- + // consider a notify and a timed wake occuring at the same time. In + // this case, if we are not careful the _queue will have been posted, + // but the waking thread may not consume the semaphore. + // IceUtil::Cond::Cond() : _gate(1), _blocked(0), *************** *** 88,160 **** IceUtil::Cond::wake(bool broadcast) { // ! // Lock gate & mutex. // _gate.wait(); - _internal.lock(); if(_unblocked != 0) { _blocked -= _unblocked; _unblocked = 0; } if(_blocked > 0) { // ! // Unblock some number of waiters. // ! _toUnblock = (broadcast) ? _blocked : 1; ! _internal.unlock(); _queue.post(); } else { // ! // Otherwise no blocked waiters, release gate & mutex. // _gate.post(); - _internal.unlock(); } } void IceUtil::Cond::preWait() const { _gate.wait(); _blocked++; _gate.post(); } void ! IceUtil::Cond::postWait(bool timedOut) const { ! _internal.lock(); _unblocked++; ! if(_toUnblock != 0) { ! bool last = --_toUnblock == 0; ! _internal.unlock(); ! ! if(timedOut) { _queue.wait(); } ! ! if(last) { _gate.post(); } ! else { _queue.post(); } } - else - { - _internal.unlock(); - } } void --- 119,262 ---- IceUtil::Cond::wake(bool broadcast) { // ! // Lock gate. The gate will be locked if there are threads waiting ! // to drain from postWait. // _gate.wait(); + // + // Lock the internal mutex. + // + IceUtil::Mutex::Lock sync(_internal); + + // + // Adjust the count of the number of waiting/blocked threads. + // if(_unblocked != 0) { _blocked -= _unblocked; _unblocked = 0; } + // + // If there are waiting threads then we enter a signal or + // broadcast state. + // if(_blocked > 0) { // ! // Unblock some number of waiters. We use -1 for the signal ! // case. ! // ! assert(_toUnblock == 0); ! _toUnblock = (broadcast) ? 1 : -1; ! // ! // Posting the queue wakes a single waiting thread. After this ! // occurs the waiting thread will wake and then either post on ! // the _queue to wake the next waiting thread, or post on the ! // gate to permit more signaling to proceed. // ! // Release before posting to avoid potential immediate ! // context switch due to the mutex being locked. ! // ! sync.release(); _queue.post(); } else { // ! // Otherwise no blocked waiters, release the gate. ! // ! // Release before posting to avoid potential immediate ! // context switch due to the mutex being locked. // + sync.release(); _gate.post(); } } void IceUtil::Cond::preWait() const { + // + // _gate is used to protect _blocked. Furthermore, this prevents + // further threads from entering the wait state while a + // signal/broadcast is being processed. + // _gate.wait(); _blocked++; _gate.post(); } void ! IceUtil::Cond::postWait(bool timedOutOrFailed) const { ! IceUtil::Mutex::Lock sync(_internal); ! ! // ! // One more thread has unblocked. ! // _unblocked++; ! // ! // If _toUnblock is 0 then this must be a timeout, otherwise its a ! // spurious wakeup which is incorrect. ! // ! if(_toUnblock == 0) ! { ! assert(timedOutOrFailed); ! return; ! } ! ! if(timedOutOrFailed) { ! // ! // If the thread was the last blocked thread and there's a ! // pending signal/broadcast, reset the signal/broadcast to ! // prevent spurious wakeup. ! // ! if(_blocked == _unblocked) { + _toUnblock = 0; + // + // Consume the queue post to prevent spurious wakeup. Note + // that although the internal mutex could be released + // prior to this wait() call, doing so gains nothing since + // this wait() MUST return immediately (if it does not + // there is a major bug and the entire application will + // deadlock). + // _queue.wait(); + // + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + // + sync.release(); + _gate.post(); } ! } ! else ! { ! // ! // At this point, the thread must have been woken up because ! // of a signal/broadcast. ! // ! if(_toUnblock == -1 || _blocked == _unblocked) // Signal or no more blocked threads { + _toUnblock = 0; + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + sync.release(); _gate.post(); } ! else // Broadcast and more blocked threads to wake up. { + // Release before posting to avoid potential immediate + // context switch due to the mutex being locked. + sync.release(); _queue.post(); } } } void *************** *** 167,173 **** } catch(...) { ! postWait(false); throw; } } --- 269,275 ---- } catch(...) { ! postWait(true); throw; } } *************** *** 183,189 **** } catch(...) { ! postWait(false); throw; } } --- 285,291 ---- } catch(...) { ! postWait(true); throw; } }