Skip to content

Commit

Permalink
Add the ability for EventLoopHandlers to participate in the event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
ksperling-apple committed Jul 23, 2024
1 parent f7a9ce4 commit a26133c
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/lib/support/IntrusiveList.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ class IntrusiveList : public IntrusiveListBase
ConstIterator(IntrusiveListBase::ConstIteratorBase && base) : IntrusiveListBase::ConstIteratorBase(std::move(base)) {}
const T * operator->() { return Hook::ToObject(mCurrent); }
const T & operator*() { return *Hook::ToObject(mCurrent); }

ConstIterator & operator++() { return static_cast<ConstIterator &>(IntrusiveListBase::ConstIteratorBase::operator++()); }
ConstIterator operator++(int) { return IntrusiveListBase::ConstIteratorBase::operator++(1); }
ConstIterator & operator--() { return static_cast<ConstIterator &>(IntrusiveListBase::ConstIteratorBase::operator--()); }
ConstIterator operator--(int) { return IntrusiveListBase::ConstIteratorBase::operator--(1); }
};

class Iterator : public IntrusiveListBase::IteratorBase
Expand All @@ -426,6 +431,11 @@ class IntrusiveList : public IntrusiveListBase
Iterator(IntrusiveListBase::IteratorBase && base) : IntrusiveListBase::IteratorBase(std::move(base)) {}
T * operator->() { return Hook::ToObject(mCurrent); }
T & operator*() { return *Hook::ToObject(mCurrent); }

Iterator & operator++() { return static_cast<Iterator &>(IntrusiveListBase::IteratorBase::operator++()); }
Iterator operator++(int) { return IntrusiveListBase::IteratorBase::operator++(1); }
Iterator & operator--() { return static_cast<Iterator &>(IntrusiveListBase::IteratorBase::operator--()); }
Iterator operator--(int) { return IntrusiveListBase::IteratorBase::operator--(1); }
};

ConstIterator begin() const { return IntrusiveListBase::begin(); }
Expand Down
37 changes: 37 additions & 0 deletions src/system/SystemLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <system/SystemEvent.h>

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#include <lib/support/IntrusiveList.h>
#include <system/SocketEvents.h>
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down Expand Up @@ -243,6 +244,7 @@ class LayerSockets : public Layer
* Initialize watching for events on a file descriptor.
*
* Returns an opaque token through @a tokenOut that must be passed to subsequent operations for this file descriptor.
* Multiple calls to start watching the same file descriptor will return the same token.
* StopWatchingSocket() must be called before closing the file descriptor.
*/
virtual CHIP_ERROR StartWatchingSocket(int fd, SocketWatchToken * tokenOut) = 0;
Expand Down Expand Up @@ -288,6 +290,32 @@ class LayerSockets : public Layer
virtual SocketWatchToken InvalidSocketWatchToken() = 0;
};

class LayerSocketsLoop;

/**
* Enables the participation of subordinate event loops in the SystemLayer event loop.
*/
class EventLoopHandler : public chip::IntrusiveListNodeBase<>
{
public:
virtual ~EventLoopHandler() {}

/**
* Prepares events and returns the next requested wake time.
*/
virtual Clock::Timestamp PrepareEvents(Clock::Timestamp now) { return Clock::Timestamp::max(); }

/**
* Handles / dispatches pending events.
* Every call to this method will have been preceded by a call to `PrepareEvents`.
*/
virtual void HandleEvents() = 0;

private:
friend class LayerSocketsLoop;
intptr_t mState = 0; // For use by the event loop implementation
};

class LayerSocketsLoop : public LayerSockets
{
public:
Expand All @@ -298,13 +326,22 @@ class LayerSocketsLoop : public LayerSockets
virtual void HandleEvents() = 0;
virtual void EventLoopEnds() = 0;

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
virtual void AddLoopHandler(EventLoopHandler & handler) = 0;
virtual void RemoveLoopHandler(EventLoopHandler & handler) = 0;
#endif // !CHIP_SYSTEM_CONFIG_USE_DISPATCH

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
virtual void SetDispatchQueue(dispatch_queue_t dispatchQueue) = 0;
virtual dispatch_queue_t GetDispatchQueue() = 0;
#elif CHIP_SYSTEM_CONFIG_USE_LIBEV
virtual void SetLibEvLoop(struct ev_loop * aLibEvLoopP) = 0;
virtual struct ev_loop * GetLibEvLoop() = 0;
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH/LIBEV

protected:
// Expose EventLoopHandler.mState to sub-classes
decltype(EventLoopHandler::mState) & LoopHandlerState(EventLoopHandler & handler) { return handler.mState; }
};

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
Expand Down
81 changes: 72 additions & 9 deletions src/system/SystemLayerImplSelect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <system/SystemLayer.h>
#include <system/SystemLayerImplSelect.h>

#include <algorithm>
#include <errno.h>

// Choose an approximation of PTHREAD_NULL if pthread.h doesn't define one.
Expand Down Expand Up @@ -370,8 +371,9 @@ CHIP_ERROR LayerImplSelect::StartWatchingSocket(int fd, SocketWatchToken * token
{
if (w.mFD == fd)
{
// Duplicate registration is an error.
return CHIP_ERROR_INVALID_ARGUMENT;
// Already registered, return the existing token
*tokenOut = reinterpret_cast<SocketWatchToken>(&w);
return CHIP_NO_ERROR;
}
if ((w.mFD == kInvalidFd) && (watch == nullptr))
{
Expand Down Expand Up @@ -608,6 +610,32 @@ SocketEvents LayerImplSelect::SocketEventsFromFDs(int socket, const fd_set & rea
return res;
}

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
enum : intptr_t
{
kLoopHandlerInactive = 0, // default value for EventLoopHandler::mState
kLoopHandlerPending,
kLoopHandlerActive,
};

void LayerImplSelect::AddLoopHandler(EventLoopHandler & handler)
{
// Add the handler as pending because this method can be called at any point
// in a PrepareEvents() / WaitForEvents() / HandleEvents() sequence.
// It will be marked active when we call PrepareEvents() on it for the first time.
auto & state = LoopHandlerState(handler);
VerifyOrDie(state == kLoopHandlerInactive);
state = kLoopHandlerPending;
mLoopHandlers.PushBack(&handler);
}

void LayerImplSelect::RemoveLoopHandler(EventLoopHandler & handler)
{
mLoopHandlers.Remove(&handler);
LoopHandlerState(handler) = kLoopHandlerInactive;
}
#endif // !CHIP_SYSTEM_CONFIG_USE_DISPATCH

void LayerImplSelect::PrepareEvents()
{
assertChipStackLockedByCurrentThread();
Expand All @@ -616,10 +644,28 @@ void LayerImplSelect::PrepareEvents()
Clock::Timestamp awakenTime = currentTime + kDefaultMinSleepPeriod;

TimerList::Node * timer = mTimerList.Earliest();
if (timer && timer->AwakenTime() < awakenTime)
if (timer)
{
awakenTime = std::min(awakenTime, timer->AwakenTime());
}

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
// Activate added EventLoopHandlers and call PrepareEvents on active handlers.
auto loopIter = mLoopHandlers.begin();
while (loopIter != mLoopHandlers.end())
{
awakenTime = timer->AwakenTime();
auto & loop = *loopIter++; // advance before calling out, in case a list modification clobbers the `next` pointer
switch (auto & state = LoopHandlerState(loop))
{
case kLoopHandlerPending:
state = kLoopHandlerActive;
[[fallthrough]];
case kLoopHandlerActive:
awakenTime = std::min(awakenTime, loop.PrepareEvents(currentTime));
break;
}
}
#endif // !CHIP_SYSTEM_CONFIG_USE_DISPATCH

const Clock::Timestamp sleepTime = (awakenTime > currentTime) ? (awakenTime - currentTime) : Clock::kZero;
Clock::ToTimeval(sleepTime, mNextTimeout);
Expand Down Expand Up @@ -683,18 +729,35 @@ void LayerImplSelect::HandleEvents()
mTimerPool.Invoke(timer);
}

for (auto & w : mSocketWatchPool)
// Process socket events, if any
if (mSelectResult > 0)
{
if (w.mFD != kInvalidFd)
for (auto & w : mSocketWatchPool)
{
SocketEvents events = SocketEventsFromFDs(w.mFD, mSelected.mReadSet, mSelected.mWriteSet, mSelected.mErrorSet);
if (events.HasAny() && w.mCallback != nullptr)
if (w.mFD != kInvalidFd && w.mCallback != nullptr)
{
w.mCallback(events, w.mCallbackData);
SocketEvents events = SocketEventsFromFDs(w.mFD, mSelected.mReadSet, mSelected.mWriteSet, mSelected.mErrorSet);
if (events.HasAny())
{
w.mCallback(events, w.mCallbackData);
}
}
}
}

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
// Call HandleEvents for active loop handlers
auto loopIter = mLoopHandlers.begin();
while (loopIter != mLoopHandlers.end())
{
auto & loop = *loopIter++; // advance before calling out, in case a list modification clobbers the `next` pointer
if (LoopHandlerState(loop) == kLoopHandlerActive)
{
loop.HandleEvents();
}
}
#endif // !CHIP_SYSTEM_CONFIG_USE_DISPATCH

#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
mHandleSelectThread = PTHREAD_NULL;
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING
Expand Down
9 changes: 9 additions & 0 deletions src/system/SystemLayerImplSelect.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class LayerImplSelect : public LayerSocketsLoop
void HandleEvents() override;
void EventLoopEnds() override {}

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
void AddLoopHandler(EventLoopHandler & handler) override;
void RemoveLoopHandler(EventLoopHandler & handler) override;
#endif // !CHIP_SYSTEM_CONFIG_USE_DISPATCH

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
void SetDispatchQueue(dispatch_queue_t dispatchQueue) override { mDispatchQueue = dispatchQueue; };
dispatch_queue_t GetDispatchQueue() override { return mDispatchQueue; };
Expand Down Expand Up @@ -135,6 +140,10 @@ class LayerImplSelect : public LayerSocketsLoop
TimerList mExpiredTimers;
timeval mNextTimeout;

#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH
IntrusiveList<EventLoopHandler> mLoopHandlers;
#endif

// Members for select loop
struct SelectSets
{
Expand Down
1 change: 1 addition & 0 deletions src/system/tests/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ chip_test_suite("tests") {
output_name = "libSystemLayerTests"

test_sources = [
"TestEventLoopHandler.cpp",
"TestSystemClock.cpp",
"TestSystemErrorStr.cpp",
"TestSystemPacketBuffer.cpp",
Expand Down
138 changes: 138 additions & 0 deletions src/system/tests/TestEventLoopHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <pw_unit_test/framework.h>
#include <system/SystemConfig.h>

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS && !CHIP_SYSTEM_CONFIG_USE_DISPATCH

#include <lib/support/CodeUtils.h>
#include <platform/CHIPDeviceLayer.h>

#include <functional>
#include <string>

using namespace chip;
using namespace chip::System::Clock;
using namespace chip::System::Clock::Literals;

class TestEventLoopHandler : public ::testing::Test
{
public:
static void SetUpTestSuite()
{
ASSERT_EQ(Platform::MemoryInit(), CHIP_NO_ERROR);
ASSERT_EQ(DeviceLayer::PlatformMgr().InitChipStack(), CHIP_NO_ERROR);
}

static void TearDownTestSuite()
{
DeviceLayer::PlatformMgr().Shutdown();
Platform::MemoryShutdown();
}

static System::LayerSocketsLoop & SystemLayer() { return static_cast<System::LayerSocketsLoop &>(DeviceLayer::SystemLayer()); }

template <class Lambda>
static void Schedule(Timeout delay, Lambda lambda)
{
SystemLayer().StartTimer(
delay,
[](System::Layer * layer, void * ctx) {
auto * function = static_cast<std::function<void()> *>(ctx);
(*function)();
delete function;
},
new std::function<void()>(lambda));
}

template <class Lambda>
static void ScheduleNextTick(Lambda lambda)
{
// ScheduleLambda is based on device events, which are greedily processed until the
// queue is empty, so we can't use it to wait for the next event loop tick. Just use
// a timer with a very short delay.
Schedule(1_ms, lambda);
}
};

TEST_F(TestEventLoopHandler, EventLoopHandlerSequence)
{
struct : public System::EventLoopHandler
{
std::string trace;
Timestamp PrepareEvents(Timestamp now) override
{
trace.append("P");
return Timestamp::max();
}
void HandleEvents() override { trace.append("H"); }
} loopHandler;

ScheduleNextTick([&] {
loopHandler.trace.append("1");
SystemLayer().AddLoopHandler(loopHandler);
loopHandler.trace.append("A");
ScheduleNextTick([&] { // "P"
loopHandler.trace.append("2");
ScheduleNextTick([&] { // "H", "P"
loopHandler.trace.append("3");
SystemLayer().RemoveLoopHandler(loopHandler);
loopHandler.trace.append("R");
ScheduleNextTick([&] {
loopHandler.trace.append("4");
DeviceLayer::PlatformMgr().StopEventLoopTask();
});
});
});
});

chip::DeviceLayer::PlatformMgr().RunEventLoop();
EXPECT_EQ(loopHandler.trace, std::string("1AP2HP3R4"));
}

TEST_F(TestEventLoopHandler, EventLoopHandlerWake)
{
struct : public System::EventLoopHandler
{
Timestamp startTimestamp = System::SystemClock().GetMonotonicTimestamp();
Timestamp wakeTimestamp = Timestamp::max();

Timestamp PrepareEvents(Timestamp now) override { return now + 400_ms; }
void HandleEvents() override
{
// StartTimer() (called by Schedule()) is liable to causes an immediate
// wakeup via Signal(), so ignore this call if it's only been a few ms.
auto now = System::SystemClock().GetMonotonicTimestamp();
if (now - startTimestamp >= 100_ms)
{
wakeTimestamp = now;
DeviceLayer::PlatformMgr().StopEventLoopTask();
}
}
} loopHandler;

SystemLayer().AddLoopHandler(loopHandler);
Schedule(1000_ms, [] { DeviceLayer::PlatformMgr().StopEventLoopTask(); });
chip::DeviceLayer::PlatformMgr().RunEventLoop();
SystemLayer().RemoveLoopHandler(loopHandler);

Timestamp sleepDuration = loopHandler.wakeTimestamp - loopHandler.startTimestamp;
EXPECT_GE(sleepDuration.count(), 400u); // loopHandler requested wake-up after 400ms
EXPECT_LE(sleepDuration.count(), 500u); // allow some slack for test machine load
}

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS && !CHIP_SYSTEM_CONFIG_USE_DISPATCH

0 comments on commit a26133c

Please sign in to comment.