From 1955ad2ae1b17590facc35228719fcc2e8b432a4 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Thu, 3 Oct 2024 19:45:32 +0200 Subject: [PATCH] Enqueue all fibers from evloop at once Fixes a bunch of issues caused by evloop#run enqueueing fibers while we're running the evloop, which led to many enqueues for ST that could be replaced with a single bulk enqueue (simpler, faster) as well as properly handling trying to wake up a thread for MT (and not try to interrupt the currently running evloop, or no wakeup because a spinning thread is running the evloop, ...) --- src/core_ext/event_loop.cr | 10 ++++++ src/isolated.cr | 23 ++++++++++--- src/multi_threaded.cr | 34 ++++++++++--------- src/multi_threaded/scheduler.cr | 59 +++++++++++++++++++++++---------- src/single_threaded.cr | 30 ++++++++++++----- 5 files changed, 110 insertions(+), 46 deletions(-) diff --git a/src/core_ext/event_loop.cr b/src/core_ext/event_loop.cr index 13624b9..adab47d 100644 --- a/src/core_ext/event_loop.cr +++ b/src/core_ext/event_loop.cr @@ -5,6 +5,16 @@ abstract class Crystal::EventLoop end end +{% if Crystal.has_constant?(:Evented) %} + abstract class Crystal::Evented::EventLoop + @[AlwaysInline] + def run(runnables : ExecutionContext::Queue*, blocking : Bool) : Bool + system_run(blocking) { |fiber| runnables.value.push(fiber) } + true + end + end +{% end %} + {% if flag?(:unix) && flag?(:evloop_libevent) %} class Crystal::LibEvent::EventLoop # Create a new resume event for a fiber. diff --git a/src/isolated.cr b/src/isolated.cr index 2fa510e..42674ce 100644 --- a/src/isolated.cr +++ b/src/isolated.cr @@ -103,21 +103,36 @@ module ExecutionContext # wait on the evloop @waiting.set(true, :release) + queue = Queue.new + loop do - # check parallel enqueue first (on second iteration checks if evloop - # enqueued) + # check for parallel enqueue if @enqueued.get(:acquire) @waiting.set(false, :release) - @enqueued.set(false, :relaxed) + @enqueued.set(false, :release) Crystal.trace :sched, "resume" return end - unless @event_loop.run(blocking: true) + if @event_loop.run(pointerof(queue), blocking: true) + if fiber = queue.pop? + unless fiber == @main_fiber && queue.empty? + raise RuntimeError.new("ERROR: concurrency is disabled in isolated contexts") + end + @waiting.set(false, :release) + @enqueued.set(false, :release) + Crystal.trace :sched, "resume" + return + else + # the evloop got interrupted: restart + next + end + else # evloop doesn't wait when empty (e.g. libevent) break end end + @waiting.set(false, :relaxed) # empty evloop: park the thread @mutex.synchronize do diff --git a/src/multi_threaded.cr b/src/multi_threaded.cr index b132fc6..cb31122 100644 --- a/src/multi_threaded.cr +++ b/src/multi_threaded.cr @@ -168,7 +168,7 @@ module ExecutionContext @condition.wait(@mutex) - @parked.sub(1, :acquire_release) + # we don't decrement @parked because #wake_scheduler did Crystal.trace :sched, "wakeup" end @@ -183,15 +183,14 @@ module ExecutionContext # parking when there is a parallel cross context enqueue! protected def wake_scheduler : Nil # another thread is spinning: nothing to do (it shall notice the enqueue) - return if @spinning.get(:acquire) > 0 + if @spinning.get(:relaxed) > 0 + return + end # try to interrupt a thread waiting on the event loop # - # FIXME: don't interrupt ourselves (i.e. current thread is running the - # eventloop...) - # # OPTIMIZE: with one eventloop per execution context, we might prefer to - # wakeup a parked thread *before* interrupting the event loop. + # wakeup a parked thread *before* interrupting the event loop? if @event_loop_lock.get(:relaxed) @event_loop.interrupt return @@ -206,14 +205,20 @@ module ExecutionContext # the current thread if @parked.get(:acquire) > 0 @mutex.synchronize do - # OPTIMIZE: relaxed atomics should be enough (we're inside a mutex - # that shall already deal with memory order + # OPTIMIZE: relaxed atomic should be enough return if @parked.get(:acquire) == 0 - return if @spinning.get(:acquire) > 0 + + # OPTIMIZE: don't unpark if there are enough spinning threads + return if @spinning.get(:relaxed) > 0 # increase the number of spinning threads _now_ to avoid multiple # threads from trying to wakeup multiple threads at the same time - @spinning.add(1, :acquire_release) + # + # we must also decrement the number of parked threads because another + # thread could lock the mutex and increment @spinning again before the + # signaled thread is resumed (oops) + spinning = @spinning.add(1, :acquire_release) + parked = @parked.sub(1, :acquire_release) # wakeup a thread @condition.signal @@ -222,8 +227,8 @@ module ExecutionContext end # shall we start another thread? - # no need for atomics, the values shall be rather stable ovber time and - # we check them again inside the mutex. + # no need for atomics, the values shall be rather stable over time and we + # check them again inside the mutex. return if @threads.size == @size.end @mutex.synchronize do @@ -234,11 +239,10 @@ module ExecutionContext end end - protected def lock_evloop? : Bool + protected def lock_evloop?(& : -> Bool) : Bool if @event_loop_lock.swap(true, :acquire) == false begin - yield @event_loop - true + yield ensure @event_loop_lock.set(false, :release) end diff --git a/src/multi_threaded/scheduler.cr b/src/multi_threaded/scheduler.cr index 4d9f7cf..172d24e 100644 --- a/src/multi_threaded/scheduler.cr +++ b/src/multi_threaded/scheduler.cr @@ -46,6 +46,21 @@ module ExecutionContext @execution_context.wake_scheduler unless @execution_context.capacity == 1 end + # Enqueue a list of fibers in a single operation and returns a fiber to + # resume immediately. + # + # This is called after running the event loop for example. + private def enqueue(queue : Queue*) : Fiber? + if fiber = queue.value.pop? + Crystal.trace :sched, "enqueue", size: queue.value.size, fiber: fiber + unless queue.value.empty? + @runnables.bulk_push(queue) + @execution_context.wake_scheduler + end + fiber + end + end + protected def reschedule : Nil Crystal.trace :sched, "reschedule" if fiber = quick_dequeue? @@ -100,6 +115,7 @@ module ExecutionContext private def try_steal? : Fiber? @execution_context.steal do |other| if other == self + # no need to steal from ourselves next end @@ -142,19 +158,22 @@ module ExecutionContext end private def find_next_runnable(&) : Nil + queue = Queue.new + # nothing to do: start spinning spinning do - if @execution_context.lock_evloop? { @event_loop.run(blocking: false) } - yield @runnables.get? - end - yield @global_queue.grab?(@runnables, divisor: @execution_context.size) + if @execution_context.lock_evloop? { @event_loop.run(pointerof(queue), blocking: false) } + spin_stop + yield enqueue(pointerof(queue)) + end + yield try_steal? end # wait on the event loop for events and timers to activate - @execution_context.lock_evloop? do + result = @execution_context.lock_evloop? do @waiting = true # there is a time window between stop spinning and start waiting @@ -163,22 +182,29 @@ module ExecutionContext # which may block for a long time: yield @global_queue.grab?(@runnables, divisor: @execution_context.size) - if @event_loop.run(blocking: true) - # the event loop enqueud a fiber or was interrupted: restart - return - end + # block on the event loop until an event is ready or the loop is + # interrupted + @event_loop.run(pointerof(queue), blocking: true) ensure @waiting = false end + if result + yield enqueue(pointerof(queue)) + + # the event loop was interrupted: restart + return + else + # evloop doesn't wait when empty (e.g. libevent) + end + # no runnable fiber, no event in the event loop or another thread is - # already running the event loop: go into deep sleep until another - # scheduler or another context enqueues a fiber + # already running it: go into deep sleep until another scheduler or + # another context enqueues a fiber @execution_context.park_thread do - # by the time we acquired the lock, another thread may have - # enqueued fiber(s) and already tried to wakeup a thread (race). - # we don't check the scheduler's local queue nor its event loop - # (both are empty) + # by the time we acquired the lock, another thread may have enqueued + # fiber(s) and already tried to wakeup a thread (race). we don't check + # the scheduler's local queue nor its event loop (both are empty) yield @global_queue.unsafe_grab?(@runnables, divisor: @execution_context.size) # OPTIMIZE: may hold the lock for a while (increasing with threads) @@ -197,9 +223,6 @@ module ExecutionContext # OPTIMIZE: skip spinning if spinning >= running/2 private def spinning(&) - # we could avoid spinning with MT:1 but another context could try to - # enqueue... maybe keep a counter of execution contexts? - # return if @execution_context.size == 1 spin_start 4.times do |iter| diff --git a/src/single_threaded.cr b/src/single_threaded.cr index a2906f1..94a4362 100644 --- a/src/single_threaded.cr +++ b/src/single_threaded.cr @@ -97,6 +97,13 @@ module ExecutionContext end end + private def enqueue(queue : Queue*) : Fiber? + if fiber = queue.value.pop? + @runnables.bulk_push(queue) unless queue.value.empty? + fiber + end + end + protected def reschedule : Nil Crystal.trace :sched, "reschedule" if fiber = quick_dequeue? @@ -151,10 +158,10 @@ module ExecutionContext end # run the event loop to see if any event is activable - if @event_loop.run(blocking: false) - if fiber = @runnables.get? - return fiber - end + queue = Queue.new + + if @event_loop.run(pointerof(queue), blocking: false) + return enqueue(pointerof(queue)) end end @@ -190,12 +197,14 @@ module ExecutionContext end private def find_next_runnable(&) : Nil + queue = Queue.new + # nothing to do: start spinning spinning do yield @global_queue.grab?(@runnables, divisor: 1) - if @event_loop.run(blocking: false) - yield @runnables.get? + if @event_loop.run(pointerof(queue), blocking: false) + yield enqueue(pointerof(queue)) end end @@ -207,9 +216,13 @@ module ExecutionContext # block for a long time): yield @global_queue.grab?(@runnables, divisor: 1) - if @event_loop.run(blocking: true) - # the event loop enqueued a fiber or was interrupted: restart + if @event_loop.run(pointerof(queue), blocking: true) + yield enqueue(pointerof(queue)) + + # the event loop was interrupted: restart return + else + # the event loop doesn't wait when empty: go to park thread end end @@ -288,7 +301,6 @@ module ExecutionContext # queue to try and wakeup the ST thread running in parallel that may be # running, spinning, waiting or parked. private def wake_scheduler : Nil - # return unless @idle return if @spinning.get(:acquire) if @waiting.get(:acquire)