Skip to content

Commit

Permalink
PLT-9364 Exhaust queue before closing (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
berewt authored Jan 14, 2024
1 parent 66be793 commit cc322d6
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions marconi-cardano-core/src/Marconi/Cardano/Core/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import Control.Concurrent qualified as Concurrent
import Control.Concurrent.Async (race_)
import Control.Concurrent.STM (atomically, check, isEmptyTBQueue)
import Control.Concurrent.STM qualified as STM
import Control.Exception (catch, finally)
import Control.Exception (catch)
import Control.Lens ((^.))
import Control.Lens qualified as Lens
import Control.Monad (void)
Expand Down Expand Up @@ -201,10 +201,14 @@ runEmitterAndConsumer
closeSwitch =
do
EventEmitter{queue, indexerMVar, emitEvents} <- eventEmitter
(emitEvents `finally` atomically (check =<< isEmptyTBQueue queue))
`race_` consumer queue indexerMVar
emitter emitEvents queue `race_` consumer queue indexerMVar
pure indexerMVar
where
emitter emitEvents queue = case closeSwitch of
Core.CloseOn -> void emitEvents
Core.CloseOff -> do
void emitEvents
atomically $ check =<< isEmptyTBQueue queue
consumer queue indexerMVar = do
Core.processQueue
( stablePointComputation
Expand Down

0 comments on commit cc322d6

Please sign in to comment.