Skip to content

Commit

Permalink
Proposal for issue project-chip#5556 rework system layer event loop
Browse files Browse the repository at this point in the history
☛   This PR is not intended to be merged (or even build in CI since
I've left out test changes in the interest of brevity).  The intent
is to get feedback on the general approach, to avoid wasting time on
the remaining work if there are major objections and/or better ideas.

The end goal of issue project-chip#5556 is to allow CHIP as a library to be
used more easily by programs or systems that already have their
own I/O event loop.

This defines a SystemSocketWatcher, primarly used by the Inet
layer, as an alternative to the current `select()`-based loop for
socket-based platforms.  The interface is intended to make minimal
assumptions about the underlying implementation. The proof-of-concept
implementation uses the libevent library and works for the completed
subset — enough to run `chip-tool basic read vendor-name 0`. The
current `select()`- based implementation has been _partly_ converted
to use the same interface.
  • Loading branch information
kpschoedel committed May 6, 2021
1 parent e468f42 commit 1fc4aea
Show file tree
Hide file tree
Showing 30 changed files with 597 additions and 256 deletions.
6 changes: 3 additions & 3 deletions src/controller/CHIPDeviceController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,11 @@ CHIP_ERROR DeviceController::ServiceEventSignal()
{
VerifyOrReturnError(mState == State::Initialized, CHIP_ERROR_INCORRECT_STATE);

#if CONFIG_DEVICE_LAYER && (CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
DeviceLayer::SystemLayer.WakeSelect();
#if CONFIG_DEVICE_LAYER && CHIP_SYSTEM_CONFIG_USE_IO_THREAD
DeviceLayer::SystemLayer.WakeIOThread();
#else
ReturnErrorOnFailure(CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE);
#endif // CONFIG_DEVICE_LAYER && (CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
#endif // CONFIG_DEVICE_LAYER && CHIP_SYSTEM_CONFIG_USE_IO_THREAD

return CHIP_NO_ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion src/controller/java/CHIPDeviceController-JNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void JNI_OnUnload(JavaVM * jvm, void * reserved)
if (sIOThread != PTHREAD_NULL)
{
sShutdown = true;
sSystemLayer.WakeSelect();
sSystemLayer.WakeIOThread();
pthread_join(sIOThread, NULL);
}

Expand Down
28 changes: 20 additions & 8 deletions src/include/platform/internal/GenericPlatformManagerImpl_POSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
#include <fcntl.h>
#include <poll.h>
#include <sched.h>
#include <sys/select.h>
#include <unistd.h>

#if CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
#include <sys/select.h>
#endif // CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

#define DEFAULT_MIN_SLEEP_PERIOD (60 * 60 * 24 * 30) // Month [sec]

#if CHIP_DEVICE_CONFIG_ENABLE_MDNS
Expand Down Expand Up @@ -108,7 +111,10 @@ template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::_PostEvent(const ChipDeviceEvent * event)
{
mChipEventQueue.push(*event); // Thread safe due to ChipStackLock taken by App thread
SysOnEventSignal(this); // Trigger wake select on CHIP thread

#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer.WakeIOThread(); // Trigger wake select on CHIP thread
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
}

template <class ImplClass>
Expand All @@ -121,11 +127,7 @@ void GenericPlatformManagerImpl_POSIX<ImplClass>::ProcessDeviceEvents()
}
}

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::SysOnEventSignal(void * arg)
{
SystemLayer.WakeSelect();
}
#if CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::SysUpdate()
Expand Down Expand Up @@ -192,9 +194,12 @@ void GenericPlatformManagerImpl_POSIX<ImplClass>::SysProcess()
#endif
}

#endif // CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::_RunEventLoop()
{
#if CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
Impl()->LockChipStack();

do
Expand All @@ -204,6 +209,9 @@ void GenericPlatformManagerImpl_POSIX<ImplClass>::_RunEventLoop()
} while (mShouldRunEventLoop.load(std::memory_order_relaxed));

Impl()->UnlockChipStack();
#else
SystemLayer.SocketWatchState().HandleEvents();
#endif // CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
}

template <class ImplClass>
Expand Down Expand Up @@ -238,7 +246,11 @@ CHIP_ERROR GenericPlatformManagerImpl_POSIX<ImplClass>::_Shutdown()
mShouldRunEventLoop.store(false, std::memory_order_relaxed);
if (mChipTask)
{
SystemLayer.WakeSelect();
#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer.WakeIOThread();
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer.SocketWatchState().Shutdown();

SuccessOrExit(err = pthread_join(mChipTask, nullptr));
}
// Call up to the base class _Shutdown() to perform the bulk of the shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@

#include <fcntl.h>
#include <sched.h>
#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>

#include <atomic>
#include <pthread.h>
#include <queue>

#if CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
#include <sys/select.h>
#endif // CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

namespace chip {
namespace DeviceLayer {
namespace Internal {
Expand Down Expand Up @@ -87,9 +90,10 @@ class GenericPlatformManagerImpl_POSIX : public GenericPlatformManagerImpl<ImplC

inline ImplClass * Impl() { return static_cast<ImplClass *>(this); }

#if CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
void SysUpdate();
void SysProcess();
static void SysOnEventSignal(void * arg);
#endif // CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

void ProcessDeviceEvents();

Expand Down
1 change: 0 additions & 1 deletion src/inet/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ static_library("inet") {
"InetInterface.h",
"InetLayer.cpp",
"InetLayer.h",
"InetLayerBasis.cpp",
"InetLayerBasis.h",
"InetLayerEvents.h",
"InetUtils.cpp",
Expand Down
3 changes: 1 addition & 2 deletions src/inet/EndPointBasis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ void EndPointBasis::InitEndPointBasis(InetLayer & aInetLayer, void * aAppState)

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
mSocket = INET_INVALID_SOCKET_FD;
mPendingIO.Clear();
mRequestIO.Clear();
mPendingIO.ClearAll();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
}

Expand Down
15 changes: 10 additions & 5 deletions src/inet/EndPointBasis.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@

#include <support/DLLUtil.h>

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#include <system/SystemSockets.h>
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
#include <Network/Network.h>
#endif // CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
Expand Down Expand Up @@ -93,11 +97,12 @@ class DLL_EXPORT EndPointBasis : public InetLayerBasis
#endif

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
int mSocket; /**< Encapsulated socket descriptor. */
IPAddressType mAddrType; /**< Protocol family, i.e. IPv4 or IPv6. */
SocketEvents mPendingIO; /**< Socket event masks (read/write/error) currently available */
SocketEvents mRequestIO; /**< Socket event masks (read/write) to wait for */
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
int mSocket; /**< Encapsulated socket descriptor. */
IPAddressType mAddrType; /**< Protocol family, i.e. IPv4 or IPv6. */
System::SocketEvents mPendingIO; /**< Socket event masks (read/write/error) currently available */

System::SystemSocketWatcher mWatcher;
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if CHIP_SYSTEM_CONFIG_USE_LWIP
/** Encapsulated LwIP protocol control block */
Expand Down
28 changes: 19 additions & 9 deletions src/inet/InetLayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
* * UDP network transport
* * Raw network transport
*
* For BSD/POSIX Sockets, event readiness notification is handled
* via file descriptors and a traditional poll / select
* implementation on the platform adaptation.
* For BSD/POSIX Sockets (CHIP_SYSTEM_CONFIG_USE_LWIP), event readiness
* notification is handled via file descriptors.
* - CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT==1 uses a traditional poll / select
* implementation on the platform adaptation.
* - CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT==0 uses an event registration interface.
*
* For LwIP, event readiness notification is handle via events /
* messages and platform- and system-specific hooks for the event
Expand Down Expand Up @@ -281,6 +283,7 @@ INET_ERROR InetLayer::Init(chip::System::Layer & aSystemLayer, void * aContext)
State = kState_Initialized;

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if INET_CONFIG_ENABLE_DNS_RESOLVER && INET_CONFIG_ENABLE_ASYNC_DNS_SOCKETS

err = mAsyncDNSResolver.Init(this);
Expand Down Expand Up @@ -1083,6 +1086,8 @@ chip::System::Error InetLayer::HandleInetLayerEvent(chip::System::Object & aTarg
#endif // CHIP_SYSTEM_CONFIG_USE_LWIP

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
/**
* Prepare the sets of file descriptors for @p select() to work with.
*
Expand All @@ -1108,7 +1113,7 @@ void InetLayer::PrepareSelect(int & nfds, fd_set * readfds, fd_set * writefds, f
{
RawEndPoint * lEndPoint = RawEndPoint::sPool.Get(*mSystemLayer, i);
if ((lEndPoint != nullptr) && lEndPoint->IsCreatedByInetLayer(*this))
lEndPoint->mRequestIO.SetFDs(lEndPoint->mSocket, nfds, readfds, writefds, exceptfds);
lEndPoint->mWatcher.SetFDs(lEndPoint->mSocket, nfds, readfds, writefds, exceptfds);
}
#endif // INET_CONFIG_ENABLE_RAW_ENDPOINT

Expand All @@ -1117,7 +1122,7 @@ void InetLayer::PrepareSelect(int & nfds, fd_set * readfds, fd_set * writefds, f
{
TCPEndPoint * lEndPoint = TCPEndPoint::sPool.Get(*mSystemLayer, i);
if ((lEndPoint != nullptr) && lEndPoint->IsCreatedByInetLayer(*this))
lEndPoint->mRequestIO.SetFDs(lEndPoint->mSocket, nfds, readfds, writefds, exceptfds);
lEndPoint->mWatcher.SetFDs(lEndPoint->mSocket, nfds, readfds, writefds, exceptfds);
}
#endif // INET_CONFIG_ENABLE_TCP_ENDPOINT

Expand All @@ -1126,7 +1131,7 @@ void InetLayer::PrepareSelect(int & nfds, fd_set * readfds, fd_set * writefds, f
{
UDPEndPoint * lEndPoint = UDPEndPoint::sPool.Get(*mSystemLayer, i);
if ((lEndPoint != nullptr) && lEndPoint->IsCreatedByInetLayer(*this))
lEndPoint->mRequestIO.SetFDs(lEndPoint->mSocket, nfds, readfds, writefds, exceptfds);
lEndPoint->mWatcher.SetFDs(lEndPoint->mSocket, nfds, readfds, writefds, exceptfds);
}
#endif // INET_CONFIG_ENABLE_UDP_ENDPOINT
}
Expand Down Expand Up @@ -1175,7 +1180,8 @@ void InetLayer::HandleSelectResult(int selectRes, fd_set * readfds, fd_set * wri
RawEndPoint * lEndPoint = RawEndPoint::sPool.Get(*mSystemLayer, i);
if ((lEndPoint != nullptr) && lEndPoint->IsCreatedByInetLayer(*this))
{
lEndPoint->mPendingIO = SocketEvents::FromFDs(lEndPoint->mSocket, readfds, writefds, exceptfds);
lEndPoint->mPendingIO =
System::SystemSocketWatcher::SocketEventsFromFDs(lEndPoint->mSocket, readfds, writefds, exceptfds);
}
}
#endif // INET_CONFIG_ENABLE_RAW_ENDPOINT
Expand All @@ -1186,7 +1192,8 @@ void InetLayer::HandleSelectResult(int selectRes, fd_set * readfds, fd_set * wri
TCPEndPoint * lEndPoint = TCPEndPoint::sPool.Get(*mSystemLayer, i);
if ((lEndPoint != nullptr) && lEndPoint->IsCreatedByInetLayer(*this))
{
lEndPoint->mPendingIO = SocketEvents::FromFDs(lEndPoint->mSocket, readfds, writefds, exceptfds);
lEndPoint->mPendingIO =
System::SystemSocketWatcher::SocketEventsFromFDs(lEndPoint->mSocket, readfds, writefds, exceptfds);
}
}
#endif // INET_CONFIG_ENABLE_TCP_ENDPOINT
Expand All @@ -1197,7 +1204,8 @@ void InetLayer::HandleSelectResult(int selectRes, fd_set * readfds, fd_set * wri
UDPEndPoint * lEndPoint = UDPEndPoint::sPool.Get(*mSystemLayer, i);
if ((lEndPoint != nullptr) && lEndPoint->IsCreatedByInetLayer(*this))
{
lEndPoint->mPendingIO = SocketEvents::FromFDs(lEndPoint->mSocket, readfds, writefds, exceptfds);
lEndPoint->mPendingIO =
System::SystemSocketWatcher::SocketEventsFromFDs(lEndPoint->mSocket, readfds, writefds, exceptfds);
}
}
#endif // INET_CONFIG_ENABLE_UDP_ENDPOINT
Expand Down Expand Up @@ -1238,6 +1246,8 @@ void InetLayer::HandleSelectResult(int selectRes, fd_set * readfds, fd_set * wri
}
}

#endif // CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

/**
Expand Down
6 changes: 4 additions & 2 deletions src/inet/InetLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@
#endif // INET_CONFIG_ENABLE_UDP_ENDPOINT

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if INET_CONFIG_ENABLE_DNS_RESOLVER && INET_CONFIG_ENABLE_ASYNC_DNS_SOCKETS
#include <inet/AsyncDNSResolverSockets.h>
#endif // INET_CONFIG_ENABLE_DNS_RESOLVER && INET_CONFIG_ENABLE_ASYNC_DNS_SOCKETS

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

#include <system/SystemLayer.h>
Expand Down Expand Up @@ -214,10 +216,10 @@ class DLL_EXPORT InetLayer
INET_ERROR GetLinkLocalAddr(InterfaceId link, IPAddress * llAddr);
bool MatchLocalIPv6Subnet(const IPAddress & addr);

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS && CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT
void PrepareSelect(int & nfds, fd_set * readfds, fd_set * writefds, fd_set * exceptfds, struct timeval & sleepTime);
void HandleSelectResult(int selectRes, fd_set * readfds, fd_set * writefds, fd_set * exceptfds);
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS && CHIP_SYSTEM_CONFIG_SOCKETS_USE_SELECT

static void UpdateSnapshot(chip::System::Stats::Snapshot & aSnapshot);

Expand Down
Loading

0 comments on commit 1fc4aea

Please sign in to comment.