diff --git a/src/pevents.cpp b/src/pevents.cpp index 857a626..70c1457 100644 --- a/src/pevents.cpp +++ b/src/pevents.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #ifdef WFMO #include #include @@ -30,7 +31,7 @@ namespace neosmart { int RefCount; union { int FiredEvent; // WFSO - int EventsLeft; // WFMO + std::atomic EventsLeft; // WFMO } Status; bool WaitAll; bool StillWaiting; @@ -47,6 +48,7 @@ namespace neosmart { struct neosmart_wfmo_info_t_ { neosmart_wfmo_t Waiter; int WaitIndex; + bool Signalled; }; typedef neosmart_wfmo_info_t_ *neosmart_wfmo_info_t; #endif // WFMO @@ -103,14 +105,9 @@ namespace neosmart { assert(result == 0); // memory_order_relaxed: Newly created event is guaranteed to not have any waiters - event->State.store(false, std::memory_order_relaxed); + event->State.store(initialState, std::memory_order_relaxed); event->AutoReset = !manualReset; - if (initialState) { - result = SetEvent(event); - assert(result == 0); - } - return event; } @@ -157,9 +154,26 @@ namespace neosmart { result = 0; // memory_order_release: Prevent overlapping/interleaved Set/Reset contexts event->State.store(false, std::memory_order_release); + } else { + // We're trying to obtain a manual reset event with a signaled state; + // don't do anything. } - // Else we're trying to obtain a manual reset event with a signaled state; - // don't do anything + +#ifdef WFMO + // Un-signal any registered waiters in case of an auto-reset event + if (!event->State.load(std::memory_order_relaxed)) { + for (auto &wfmo : event->RegisteredWaits) { + if (wfmo.Signalled) { + // We don't need to lock the WFMO mutex because the event mutex is required to + // change the Signalled flag, and the EventsLeft atomic is only changed as a + // direct result of that. + + wfmo.Signalled = false; + wfmo.Waiter->Status.EventsLeft++; + } + } + } +#endif return result; } @@ -221,7 +235,7 @@ namespace neosmart { bool done = false; waitIndex = -1; - for (int i = 0; i < count; ++i) { + for (int i = 0; !done && i < count; ++i) { waitInfo.WaitIndex = i; // Must not release lock until RegisteredWait is potentially added @@ -235,63 +249,102 @@ namespace neosmart { RemoveExpiredWaitHelper), events[i]->RegisteredWaits.end()); - if (UnlockedWaitForEvent(events[i], 0) == 0) { - tempResult = pthread_mutex_unlock(&events[i]->Mutex); - assert(tempResult == 0); - + // Set the signalled flag without modifying (i.e. consuming) the event + waitInfo.Signalled = events[i]->State; + if (waitInfo.Signalled) { if (waitAll) { --wfmo->Status.EventsLeft; assert(wfmo->Status.EventsLeft >= 0); } else { + // Consume the event because we don't need to atomically wait for all + tempResult = UnlockedWaitForEvent(events[i], 0); + assert(tempResult == 0); + wfmo->Status.FiredEvent = i; waitIndex = i; done = true; - break; } - } else { + } + + if (!done) { + // Register this wait with the event in question events[i]->RegisteredWaits.push_back(waitInfo); ++wfmo->RefCount; - - tempResult = pthread_mutex_unlock(&events[i]->Mutex); - assert(tempResult == 0); } - } - // We set the `done` flag above in case of WaitAny and at least one event was set. - // But we need to check again here if we were doing a WaitAll or else we'll incorrectly - // return WAIT_TIMEOUT. - if (waitAll && wfmo->Status.EventsLeft == 0) { - done = true; + tempResult = pthread_mutex_unlock(&events[i]->Mutex); + assert(tempResult == 0); } timespec ts; - if (!done) { - if (milliseconds == 0) { - result = WAIT_TIMEOUT; - done = true; - } else if (milliseconds != WAIT_INFINITE) { - timeval tv; - gettimeofday(&tv, NULL); + if (!done && milliseconds != WAIT_INFINITE && milliseconds != 0) { + timeval tv; + gettimeofday(&tv, NULL); - uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 + - milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000; + uint64_t nanoseconds = ((uint64_t)tv.tv_sec) * 1000 * 1000 * 1000 + + milliseconds * 1000 * 1000 + ((uint64_t)tv.tv_usec) * 1000; - ts.tv_sec = (time_t) (nanoseconds / 1000 / 1000 / 1000); - ts.tv_nsec = (long) (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000); - } + ts.tv_sec = (time_t) (nanoseconds / 1000 / 1000 / 1000); + ts.tv_nsec = (long) (nanoseconds - ((uint64_t)ts.tv_sec) * 1000 * 1000 * 1000); } while (!done) { // One (or more) of the events we're monitoring has been triggered? - // If we're waiting for all events, assume we're done and check if there's an event that - // hasn't fired But if we're waiting for just one event, assume we're not done until we - // find a fired event - done = (waitAll && wfmo->Status.EventsLeft == 0) || - (!waitAll && wfmo->Status.FiredEvent != -1); + if (!waitAll) { + done = wfmo->Status.FiredEvent != -1; + } else if (wfmo->Status.EventsLeft == 0) { + // All events are currently signalled, but we must atomically obtain them before + // returning. + +retry: + bool lockedAtomically = true; + for (int i = 0; i < count; ++i) { + tempResult = pthread_mutex_trylock(&events[i]->Mutex); + if (tempResult == EBUSY) { + // The event state is locked; we can't continue without knowing for sure if + // all the events can be atomically claimed because we risk missing a wake + // otherwise. To avoid a deadlock here, we return all the locks and try + // again. + for (int j = i - 1; j >= 0; --j) { + tempResult = pthread_mutex_unlock(&events[j]->Mutex); + assert(tempResult == 0); + } + goto retry; + } + + assert(tempResult == 0); + if (!events[i]->State) { + // The event has been stolen from under us; since we hold the WFMO lock, it + // should be safe to sleep until a relevant SetEvent() call is made. But + // first, release all the locks we've accumulated. + for (int j = i; j >= 0; --j) { + tempResult = pthread_mutex_unlock(&events[j]->Mutex); + assert(tempResult == 0); + } + lockedAtomically = false; + break; + } + } + + if (lockedAtomically) { + // We have all the locks, so we can atomically consume all the events + for (int i = 0; i < count; ++i) { + tempResult = UnlockedWaitForEvent(events[i], 0); + assert(tempResult == 0); + + tempResult = pthread_mutex_unlock(&events[i]->Mutex); + assert(tempResult == 0); + } + done = true; + } + } if (!done) { - if (milliseconds != WAIT_INFINITE) { + if (milliseconds == 0) { + result = WAIT_TIMEOUT; + done = true; + } else if (milliseconds != WAIT_INFINITE) { result = pthread_cond_timedwait(&wfmo->CVariable, &wfmo->Mutex, &ts); } else { result = pthread_cond_wait(&wfmo->CVariable, &wfmo->Mutex); @@ -349,117 +402,79 @@ namespace neosmart { int result = pthread_mutex_lock(&event->Mutex); assert(result == 0); - // memory_order_release: Unblock threads waiting for the event - event->State.store(true, std::memory_order_release); - - // Depending on the event type, we either trigger everyone or only one - if (event->AutoReset) { #ifdef WFMO - while (!event->RegisteredWaits.empty()) { - neosmart_wfmo_info_t i = &event->RegisteredWaits.front(); - - result = pthread_mutex_lock(&i->Waiter->Mutex); - assert(result == 0); + bool consumed = false; + for (std::deque::iterator i = event->RegisteredWaits.begin(); + !consumed && i != event->RegisteredWaits.end();) { + result = pthread_mutex_lock(&i->Waiter->Mutex); + assert(result == 0); + // Remove expired waits + if (!i->Waiter->StillWaiting) { --i->Waiter->RefCount; assert(i->Waiter->RefCount >= 0); - if (!i->Waiter->StillWaiting) { - bool destroy = i->Waiter->RefCount == 0; - result = pthread_mutex_unlock(&i->Waiter->Mutex); - assert(result == 0); - if (destroy) { - i->Waiter->Destroy(); - delete i->Waiter; - } - event->RegisteredWaits.pop_front(); - continue; + + bool destroy = i->Waiter->RefCount == 0; + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); + if (destroy) { + i->Waiter->Destroy(); + delete i->Waiter; } + i = event->RegisteredWaits.erase(i); + continue; + } + if (!i->Signalled) { + i->Signalled = true; if (i->Waiter->WaitAll) { --i->Waiter->Status.EventsLeft; assert(i->Waiter->Status.EventsLeft >= 0); - // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft - // != 0 but the only time it'll be equal to zero is if we're the last event, so - // no one else will be checking the StillWaiting flag. We're good to go without - // it. + + if (i->Waiter->Status.EventsLeft == 0) { + // Wake the waiter but don't consume our event + result = pthread_cond_signal(&i->Waiter->CVariable); + assert(result == 0); + } } else { i->Waiter->Status.FiredEvent = i->WaitIndex; - i->Waiter->StillWaiting = false; - } - - result = pthread_mutex_unlock(&i->Waiter->Mutex); - assert(result == 0); - result = pthread_cond_signal(&i->Waiter->CVariable); - assert(result == 0); - - event->RegisteredWaits.pop_front(); - - // memory_order_release: Prevent overlapping of sequential Set/Reset states. - event->State.store(false, std::memory_order_release); - - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); - - return 0; - } -#endif // WFMO - // event->State can be false if compiled with WFMO support - // memory_order_relaxed: ordering is ensured by the mutex - if (event->State.load(std::memory_order_relaxed)) { - result = pthread_mutex_unlock(&event->Mutex); - assert(result == 0); - - result = pthread_cond_signal(&event->CVariable); - assert(result == 0); - - return 0; - } - } else { // this is a manual reset event -#ifdef WFMO - for (size_t i = 0; i < event->RegisteredWaits.size(); ++i) { - neosmart_wfmo_info_t info = &event->RegisteredWaits[i]; - - result = pthread_mutex_lock(&info->Waiter->Mutex); - assert(result == 0); - - --info->Waiter->RefCount; - assert(info->Waiter->RefCount >= 0); + // If the waiter is waiting on any single event, just consume the call to + // SetEvent() that brought us here (in case of an auto-reset event) and + // stop. + if (event->AutoReset) { + consumed = true; + } - if (!info->Waiter->StillWaiting) { - bool destroy = info->Waiter->RefCount == 0; - result = pthread_mutex_unlock(&info->Waiter->Mutex); + result = pthread_cond_signal(&i->Waiter->CVariable); assert(result == 0); - if (destroy) { - info->Waiter->Destroy(); - delete info->Waiter; - } - continue; } + } - if (info->Waiter->WaitAll) { - --info->Waiter->Status.EventsLeft; - assert(info->Waiter->Status.EventsLeft >= 0); - // We technically should do i->Waiter->StillWaiting = Waiter->Status.EventsLeft - // != 0 but the only time it'll be equal to zero is if we're the last event, so - // no one else will be checking the StillWaiting flag. We're good to go without - // it. - } else { - info->Waiter->Status.FiredEvent = info->WaitIndex; - info->Waiter->StillWaiting = false; - } + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); - result = pthread_mutex_unlock(&info->Waiter->Mutex); - assert(result == 0); + i = ++i; + } - result = pthread_cond_signal(&info->Waiter->CVariable); - assert(result == 0); - } - event->RegisteredWaits.clear(); -#endif // WFMO + if (consumed) { result = pthread_mutex_unlock(&event->Mutex); assert(result == 0); + return 0; + } +#endif // WFMO + + // memory_order_release: Unblock threads waiting for the event + event->State.store(true, std::memory_order_release); + result = pthread_mutex_unlock(&event->Mutex); + assert(result == 0); + + // Depending on the event type, we either trigger everyone or only one + if (event->AutoReset) { + result = pthread_cond_signal(&event->CVariable); + assert(result == 0); + } else { // this is a manual reset event result = pthread_cond_broadcast(&event->CVariable); assert(result == 0); } @@ -475,6 +490,37 @@ namespace neosmart { // be required per https://old.reddit.com/r/cpp/comments/g84bzv/c/fpua2yq/ event->State.store(false, std::memory_order_release); +#ifdef WFMO + for (std::deque::iterator i = event->RegisteredWaits.begin(); + i != event->RegisteredWaits.end();) { + result = pthread_mutex_lock(&i->Waiter->Mutex); + assert(result == 0); + + if (!i->Waiter->StillWaiting) { + --i->Waiter->RefCount; + assert(i->Waiter->RefCount >= 0); + + bool destroy = i->Waiter->RefCount == 0; + result = pthread_mutex_unlock(&i->Waiter->Mutex); + assert(result == 0); + if (destroy) { + i->Waiter->Destroy(); + delete i->Waiter; + } + i = event->RegisteredWaits.erase(i); + continue; + } + + if (i->Signalled) { + if (i->Waiter->WaitAll) { + i->Signalled = false; + ++i->Waiter->Status.EventsLeft; + i->Waiter->StillWaiting = true; + } + } + } +#endif // WFMO + result = pthread_mutex_unlock(&event->Mutex); assert(result == 0);