Skip to content

Commit

Permalink
Add kj::EventLoopLocal<T>.
Browse files Browse the repository at this point in the history
It's like thread-local variables, except they are local to the current EventLoop instead.
  • Loading branch information
kentonv committed Oct 27, 2024
1 parent acdca9b commit f6fb3b1
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
43 changes: 43 additions & 0 deletions c++/src/kj/async-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1847,5 +1847,48 @@ KJ_TEST("constPromise") {
KJ_EXPECT(i == 123);
}

KJ_TEST("EventLoopLocal") {
static const EventLoopLocal<int> evLocalInt;
static const EventLoopLocal<Own<Refcounted>> evLocalOwn;

auto rc1 = kj::refcounted<Refcounted>();
auto rc2 = kj::refcounted<Refcounted>();;

{
EventLoop loop1, loop2;

{
WaitScope waitScope(loop1);
*evLocalInt = 123;
*evLocalOwn = kj::addRef(*rc1);
}

{
WaitScope waitScope(loop2);
*evLocalInt = 456;
*evLocalOwn = kj::addRef(*rc2);
}

{
WaitScope waitScope(loop1);
KJ_EXPECT(*evLocalInt == 123);
KJ_EXPECT(evLocalOwn->get() == rc1.get());
}

{
WaitScope waitScope(loop2);
KJ_EXPECT(*evLocalInt == 456);
KJ_EXPECT(evLocalOwn->get() == rc2.get());
}

KJ_EXPECT(rc1->isShared());
KJ_EXPECT(rc2->isShared());
}

// Destroying the event loop destoys all locals, so these are no longer shared.
KJ_EXPECT(!rc1->isShared());
KJ_EXPECT(!rc2->isShared());
}

} // namespace
} // namespace kj
24 changes: 24 additions & 0 deletions c++/src/kj/async.c++
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "one-of.h"
#include "function.h"
#include "list.h"
#include "map.h"
#include <deque>
#include <atomic>

Expand Down Expand Up @@ -1961,6 +1962,29 @@ void WaitScope::cancelAllDetached() {
loop.cancelAllDetached();
}

struct EventLoop::LocalMap {
kj::HashMap<const void*, kj::Own<void>> map;
};

void* EventLoop::getLocal(const void* key, kj::Own<void>(*allocate)()) {
EventLoop* loop = threadLocalEventLoop;
KJ_REQUIRE(loop != nullptr, "there is no current EventLoop in this thread");

LocalMap* localMap;
KJ_IF_SOME(m, loop->localMap) {
localMap = m;
} else {
localMap = loop->localMap.emplace(kj::heap<LocalMap>());
}

return localMap->map.findOrCreate(key, [&]() -> decltype(localMap->map)::Entry {
return {
.key = key,
.value = allocate()
};
}).get();
}

namespace _ { // private

static kj::CanceledException fiberCanceledException() {
Expand Down
28 changes: 28 additions & 0 deletions c++/src/kj/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ struct PromiseFulfillerPair;
template <typename Func>
class FunctionParam;

template <typename T>
class EventLoopLocal;

template <typename Func, typename T>
using PromiseForResult = _::ReducePromises<_::ReturnType<Func, T>>;
// Evaluates to the type of Promise for the result of calling functor type Func with parameter type
Expand Down Expand Up @@ -1246,6 +1249,10 @@ class EventLoop {
kj::Maybe<Own<Executor>> executor;
// Allocated the first time getExecutor() is requested, making cross-thread request possible.

struct LocalMap;
kj::Maybe<Own<LocalMap>> localMap;
// For EventLoopLocal. Allocated separately to avoid including HashMap here.

Own<TaskSet> daemons;

_::Event* currentlyFiring = nullptr;
Expand All @@ -1258,6 +1265,8 @@ class EventLoop {
void wait();
void poll();

static void* getLocal(const void* key, kj::Own<void>(*allocate)());

friend void _::detach(kj::Promise<void>&& promise);
friend void _::waitImpl(_::OwnPromiseNode&& node, _::ExceptionOrValue& result,
WaitScope& waitScope, SourceLocation location);
Expand All @@ -1270,6 +1279,8 @@ class EventLoop {
friend class _::FiberBase;
friend class _::FiberStack;
friend ArrayPtr<void* const> getAsyncTrace(ArrayPtr<void*> space);
template <typename T>
friend class EventLoopLocal;
};

class WaitScope {
Expand Down Expand Up @@ -1354,6 +1365,23 @@ class WaitScope {
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope, SourceLocation location);
};

template <typename T>
class EventLoopLocal {
// Like thread-local storage, but attached to the current EventLoop instead. Value is
// default-initialized on first access and then destroyed when the EventLoop is destroyed.
//
// EventLoopLocal MUST be declared as a global or static variable. It cannot be allocated
// dynamically at runtime.
public:
T* get() const {
return static_cast<T*>(EventLoop::getLocal(this,
[]() -> kj::Own<void> { return kj::heap<T>(); }));
}

T& operator*() const { return *get(); }
T* operator->() const { return get(); }
};

} // namespace kj

#define KJ_ASYNC_H_INCLUDED
Expand Down

0 comments on commit f6fb3b1

Please sign in to comment.