Skip to content

Commit

Permalink
Move WakeIOThread() functionality to WatchableEventManager (#8456)
Browse files Browse the repository at this point in the history
#### Problem

System::Layer contains `WakeIOThread()` and `mWakeEvent` which are
really implementation details of `WatchableEventManager`.

#### Change overview

- Replace `System::Layer::WakeIOThread()` with
  `WatchableEventManager::Signal()`, because not every implementation
  will necessarily implement the nudge by waking a thread.

- Split `SystemSockets.h` into `WatchableEventManager.h`,
  `WatchableSocket.h`, and `WakeEvent.h`; likewise split the
  various implementation files.

- Make the `System::WakeEvent` public API independent of the
  file-descriptor implementation.

For the present, `mHandleSelectThread` remains in `System::Layer`
until timers are integrated and the transitional `HandleTimeout()`
is removed.

#### Testing

Sanity check with chip-tool. CI should verify that functionality does
not regress.
  • Loading branch information
kpschoedel authored Jul 19, 2021
1 parent 37df562 commit fa6033f
Show file tree
Hide file tree
Showing 27 changed files with 845 additions and 566 deletions.
6 changes: 3 additions & 3 deletions src/controller/CHIPDeviceController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,11 +549,11 @@ CHIP_ERROR DeviceController::ServiceEventSignal()
{
VerifyOrReturnError(mState == State::Initialized, CHIP_ERROR_INCORRECT_STATE);

#if CONFIG_DEVICE_LAYER && CHIP_SYSTEM_CONFIG_USE_IO_THREAD
DeviceLayer::SystemLayer.WakeIOThread();
#if CONFIG_DEVICE_LAYER && (CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)
DeviceLayer::SystemLayer.WatchableEvents().Signal();
#else
ReturnErrorOnFailure(CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE);
#endif // CONFIG_DEVICE_LAYER && CHIP_SYSTEM_CONFIG_USE_IO_THREAD
#endif // CONFIG_DEVICE_LAYER && (CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK)

return CHIP_NO_ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion src/controller/java/CHIPDeviceController-JNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void JNI_OnUnload(JavaVM * jvm, void * reserved)
if (sIOThread != PTHREAD_NULL)
{
sShutdown = true;
sSystemLayer.WakeIOThread();
sSystemLayer.WatchableEvents().Signal();

StackUnlockGuard unlockGuard(JniReferences::GetInstance().GetStackLock());
pthread_join(sIOThread, NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ void GenericPlatformManagerImpl_POSIX<ImplClass>::_PostEvent(const ChipDeviceEve
{
mChipEventQueue.Push(*event);

#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer.WakeIOThread(); // Trigger wake select on CHIP thread
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer.WatchableEvents().Signal(); // Trigger wake select on CHIP thread
}

template <class ImplClass>
Expand Down Expand Up @@ -259,7 +257,7 @@ CHIP_ERROR GenericPlatformManagerImpl_POSIX<ImplClass>::_StopEventLoopTask()
// SystemLayer.
//
Impl()->LockChipStack();
SystemLayer.WakeIOThread();
SystemLayer.WatchableEvents().Signal();
Impl()->UnlockChipStack();

pthread_mutex_lock(&mStateLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void GenericPlatformManagerImpl_Zephyr<ImplClass>::_PostEvent(const ChipDeviceEv
// k_msgq_put takes `void*` instead of `const void*`. Nonetheless, it should be safe to
// const_cast here and there are components in Zephyr itself which do the same.
if (k_msgq_put(&mChipEventQueue, const_cast<ChipDeviceEvent *>(event), K_NO_WAIT) == 0)
SystemLayer.WakeIOThread(); // Trigger wake on CHIP thread
SystemLayer.WatchableEvents().Signal(); // Trigger wake on CHIP thread
else
ChipLogError(DeviceLayer, "Failed to post event to CHIP Platform event queue");
}
Expand Down
2 changes: 1 addition & 1 deletion src/inet/EndPointBasis.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include <support/DLLUtil.h>

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#include <system/SystemSockets.h>
#include <system/WatchableSocket.h>
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
Expand Down
2 changes: 1 addition & 1 deletion src/inet/RawEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
#endif // CHIP_SYSTEM_CONFIG_USE_LWIP

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#include <system/SystemSockets.h>
#include <system/WatchableSocket.h>
#if HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif // HAVE_SYS_SOCKET_H
Expand Down
6 changes: 3 additions & 3 deletions src/inet/TCPEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,10 +833,10 @@ void TCPEndPoint::EnableReceive()

DriveReceiving();

#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
// Wake the thread waiting for I/O so that it can include the socket.
SystemLayer().WakeIOThread();
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
SystemLayer().WatchableEvents().Signal();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/platform/Linux/MdnsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include <avahi-common/watch.h>

#include "lib/mdns/platform/Mdns.h"
#include "system/SystemSockets.h"
#include "system/WatchableSocket.h"

struct AvahiWatch
{
Expand Down
7 changes: 4 additions & 3 deletions src/platform/mbed/PlatformManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack(void)
mQueue.~EventQueue();
new (&mQueue) events::EventQueue(event_size * CHIP_DEVICE_CONFIG_MAX_EVENT_QUEUE_SIZE);

mQueue.background(
[&](int t) { MbedEventTimeout::AttachTimeout([&] { SystemLayer.WakeIOThread(); }, std::chrono::milliseconds{ t }); });
mQueue.background([&](int t) {
MbedEventTimeout::AttachTimeout([&] { SystemLayer.WatchableEvents().Signal(); }, std::chrono::milliseconds{ t });
});

// Reinitialize the Mutexes
mThisStateMutex.~Mutex();
Expand Down Expand Up @@ -211,7 +212,7 @@ CHIP_ERROR PlatformManagerImpl::_StopEventLoopTask()

// Wake from select so it unblocks processing
LockChipStack();
SystemLayer.WakeIOThread();
SystemLayer.WatchableEvents().Signal();
UnlockChipStack();

osStatus err = osOK;
Expand Down
9 changes: 7 additions & 2 deletions src/system/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ buildconfig_header("system_buildconfig") {
}

if (chip_system_config_use_sockets) {
defines += [ "CHIP_SYSTEM_WATCHABLE_EVENT_MANAGER_CONFIG_FILE=<system/WatchableEventManager${chip_system_config_sockets_event_loop}.h>" ]
defines += [ "CHIP_SYSTEM_WATCHABLE_SOCKET_CONFIG_FILE=<system/WatchableSocket${chip_system_config_sockets_event_loop}.h>" ]
}
}
Expand Down Expand Up @@ -142,15 +143,17 @@ static_library("system") {
"SystemObject.h",
"SystemPacketBuffer.cpp",
"SystemPacketBuffer.h",
"SystemSockets.cpp",
"SystemSockets.h",
"SystemStats.cpp",
"SystemStats.h",
"SystemTimer.cpp",
"SystemTimer.h",
"TLVPacketBufferBackingStore.cpp",
"TLVPacketBufferBackingStore.h",
"TimeSource.h",
"WakeEvent.cpp",
"WakeEvent.h",
"WatchableEventManager.h",
"WatchableSocket.h",
]

cflags = [ "-Wconversion" ]
Expand All @@ -165,6 +168,8 @@ static_library("system") {

if (chip_system_config_use_sockets) {
sources += [
"WatchableEventManager${chip_system_config_sockets_event_loop}.cpp",
"WatchableEventManager${chip_system_config_sockets_event_loop}.h",
"WatchableSocket${chip_system_config_sockets_event_loop}.cpp",
"WatchableSocket${chip_system_config_sockets_event_loop}.h",
]
Expand Down
8 changes: 0 additions & 8 deletions src/system/SystemConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,6 @@

// clang-format off

#ifndef CHIP_SYSTEM_CONFIG_USE_IO_THREAD
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
#define CHIP_SYSTEM_CONFIG_USE_IO_THREAD 1
#else
#define CHIP_SYSTEM_CONFIG_USE_IO_THREAD 0
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD

/**
* @def CHIP_SYSTEM_CONFIG_TRANSFER_INETLAYER_PROJECT_CONFIGURATION
*
Expand Down
45 changes: 0 additions & 45 deletions src/system/SystemLayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ CHIP_ERROR Layer::Init()
this->AddEventHandlerDelegate(sSystemEventHandlerDelegate);
#endif // CHIP_SYSTEM_CONFIG_USE_LWIP

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
// Create an event to allow an arbitrary thread to wake the thread in the select loop.
ReturnErrorOnFailure(this->mWakeEvent.Open(mWatchableEvents));
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK

this->mLayerState = kLayerState_Initialized;

return CHIP_NO_ERROR;
Expand All @@ -148,10 +143,6 @@ CHIP_ERROR Layer::Shutdown()
if (this->mLayerState == kLayerState_NotInitialized)
return CHIP_ERROR_INCORRECT_STATE;

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
ReturnErrorOnFailure(mWakeEvent.Close());
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK

for (size_t i = 0; i < Timer::sPool.Size(); ++i)
{
Timer * lTimer = Timer::sPool.Get(*this, i);
Expand Down Expand Up @@ -455,42 +446,6 @@ void Layer::HandleTimeout()

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK

#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD

/**
* Wake up the I/O thread by writing a single byte to the wake pipe.
*
* @note
* If @p WakeIOThread() is being called from within an I/O event callback, then writing to the wake pipe can be skipped,
* since the I/O thread is already awake.
*
* Furthermore, we don't care if this write fails as the only reasonably likely failure is that the pipe is full, in which
* case the select calling thread is going to wake up anyway.
*/
void Layer::WakeIOThread()
{
CHIP_ERROR lReturn;

if (this->State() != kLayerState_Initialized)
return;

#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
if (pthread_equal(this->mHandleSelectThread, pthread_self()))
{
return;
}
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING

// Send notification to wake up the select call.
lReturn = this->mWakeEvent.Notify();
if (lReturn != CHIP_NO_ERROR)
{
ChipLogError(chipSystemLayer, "System wake event notify failed: %s", ErrorStr(lReturn));
}
}

#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD

#if CHIP_SYSTEM_CONFIG_USE_LWIP
LwIPEventHandlerDelegate Layer::sSystemEventHandlerDelegate;

Expand Down
12 changes: 5 additions & 7 deletions src/system/SystemLayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@

// Include dependent headers
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#include <system/SystemSockets.h>
#include <system/WakeEvent.h>
#include <system/WatchableEventManager.h>
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
Expand Down Expand Up @@ -136,14 +137,11 @@ class DLL_EXPORT Layer

Clock & GetClock() { return mClock; }

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
WatchableEventManager & WatchableEvents() { return mWatchableEvents; }
bool GetTimeout(struct timeval & aSleepTime); // TODO(#5556): Integrate timer platform details with WatchableEventManager.
void HandleTimeout(); // TODO(#5556): Integrate timer platform details with WatchableEventManager.
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
void WakeIOThread();
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK

#if CHIP_SYSTEM_CONFIG_USE_LWIP
typedef CHIP_ERROR (*EventHandler)(Object & aTarget, EventType aEventType, uintptr_t aArgument);
Expand Down Expand Up @@ -172,8 +170,8 @@ class DLL_EXPORT Layer

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
WatchableEventManager mWatchableEvents;
WakeEvent mWakeEvent;
#if CHIP_SYSTEM_CONFIG_POSIX_LOCKING
friend class WatchableEventManager;
std::atomic<pthread_t> mHandleSelectThread;
#endif // CHIP_SYSTEM_CONFIG_POSIX_LOCKING
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
Expand Down
8 changes: 4 additions & 4 deletions src/system/SystemTimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ CHIP_ERROR Timer::Start(uint32_t aDelayMilliseconds, OnCompleteFunct aOnComplete
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

#if CHIP_SYSTEM_CONFIG_USE_IO_THREAD
lLayer.WakeIOThread();
#endif // CHIP_SYSTEM_CONFIG_USE_IO_THREAD
#if CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
lLayer.WatchableEvents().Signal();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS || CHIP_SYSTEM_CONFIG_USE_NETWORK_FRAMEWORK

return CHIP_NO_ERROR;
Expand Down Expand Up @@ -220,7 +220,7 @@ CHIP_ERROR Timer::ScheduleWork(OnCompleteFunct aOnComplete, void * aAppState)
else
{
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
lLayer.WakeIOThread();
lLayer.WatchableEvents().Signal();
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
}
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH
Expand Down
5 changes: 4 additions & 1 deletion src/system/SystemSockets.cpp → src/system/WakeEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
* data stream built on top of two file descriptors.
*/

#include <system/SystemSockets.h>
#include <system/SystemConfig.h>

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS

#include <system/WakeEvent.h>

// Include additional CHIP headers
#include <support/CodeUtils.h>
#include <system/SystemError.h>

// Include system and language headers
#include <errno.h>
Expand Down
69 changes: 69 additions & 0 deletions src/system/WakeEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
*
* Copyright (c) 2020-2021 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* @file
* This file declares the abstraction of socket (file descriptor) events.
*/

#pragma once

// Include configuration headers
#include <system/SystemConfig.h>

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS

#include <core/CHIPError.h>
#include <system/WatchableSocket.h>

namespace chip {
namespace System {

class WakeEventTest;
class WatchableEventManager;

/**
* @class WakeEvent
*
* An instance of this type can be used by a WatchableEventManager to allow other threads
* to wake its event loop thread via WatchableEventManager::Signal().
*/
class WakeEvent
{
public:
CHIP_ERROR Open(WatchableEventManager & watchState); /**< Initialize the pipeline */
CHIP_ERROR Close(); /**< Close both ends of the pipeline. */

CHIP_ERROR Notify(); /**< Set the event. */
void Confirm(); /**< Clear the event. */

private:
friend class WakeEventTest;

int GetReadFD() const { return mFD.GetFD(); }
static void Confirm(WatchableSocket & socket) { reinterpret_cast<WakeEvent *>(socket.GetCallbackData())->Confirm(); }

#if CHIP_SYSTEM_CONFIG_USE_POSIX_PIPE
int mWriteFD;
#endif
WatchableSocket mFD;
};

} // namespace System
} // namespace chip

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
Loading

0 comments on commit fa6033f

Please sign in to comment.