Skip to content

Commit

Permalink
PLT-9070 - Marconi components; further generalisation of index runnin…
Browse files Browse the repository at this point in the history
…g machinery (#285)

* PLT-9070 - Allow monadic preprocessing

* PLT-9070 - Loosen runEmitterAndConsumer arg reqs

* PLT-9070 - Generalise and expose stream emitter

* PLT-9070 - Loosen streamEmitter a bit more

* PLT-9070 - More experimental changes

* PLT-9070 - More experimentation with stateful streaming

* PLT-9070 - Removed the transformer wackiness

* PLT-9070 - Remove specialised config dependency from streamEmitter

* PLT-9070 - More messy experimenting

* PLT-9070 - Use optparse-applicative-fork

* PLT-9070 - Try to dedupe the streaming changes

* PLT-9070 - Added accidentally removed comment back in

* PLT-9070 - Tiny comments

* PLT-9070 - optparse-applicative-fork in marconi-chain-index-legacy

* PLT-9070 - Updated golden files

* PLT-9070 - Missed some golden files

* PLT-9070 - Dummy file for missing required folder

* PLT-9070 - Added golden files back in

* PLT-9070 - Moved the golden files into the right folder

* Revert "PLT-9070 - Moved the golden files into the right folder"

This reverts commit c3460fe.

* Revert "PLT-9070 - Added golden files back in"

This reverts commit 644a50b.

* Revert "PLT-9070 - Dummy file for missing required folder"

This reverts commit 97d9c73.

* Revert "PLT-9070 - Missed some golden files"

This reverts commit adc4c04.

* Revert "PLT-9070 - Updated golden files"

This reverts commit 4cea62d.

* Revert "PLT-9070 - optparse-applicative-fork in marconi-chain-index-legacy"

This reverts commit 3aa082b.

* PLT-9070 - Remove dependency on optparse-applicative-fork

* PLT-9070 - Removed unneeded language ext
  • Loading branch information
willjgould authored Jan 11, 2024
1 parent 3944363 commit 96c5bef
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
66 changes: 42 additions & 24 deletions marconi-cardano-core/src/Marconi/Cardano/Core/Runner.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ module Marconi.Cardano.Core.Runner (
-- * Runner
runIndexerOnChainSync,
runIndexerOnSnapshot,
runEmitterAndConsumer,

-- * Emitters
streamEmitter,

-- ** Runner Config
RunIndexerConfig (RunIndexerConfig),
Expand Down Expand Up @@ -133,8 +137,9 @@ runIndexerOnChainSync
runIndexerOnChainSync config indexer = do
void $
runEmitterAndConsumer
(eventPreprocessing ^. runIndexerExtractTipDistance)
(eventPreprocessing ^. runIndexerExtractBlockNo)
securityParam
eventPreprocessing
(chainSyncEventEmitter config indexer)
Core.CloseOn
where
Expand All @@ -155,9 +160,10 @@ runIndexerOnSnapshot
-> IO (Concurrent.MVar (indexer event))
runIndexerOnSnapshot config indexer stream =
runEmitterAndConsumer
(eventPreprocessing ^. runIndexerExtractTipDistance)
(eventPreprocessing ^. runIndexerExtractBlockNo)
securityParam
eventPreprocessing
(streamBlockEventEmitter config indexer stream)
(streamEmitter (eventPreprocessing ^. runIndexerPreprocessEvent) securityParam indexer stream)
Core.CloseOff
where
securityParam = Lens.view runIndexerOnSnapshotConfigSecurityParam config
Expand All @@ -179,14 +185,18 @@ runEmitterAndConsumer
, Core.IsIndex (ExceptT Core.IndexerError IO) event indexer
, Core.Closeable IO indexer
)
=> SecurityParam
-> RunIndexerEventPreprocessing rawEvent event
=> (event -> Maybe Word)
-- ^ A tip extraction function
-> (event -> Maybe C.BlockNo)
-- ^ A block extraction function
-> SecurityParam
-> IO (EventEmitter indexer event a)
-> Core.CloseSwitch
-> IO (Concurrent.MVar (indexer event))
runEmitterAndConsumer
tipExtractor
blockExtractor
securityParam
eventPreprocessing
eventEmitter
closeSwitch =
do
Expand All @@ -197,7 +207,11 @@ runEmitterAndConsumer
where
consumer queue indexerMVar = do
Core.processQueue
(stablePointComputation securityParam eventPreprocessing)
( stablePointComputation
tipExtractor
blockExtractor
securityParam
)
Map.empty
queue
indexerMVar
Expand Down Expand Up @@ -245,30 +259,31 @@ chainSyncEventEmitter
instead of consuming the stream a caller will consume the created queue.
The reason behind this is to provide the same interface as 'chainSyncEventEmitter'.
-}
streamBlockEventEmitter
streamEmitter
:: (Core.Point event ~ C.ChainPoint)
=> RunIndexerOnSnapshotConfig BlockEvent event
=> (pre -> [Core.ProcessedInput C.ChainPoint event])
-- ^ A preprocessing function
-> SecurityParam
-> indexer event
-> S.Stream (S.Of BlockEvent) IO ()
-> S.Stream (S.Of pre) IO ()
-> IO (EventEmitter indexer event ())
streamBlockEventEmitter config indexer stream = do
streamEmitter processEvent securityParam indexer stream = do
queue <- STM.newTBQueueIO $ fromIntegral securityParam
indexerMVar <- Concurrent.newMVar indexer
let processEvent = eventProcessing ^. runIndexerPreprocessEvent
emitEvents = mkEventStream processEvent queue stream
let emitEvents = mkEventStream processEvent queue stream
pure EventEmitter{queue, indexerMVar, emitEvents}
where
securityParam = Lens.view runIndexerOnSnapshotConfigSecurityParam config
eventProcessing = Lens.view runIndexerOnSnapshotConfigEventProcessing config

stablePointComputation
:: SecurityParam
-> RunIndexerEventPreprocessing rawEvent event
:: (event -> Maybe Word)
-- ^ A tip extraction function
-> (event -> Maybe C.BlockNo)
-- ^ A block extraction function
-> SecurityParam
-> Core.Timed C.ChainPoint (Maybe event)
-> State (Map C.BlockNo C.ChainPoint) (Maybe C.ChainPoint)
stablePointComputation securityParam preprocessing (Core.Timed point event) = do
let distanceM = preprocessing ^. runIndexerExtractTipDistance =<< event
blockNoM = preprocessing ^. runIndexerExtractBlockNo =<< event
stablePointComputation tipExtractor blockExtractor securityParam (Core.Timed point event) = do
let distanceM = tipExtractor =<< event
blockNoM = blockExtractor =<< event
case (distanceM, blockNoM) of
(Just distance, Just blockNo) ->
if distance > fromIntegral securityParam
Expand All @@ -290,8 +305,8 @@ getBlockNo (C.BlockInMode block _eraInMode) =

-- | Event preprocessing, to ease the coordinator work
mkEventStream
:: (inputEvent -> [Core.ProcessedInput (Core.Point inputEvent) outputEvent])
-> STM.TBQueue (Core.ProcessedInput (Core.Point inputEvent) outputEvent)
:: (inputEvent -> [Core.ProcessedInput (Core.Point outputEvent) outputEvent])
-> STM.TBQueue (Core.ProcessedInput (Core.Point outputEvent) outputEvent)
-> S.Stream (S.Of inputEvent) IO r
-> IO r
mkEventStream processEvent q =
Expand Down Expand Up @@ -321,7 +336,10 @@ withDistanceAndTipPreprocessor =
getDistance _ = Nothing
blockNoFromBlockEvent (TipAndBlock _ (Just event)) = Just . getBlockNo . blockInMode $ getEvent event
blockNoFromBlockEvent _ = Nothing
in RunIndexerEventPreprocessing extractChainTipAndAddDistance blockNoFromBlockEvent getDistance
in RunIndexerEventPreprocessing
extractChainTipAndAddDistance
blockNoFromBlockEvent
getDistance

withNoPreprocessor :: RunIndexerEventPreprocessing (ChainSyncEvent BlockEvent) BlockEvent
withNoPreprocessor =
Expand Down
1 change: 0 additions & 1 deletion marconi-cardano-indexers/test-lib/Test/Integration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ unboundedValidityRange :: (C.TxValidityLowerBound C.BabbageEra, C.TxValidityUppe
unboundedValidityRange = (C.TxValidityNoLowerBound, C.TxValidityNoUpperBound C.ValidityNoUpperBoundInBabbageEra)

{- Transaction operations -}

validateAndSubmitTx
:: (MonadIO m)
=> C.LocalNodeConnectInfo C.CardanoMode
Expand Down

0 comments on commit 96c5bef

Please sign in to comment.