diff --git a/freer-extras/src/Control/Monad/Freer/Extras/Beam.hs b/freer-extras/src/Control/Monad/Freer/Extras/Beam.hs index cb046f6bee..0c6109981d 100644 --- a/freer-extras/src/Control/Monad/Freer/Extras/Beam.hs +++ b/freer-extras/src/Control/Monad/Freer/Extras/Beam.hs @@ -3,6 +3,7 @@ {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} @@ -116,6 +117,12 @@ data BeamEffect r where :: [BeamEffect ()] -> BeamEffect () +instance Monoid (BeamEffect ()) where + mempty = Combined [] + +instance Semigroup (BeamEffect ()) where + a <> b = Combined [a, b] + handleBeam :: forall effs. ( LastMember IO effs diff --git a/nix/pkgs/haskell/materialized-darwin/.plan.nix/plutus-chain-index.nix b/nix/pkgs/haskell/materialized-darwin/.plan.nix/plutus-chain-index.nix index 53a1d52448..d111dee5b9 100644 --- a/nix/pkgs/haskell/materialized-darwin/.plan.nix/plutus-chain-index.nix +++ b/nix/pkgs/haskell/materialized-darwin/.plan.nix/plutus-chain-index.nix @@ -44,6 +44,7 @@ (hsPkgs."beam-migrate" or (errorHandler.buildDepError "beam-migrate")) (hsPkgs."cardano-api" or (errorHandler.buildDepError "cardano-api")) (hsPkgs."contra-tracer" or (errorHandler.buildDepError "contra-tracer")) + (hsPkgs."clock" or (errorHandler.buildDepError "clock")) (hsPkgs."data-default" or (errorHandler.buildDepError "data-default")) (hsPkgs."freer-simple" or (errorHandler.buildDepError "freer-simple")) (hsPkgs."iohk-monitoring" or (errorHandler.buildDepError "iohk-monitoring")) @@ -59,6 +60,7 @@ buildable = true; modules = [ "Plutus/ChainIndex/App" + "Plutus/ChainIndex/Events" "Plutus/ChainIndex/CommandLine" "Plutus/ChainIndex/Config" "Plutus/ChainIndex/Lib" diff --git a/nix/pkgs/haskell/materialized-linux/.plan.nix/plutus-chain-index.nix b/nix/pkgs/haskell/materialized-linux/.plan.nix/plutus-chain-index.nix index 53a1d52448..d111dee5b9 100644 --- a/nix/pkgs/haskell/materialized-linux/.plan.nix/plutus-chain-index.nix +++ b/nix/pkgs/haskell/materialized-linux/.plan.nix/plutus-chain-index.nix @@ -44,6 +44,7 @@ (hsPkgs."beam-migrate" or (errorHandler.buildDepError "beam-migrate")) (hsPkgs."cardano-api" or (errorHandler.buildDepError "cardano-api")) (hsPkgs."contra-tracer" or (errorHandler.buildDepError "contra-tracer")) + (hsPkgs."clock" or (errorHandler.buildDepError "clock")) (hsPkgs."data-default" or (errorHandler.buildDepError "data-default")) (hsPkgs."freer-simple" or (errorHandler.buildDepError "freer-simple")) (hsPkgs."iohk-monitoring" or (errorHandler.buildDepError "iohk-monitoring")) @@ -59,6 +60,7 @@ buildable = true; modules = [ "Plutus/ChainIndex/App" + "Plutus/ChainIndex/Events" "Plutus/ChainIndex/CommandLine" "Plutus/ChainIndex/Config" "Plutus/ChainIndex/Lib" diff --git a/nix/pkgs/haskell/materialized-windows/.plan.nix/plutus-chain-index.nix b/nix/pkgs/haskell/materialized-windows/.plan.nix/plutus-chain-index.nix index 53a1d52448..d111dee5b9 100644 --- a/nix/pkgs/haskell/materialized-windows/.plan.nix/plutus-chain-index.nix +++ b/nix/pkgs/haskell/materialized-windows/.plan.nix/plutus-chain-index.nix @@ -44,6 +44,7 @@ (hsPkgs."beam-migrate" or (errorHandler.buildDepError "beam-migrate")) (hsPkgs."cardano-api" or (errorHandler.buildDepError "cardano-api")) (hsPkgs."contra-tracer" or (errorHandler.buildDepError "contra-tracer")) + (hsPkgs."clock" or (errorHandler.buildDepError "clock")) (hsPkgs."data-default" or (errorHandler.buildDepError "data-default")) (hsPkgs."freer-simple" or (errorHandler.buildDepError "freer-simple")) (hsPkgs."iohk-monitoring" or (errorHandler.buildDepError "iohk-monitoring")) @@ -59,6 +60,7 @@ buildable = true; modules = [ "Plutus/ChainIndex/App" + "Plutus/ChainIndex/Events" "Plutus/ChainIndex/CommandLine" "Plutus/ChainIndex/Config" "Plutus/ChainIndex/Lib" diff --git a/plutus-chain-index-core/src/Plutus/ChainIndex/DbSchema.hs b/plutus-chain-index-core/src/Plutus/ChainIndex/DbSchema.hs index bf4de73cf4..b675ac3e86 100644 --- a/plutus-chain-index-core/src/Plutus/ChainIndex/DbSchema.hs +++ b/plutus-chain-index-core/src/Plutus/ChainIndex/DbSchema.hs @@ -279,11 +279,6 @@ instance HasDbType (RedeemerHash, Redeemer) where toDbValue (hash, redeemer) = RedeemerRow (toDbValue hash) (toDbValue redeemer) fromDbValue (RedeemerRow hash redeemer) = (fromDbValue hash, fromDbValue redeemer) --- instance HasDbType (TxOutRef, TxOut) where --- type DbType (TxOutRef, TxOut) = UtxoRow --- toDbValue (outRef, txOut) = UtxoRow (toDbValue outRef) (toDbValue txOut) --- fromDbValue (UtxoRow outRef txOut) = (fromDbValue outRef, fromDbValue txOut) - instance HasDbType (Credential, TxOutRef) where type DbType (Credential, TxOutRef) = AddressRow toDbValue (cred, outRef) = AddressRow (toDbValue cred) (toDbValue outRef) diff --git a/plutus-chain-index-core/src/Plutus/ChainIndex/Effects.hs b/plutus-chain-index-core/src/Plutus/ChainIndex/Effects.hs index 3da77bb0ee..e6b8c210e2 100644 --- a/plutus-chain-index-core/src/Plutus/ChainIndex/Effects.hs +++ b/plutus-chain-index-core/src/Plutus/ChainIndex/Effects.hs @@ -19,7 +19,7 @@ module Plutus.ChainIndex.Effects( , getTip -- * Control effect , ChainIndexControlEffect(..) - , appendBlock + , appendBlocks , rollback , resumeSync , collectGarbage @@ -77,8 +77,8 @@ makeEffect ''ChainIndexQueryEffect data ChainIndexControlEffect r where - -- | Add a new block to the chain index by giving a new tip and list of tx. - AppendBlock :: ChainSyncBlock -> ChainIndexControlEffect () + -- | Add new blocks to the chain index. + AppendBlocks :: [ChainSyncBlock] -> ChainIndexControlEffect () -- | Roll back to a previous state (previous tip) Rollback :: Point -> ChainIndexControlEffect () diff --git a/plutus-chain-index-core/src/Plutus/ChainIndex/Emulator/Handlers.hs b/plutus-chain-index-core/src/Plutus/ChainIndex/Emulator/Handlers.hs index 0255873d34..22b994c3b3 100644 --- a/plutus-chain-index-core/src/Plutus/ChainIndex/Emulator/Handlers.hs +++ b/plutus-chain-index-core/src/Plutus/ChainIndex/Emulator/Handlers.hs @@ -21,6 +21,7 @@ module Plutus.ChainIndex.Emulator.Handlers( ) where import Control.Lens (at, ix, makeLenses, over, preview, set, to, view, (&)) +import Control.Monad (foldM) import Control.Monad.Freer (Eff, Member, type (~>)) import Control.Monad.Freer.Error (Error, throwError) import Control.Monad.Freer.Extras.Log (LogMsg, logDebug, logError, logWarn) @@ -163,6 +164,31 @@ handleQuery = \case GetTip -> gets (tip . utxoState . view utxoIndex) +appendBlocks :: + forall effs. + ( Member (State ChainIndexEmulatorState) effs + , Member (LogMsg ChainIndexLog) effs + ) + => [ChainSyncBlock] -> Eff effs () +appendBlocks [] = pure () +appendBlocks blocks = do + let + processBlock (utxoIndexState, txs) (Block tip_ transactions) = do + case UtxoState.insert (TxUtxoBalance.fromBlock tip_ (map fst transactions)) utxoIndexState of + Left err -> do + let reason = InsertionFailed err + logError $ Err reason + return (utxoIndexState, txs) + Right InsertUtxoSuccess{newIndex, insertPosition} -> do + logDebug $ InsertionSuccess tip_ insertPosition + return (newIndex, transactions ++ txs) + oldState <- get @ChainIndexEmulatorState + (newIndex, transactions) <- foldM processBlock (view utxoIndex oldState, []) blocks + put $ oldState + & set utxoIndex newIndex + & over diskState + (mappend $ foldMap (\(tx, opt) -> if tpoStoreTx opt then DiskState.fromTx tx else mempty) transactions) + handleControl :: forall effs. ( Member (State ChainIndexEmulatorState) effs @@ -172,19 +198,7 @@ handleControl :: => ChainIndexControlEffect ~> Eff effs handleControl = \case - AppendBlock (Block tip_ transactions) -> do - oldState <- get @ChainIndexEmulatorState - case UtxoState.insert (TxUtxoBalance.fromBlock tip_ (map fst transactions)) (view utxoIndex oldState) of - Left err -> do - let reason = InsertionFailed err - logError $ Err reason - throwError reason - Right InsertUtxoSuccess{newIndex, insertPosition} -> do - put $ oldState - & set utxoIndex newIndex - & over diskState - (mappend $ foldMap (\(tx, opt) -> if tpoStoreTx opt then DiskState.fromTx tx else mempty) transactions) - logDebug $ InsertionSuccess tip_ insertPosition + AppendBlocks blocks -> appendBlocks blocks Rollback tip_ -> do oldState <- get @ChainIndexEmulatorState case TxUtxoBalance.rollback tip_ (view utxoIndex oldState) of diff --git a/plutus-chain-index-core/src/Plutus/ChainIndex/Handlers.hs b/plutus-chain-index-core/src/Plutus/ChainIndex/Handlers.hs index 99cb1f50fb..bae3150b40 100644 --- a/plutus-chain-index-core/src/Plutus/ChainIndex/Handlers.hs +++ b/plutus-chain-index-core/src/Plutus/ChainIndex/Handlers.hs @@ -21,10 +21,10 @@ module Plutus.ChainIndex.Handlers import Cardano.Api qualified as C import Control.Applicative (Const (..)) import Control.Lens (Lens', view) +import Control.Monad (foldM) import Control.Monad.Freer (Eff, Member, type (~>)) import Control.Monad.Freer.Error (Error, throwError) -import Control.Monad.Freer.Extras.Beam (BeamEffect (..), BeamableSqlite, addRowsInBatches, combined, deleteRows, - selectList, selectOne, selectPage, updateRows) +import Control.Monad.Freer.Extras.Beam (BeamEffect (..), BeamableSqlite, combined, selectList, selectOne, selectPage) import Control.Monad.Freer.Extras.Log (LogMsg, logDebug, logError, logWarn) import Control.Monad.Freer.Extras.Pagination (Page (Page), PageQuery (..)) import Control.Monad.Freer.Reader (Reader, ask) @@ -33,7 +33,6 @@ import Data.ByteString (ByteString) import Data.FingerTree qualified as FT import Data.Map qualified as Map import Data.Maybe (catMaybes, fromMaybe, mapMaybe) -import Data.Monoid (Ap (..)) import Data.Proxy (Proxy (..)) import Data.Set qualified as Set import Data.Word (Word64) @@ -234,6 +233,42 @@ getTxoSetAtAddress pageQuery (toDbValue -> cred) = do let page = fmap fromDbValue txOutRefs' pure $ TxosResponse page +appendBlocks :: + forall effs. + ( Member (State ChainIndexState) effs + , Member (Reader Depth) effs + , Member BeamEffect effs + , Member (LogMsg ChainIndexLog) effs + ) + => [ChainSyncBlock] -> Eff effs () +appendBlocks [] = pure () +appendBlocks blocks = do + let + processBlock (utxoIndexState, txs, utxoStates) (Block tip_ transactions) = do + let newUtxoState = TxUtxoBalance.fromBlock tip_ (map fst transactions) + case UtxoState.insert newUtxoState utxoIndexState of + Left err -> do + logError $ Err $ InsertionFailed err + return (utxoIndexState, txs, utxoStates) + Right InsertUtxoSuccess{newIndex, insertPosition} -> do + logDebug $ InsertionSuccess tip_ insertPosition + return (newIndex, transactions ++ txs, newUtxoState : utxoStates) + oldIndex <- get @ChainIndexState + (newIndex, transactions, utxoStates) <- foldM processBlock (oldIndex, [], []) blocks + depth <- ask @Depth + reduceOldUtxoDbEffect <- case UtxoState.reduceBlockCount depth newIndex of + UtxoState.BlockCountNotReduced -> do + put newIndex + pure $ Combined [] + lbcResult -> do + put $ UtxoState.reducedIndex lbcResult + pure $ reduceOldUtxoDb $ UtxoState._usTip $ UtxoState.combinedState lbcResult + combined + [ reduceOldUtxoDbEffect + , insertRows $ foldMap (\(tx, opt) -> if tpoStoreTx opt then fromTx tx else mempty) transactions + , insertUtxoDb (map fst transactions) utxoStates + ] + handleControl :: forall effs. ( Member (State ChainIndexState) effs @@ -245,25 +280,7 @@ handleControl :: => ChainIndexControlEffect ~> Eff effs handleControl = \case - AppendBlock (Block tip_ transactions) -> do - oldIndex <- get @ChainIndexState - let txs = map fst transactions - let newUtxoState = TxUtxoBalance.fromBlock tip_ txs - case UtxoState.insert newUtxoState oldIndex of - Left err -> do - let reason = InsertionFailed err - logError $ Err reason - throwError reason - Right InsertUtxoSuccess{newIndex, insertPosition} -> do - depth <- ask @Depth - case UtxoState.reduceBlockCount depth newIndex of - UtxoState.BlockCountNotReduced -> put newIndex - lbcResult -> do - put $ UtxoState.reducedIndex lbcResult - reduceOldUtxoDb $ UtxoState._usTip $ UtxoState.combinedState lbcResult - insert $ foldMap (\(tx, opt) -> if tpoStoreTx opt then fromTx tx else mempty) transactions - insertUtxoDb txs newUtxoState - logDebug $ InsertionSuccess tip_ insertPosition + AppendBlocks blocks -> appendBlocks blocks Rollback tip_ -> do oldIndex <- get @ChainIndexState case TxUtxoBalance.rollback tip_ oldIndex of @@ -273,10 +290,10 @@ handleControl = \case throwError reason Right RollbackResult{newTip, rolledBackIndex} -> do put rolledBackIndex - rollbackUtxoDb $ tipAsPoint newTip + combined [rollbackUtxoDb $ tipAsPoint newTip] logDebug $ RollbackSuccess newTip ResumeSync tip_ -> do - rollbackUtxoDb tip_ + combined [rollbackUtxoDb tip_] newState <- restoreStateFromDb put newState CollectGarbage -> do @@ -293,47 +310,54 @@ handleControl = \case GetDiagnostics -> diagnostics --- Use a batch size of 400 so that we don't hit the sql too-many-variables +-- Use a batch size of 200 so that we don't hit the sql too-many-variables -- limit. batchSize :: Int -batchSize = 400 - -insertUtxoDb :: - ( Member BeamEffect effs - , Member (Error ChainIndexError) effs - ) - => [ChainIndexTx] - -> UtxoState.UtxoState TxUtxoBalance - -> Eff effs () -insertUtxoDb _ (UtxoState.UtxoState _ TipAtGenesis) = throwError $ InsertionFailed UtxoState.InsertUtxoNoTip -insertUtxoDb txs (UtxoState.UtxoState (TxUtxoBalance outputs inputs) tip) - = insert $ mempty - { tipRows = InsertRows $ catMaybes [toDbValue tip] - , unspentOutputRows = InsertRows $ UnspentOutputRow tipRowId . toDbValue <$> Set.toList outputs - , unmatchedInputRows = InsertRows $ UnmatchedInputRow tipRowId . toDbValue <$> Set.toList inputs +batchSize = 200 + +insertUtxoDb + :: [ChainIndexTx] + -> [UtxoState.UtxoState TxUtxoBalance] + -> BeamEffect () +insertUtxoDb txs utxoStates = + let + go acc (UtxoState.UtxoState _ TipAtGenesis) = acc + go (tipRows, unspentRows, unmatchedRows) (UtxoState.UtxoState (TxUtxoBalance outputs inputs) tip) = + let + tipRowId = TipRowId (toDbValue (tipSlot tip)) + newTips = catMaybes [toDbValue tip] + newUnspent = UnspentOutputRow tipRowId . toDbValue <$> Set.toList outputs + newUnmatched = UnmatchedInputRow tipRowId . toDbValue <$> Set.toList inputs + in + ( newTips ++ tipRows + , newUnspent ++ unspentRows + , newUnmatched ++ unmatchedRows) + (tr, ur, umr) = foldl go ([] :: [TipRow], [] :: [UnspentOutputRow], [] :: [UnmatchedInputRow]) utxoStates + txOuts = concatMap txOutsWithRef txs + in insertRows $ mempty + { tipRows = InsertRows tr + , unspentOutputRows = InsertRows ur + , unmatchedInputRows = InsertRows umr , utxoOutRefRows = InsertRows $ (\(txOut, txOutRef) -> UtxoRow (toDbValue txOutRef) (toDbValue txOut)) <$> txOuts } - where - txOuts = concatMap txOutsWithRef txs - tipRowId = TipRowId (toDbValue (tipSlot tip)) -reduceOldUtxoDb :: Member BeamEffect effs => Tip -> Eff effs () -reduceOldUtxoDb TipAtGenesis = pure () -reduceOldUtxoDb (Tip (toDbValue -> slot) _ _) = do +reduceOldUtxoDb :: Tip -> BeamEffect () +reduceOldUtxoDb TipAtGenesis = Combined [] +reduceOldUtxoDb (Tip (toDbValue -> slot) _ _) = Combined -- Delete all the tips before 'slot' - deleteRows $ delete (tipRows db) (\row -> _tipRowSlot row <. val_ slot) + [ DeleteRows $ delete (tipRows db) (\row -> _tipRowSlot row <. val_ slot) -- Assign all the older utxo changes to 'slot' - updateRows $ update + , UpdateRows $ update (unspentOutputRows db) (\row -> _unspentOutputRowTip row <-. TipRowId (val_ slot)) (\row -> unTipRowId (_unspentOutputRowTip row) <. val_ slot) - updateRows $ update + , UpdateRows $ update (unmatchedInputRows db) (\row -> _unmatchedInputRowTip row <-. TipRowId (val_ slot)) (\row -> unTipRowId (_unmatchedInputRowTip row) <. val_ slot) -- Among these older changes, delete the matching input/output pairs -- We're deleting only the outputs here, the matching input is deleted by a trigger (See Main.hs) - deleteRows $ delete + , DeleteRows $ delete (utxoOutRefRows db) (\utxoRow -> exists_ (filter_ @@ -341,7 +365,7 @@ reduceOldUtxoDb (Tip (toDbValue -> slot) _ _) = do (unTipRowId (_unmatchedInputRowTip input) ==. val_ slot) &&. (_utxoRowOutRef utxoRow ==. _unmatchedInputRowOutRef input)) (all_ (unmatchedInputRows db)))) - deleteRows $ delete + , DeleteRows $ delete (unspentOutputRows db) (\output -> unTipRowId (_unspentOutputRowTip output) ==. val_ slot &&. exists_ (filter_ @@ -349,20 +373,22 @@ reduceOldUtxoDb (Tip (toDbValue -> slot) _ _) = do (unTipRowId (_unmatchedInputRowTip input) ==. val_ slot) &&. (_unspentOutputRowOutRef output ==. _unmatchedInputRowOutRef input)) (all_ (unmatchedInputRows db)))) + ] -rollbackUtxoDb :: Member BeamEffect effs => Point -> Eff effs () -rollbackUtxoDb PointAtGenesis = deleteRows $ delete (tipRows db) (const (val_ True)) -rollbackUtxoDb (Point (toDbValue -> slot) _) = do - deleteRows $ delete (tipRows db) (\row -> _tipRowSlot row >. val_ slot) - deleteRows $ delete (utxoOutRefRows db) +rollbackUtxoDb :: Point -> BeamEffect () +rollbackUtxoDb PointAtGenesis = DeleteRows $ delete (tipRows db) (const (val_ True)) +rollbackUtxoDb (Point (toDbValue -> slot) _) = Combined + [ DeleteRows $ delete (tipRows db) (\row -> _tipRowSlot row >. val_ slot) + , DeleteRows $ delete (utxoOutRefRows db) (\utxoRow -> exists_ (filter_ (\output -> (unTipRowId (_unspentOutputRowTip output) >. val_ slot) &&. (_utxoRowOutRef utxoRow ==. _unspentOutputRowOutRef output)) (all_ (unspentOutputRows db)))) - deleteRows $ delete (unspentOutputRows db) (\row -> unTipRowId (_unspentOutputRowTip row) >. val_ slot) - deleteRows $ delete (unmatchedInputRows db) (\row -> unTipRowId (_unmatchedInputRowTip row) >. val_ slot) + , DeleteRows $ delete (unspentOutputRows db) (\row -> unTipRowId (_unspentOutputRowTip row) >. val_ slot) + , DeleteRows $ delete (unmatchedInputRows db) (\row -> unTipRowId (_unmatchedInputRowTip row) >. val_ slot) + ] restoreStateFromDb :: Member BeamEffect effs => Eff effs ChainIndexState restoreStateFromDb = do @@ -392,8 +418,8 @@ instance Semigroup (InsertRows te) where instance BeamableSqlite t => Monoid (InsertRows (TableEntity t)) where mempty = InsertRows [] -insert :: Member BeamEffect effs => Db InsertRows -> Eff effs () -insert = getAp . getConst . zipTables Proxy (\tbl (InsertRows rows) -> Const $ Ap $ addRowsInBatches batchSize tbl rows) db +insertRows :: Db InsertRows -> BeamEffect () +insertRows = getConst . zipTables Proxy (\tbl (InsertRows rows) -> Const $ AddRowsInBatches batchSize tbl rows) db fromTx :: ChainIndexTx -> Db InsertRows fromTx tx = mempty diff --git a/plutus-chain-index-core/test/Plutus/ChainIndex/Emulator/HandlersSpec.hs b/plutus-chain-index-core/test/Plutus/ChainIndex/Emulator/HandlersSpec.hs index cf101e0fe4..c8afa523f2 100644 --- a/plutus-chain-index-core/test/Plutus/ChainIndex/Emulator/HandlersSpec.hs +++ b/plutus-chain-index-core/test/Plutus/ChainIndex/Emulator/HandlersSpec.hs @@ -22,7 +22,7 @@ import Data.Sequence (Seq) import Data.Set qualified as S import Generators qualified as Gen import Ledger (outValue) -import Plutus.ChainIndex (ChainIndexLog, Page (pageItems), PageQuery (PageQuery), appendBlock, unspentTxOutFromRef, +import Plutus.ChainIndex (ChainIndexLog, Page (pageItems), PageQuery (PageQuery), appendBlocks, unspentTxOutFromRef, utxoSetWithCurrency) import Plutus.ChainIndex.Api (UtxosResponse (UtxosResponse)) import Plutus.ChainIndex.ChainIndexError (ChainIndexError) @@ -61,7 +61,7 @@ eachTxOutRefAtAddressShouldBeUnspentSpec = property $ do result <- liftIO $ runEmulatedChainIndex mempty $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] utxoSetFromBlockAddrs block case result of @@ -77,7 +77,7 @@ eachTxOutRefAtAddressShouldHaveTxOutSpec = property $ do result <- liftIO $ runEmulatedChainIndex mempty $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] utxos <- utxoSetFromBlockAddrs block traverse unspentTxOutFromRef (concat utxos) @@ -99,7 +99,7 @@ eachTxOutRefWithCurrencyShouldBeUnspentSpec = property $ do result <- liftIO $ runEmulatedChainIndex mempty $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] forM assetClasses $ \ac -> do let pq = PageQuery 200 Nothing @@ -119,7 +119,7 @@ cantRequestForTxOutRefsWithAdaSpec = property $ do result <- liftIO $ runEmulatedChainIndex mempty $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] let pq = PageQuery 200 Nothing UtxosResponse _ utxoRefs <- utxoSetWithCurrency pq (AssetClass ("", "")) diff --git a/plutus-chain-index-core/test/Plutus/ChainIndex/HandlersSpec.hs b/plutus-chain-index-core/test/Plutus/ChainIndex/HandlersSpec.hs index 94bebbaafd..22ee8bee06 100644 --- a/plutus-chain-index-core/test/Plutus/ChainIndex/HandlersSpec.hs +++ b/plutus-chain-index-core/test/Plutus/ChainIndex/HandlersSpec.hs @@ -25,7 +25,7 @@ import Database.SQLite.Simple qualified as Sqlite import Generators qualified as Gen import Hedgehog (MonadTest, Property, assert, failure, forAll, property, (===)) import Ledger (outValue) -import Plutus.ChainIndex (Page (pageItems), PageQuery (PageQuery), RunRequirements (..), appendBlock, citxOutputs, +import Plutus.ChainIndex (Page (pageItems), PageQuery (PageQuery), RunRequirements (..), appendBlocks, citxOutputs, runChainIndexEffects, unspentTxOutFromRef, utxoSetWithCurrency) import Plutus.ChainIndex.Api (UtxosResponse (UtxosResponse)) import Plutus.ChainIndex.DbSchema (checkedSqliteDb) @@ -62,7 +62,7 @@ eachTxOutRefAtAddressShouldBeUnspentSpec = property $ do utxoGroups <- runChainIndexTest $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] utxoSetFromBlockAddrs block S.fromList (concat utxoGroups) === view Gen.txgsUtxoSet state @@ -76,7 +76,7 @@ eachTxOutRefAtAddressShouldHaveTxOutSpec = property $ do utxouts <- runChainIndexTest $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] utxos <- utxoSetFromBlockAddrs block traverse unspentTxOutFromRef (concat utxos) @@ -97,7 +97,7 @@ eachTxOutRefWithCurrencyShouldBeUnspentSpec = property $ do utxoGroups <- runChainIndexTest $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] forM assetClasses $ \ac -> do let pq = PageQuery 200 Nothing @@ -115,7 +115,7 @@ cantRequestForTxOutRefsWithAdaSpec = property $ do utxoRefs <- runChainIndexTest $ do -- Append the generated block in the chain index - appendBlock (Block tip (map (, def) block)) + appendBlocks [(Block tip (map (, def) block))] let pq = PageQuery 200 Nothing UtxosResponse _ utxoRefs <- utxoSetWithCurrency pq (AssetClass (Ada.adaSymbol, Ada.adaToken)) diff --git a/plutus-chain-index/plutus-chain-index.cabal b/plutus-chain-index/plutus-chain-index.cabal index 19892edb1f..8f54a814a6 100644 --- a/plutus-chain-index/plutus-chain-index.cabal +++ b/plutus-chain-index/plutus-chain-index.cabal @@ -31,6 +31,7 @@ library import: lang exposed-modules: Plutus.ChainIndex.App + Plutus.ChainIndex.Events Plutus.ChainIndex.CommandLine Plutus.ChainIndex.Config Plutus.ChainIndex.Lib @@ -50,6 +51,7 @@ library beam-migrate -any, cardano-api -any, contra-tracer -any, + clock -any, data-default -any, freer-simple -any, iohk-monitoring -any, diff --git a/plutus-chain-index/src/Plutus/ChainIndex/App.hs b/plutus-chain-index/src/Plutus/ChainIndex/App.hs index 870d8bfd10..f49d96647b 100644 --- a/plutus-chain-index/src/Plutus/ChainIndex/App.hs +++ b/plutus-chain-index/src/Plutus/ChainIndex/App.hs @@ -22,18 +22,18 @@ import Cardano.BM.Configuration.Model qualified as CM import Cardano.BM.Setup (setupTrace_) import Cardano.BM.Trace (Trace) import Control.Concurrent.Async (wait, withAsync) -import Control.Concurrent.STM.TChan (newBroadcastTChanIO) +import Control.Concurrent.STM.TBQueue (newTBQueueIO) import Plutus.ChainIndex.CommandLine (AppConfig (AppConfig, acCLIConfigOverrides, acCommand, acConfigPath, acLogConfigPath, acMinLogLevel), Command (DumpDefaultConfig, DumpDefaultLoggingConfig, StartChainIndex), applyOverrides, cmdWithHelpParser) import Plutus.ChainIndex.Compatibility (fromCardanoBlockNo) import Plutus.ChainIndex.Config qualified as Config -import Plutus.ChainIndex.Lib (defaultChainSyncHandler, getTipSlot, storeFromBlockNo, syncChainIndex, - withRunRequirements, writeChainSyncEventToChan) +import Plutus.ChainIndex.Events (processEventsQueue) +import Plutus.ChainIndex.Lib (getTipSlot, storeChainSyncHandler, storeFromBlockNo, syncChainIndex, withRunRequirements) import Plutus.ChainIndex.Logging qualified as Logging import Plutus.ChainIndex.Server qualified as Server -import Plutus.ChainIndex.SyncStats (SyncLog, convertEventToSyncStats, logProgress) -import Plutus.Monitoring.Util (PrettyObject (PrettyObject), convertLog, runLogEffects) +import Plutus.ChainIndex.SyncStats (SyncLog) +import Plutus.Monitoring.Util (PrettyObject) main :: IO () main = do @@ -77,22 +77,23 @@ runMain logConfig config = do slotNo <- getTipSlot config print slotNo - -- Channel for broadcasting 'ChainSyncEvent's - chan <- newBroadcastTChanIO + -- Queue for processing events + eventsQueue <- newTBQueueIO $ fromIntegral (Config.cicAppendQueueSize config) syncHandler - <- defaultChainSyncHandler runReq + <- storeChainSyncHandler eventsQueue & storeFromBlockNo (fromCardanoBlockNo $ Config.cicStoreFrom config) - & writeChainSyncEventToChan convertEventToSyncStats chan + & pure putStrLn $ "Connecting to the node using socket: " <> Config.cicSocketPath config syncChainIndex config runReq syncHandler (trace :: Trace IO (PrettyObject SyncLog), _) <- setupTrace_ logConfig "chain-index" - withAsync (runLogEffects (convertLog PrettyObject trace) $ logProgress chan) $ \logAsync -> do + withAsync (processEventsQueue trace runReq eventsQueue) $ \processAsync -> do + let port = show (Config.cicPort config) putStrLn $ "Starting webserver on port " <> port putStrLn $ "A Swagger UI for the endpoints are available at " <> "http://localhost:" <> port <> "/swagger/swagger-ui" Server.serveChainIndexQueryServer (Config.cicPort config) runReq - wait logAsync + wait processAsync diff --git a/plutus-chain-index/src/Plutus/ChainIndex/CommandLine.hs b/plutus-chain-index/src/Plutus/ChainIndex/CommandLine.hs index d02c334630..5ef9dc1d51 100644 --- a/plutus-chain-index/src/Plutus/ChainIndex/CommandLine.hs +++ b/plutus-chain-index/src/Plutus/ChainIndex/CommandLine.hs @@ -20,20 +20,22 @@ import Plutus.ChainIndex.Config qualified as Config data CLIConfigOverrides = CLIConfigOverrides - { ccSocketPath :: Maybe String - , ccDbPath :: Maybe String - , ccPort :: Maybe Int - , ccNetworkId :: Maybe Word32 + { ccSocketPath :: Maybe String + , ccDbPath :: Maybe String + , ccPort :: Maybe Int + , ccNetworkId :: Maybe Word32 + , ccAppendQueueSize :: Maybe Int } deriving (Eq, Ord, Show) -- | Apply the CLI soverrides to the 'ChainIndexConfig' applyOverrides :: CLIConfigOverrides -> ChainIndexConfig -> ChainIndexConfig -applyOverrides CLIConfigOverrides{ccSocketPath, ccDbPath, ccPort, ccNetworkId} = +applyOverrides CLIConfigOverrides{ccSocketPath, ccDbPath, ccPort, ccNetworkId, ccAppendQueueSize} = over Config.socketPath (maybe id const ccSocketPath) . over Config.dbPath (maybe id const ccDbPath) . over Config.port (maybe id const ccPort) . over Config.networkId (maybe id (const . Testnet . NetworkMagic) ccNetworkId) + . over Config.appendQueueSize (maybe id const ccAppendQueueSize) -- | Configuration data Command = @@ -65,7 +67,7 @@ optParser = cliConfigOverridesParser :: Parser CLIConfigOverrides cliConfigOverridesParser = - CLIConfigOverrides <$> socketPathParser <*> dbPathParser <*> portParser <*> networkIDParser where + CLIConfigOverrides <$> socketPathParser <*> dbPathParser <*> portParser <*> networkIDParser <*> appendQueueSizeParser where socketPathParser = option (Just <$> str) (long "socket-path" <> value Nothing <> help "Node socket path") dbPathParser = @@ -74,6 +76,8 @@ cliConfigOverridesParser = option (Just <$> auto) (long "port" <> value Nothing <> help "Port") networkIDParser = option (Just <$> auto) (long "network-id" <> value Nothing <> help "Network ID") + appendQueueSizeParser = + option (Just <$> auto) (long "append-queue-size" <> value Nothing <> help "Append queue size") loggingConfigParser :: Parser (Maybe FilePath) loggingConfigParser = diff --git a/plutus-chain-index/src/Plutus/ChainIndex/Config.hs b/plutus-chain-index/src/Plutus/ChainIndex/Config.hs index 95df1ffa4b..31f8e73d1f 100644 --- a/plutus-chain-index/src/Plutus/ChainIndex/Config.hs +++ b/plutus-chain-index/src/Plutus/ChainIndex/Config.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TemplateHaskell #-} @@ -18,7 +19,8 @@ module Plutus.ChainIndex.Config( networkId, securityParam, slotConfig, - storeFrom + storeFrom, + appendQueueSize ) where import Cardano.Api (BlockNo (BlockNo), NetworkId (Mainnet, Testnet)) @@ -31,13 +33,14 @@ import Ouroboros.Network.Magic (NetworkMagic (NetworkMagic)) import Prettyprinter (Pretty (pretty), viaShow, vsep, (<+>)) data ChainIndexConfig = ChainIndexConfig - { cicSocketPath :: String - , cicDbPath :: String - , cicPort :: Int - , cicNetworkId :: NetworkId - , cicSecurityParam :: Int -- ^ The number of blocks after which a transaction cannot be rolled back anymore - , cicSlotConfig :: SlotConfig - , cicStoreFrom :: BlockNo -- ^ Only store transactions from this block number onward + { cicSocketPath :: String + , cicDbPath :: String + , cicPort :: Int + , cicNetworkId :: NetworkId + , cicSecurityParam :: Int -- ^ The number of blocks after which a transaction cannot be rolled back anymore + , cicSlotConfig :: SlotConfig + , cicStoreFrom :: BlockNo -- ^ Only store transactions from this block number onward + , cicAppendQueueSize :: Int -- ^ The size of the queue and a number of blocks to collect before writing to the database } deriving stock (Show, Eq, Generic) deriving anyclass (FromJSON, ToJSON) @@ -66,16 +69,18 @@ defaultConfig = ChainIndexConfig , scSlotLength = 1000 } , cicStoreFrom = BlockNo 0 + , cicAppendQueueSize = 15000 } instance Pretty ChainIndexConfig where - pretty ChainIndexConfig{cicSocketPath, cicDbPath, cicPort, cicNetworkId, cicSecurityParam, cicStoreFrom} = + pretty ChainIndexConfig{cicSocketPath, cicDbPath, cicPort, cicNetworkId, cicSecurityParam, cicStoreFrom, cicAppendQueueSize} = vsep [ "Socket:" <+> pretty cicSocketPath , "Db:" <+> pretty cicDbPath , "Port:" <+> pretty cicPort , "Network Id:" <+> viaShow cicNetworkId , "Security Param:" <+> pretty cicSecurityParam , "Store from:" <+> viaShow cicStoreFrom + , "Append queue size:" <+> viaShow cicAppendQueueSize ] makeLensesFor [ @@ -85,7 +90,8 @@ makeLensesFor [ ("cicNetworkId", "networkId"), ("cicSecurityParam", "securityParam"), ("cicSlotConfig", "slotConfig"), - ("cicStoreFrom", "storeFrom") + ("cicStoreFrom", "storeFrom"), + ("cicAppendQueueSize", "appendQueueSize") ] 'ChainIndexConfig newtype DecodeConfigException = DecodeConfigException String diff --git a/plutus-chain-index/src/Plutus/ChainIndex/Events.hs b/plutus-chain-index/src/Plutus/ChainIndex/Events.hs new file mode 100644 index 0000000000..f316a9d72c --- /dev/null +++ b/plutus-chain-index/src/Plutus/ChainIndex/Events.hs @@ -0,0 +1,58 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Plutus.ChainIndex.Events where + +import Cardano.BM.Trace (Trace) +import Control.Concurrent (threadDelay) +import Control.Concurrent.STM (atomically, flushTBQueue, isFullTBQueue) +import Control.Monad (forever, void) +import Data.Functor ((<&>)) +import Data.Maybe (catMaybes) +import Plutus.ChainIndex qualified as CI +import Plutus.ChainIndex.Lib (ChainSyncEvent (Resume, RollBackward, RollForward), EventsQueue, RunRequirements, + runChainIndexDuringSync) +import Plutus.ChainIndex.SyncStats (SyncLog, logProgress) +import Plutus.Monitoring.Util (PrettyObject (PrettyObject), convertLog, runLogEffects) +import System.Clock (Clock (Monotonic), TimeSpec, diffTimeSpec, getTime) + +-- | How often do we check the queue +period :: Int +period = 5_000_000 -- 5s + +-- | 'processEventsQueue' reads events from 'TBQueue', collects enough 'RollForward's to +-- append blocks at once. +processEventsQueue :: Trace IO (PrettyObject SyncLog) -> RunRequirements -> EventsQueue -> IO () +processEventsQueue trace runReq eventsQueue = forever $ do + start <- getTime Monotonic + eventsToProcess <- do + let + waitUntilEvents = do + isFull <- atomically $ isFullTBQueue eventsQueue + if isFull then atomically $ flushTBQueue eventsQueue + else threadDelay period >> waitUntilEvents + waitUntilEvents + processEvents start eventsToProcess + where + processEvents :: TimeSpec -> [ChainSyncEvent] -> IO () + processEvents start events = case events of + (Resume resumePoint) : (RollBackward backwardPoint _) : restEvents -> do + void $ runChainIndexDuringSync runReq $ do + CI.rollback backwardPoint + CI.resumeSync resumePoint + processEvents start restEvents + rollForwardEvents -> do + let + blocks = catMaybes $ rollForwardEvents <&> \case + (RollForward block _) -> Just block + _ -> Nothing + void $ runChainIndexDuringSync runReq $ CI.appendBlocks blocks + end <- getTime Monotonic + void $ runLogEffects (convertLog PrettyObject trace) $ logProgress events (diffTimeSpec end start) diff --git a/plutus-chain-index/src/Plutus/ChainIndex/Lib.hs b/plutus-chain-index/src/Plutus/ChainIndex/Lib.hs index 554e5a06cc..1f5f09fe51 100644 --- a/plutus-chain-index/src/Plutus/ChainIndex/Lib.hs +++ b/plutus-chain-index/src/Plutus/ChainIndex/Lib.hs @@ -3,6 +3,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} {-| Using the chain index as a library. @@ -34,10 +35,11 @@ module Plutus.ChainIndex.Lib ( -- ** Synchronisation handlers , ChainSyncHandler , ChainSyncEvent(..) - , defaultChainSyncHandler + , EventsQueue + , storeChainSyncHandler , storeFromBlockNo , filterTxs - , writeChainSyncEventToChan + , runChainIndexDuringSync -- * Utils , getTipSlot ) where @@ -60,14 +62,14 @@ import Cardano.BM.Trace (Trace, logDebug, logError, nullTracer) import Cardano.Protocol.Socket.Client qualified as C import Cardano.Protocol.Socket.Type (epochSlots) -import Control.Concurrent.STM (TChan, atomically, writeTChan) +import Control.Concurrent.STM (TBQueue, atomically, writeTBQueue) import Plutus.ChainIndex (ChainIndexLog (BeamLogItem), RunRequirements (RunRequirements), getResumePoints, runChainIndexEffects, tipBlockNo) import Plutus.ChainIndex qualified as CI import Plutus.ChainIndex.Compatibility (fromCardanoBlock, fromCardanoPoint, fromCardanoTip, tipFromCardanoBlock) import Plutus.ChainIndex.Config qualified as Config import Plutus.ChainIndex.DbSchema (checkedSqliteDb) -import Plutus.ChainIndex.Effects (ChainIndexControlEffect, ChainIndexQueryEffect, appendBlock, resumeSync, rollback) +import Plutus.ChainIndex.Effects (ChainIndexControlEffect, ChainIndexQueryEffect) import Plutus.ChainIndex.Logging qualified as Logging import Plutus.Monitoring.Util (PrettyObject (PrettyObject), convertLog, runLogEffects) @@ -131,13 +133,10 @@ toCardanoChainSyncHandler runReq handler = \case -- | A handler for chain synchronisation events. type ChainSyncHandler = ChainSyncEvent -> IO () +type EventsQueue = TBQueue ChainSyncEvent --- | The default chain synchronisation event handler. Updates the in-memory state and the database. -defaultChainSyncHandler :: RunRequirements -> ChainSyncHandler -defaultChainSyncHandler runReq evt = void $ runChainIndexDuringSync runReq $ case evt of - (RollForward block _) -> appendBlock block - (RollBackward point _) -> rollback point - (Resume point) -> resumeSync point +storeChainSyncHandler :: EventsQueue -> ChainSyncHandler +storeChainSyncHandler eventsQueue = atomically . writeTBQueue eventsQueue -- | Changes the given @ChainSyncHandler@ to only store transactions with a block number no smaller than the given one. storeFromBlockNo :: CI.BlockNumber -> ChainSyncHandler -> ChainSyncHandler @@ -171,18 +170,6 @@ getTipSlot config = do } pure slotNo --- | Write 'ChainSyncEvent's from the 'ChainSyncHandler' to the 'TChan'. -writeChainSyncEventToChan - :: (ChainSyncEvent -> a) - -> TChan a - -> ChainSyncHandler - -> IO ChainSyncHandler -writeChainSyncEventToChan convert tchan handler = do - pure $ \case - evt -> do - atomically $ writeTChan tchan (convert evt) - handler evt - -- | Synchronise the chain index with the node using the given handler. syncChainIndex :: Config.ChainIndexConfig -> RunRequirements -> ChainSyncHandler -> IO () syncChainIndex config runReq syncHandler = do diff --git a/plutus-chain-index/src/Plutus/ChainIndex/SyncStats.hs b/plutus-chain-index/src/Plutus/ChainIndex/SyncStats.hs index 18dd1bddba..1485ca2604 100644 --- a/plutus-chain-index/src/Plutus/ChainIndex/SyncStats.hs +++ b/plutus-chain-index/src/Plutus/ChainIndex/SyncStats.hs @@ -11,19 +11,18 @@ module Plutus.ChainIndex.SyncStats where import Cardano.BM.Tracing (ToObject) -import Control.Concurrent (threadDelay) -import Control.Concurrent.STM (TChan, atomically, dupTChan, tryReadTChan) -import Control.Monad.Freer (Eff, LastMember, Member) -import Control.Monad.Freer.Extras (LogMsg, logInfo, logWarn) -import Control.Monad.IO.Class (liftIO) +import Control.Monad.Freer (Eff, Member) +import Control.Monad.Freer.Extras (LogMsg, logInfo) import Data.Aeson (FromJSON, ToJSON) -import Data.Time.Units (Second, TimeUnit (fromMicroseconds)) +import Data.Time.Units (Second, fromMicroseconds) +import Data.Time.Units.Extra () import GHC.Generics (Generic) import Ledger (Slot (Slot)) import Plutus.ChainIndex (Point (PointAtGenesis), tipAsPoint) import Plutus.ChainIndex qualified as CI import Plutus.ChainIndex.Lib (ChainSyncEvent (Resume, RollBackward, RollForward)) import Prettyprinter (Pretty (pretty), comma, viaShow, (<+>)) +import System.Clock (TimeSpec, toNanoSecs) import Text.Printf (printf) data SyncStats = SyncStats @@ -43,29 +42,27 @@ instance Monoid SyncStats where mempty = SyncStats 0 0 mempty mempty data SyncLog = SyncLog - { syncStateSyncLog :: SyncState -- ^ State of the syncing - , syncStatsSyncLog :: SyncStats -- ^ Stats of the syncing - , delaySyncLog :: Second -- ^ Delay in seconds used to accumulate log events + { syncStateSyncLog :: SyncState -- ^ State of the syncing + , syncStatsSyncLog :: SyncStats -- ^ Stats of the syncing + , syncPeriodSyncLog :: Second -- ^ Period in seconds used to accumulate log events } deriving stock (Eq, Show, Generic) deriving anyclass (FromJSON, ToJSON, ToObject) instance Pretty SyncLog where pretty = \case - SyncLog syncState - (SyncStats numRollForward numRollBackwards chainSyncPoint _) - seconds -> + SyncLog syncState (SyncStats numRollForward numRollBackwards chainSyncPoint _) period -> let currentTipMsg NotSyncing = "" currentTipMsg _ = "Current tip is" <+> pretty chainSyncPoint in pretty syncState - <+> "Applied" + <+> "Processed" <+> pretty numRollForward <+> "blocks" <> comma <+> pretty numRollBackwards <+> "rollbacks in the last" - <+> viaShow seconds + <+> viaShow period <> "." <+> currentTipMsg syncState @@ -79,31 +76,15 @@ instance Pretty SyncState where Syncing pct -> "Syncing (" <> pretty (printf "%.2f" pct :: String) <> "%)." NotSyncing -> "Not syncing." --- | Read 'ChainSyncEvent's for the 'TChan' every 30 seconds and log syncing summary. -logProgress :: forall effs. - ( Member (LogMsg SyncLog) effs - , LastMember IO effs - ) - => TChan SyncStats - -> Eff effs () -logProgress broadcastChan = do - chan <- liftIO $ atomically $ dupTChan broadcastChan - go chan 30_000_000 -- 30s - where - go chan delay = do - liftIO $ threadDelay (fromIntegral delay) - syncStats <- liftIO $ foldTChanUntilEmpty chan - let syncState = getSyncStateFromStats syncStats - case syncState of - NotSyncing -> do - logWarn $ SyncLog syncState syncStats (fromMicroseconds delay) - go chan 300_000_000 -- 300s - Synced -> do - logInfo $ SyncLog syncState syncStats (fromMicroseconds delay) - go chan 300_000_000 -- 300s - Syncing _ -> do - logInfo $ SyncLog syncState syncStats (fromMicroseconds delay) - go chan 30_000_000 -- 30s +-- | Log syncing summary. +logProgress :: forall effs. (Member (LogMsg SyncLog) effs) => [ChainSyncEvent] -> TimeSpec -> Eff effs () +logProgress events period = do + let syncStats = foldl (<>) mempty $ map convertEventToSyncStats events + let syncState = getSyncStateFromStats syncStats + let syncLog = SyncLog syncState syncStats (fromMicroseconds $ toNanoSecs period `div` 1000) + case syncState of + NotSyncing -> pure () -- logWarn syncLog + _ -> logInfo syncLog -- | Get the 'SyncState' for a 'SyncState'. -- @@ -125,17 +106,6 @@ getSyncStateFromStats (SyncStats _ _ chainSyncPoint nodePoint) = let pct = ((100 :: Double) * fromIntegral chainSyncSlot) / fromIntegral nodeSlot in Syncing pct --- | Read all elements from the 'TChan' until it is empty and combine them with --- it's 'Monoid' instance. -foldTChanUntilEmpty :: (Monoid a) => TChan a -> IO a -foldTChanUntilEmpty chan = - let go combined = do - elementM <- atomically $ tryReadTChan chan - case elementM of - Nothing -> pure combined - Just element -> go (combined <> element) - in go mempty - convertEventToSyncStats :: ChainSyncEvent -> SyncStats convertEventToSyncStats (RollForward (CI.Block chainSyncTip _) nodeTip) = SyncStats 1 0 (tipAsPoint chainSyncTip) (tipAsPoint nodeTip) diff --git a/plutus-contract/src/Plutus/Trace/Emulator/System.hs b/plutus-contract/src/Plutus/Trace/Emulator/System.hs index cdac18db86..f0607fd33e 100644 --- a/plutus-contract/src/Plutus/Trace/Emulator/System.hs +++ b/plutus-contract/src/Plutus/Trace/Emulator/System.hs @@ -28,7 +28,7 @@ import Wallet.Emulator.MultiAgent (MultiAgentControlEffect, MultiAgentEffect, wa import Data.String (IsString (..)) import Ledger (Block, Slot) -import Plutus.ChainIndex (ChainIndexControlEffect, ChainSyncBlock (Block), Tip (Tip, TipAtGenesis), appendBlock, +import Plutus.ChainIndex (ChainIndexControlEffect, ChainSyncBlock (Block), Tip (Tip, TipAtGenesis), appendBlocks, blockId, fromOnChainTx, getTip) import Plutus.Trace.Emulator.Types (EmulatorMessage (..)) import Plutus.Trace.Scheduler (EmSystemCall, MessageCall (..), Priority (..), Tag, fork, mkSysCall, sleep) @@ -140,4 +140,4 @@ appendNewTipBlock lastTip block newSlot = do let nextBlockNo = case lastTip of TipAtGenesis -> 0 Tip _ _ n -> n + 1 newTip = Tip newSlot (blockId block) nextBlockNo - appendBlock (Block newTip (fmap (\tx -> (fromOnChainTx tx, def)) block)) + appendBlocks [(Block newTip (fmap (\tx -> (fromOnChainTx tx, def)) block))]