From 3ca630c7014246d78227406ef786d310de449472 Mon Sep 17 00:00:00 2001 From: Lukas Zeller Date: Mon, 25 Jul 2022 20:38:42 +0200 Subject: [PATCH] SystemLayerImplSelect: fix Request/ClearCallbackOnPendingXXX() for dispatch, clean transport (#21135) For CHIP_SYSTEM_CONFIG_USE_DISPATCH (Darwin) case, Request/ClearCallbackOnPendingXXX() were not working (no loop calling select to evaluate watches), but were called in TCP and UDP endpoint implementations, despite dispatch-specific extra code for the same purpose in the endpoints. This changeset now moves all CHIP_SYSTEM_CONFIG_USE_DISPATCH specific socket watching code into SystemLayerImplSelect, by making Request/ClearCallbackOnPendingXXX() actually working with dispatch. The issue surfaced in my (a bit exotic) use case when I needed to get callbacks for events on a plain socket not wrapped as TCPEndPoint. Doing this, I realized Request/ClearCallbackOnPendingXXX() were non-functional due to missing select main loop. Fixing this revealed the similar dispatch code in the endpoints that was needed to get those events handled despite Request/ClearCallbackOnPendingXXX() not working. That extra endpoint code now became redundant (delivering events twice), so had to be removed. I am aware that this might be a too deep intervention for coming from a CHIP newbie like me (not newbie in code deep diving, though), but I think it cleans up an inconsistency in the IP endpoints. --- src/inet/TCPEndPointImplSockets.cpp | 37 ------------- src/inet/TCPEndPointImplSockets.h | 9 --- src/inet/UDPEndPointImplSockets.cpp | 24 -------- src/inet/UDPEndPointImplSockets.h | 8 --- src/system/SystemLayerImplSelect.cpp | 83 ++++++++++++++++++++++++++++ src/system/SystemLayerImplSelect.h | 4 ++ 6 files changed, 87 insertions(+), 78 deletions(-) diff --git a/src/inet/TCPEndPointImplSockets.cpp b/src/inet/TCPEndPointImplSockets.cpp index c34603e2099a46..2efdaba6c3222d 100644 --- a/src/inet/TCPEndPointImplSockets.cpp +++ b/src/inet/TCPEndPointImplSockets.cpp @@ -132,30 +132,6 @@ CHIP_ERROR TCPEndPointImplSockets::BindImpl(IPAddressType addrType, const IPAddr } } -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - dispatch_queue_t dispatchQueue = static_cast(GetSystemLayer()).GetDispatchQueue(); - if (dispatchQueue != nullptr) - { - unsigned long fd = static_cast(mSocket); - - mReadableSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, dispatchQueue); - ReturnErrorCodeIf(mReadableSource == nullptr, CHIP_ERROR_NO_MEMORY); - - mWriteableSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, dispatchQueue); - ReturnErrorCodeIf(mWriteableSource == nullptr, CHIP_ERROR_NO_MEMORY); - - dispatch_source_set_event_handler(mReadableSource, ^{ - this->HandlePendingIO(System::SocketEventFlags::kRead); - }); - - dispatch_source_set_event_handler(mWriteableSource, ^{ - this->HandlePendingIO(System::SocketEventFlags::kWrite); - }); - - dispatch_resume(mReadableSource); - dispatch_resume(mWriteableSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH return res; } @@ -648,19 +624,6 @@ void TCPEndPointImplSockets::DoCloseImpl(CHIP_ERROR err, State oldState) mSocket = kInvalidSocketFd; } } - -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (mReadableSource) - { - dispatch_source_cancel(mReadableSource); - dispatch_release(mReadableSource); - } - if (mWriteableSource) - { - dispatch_source_cancel(mWriteableSource); - dispatch_release(mWriteableSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH } #if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT diff --git a/src/inet/TCPEndPointImplSockets.h b/src/inet/TCPEndPointImplSockets.h index 036939e826323a..40e81eeb6a0d06 100644 --- a/src/inet/TCPEndPointImplSockets.h +++ b/src/inet/TCPEndPointImplSockets.h @@ -25,10 +25,6 @@ #include #include -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH -#include -#endif - namespace chip { namespace Inet { @@ -74,11 +70,6 @@ class TCPEndPointImplSockets : public TCPEndPoint, public EndPointStateSockets CHIP_ERROR BindSrcAddrFromIntf(IPAddressType addrType, InterfaceId intfId); static void HandlePendingIO(System::SocketEvents events, intptr_t data); -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - dispatch_source_t mReadableSource = nullptr; - dispatch_source_t mWriteableSource = nullptr; -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH - #if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT /// This counts the number of bytes written on the TCP socket since thelast probe into the TCP outqueue was made. uint32_t mBytesWrittenSinceLastProbe; diff --git a/src/inet/UDPEndPointImplSockets.cpp b/src/inet/UDPEndPointImplSockets.cpp index 93b0828f5a1e21..a9e078cc5f48fd 100644 --- a/src/inet/UDPEndPointImplSockets.cpp +++ b/src/inet/UDPEndPointImplSockets.cpp @@ -205,22 +205,6 @@ CHIP_ERROR UDPEndPointImplSockets::BindImpl(IPAddressType addressType, const IPA } } -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - dispatch_queue_t dispatchQueue = static_cast(&GetSystemLayer())->GetDispatchQueue(); - if (dispatchQueue != nullptr) - { - unsigned long fd = static_cast(mSocket); - - mReadableSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, dispatchQueue); - ReturnErrorCodeIf(mReadableSource == nullptr, CHIP_ERROR_NO_MEMORY); - - dispatch_source_set_event_handler(mReadableSource, ^{ - this->HandlePendingIO(System::SocketEventFlags::kRead); - }); - dispatch_resume(mReadableSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH - return CHIP_NO_ERROR; } @@ -431,14 +415,6 @@ void UDPEndPointImplSockets::CloseImpl() close(mSocket); mSocket = kInvalidSocketFd; } - -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (mReadableSource) - { - dispatch_source_cancel(mReadableSource); - dispatch_release(mReadableSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH } void UDPEndPointImplSockets::Free() diff --git a/src/inet/UDPEndPointImplSockets.h b/src/inet/UDPEndPointImplSockets.h index 490360da5e9ec1..c078722fbb4bdd 100644 --- a/src/inet/UDPEndPointImplSockets.h +++ b/src/inet/UDPEndPointImplSockets.h @@ -26,10 +26,6 @@ #include #include -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH -#include -#endif - namespace chip { namespace Inet { @@ -65,10 +61,6 @@ class UDPEndPointImplSockets : public UDPEndPoint, public EndPointStateSockets InterfaceId mBoundIntfId; uint16_t mBoundPort; -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - dispatch_source_t mReadableSource = nullptr; -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH - #if CHIP_SYSTEM_CONFIG_USE_PLATFORM_MULTICAST_API public: using MulticastGroupHandler = CHIP_ERROR (*)(InterfaceId, const IPAddress &); diff --git a/src/system/SystemLayerImplSelect.cpp b/src/system/SystemLayerImplSelect.cpp index a7fc85748daf82..5d5e07e890e0f4 100644 --- a/src/system/SystemLayerImplSelect.cpp +++ b/src/system/SystemLayerImplSelect.cpp @@ -246,6 +246,30 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingRead(SocketWatchToken token) VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); watch->mPendingIO.Set(SocketEventFlags::kRead); + +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + dispatch_queue_t dispatchQueue = GetDispatchQueue(); + if (watch->mWrSource) + { + dispatch_resume(watch->mRdSource); + } + else + { + if (dispatchQueue) + { + watch->mRdSource = + dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast(watch->mFD), 0, dispatchQueue); + ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY); + dispatch_source_set_event_handler(watch->mRdSource, ^{ + SocketEvents events; + events.Set(SocketEventFlags::kRead); + watch->mCallback(events, watch->mCallbackData); + }); + dispatch_activate(watch->mRdSource); + } + } +#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + return CHIP_NO_ERROR; } @@ -255,6 +279,30 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingWrite(SocketWatchToken token VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); watch->mPendingIO.Set(SocketEventFlags::kWrite); + +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + if (watch->mWrSource) + { + dispatch_resume(watch->mWrSource); + } + else + { + dispatch_queue_t dispatchQueue = GetDispatchQueue(); + if (dispatchQueue) + { + watch->mWrSource = + dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast(watch->mFD), 0, dispatchQueue); + ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY); + dispatch_source_set_event_handler(watch->mWrSource, ^{ + SocketEvents events; + events.Set(SocketEventFlags::kWrite); + watch->mCallback(events, watch->mCallbackData); + }); + dispatch_activate(watch->mWrSource); + } + } +#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + return CHIP_NO_ERROR; } @@ -263,6 +311,14 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingRead(SocketWatchToken token) SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); watch->mPendingIO.Clear(SocketEventFlags::kRead); + +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + if (watch->mRdSource) + { + dispatch_suspend(watch->mRdSource); + } +#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + return CHIP_NO_ERROR; } @@ -271,6 +327,14 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingWrite(SocketWatchToken token) SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); watch->mPendingIO.Clear(SocketEventFlags::kWrite); + +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + if (watch->mWrSource) + { + dispatch_suspend(watch->mWrSource); + } +#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + return CHIP_NO_ERROR; } @@ -282,10 +346,25 @@ CHIP_ERROR LayerImplSelect::StopWatchingSocket(SocketWatchToken * tokenInOut) VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); VerifyOrReturnError(watch->mFD >= 0, CHIP_ERROR_INCORRECT_STATE); +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + if (watch->mRdSource) + { + dispatch_cancel(watch->mRdSource); + dispatch_release(watch->mRdSource); + } + if (watch->mWrSource) + { + dispatch_cancel(watch->mWrSource); + dispatch_release(watch->mWrSource); + } +#endif + watch->Clear(); +#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH // Wake the thread calling select so that it stops selecting on the socket. Signal(); +#endif return CHIP_NO_ERROR; } @@ -426,6 +505,10 @@ void LayerImplSelect::SocketWatch::Clear() mPendingIO.ClearAll(); mCallback = nullptr; mCallbackData = 0; +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + mRdSource = nullptr; + mWrSource = nullptr; +#endif } } // namespace System diff --git a/src/system/SystemLayerImplSelect.h b/src/system/SystemLayerImplSelect.h index 338214c3b8720a..7f2724ed732d11 100644 --- a/src/system/SystemLayerImplSelect.h +++ b/src/system/SystemLayerImplSelect.h @@ -90,6 +90,10 @@ class LayerImplSelect : public LayerSocketsLoop int mFD; SocketEvents mPendingIO; SocketWatchCallback mCallback; +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH + dispatch_source_t mRdSource; + dispatch_source_t mWrSource; +#endif intptr_t mCallbackData; }; SocketWatch mSocketWatchPool[kSocketWatchMax];