diff --git a/src/system/SystemLayerImplSelect.cpp b/src/system/SystemLayerImplSelect.cpp index 5d5e07e890e0f4..2118d9ef92d363 100644 --- a/src/system/SystemLayerImplSelect.cpp +++ b/src/system/SystemLayerImplSelect.cpp @@ -77,6 +77,12 @@ void LayerImplSelect::Shutdown() } } mTimerPool.ReleaseAll(); + + for (auto & w : mSocketWatchPool) + { + w.DisableAndClear(); + } + #else // CHIP_SYSTEM_CONFIG_USE_DISPATCH mTimerList.Clear(); mTimerPool.ReleaseAll(); @@ -245,29 +251,44 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingRead(SocketWatchToken token) SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - watch->mPendingIO.Set(SocketEventFlags::kRead); - #if CHIP_SYSTEM_CONFIG_USE_DISPATCH - dispatch_queue_t dispatchQueue = GetDispatchQueue(); - if (watch->mWrSource) - { - dispatch_resume(watch->mRdSource); - } - else + if (!watch->mPendingIO.Has(SocketEventFlags::kRead)) { - if (dispatchQueue) + // Not yet watching + if (watch->mRdSource) { - watch->mRdSource = - dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast(watch->mFD), 0, dispatchQueue); - ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY); - dispatch_source_set_event_handler(watch->mRdSource, ^{ - SocketEvents events; - events.Set(SocketEventFlags::kRead); - watch->mCallback(events, watch->mCallbackData); - }); - dispatch_activate(watch->mRdSource); + // already used once before, but callback disabled later: just set flag again + watch->mPendingIO.Set(SocketEventFlags::kRead); + } + else + { + // first time watching for read events: install a dispatch source + // Note: if no dispatch queue is available, this is a complete NOP (does not even set the flag) + dispatch_queue_t dispatchQueue = GetDispatchQueue(); + if (dispatchQueue == nullptr) + { + ChipLogError(DeviceLayer, "RequestCallbackOnPendingRead with no dispatch queue installed: callback will not work"); + } + else { + watch->mRdSource = + dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast(watch->mFD), 0, dispatchQueue); + ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY); + dispatch_source_set_event_handler(watch->mRdSource, ^{ + if (watch->mPendingIO.Has(SocketEventFlags::kRead) && watch->mCallback != nullptr) + { + SocketEvents events; + events.Set(SocketEventFlags::kRead); + watch->mCallback(events, watch->mCallbackData); + } + }); + // only now we are sure the source exists and can become active + watch->mPendingIO.Set(SocketEventFlags::kRead); + dispatch_activate(watch->mRdSource); + } } } +#else + watch->mPendingIO.Set(SocketEventFlags::kRead); #endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH return CHIP_NO_ERROR; @@ -278,29 +299,44 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingWrite(SocketWatchToken token SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - watch->mPendingIO.Set(SocketEventFlags::kWrite); - #if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mWrSource) + if (!watch->mPendingIO.Has(SocketEventFlags::kWrite)) { - dispatch_resume(watch->mWrSource); - } - else - { - dispatch_queue_t dispatchQueue = GetDispatchQueue(); - if (dispatchQueue) + // Not yet watching + if (watch->mWrSource) { - watch->mWrSource = - dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast(watch->mFD), 0, dispatchQueue); - ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY); - dispatch_source_set_event_handler(watch->mWrSource, ^{ - SocketEvents events; - events.Set(SocketEventFlags::kWrite); - watch->mCallback(events, watch->mCallbackData); - }); - dispatch_activate(watch->mWrSource); + // already used once before, but callback disabled later: just set flag again + watch->mPendingIO.Set(SocketEventFlags::kWrite); + } + else + { + // first time watching for write events: install a dispatch source + // Note: if no dispatch queue is available, this is a complete NOP (does not even set the flag) + dispatch_queue_t dispatchQueue = GetDispatchQueue(); + if (dispatchQueue == nullptr) + { + ChipLogError(DeviceLayer, "RequestCallbackOnPendingWrite with no dispatch queue installed: callback will not work"); + } + else { + watch->mWrSource = + dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast(watch->mFD), 0, dispatchQueue); + ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY); + dispatch_source_set_event_handler(watch->mWrSource, ^{ + if (watch->mPendingIO.Has(SocketEventFlags::kWrite) && watch->mCallback != nullptr) + { + SocketEvents events; + events.Set(SocketEventFlags::kWrite); + watch->mCallback(events, watch->mCallbackData); + } + }); + // only now we are sure the source exists and can become active + watch->mPendingIO.Set(SocketEventFlags::kWrite); + dispatch_activate(watch->mWrSource); + } } } +#else + watch->mPendingIO.Set(SocketEventFlags::kWrite); #endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH return CHIP_NO_ERROR; @@ -310,14 +346,8 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingRead(SocketWatchToken token) { SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - watch->mPendingIO.Clear(SocketEventFlags::kRead); -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mRdSource) - { - dispatch_suspend(watch->mRdSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + watch->mPendingIO.Clear(SocketEventFlags::kRead); return CHIP_NO_ERROR; } @@ -326,14 +356,8 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingWrite(SocketWatchToken token) { SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - watch->mPendingIO.Clear(SocketEventFlags::kWrite); -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mWrSource) - { - dispatch_suspend(watch->mWrSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + watch->mPendingIO.Clear(SocketEventFlags::kWrite); return CHIP_NO_ERROR; } @@ -347,21 +371,10 @@ CHIP_ERROR LayerImplSelect::StopWatchingSocket(SocketWatchToken * tokenInOut) VerifyOrReturnError(watch->mFD >= 0, CHIP_ERROR_INCORRECT_STATE); #if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mRdSource) - { - dispatch_cancel(watch->mRdSource); - dispatch_release(watch->mRdSource); - } - if (watch->mWrSource) - { - dispatch_cancel(watch->mWrSource); - dispatch_release(watch->mWrSource); - } -#endif - + watch->DisableAndClear(); +#else watch->Clear(); -#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH // Wake the thread calling select so that it stops selecting on the socket. Signal(); #endif @@ -511,5 +524,22 @@ void LayerImplSelect::SocketWatch::Clear() #endif } +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH +void LayerImplSelect::SocketWatch::DisableAndClear() +{ + if (mRdSource) + { + dispatch_source_cancel(mRdSource); + dispatch_release(mRdSource); + } + if (mWrSource) + { + dispatch_source_cancel(mWrSource); + dispatch_release(mWrSource); + } + Clear(); +} +#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + } // namespace System } // namespace chip diff --git a/src/system/SystemLayerImplSelect.h b/src/system/SystemLayerImplSelect.h index 7f2724ed732d11..f193a8860d961c 100644 --- a/src/system/SystemLayerImplSelect.h +++ b/src/system/SystemLayerImplSelect.h @@ -93,6 +93,7 @@ class LayerImplSelect : public LayerSocketsLoop #if CHIP_SYSTEM_CONFIG_USE_DISPATCH dispatch_source_t mRdSource; dispatch_source_t mWrSource; + void DisableAndClear(); #endif intptr_t mCallbackData; };