Skip to content

Commit

Permalink
Generalize socket-based event loop
Browse files Browse the repository at this point in the history
#### Problem

Socket event management is currently tied closely to select(2),
and does not integrate well with other event loops.

See issue project-chip#5556 _rework system layer event loop_

#### Change overview

The primary purpose of this change is to provide a clear interface for
monitoring socket events, that can be used both within the CHIP SDK
and by an application using the SDK, and can be implemented either
inside or outside the SDK. Functionality does not change.

* Defines an interface, in `system/SystemSockets.h`.
* Converts most existing uses of socket event monitoring (except mDNS,
  to be done as a followup).
* Converts the existing select(2)-based event management into an
  implementation of the new interface.
* Adds a second implementation of the interface, using libevent,
  to verify that the interface is general enough.

#### Testing

* Converted the applicable unit tests from select(2) to this interface,
  and ran them with both the select and libevent implementations.
  (CI remains select-only for now.)
* Integration tests involving socket-based platforms exercise this code
  and confirm no change to functionality.
* Manual verification of controller/device communication
  • Loading branch information
kpschoedel committed Jun 14, 2021
1 parent 446e7ad commit 1b4fa34
Show file tree
Hide file tree
Showing 36 changed files with 1,333 additions and 1,035 deletions.
8 changes: 4 additions & 4 deletions src/controller/CHIPDeviceController.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020 Project CHIP Authors
* Copyright (c) 2020-2021 Project CHIP Authors
* Copyright (c) 2013-2017 Nest Labs, Inc.
* All rights reserved.
*
Expand Down Expand Up @@ -501,11 +501,11 @@ CHIP_ERROR DeviceController::ServiceEventSignal()
{
VerifyOrReturnError(mState == State::Initialized, CHIP_ERROR_INCORRECT_STATE);

#if CONFIG_DEVICE_LAYER && (CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
DeviceLayer::SystemLayer.WakeSelect();
#if CONFIG_DEVICE_LAYER && CHIP_SYSTEM_CONFIG_USE_IO_THREAD
DeviceLayer::SystemLayer.WakeIOThread();
#else
ReturnErrorOnFailure(CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE);
#endif // CONFIG_DEVICE_LAYER && (CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
#endif // CONFIG_DEVICE_LAYER && CHIP_SYSTEM_CONFIG_USE_IO_THREAD

return CHIP_NO_ERROR;
}
Expand Down
29 changes: 9 additions & 20 deletions src/controller/java/CHIPDeviceController-JNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void JNI_OnUnload(JavaVM * jvm, void * reserved)
if (sIOThread != PTHREAD_NULL)
{
sShutdown = true;
sSystemLayer.WakeSelect();
sSystemLayer.WakeIOThread();

AndroidDeviceControllerWrapper::StackUnlockGuard unlockGuard(&sStackLock);
pthread_join(sIOThread, NULL);
Expand Down Expand Up @@ -1013,9 +1013,6 @@ void * IOThreadMain(void * arg)
{
JNIEnv * env;
JavaVMAttachArgs attachArgs;
struct timeval sleepTime;
fd_set readFDs, writeFDs, exceptFDs;
int numFDs = 0;

// Attach the IO thread to the JVM as a daemon thread.
// This allows the JVM to shutdown without waiting for this thread to exit.
Expand All @@ -1036,26 +1033,19 @@ void * IOThreadMain(void * arg)
// Lock the stack to prevent collisions with Java threads.
pthread_mutex_lock(&sStackLock);

System::WatchableEventManager & watchState = sSystemLayer.WatchableEvents();
watchState.EventLoopBegins();

// Loop until we are told to exit.
while (!quit.load(std::memory_order_relaxed))
{
numFDs = 0;
FD_ZERO(&readFDs);
FD_ZERO(&writeFDs);
FD_ZERO(&exceptFDs);

sleepTime.tv_sec = 10;
sleepTime.tv_usec = 0;

// Collect the currently active file descriptors.
sSystemLayer.PrepareSelect(numFDs, &readFDs, &writeFDs, &exceptFDs, sleepTime);
sInetLayer.PrepareSelect(numFDs, &readFDs, &writeFDs, &exceptFDs, sleepTime);
// TODO(#5556): add a timer for `sleepTime.tv_sec = 10; sleepTime.tv_usec = 0;`
watchState.PrepareEvents();

// Unlock the stack so that Java threads can make API calls.
pthread_mutex_unlock(&sStackLock);

// Wait for for I/O or for the next timer to expire.
int selectRes = select(numFDs, &readFDs, &writeFDs, &exceptFDs, &sleepTime);
watchState.WaitForEvents();

// Break the loop if requested to shutdown.
// if (sShutdown)
Expand All @@ -1064,10 +1054,9 @@ void * IOThreadMain(void * arg)
// Re-lock the stack.
pthread_mutex_lock(&sStackLock);

// Perform I/O and/or dispatch timers.
sSystemLayer.HandleSelectResult(selectRes, &readFDs, &writeFDs, &exceptFDs);
sInetLayer.HandleSelectResult(selectRes, &readFDs, &writeFDs, &exceptFDs);
watchState.HandleEvents();
}
watchState.EventLoopEnds();

// Detach the thread from the JVM.
sJVM->DetachCurrentThread();
Expand Down
109 changes: 21 additions & 88 deletions src/include/platform/internal/GenericPlatformManagerImpl_POSIX.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020 Project CHIP Authors
* Copyright (c) 2020-2021 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@
#ifndef GENERIC_PLATFORM_MANAGER_IMPL_POSIX_CPP
#define GENERIC_PLATFORM_MANAGER_IMPL_POSIX_CPP

#include "system/SystemError.h"
#include <platform/PlatformManager.h>
#include <platform/internal/CHIPDeviceLayerInternal.h>
#include <platform/internal/GenericPlatformManagerImpl_POSIX.h>
Expand All @@ -36,28 +35,18 @@
#endif
#include <platform/internal/GenericPlatformManagerImpl.cpp>

#include <system/SystemError.h>
#include <system/SystemLayer.h>

#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <sched.h>
#include <sys/select.h>
#include <unistd.h>

#define DEFAULT_MIN_SLEEP_PERIOD (60 * 60 * 24 * 30) // Month [sec]

#if CHIP_DEVICE_CONFIG_ENABLE_MDNS
namespace chip {
namespace Mdns {
void UpdateMdnsDataset(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout);
void ProcessMdns(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet);
} // namespace Mdns
} // namespace chip
#endif

namespace chip {
namespace DeviceLayer {
namespace Internal {

Expand Down Expand Up @@ -140,6 +129,8 @@ bool GenericPlatformManagerImpl_POSIX<ImplClass>::_IsChipStackLockedByCurrentThr
template <class ImplClass>
CHIP_ERROR GenericPlatformManagerImpl_POSIX<ImplClass>::_StartChipTimer(int64_t aMilliseconds)
{
// TODO(#5556): Integrate timer platform details with WatchableEventManager.

// Let SystemLayer.PrepareSelect() handle timers.
return CHIP_NO_ERROR;
}
Expand All @@ -148,7 +139,10 @@ template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::_PostEvent(const ChipDeviceEvent * event)
{
mChipEventQueue.push(*event); // Thread safe due to ChipStackLock taken by App thread
SysOnEventSignal(this); // Trigger wake select on CHIP thread

#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer.WakeIOThread(); // Trigger wake select on CHIP thread
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
}

template <class ImplClass>
Expand All @@ -161,77 +155,6 @@ void GenericPlatformManagerImpl_POSIX<ImplClass>::ProcessDeviceEvents()
}
}

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::SysOnEventSignal(void * arg)
{
SystemLayer.WakeSelect();
}

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::SysUpdate()
{
FD_ZERO(&mReadSet);
FD_ZERO(&mWriteSet);
FD_ZERO(&mErrorSet);
mMaxFd = 0;

// Max out this duration and let CHIP set it appropriately.
mNextTimeout.tv_sec = DEFAULT_MIN_SLEEP_PERIOD;
mNextTimeout.tv_usec = 0;

if (SystemLayer.State() == System::kLayerState_Initialized)
{
SystemLayer.PrepareSelect(mMaxFd, &mReadSet, &mWriteSet, &mErrorSet, mNextTimeout);
}

#if !(CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
if (InetLayer.State == InetLayer::kState_Initialized)
{
InetLayer.PrepareSelect(mMaxFd, &mReadSet, &mWriteSet, &mErrorSet, mNextTimeout);
}
#endif // !(CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
#if CHIP_DEVICE_CONFIG_ENABLE_MDNS
chip::Mdns::UpdateMdnsDataset(mReadSet, mWriteSet, mErrorSet, mMaxFd, mNextTimeout);
#endif
}

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::SysProcess()
{
int selectRes;
int64_t nextTimeoutMs;

nextTimeoutMs = mNextTimeout.tv_sec * 1000 + mNextTimeout.tv_usec / 1000;
_StartChipTimer(nextTimeoutMs);

Impl()->UnlockChipStack();
selectRes = select(mMaxFd + 1, &mReadSet, &mWriteSet, &mErrorSet, &mNextTimeout);
Impl()->LockChipStack();

if (selectRes < 0)
{
ChipLogError(DeviceLayer, "select failed: %s\n", ErrorStr(System::MapErrorPOSIX(errno)));
return;
}

if (SystemLayer.State() == System::kLayerState_Initialized)
{
SystemLayer.HandleSelectResult(mMaxFd, &mReadSet, &mWriteSet, &mErrorSet);
}

#if !(CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
if (InetLayer.State == InetLayer::kState_Initialized)
{
InetLayer.HandleSelectResult(mMaxFd, &mReadSet, &mWriteSet, &mErrorSet);
}
#endif // !(CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)

ProcessDeviceEvents();
#if CHIP_DEVICE_CONFIG_ENABLE_MDNS
chip::Mdns::ProcessMdns(mReadSet, mWriteSet, mErrorSet);
#endif
}

template <class ImplClass>
void GenericPlatformManagerImpl_POSIX<ImplClass>::_RunEventLoop()
{
Expand All @@ -254,11 +177,21 @@ void GenericPlatformManagerImpl_POSIX<ImplClass>::_RunEventLoop()

Impl()->LockChipStack();

System::WatchableEventManager & watchState = SystemLayer.WatchableEvents();
watchState.EventLoopBegins();
do
{
SysUpdate();
SysProcess();
watchState.PrepareEvents();

Impl()->UnlockChipStack();
watchState.WaitForEvents();
Impl()->LockChipStack();

watchState.HandleEvents();

ProcessDeviceEvents();
} while (mShouldRunEventLoop.load(std::memory_order_relaxed));
watchState.EventLoopEnds();

Impl()->UnlockChipStack();

Expand Down Expand Up @@ -340,7 +273,7 @@ CHIP_ERROR GenericPlatformManagerImpl_POSIX<ImplClass>::_StopEventLoopTask()
// SystemLayer.
//
Impl()->LockChipStack();
SystemLayer.WakeSelect();
SystemLayer.WakeIOThread();
Impl()->UnlockChipStack();

pthread_mutex_lock(&mStateLock);
Expand Down
14 changes: 1 addition & 13 deletions src/include/platform/internal/GenericPlatformManagerImpl_POSIX.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020 Project CHIP Authors
* Copyright (c) 2020-2021 Project CHIP Authors
* Copyright (c) 2018 Nest Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -28,7 +28,6 @@

#include <fcntl.h>
#include <sched.h>
#include <sys/select.h>
#include <sys/time.h>
#include <unistd.h>

Expand All @@ -52,13 +51,6 @@ template <class ImplClass>
class GenericPlatformManagerImpl_POSIX : public GenericPlatformManagerImpl<ImplClass>
{
protected:
// Members for select loop
int mMaxFd;
fd_set mReadSet;
fd_set mWriteSet;
fd_set mErrorSet;
struct timeval mNextTimeout;

// OS-specific members (pthread)
pthread_mutex_t mChipStackLock;
std::queue<ChipDeviceEvent> mChipEventQueue;
Expand Down Expand Up @@ -115,10 +107,6 @@ class GenericPlatformManagerImpl_POSIX : public GenericPlatformManagerImpl<ImplC

inline ImplClass * Impl() { return static_cast<ImplClass *>(this); }

void SysUpdate();
void SysProcess();
static void SysOnEventSignal(void * arg);

void ProcessDeviceEvents();

std::atomic<bool> mShouldRunEventLoop;
Expand Down
Loading

0 comments on commit 1b4fa34

Please sign in to comment.