Skip to content

Commit

Permalink
SystemLayerImplSelect: fix Request/ClearCallbackOnPendingXXX() for di…
Browse files Browse the repository at this point in the history
…spatch, clean transport

For CHIP_SYSTEM_CONFIG_USE_DISPATCH (Darwin) case, Request/ClearCallbackOnPendingXXX() were not working (no loop calling select to evaluate watches), but were called in TCP and UDP endpoint implementations, despite dispatch-specific extra code for the same purpose in the endpoints.

This changeset now moves all CHIP_SYSTEM_CONFIG_USE_DISPATCH specific socket watching code into SystemLayerImplSelect, by making Request/ClearCallbackOnPendingXXX() actually working with dispatch.

The issue surfaced in my (a bit exotic) use case when I needed to get callbacks for events on a plain socket not wrapped as TCPEndPoint.
Doing this, I realized Request/ClearCallbackOnPendingXXX() were non-functional due to missing select main loop.
Fixing this revealed the similar dispatch code in the endpoints that was needed to get those events handled despite Request/ClearCallbackOnPendingXXX() not working. That extra endpoint code now became redundant (delivering events twice), so had to be removed.

I am aware that this might be a too deep intervention for coming from a CHIP newbie like me (not newbie in code deep diving, though), but I think it cleans up an inconsistency in the IP endpoints.
  • Loading branch information
plan44 committed Jul 23, 2022
1 parent c104a72 commit fd85659
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 78 deletions.
37 changes: 0 additions & 37 deletions src/inet/TCPEndPointImplSockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,6 @@ CHIP_ERROR TCPEndPointImplSockets::BindImpl(IPAddressType addrType, const IPAddr
}
}

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_queue_t dispatchQueue = static_cast<System::LayerSocketsLoop &>(GetSystemLayer()).GetDispatchQueue();
if (dispatchQueue != nullptr)
{
unsigned long fd = static_cast<unsigned long>(mSocket);

mReadableSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, dispatchQueue);
ReturnErrorCodeIf(mReadableSource == nullptr, CHIP_ERROR_NO_MEMORY);

mWriteableSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, fd, 0, dispatchQueue);
ReturnErrorCodeIf(mWriteableSource == nullptr, CHIP_ERROR_NO_MEMORY);

dispatch_source_set_event_handler(mReadableSource, ^{
this->HandlePendingIO(System::SocketEventFlags::kRead);
});

dispatch_source_set_event_handler(mWriteableSource, ^{
this->HandlePendingIO(System::SocketEventFlags::kWrite);
});

dispatch_resume(mReadableSource);
dispatch_resume(mWriteableSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
return res;
}

Expand Down Expand Up @@ -648,19 +624,6 @@ void TCPEndPointImplSockets::DoCloseImpl(CHIP_ERROR err, State oldState)
mSocket = kInvalidSocketFd;
}
}

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (mReadableSource)
{
dispatch_source_cancel(mReadableSource);
dispatch_release(mReadableSource);
}
if (mWriteableSource)
{
dispatch_source_cancel(mWriteableSource);
dispatch_release(mWriteableSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
}

#if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT
Expand Down
9 changes: 0 additions & 9 deletions src/inet/TCPEndPointImplSockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
#include <inet/EndPointStateSockets.h>
#include <inet/TCPEndPoint.h>

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
#include <dispatch/dispatch.h>
#endif

namespace chip {
namespace Inet {

Expand Down Expand Up @@ -74,11 +70,6 @@ class TCPEndPointImplSockets : public TCPEndPoint, public EndPointStateSockets
CHIP_ERROR BindSrcAddrFromIntf(IPAddressType addrType, InterfaceId intfId);
static void HandlePendingIO(System::SocketEvents events, intptr_t data);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_source_t mReadableSource = nullptr;
dispatch_source_t mWriteableSource = nullptr;
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

#if INET_CONFIG_OVERRIDE_SYSTEM_TCP_USER_TIMEOUT
/// This counts the number of bytes written on the TCP socket since thelast probe into the TCP outqueue was made.
uint32_t mBytesWrittenSinceLastProbe;
Expand Down
24 changes: 0 additions & 24 deletions src/inet/UDPEndPointImplSockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,6 @@ CHIP_ERROR UDPEndPointImplSockets::BindImpl(IPAddressType addressType, const IPA
}
}

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_queue_t dispatchQueue = static_cast<System::LayerSocketsLoop *>(&GetSystemLayer())->GetDispatchQueue();
if (dispatchQueue != nullptr)
{
unsigned long fd = static_cast<unsigned long>(mSocket);

mReadableSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, fd, 0, dispatchQueue);
ReturnErrorCodeIf(mReadableSource == nullptr, CHIP_ERROR_NO_MEMORY);

dispatch_source_set_event_handler(mReadableSource, ^{
this->HandlePendingIO(System::SocketEventFlags::kRead);
});
dispatch_resume(mReadableSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

return CHIP_NO_ERROR;
}

Expand Down Expand Up @@ -431,14 +415,6 @@ void UDPEndPointImplSockets::CloseImpl()
close(mSocket);
mSocket = kInvalidSocketFd;
}

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (mReadableSource)
{
dispatch_source_cancel(mReadableSource);
dispatch_release(mReadableSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
}

void UDPEndPointImplSockets::Free()
Expand Down
8 changes: 0 additions & 8 deletions src/inet/UDPEndPointImplSockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
#include <inet/EndPointStateSockets.h>
#include <inet/UDPEndPoint.h>

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
#include <dispatch/dispatch.h>
#endif

namespace chip {
namespace Inet {

Expand Down Expand Up @@ -65,10 +61,6 @@ class UDPEndPointImplSockets : public UDPEndPoint, public EndPointStateSockets
InterfaceId mBoundIntfId;
uint16_t mBoundPort;

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_source_t mReadableSource = nullptr;
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

#if CHIP_SYSTEM_CONFIG_USE_PLATFORM_MULTICAST_API
public:
using MulticastGroupHandler = CHIP_ERROR (*)(InterfaceId, const IPAddress &);
Expand Down
83 changes: 83 additions & 0 deletions src/system/SystemLayerImplSelect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,30 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingRead(SocketWatchToken token)
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);

watch->mPendingIO.Set(SocketEventFlags::kRead);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (watch->mWrSource)
{
dispatch_resume(watch->mRdSource);
}
else
{
if (dispatchQueue)
{
watch->mRdSource =
dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast<uintptr_t>(watch->mFD), 0, dispatchQueue);
ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY);
dispatch_source_set_event_handler(watch->mRdSource, ^{
SocketEvents events;
events.Set(SocketEventFlags::kRead);
watch->mCallback(events, watch->mCallbackData);
});
dispatch_activate(watch->mRdSource);
}
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

return CHIP_NO_ERROR;
}

Expand All @@ -255,6 +279,30 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingWrite(SocketWatchToken token
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);

watch->mPendingIO.Set(SocketEventFlags::kWrite);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mWrSource)
{
dispatch_resume(watch->mWrSource);
}
else
{
dispatch_queue_t dispatchQueue = GetDispatchQueue();
if (dispatchQueue)
{
watch->mWrSource =
dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast<uintptr_t>(watch->mFD), 0, dispatchQueue);
ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY);
dispatch_source_set_event_handler(watch->mWrSource, ^{
SocketEvents events;
events.Set(SocketEventFlags::kWrite);
watch->mCallback(events, watch->mCallbackData);
});
dispatch_activate(watch->mWrSource);
}
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

return CHIP_NO_ERROR;
}

Expand All @@ -263,6 +311,14 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingRead(SocketWatchToken token)
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Clear(SocketEventFlags::kRead);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mRdSource)
{
dispatch_suspend(watch->mRdSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

return CHIP_NO_ERROR;
}

Expand All @@ -271,6 +327,14 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingWrite(SocketWatchToken token)
SocketWatch * watch = reinterpret_cast<SocketWatch *>(token);
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
watch->mPendingIO.Clear(SocketEventFlags::kWrite);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mWrSource)
{
dispatch_suspend(watch->mWrSource);
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

return CHIP_NO_ERROR;
}

Expand All @@ -282,10 +346,25 @@ CHIP_ERROR LayerImplSelect::StopWatchingSocket(SocketWatchToken * tokenInOut)
VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT);
VerifyOrReturnError(watch->mFD >= 0, CHIP_ERROR_INCORRECT_STATE);

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
if (watch->mRdSource)
{
dispatch_cancel(watch->mRdSource);
dispatch_release(watch->mRdSource);
}
if (watch->mWrSource)
{
dispatch_cancel(watch->mWrSource);
dispatch_release(watch->mWrSource);
}
#endif

watch->Clear();

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
// Wake the thread calling select so that it stops selecting on the socket.
Signal();
#endif

return CHIP_NO_ERROR;
}
Expand Down Expand Up @@ -426,6 +505,10 @@ void LayerImplSelect::SocketWatch::Clear()
mPendingIO.ClearAll();
mCallback = nullptr;
mCallbackData = 0;
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
mRdSource = nullptr;
mWrSource = nullptr;
#endif
}

} // namespace System
Expand Down
4 changes: 4 additions & 0 deletions src/system/SystemLayerImplSelect.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class LayerImplSelect : public LayerSocketsLoop
int mFD;
SocketEvents mPendingIO;
SocketWatchCallback mCallback;
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_source_t mRdSource;
dispatch_source_t mWrSource;
#endif
intptr_t mCallbackData;
};
SocketWatch mSocketWatchPool[kSocketWatchMax];
Expand Down

0 comments on commit fd85659

Please sign in to comment.