Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
SCP-3502: Add AppendBlocks effect (#319)
Browse files Browse the repository at this point in the history
* Add AppendBlocks effect

Remove AppendBlock effect

Process blocks in batches

Reorganize events processing

Add appendPeriod and appendBatchSize config parameters

Add cli overrides

Use the same chan for logging and appending blocks.

* Use TBQueue instead of TChan.

Flush all events from TBQueue

* Use Combined Beam effect to execute queries in one transaction

* Get reporting of processing time back

* Remove append period param. Change sql batchsize to 200.

* Add syncPeriodSyncLog comment

* Use error instead of putStrLn

* Refactor processEventsQueue
  • Loading branch information
Evgenii Akentev authored Mar 22, 2022
1 parent 9c03bba commit ffa0cd4
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 192 deletions.
7 changes: 7 additions & 0 deletions freer-extras/src/Control/Monad/Freer/Extras/Beam.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
Expand Down Expand Up @@ -116,6 +117,12 @@ data BeamEffect r where
:: [BeamEffect ()]
-> BeamEffect ()

instance Monoid (BeamEffect ()) where
mempty = Combined []

instance Semigroup (BeamEffect ()) where
a <> b = Combined [a, b]

handleBeam ::
forall effs.
( LastMember IO effs
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions plutus-chain-index-core/src/Plutus/ChainIndex/DbSchema.hs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,6 @@ instance HasDbType (RedeemerHash, Redeemer) where
toDbValue (hash, redeemer) = RedeemerRow (toDbValue hash) (toDbValue redeemer)
fromDbValue (RedeemerRow hash redeemer) = (fromDbValue hash, fromDbValue redeemer)

-- instance HasDbType (TxOutRef, TxOut) where
-- type DbType (TxOutRef, TxOut) = UtxoRow
-- toDbValue (outRef, txOut) = UtxoRow (toDbValue outRef) (toDbValue txOut)
-- fromDbValue (UtxoRow outRef txOut) = (fromDbValue outRef, fromDbValue txOut)

instance HasDbType (Credential, TxOutRef) where
type DbType (Credential, TxOutRef) = AddressRow
toDbValue (cred, outRef) = AddressRow (toDbValue cred) (toDbValue outRef)
Expand Down
6 changes: 3 additions & 3 deletions plutus-chain-index-core/src/Plutus/ChainIndex/Effects.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module Plutus.ChainIndex.Effects(
, getTip
-- * Control effect
, ChainIndexControlEffect(..)
, appendBlock
, appendBlocks
, rollback
, resumeSync
, collectGarbage
Expand Down Expand Up @@ -77,8 +77,8 @@ makeEffect ''ChainIndexQueryEffect

data ChainIndexControlEffect r where

-- | Add a new block to the chain index by giving a new tip and list of tx.
AppendBlock :: ChainSyncBlock -> ChainIndexControlEffect ()
-- | Add new blocks to the chain index.
AppendBlocks :: [ChainSyncBlock] -> ChainIndexControlEffect ()

-- | Roll back to a previous state (previous tip)
Rollback :: Point -> ChainIndexControlEffect ()
Expand Down
40 changes: 27 additions & 13 deletions plutus-chain-index-core/src/Plutus/ChainIndex/Emulator/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Plutus.ChainIndex.Emulator.Handlers(
) where

import Control.Lens (at, ix, makeLenses, over, preview, set, to, view, (&))
import Control.Monad (foldM)
import Control.Monad.Freer (Eff, Member, type (~>))
import Control.Monad.Freer.Error (Error, throwError)
import Control.Monad.Freer.Extras.Log (LogMsg, logDebug, logError, logWarn)
Expand Down Expand Up @@ -163,6 +164,31 @@ handleQuery = \case
GetTip ->
gets (tip . utxoState . view utxoIndex)

appendBlocks ::
forall effs.
( Member (State ChainIndexEmulatorState) effs
, Member (LogMsg ChainIndexLog) effs
)
=> [ChainSyncBlock] -> Eff effs ()
appendBlocks [] = pure ()
appendBlocks blocks = do
let
processBlock (utxoIndexState, txs) (Block tip_ transactions) = do
case UtxoState.insert (TxUtxoBalance.fromBlock tip_ (map fst transactions)) utxoIndexState of
Left err -> do
let reason = InsertionFailed err
logError $ Err reason
return (utxoIndexState, txs)
Right InsertUtxoSuccess{newIndex, insertPosition} -> do
logDebug $ InsertionSuccess tip_ insertPosition
return (newIndex, transactions ++ txs)
oldState <- get @ChainIndexEmulatorState
(newIndex, transactions) <- foldM processBlock (view utxoIndex oldState, []) blocks
put $ oldState
& set utxoIndex newIndex
& over diskState
(mappend $ foldMap (\(tx, opt) -> if tpoStoreTx opt then DiskState.fromTx tx else mempty) transactions)

handleControl ::
forall effs.
( Member (State ChainIndexEmulatorState) effs
Expand All @@ -172,19 +198,7 @@ handleControl ::
=> ChainIndexControlEffect
~> Eff effs
handleControl = \case
AppendBlock (Block tip_ transactions) -> do
oldState <- get @ChainIndexEmulatorState
case UtxoState.insert (TxUtxoBalance.fromBlock tip_ (map fst transactions)) (view utxoIndex oldState) of
Left err -> do
let reason = InsertionFailed err
logError $ Err reason
throwError reason
Right InsertUtxoSuccess{newIndex, insertPosition} -> do
put $ oldState
& set utxoIndex newIndex
& over diskState
(mappend $ foldMap (\(tx, opt) -> if tpoStoreTx opt then DiskState.fromTx tx else mempty) transactions)
logDebug $ InsertionSuccess tip_ insertPosition
AppendBlocks blocks -> appendBlocks blocks
Rollback tip_ -> do
oldState <- get @ChainIndexEmulatorState
case TxUtxoBalance.rollback tip_ (view utxoIndex oldState) of
Expand Down
146 changes: 86 additions & 60 deletions plutus-chain-index-core/src/Plutus/ChainIndex/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ module Plutus.ChainIndex.Handlers
import Cardano.Api qualified as C
import Control.Applicative (Const (..))
import Control.Lens (Lens', view)
import Control.Monad (foldM)
import Control.Monad.Freer (Eff, Member, type (~>))
import Control.Monad.Freer.Error (Error, throwError)
import Control.Monad.Freer.Extras.Beam (BeamEffect (..), BeamableSqlite, addRowsInBatches, combined, deleteRows,
selectList, selectOne, selectPage, updateRows)
import Control.Monad.Freer.Extras.Beam (BeamEffect (..), BeamableSqlite, combined, selectList, selectOne, selectPage)
import Control.Monad.Freer.Extras.Log (LogMsg, logDebug, logError, logWarn)
import Control.Monad.Freer.Extras.Pagination (Page (Page), PageQuery (..))
import Control.Monad.Freer.Reader (Reader, ask)
Expand All @@ -33,7 +33,6 @@ import Data.ByteString (ByteString)
import Data.FingerTree qualified as FT
import Data.Map qualified as Map
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
import Data.Monoid (Ap (..))
import Data.Proxy (Proxy (..))
import Data.Set qualified as Set
import Data.Word (Word64)
Expand Down Expand Up @@ -234,6 +233,42 @@ getTxoSetAtAddress pageQuery (toDbValue -> cred) = do
let page = fmap fromDbValue txOutRefs'
pure $ TxosResponse page

appendBlocks ::
forall effs.
( Member (State ChainIndexState) effs
, Member (Reader Depth) effs
, Member BeamEffect effs
, Member (LogMsg ChainIndexLog) effs
)
=> [ChainSyncBlock] -> Eff effs ()
appendBlocks [] = pure ()
appendBlocks blocks = do
let
processBlock (utxoIndexState, txs, utxoStates) (Block tip_ transactions) = do
let newUtxoState = TxUtxoBalance.fromBlock tip_ (map fst transactions)
case UtxoState.insert newUtxoState utxoIndexState of
Left err -> do
logError $ Err $ InsertionFailed err
return (utxoIndexState, txs, utxoStates)
Right InsertUtxoSuccess{newIndex, insertPosition} -> do
logDebug $ InsertionSuccess tip_ insertPosition
return (newIndex, transactions ++ txs, newUtxoState : utxoStates)
oldIndex <- get @ChainIndexState
(newIndex, transactions, utxoStates) <- foldM processBlock (oldIndex, [], []) blocks
depth <- ask @Depth
reduceOldUtxoDbEffect <- case UtxoState.reduceBlockCount depth newIndex of
UtxoState.BlockCountNotReduced -> do
put newIndex
pure $ Combined []
lbcResult -> do
put $ UtxoState.reducedIndex lbcResult
pure $ reduceOldUtxoDb $ UtxoState._usTip $ UtxoState.combinedState lbcResult
combined
[ reduceOldUtxoDbEffect
, insertRows $ foldMap (\(tx, opt) -> if tpoStoreTx opt then fromTx tx else mempty) transactions
, insertUtxoDb (map fst transactions) utxoStates
]

handleControl ::
forall effs.
( Member (State ChainIndexState) effs
Expand All @@ -245,25 +280,7 @@ handleControl ::
=> ChainIndexControlEffect
~> Eff effs
handleControl = \case
AppendBlock (Block tip_ transactions) -> do
oldIndex <- get @ChainIndexState
let txs = map fst transactions
let newUtxoState = TxUtxoBalance.fromBlock tip_ txs
case UtxoState.insert newUtxoState oldIndex of
Left err -> do
let reason = InsertionFailed err
logError $ Err reason
throwError reason
Right InsertUtxoSuccess{newIndex, insertPosition} -> do
depth <- ask @Depth
case UtxoState.reduceBlockCount depth newIndex of
UtxoState.BlockCountNotReduced -> put newIndex
lbcResult -> do
put $ UtxoState.reducedIndex lbcResult
reduceOldUtxoDb $ UtxoState._usTip $ UtxoState.combinedState lbcResult
insert $ foldMap (\(tx, opt) -> if tpoStoreTx opt then fromTx tx else mempty) transactions
insertUtxoDb txs newUtxoState
logDebug $ InsertionSuccess tip_ insertPosition
AppendBlocks blocks -> appendBlocks blocks
Rollback tip_ -> do
oldIndex <- get @ChainIndexState
case TxUtxoBalance.rollback tip_ oldIndex of
Expand All @@ -273,10 +290,10 @@ handleControl = \case
throwError reason
Right RollbackResult{newTip, rolledBackIndex} -> do
put rolledBackIndex
rollbackUtxoDb $ tipAsPoint newTip
combined [rollbackUtxoDb $ tipAsPoint newTip]
logDebug $ RollbackSuccess newTip
ResumeSync tip_ -> do
rollbackUtxoDb tip_
combined [rollbackUtxoDb tip_]
newState <- restoreStateFromDb
put newState
CollectGarbage -> do
Expand All @@ -293,76 +310,85 @@ handleControl = \case
GetDiagnostics -> diagnostics


-- Use a batch size of 400 so that we don't hit the sql too-many-variables
-- Use a batch size of 200 so that we don't hit the sql too-many-variables
-- limit.
batchSize :: Int
batchSize = 400

insertUtxoDb ::
( Member BeamEffect effs
, Member (Error ChainIndexError) effs
)
=> [ChainIndexTx]
-> UtxoState.UtxoState TxUtxoBalance
-> Eff effs ()
insertUtxoDb _ (UtxoState.UtxoState _ TipAtGenesis) = throwError $ InsertionFailed UtxoState.InsertUtxoNoTip
insertUtxoDb txs (UtxoState.UtxoState (TxUtxoBalance outputs inputs) tip)
= insert $ mempty
{ tipRows = InsertRows $ catMaybes [toDbValue tip]
, unspentOutputRows = InsertRows $ UnspentOutputRow tipRowId . toDbValue <$> Set.toList outputs
, unmatchedInputRows = InsertRows $ UnmatchedInputRow tipRowId . toDbValue <$> Set.toList inputs
batchSize = 200

insertUtxoDb
:: [ChainIndexTx]
-> [UtxoState.UtxoState TxUtxoBalance]
-> BeamEffect ()
insertUtxoDb txs utxoStates =
let
go acc (UtxoState.UtxoState _ TipAtGenesis) = acc
go (tipRows, unspentRows, unmatchedRows) (UtxoState.UtxoState (TxUtxoBalance outputs inputs) tip) =
let
tipRowId = TipRowId (toDbValue (tipSlot tip))
newTips = catMaybes [toDbValue tip]
newUnspent = UnspentOutputRow tipRowId . toDbValue <$> Set.toList outputs
newUnmatched = UnmatchedInputRow tipRowId . toDbValue <$> Set.toList inputs
in
( newTips ++ tipRows
, newUnspent ++ unspentRows
, newUnmatched ++ unmatchedRows)
(tr, ur, umr) = foldl go ([] :: [TipRow], [] :: [UnspentOutputRow], [] :: [UnmatchedInputRow]) utxoStates
txOuts = concatMap txOutsWithRef txs
in insertRows $ mempty
{ tipRows = InsertRows tr
, unspentOutputRows = InsertRows ur
, unmatchedInputRows = InsertRows umr
, utxoOutRefRows = InsertRows $ (\(txOut, txOutRef) -> UtxoRow (toDbValue txOutRef) (toDbValue txOut)) <$> txOuts
}
where
txOuts = concatMap txOutsWithRef txs
tipRowId = TipRowId (toDbValue (tipSlot tip))

reduceOldUtxoDb :: Member BeamEffect effs => Tip -> Eff effs ()
reduceOldUtxoDb TipAtGenesis = pure ()
reduceOldUtxoDb (Tip (toDbValue -> slot) _ _) = do
reduceOldUtxoDb :: Tip -> BeamEffect ()
reduceOldUtxoDb TipAtGenesis = Combined []
reduceOldUtxoDb (Tip (toDbValue -> slot) _ _) = Combined
-- Delete all the tips before 'slot'
deleteRows $ delete (tipRows db) (\row -> _tipRowSlot row <. val_ slot)
[ DeleteRows $ delete (tipRows db) (\row -> _tipRowSlot row <. val_ slot)
-- Assign all the older utxo changes to 'slot'
updateRows $ update
, UpdateRows $ update
(unspentOutputRows db)
(\row -> _unspentOutputRowTip row <-. TipRowId (val_ slot))
(\row -> unTipRowId (_unspentOutputRowTip row) <. val_ slot)
updateRows $ update
, UpdateRows $ update
(unmatchedInputRows db)
(\row -> _unmatchedInputRowTip row <-. TipRowId (val_ slot))
(\row -> unTipRowId (_unmatchedInputRowTip row) <. val_ slot)
-- Among these older changes, delete the matching input/output pairs
-- We're deleting only the outputs here, the matching input is deleted by a trigger (See Main.hs)
deleteRows $ delete
, DeleteRows $ delete
(utxoOutRefRows db)
(\utxoRow ->
exists_ (filter_
(\input ->
(unTipRowId (_unmatchedInputRowTip input) ==. val_ slot) &&.
(_utxoRowOutRef utxoRow ==. _unmatchedInputRowOutRef input))
(all_ (unmatchedInputRows db))))
deleteRows $ delete
, DeleteRows $ delete
(unspentOutputRows db)
(\output -> unTipRowId (_unspentOutputRowTip output) ==. val_ slot &&.
exists_ (filter_
(\input ->
(unTipRowId (_unmatchedInputRowTip input) ==. val_ slot) &&.
(_unspentOutputRowOutRef output ==. _unmatchedInputRowOutRef input))
(all_ (unmatchedInputRows db))))
]

rollbackUtxoDb :: Member BeamEffect effs => Point -> Eff effs ()
rollbackUtxoDb PointAtGenesis = deleteRows $ delete (tipRows db) (const (val_ True))
rollbackUtxoDb (Point (toDbValue -> slot) _) = do
deleteRows $ delete (tipRows db) (\row -> _tipRowSlot row >. val_ slot)
deleteRows $ delete (utxoOutRefRows db)
rollbackUtxoDb :: Point -> BeamEffect ()
rollbackUtxoDb PointAtGenesis = DeleteRows $ delete (tipRows db) (const (val_ True))
rollbackUtxoDb (Point (toDbValue -> slot) _) = Combined
[ DeleteRows $ delete (tipRows db) (\row -> _tipRowSlot row >. val_ slot)
, DeleteRows $ delete (utxoOutRefRows db)
(\utxoRow ->
exists_ (filter_
(\output ->
(unTipRowId (_unspentOutputRowTip output) >. val_ slot) &&.
(_utxoRowOutRef utxoRow ==. _unspentOutputRowOutRef output))
(all_ (unspentOutputRows db))))
deleteRows $ delete (unspentOutputRows db) (\row -> unTipRowId (_unspentOutputRowTip row) >. val_ slot)
deleteRows $ delete (unmatchedInputRows db) (\row -> unTipRowId (_unmatchedInputRowTip row) >. val_ slot)
, DeleteRows $ delete (unspentOutputRows db) (\row -> unTipRowId (_unspentOutputRowTip row) >. val_ slot)
, DeleteRows $ delete (unmatchedInputRows db) (\row -> unTipRowId (_unmatchedInputRowTip row) >. val_ slot)
]

restoreStateFromDb :: Member BeamEffect effs => Eff effs ChainIndexState
restoreStateFromDb = do
Expand Down Expand Up @@ -392,8 +418,8 @@ instance Semigroup (InsertRows te) where
instance BeamableSqlite t => Monoid (InsertRows (TableEntity t)) where
mempty = InsertRows []

insert :: Member BeamEffect effs => Db InsertRows -> Eff effs ()
insert = getAp . getConst . zipTables Proxy (\tbl (InsertRows rows) -> Const $ Ap $ addRowsInBatches batchSize tbl rows) db
insertRows :: Db InsertRows -> BeamEffect ()
insertRows = getConst . zipTables Proxy (\tbl (InsertRows rows) -> Const $ AddRowsInBatches batchSize tbl rows) db

fromTx :: ChainIndexTx -> Db InsertRows
fromTx tx = mempty
Expand Down
Loading

0 comments on commit ffa0cd4

Please sign in to comment.