diff --git a/io-sim-classes/src/Control/Monad/Class/MonadSTM.hs b/io-sim-classes/src/Control/Monad/Class/MonadSTM.hs index 6bd900bf8d6..e7b6a42c5f3 100644 --- a/io-sim-classes/src/Control/Monad/Class/MonadSTM.hs +++ b/io-sim-classes/src/Control/Monad/Class/MonadSTM.hs @@ -44,6 +44,7 @@ module Control.Monad.Class.MonadSTM , writeTBQueueDefault , isEmptyTBQueueDefault , isFullTBQueueDefault + , lengthTBQueueDefault ) where import Prelude hiding (read) @@ -423,3 +424,19 @@ isFullTBQueueDefault (TBQueue rsize _read wsize _write _size) = do if (r > 0) then return False else return True + +-- 'lengthTBQueue' was added in stm-2.5.0.0, but since we support older +-- versions of stm, we don't include it as a method in the type class. If we +-- were to conditionally (@MIN_VERSION_stm(2,5,0)@) include the method in the +-- type class, the IO simulator would have to conditionally include the +-- method, requiring a dependency on the @stm@ package, which would be +-- strange. +-- +-- Nevertheless, we already provide a default implementation. Downstream +-- packages that don't mind having a >= 2.5 constraint on stm can use this to +-- implement 'lengthTBQueue' for the IO simulator. +lengthTBQueueDefault :: MonadSTM m => TBQueueDefault m a -> STM m Natural +lengthTBQueueDefault (TBQueue rsize _read wsize _write size) = do + r <- readTVar rsize + w <- readTVar wsize + return $! size - r - w diff --git a/io-sim/src/Control/Monad/IOSim.hs b/io-sim/src/Control/Monad/IOSim.hs index c0362a21f0e..e082c14e581 100644 --- a/io-sim/src/Control/Monad/IOSim.hs +++ b/io-sim/src/Control/Monad/IOSim.hs @@ -13,6 +13,7 @@ module Control.Monad.IOSim ( -- * Simulation monad SimM, + SimSTM, -- ** Run simulation runSim, runSimOrThrow, @@ -140,6 +141,8 @@ data StmA s a where Retry :: StmA s b OrElse :: StmA s a -> StmA s a -> (a -> StmA s b) -> StmA s b +-- Exported type +type SimSTM = STM data MaskingState = Unmasked | MaskedInterruptible | MaskedUninterruptible deriving (Eq, Ord, Show) diff --git a/ouroboros-consensus-byron/ouroboros-consensus-byrondual/src/Ouroboros/Consensus/ByronDual/Node.hs b/ouroboros-consensus-byron/ouroboros-consensus-byrondual/src/Ouroboros/Consensus/ByronDual/Node.hs index a86b4dc5aa9..eacedd3d515 100644 --- a/ouroboros-consensus-byron/ouroboros-consensus-byrondual/src/Ouroboros/Consensus/ByronDual/Node.hs +++ b/ouroboros-consensus-byron/ouroboros-consensus-byrondual/src/Ouroboros/Consensus/ByronDual/Node.hs @@ -199,7 +199,7 @@ instance RunNode DualByronBlock where tip <- atomically $ ChainDB.getTipPoint chainDB case tip of BlockPoint {} -> return () -- Chain is not empty - GenesisPoint -> ChainDB.addBlock chainDB genesisEBB + GenesisPoint -> ChainDB.addBlock_ chainDB genesisEBB where genesisEBB :: DualByronBlock genesisEBB = DualBlock { diff --git a/ouroboros-consensus-byron/src/Ouroboros/Consensus/Byron/Node.hs b/ouroboros-consensus-byron/src/Ouroboros/Consensus/Byron/Node.hs index 9a364515e67..583c44e6cd2 100644 --- a/ouroboros-consensus-byron/src/Ouroboros/Consensus/Byron/Node.hs +++ b/ouroboros-consensus-byron/src/Ouroboros/Consensus/Byron/Node.hs @@ -209,7 +209,7 @@ instance RunNode ByronBlock where case tip of -- Chain is not empty BlockPoint {} -> return () - GenesisPoint -> ChainDB.addBlock chainDB genesisEBB + GenesisPoint -> ChainDB.addBlock_ chainDB genesisEBB where genesisEBB = forgeEBB cfg (SlotNo 0) (BlockNo 0) GenesisHash diff --git a/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs b/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs index 3bed9f871a7..9de50c9163e 100644 --- a/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs @@ -492,7 +492,7 @@ runThreadNetwork ThreadNetworkArgs pure (mSlot, fromWithOrigin (firstBlockNo (Proxy @blk)) bno, pointHash p) when (prevSlot < At ebbSlotNo) $ do let ebb = forgeEBB cfg ebbSlotNo ebbBlockNo prevHash - ChainDB.addBlock chainDB ebb + ChainDB.addBlock_ chainDB ebb go (succ epoch) @@ -551,6 +551,7 @@ runThreadNetwork ThreadNetworkArgs , cdbTraceLedger = nullDebugTracer , cdbRegistry = registry , cdbGcDelay = 0 + , cdbBlocksToAddSize = 2 } where -- prop_general relies on this tracer diff --git a/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/Util/Orphans/IOLike.hs b/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/Util/Orphans/IOLike.hs index 560a5d3fbe0..a5f8de495cf 100644 --- a/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/Util/Orphans/IOLike.hs +++ b/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/Util/Orphans/IOLike.hs @@ -1,8 +1,13 @@ +{-# LANGUAGE FlexibleInstances #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module Test.Util.Orphans.IOLike () where +import Control.Monad.Class.MonadSTM (lengthTBQueueDefault) import Control.Monad.IOSim import Ouroboros.Consensus.Util.IOLike import Test.Util.Orphans.NoUnexpectedThunks () +instance MonadSTMTxExtended (SimSTM s) where + lengthTBQueue = lengthTBQueueDefault + instance IOLike (SimM s) diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 32879963f58..a52bdf653f8 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -216,7 +216,7 @@ library , network >=3.1 && <3.2 , psqueues >=0.2.3 && <0.3 , serialise >=0.2 && <0.3 - , stm + , stm >=2.5 && <2.6 , streaming , text >=1.2 && <1.3 , time diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs index ac7b8551d6b..ac85e0cad27 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs @@ -331,8 +331,10 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize readFetchedBlocks :: STM m (Point blk -> Bool) readFetchedBlocks = ChainDB.getIsFetched chainDB + -- Asynchronous: doesn't wait until the block has been written to disk or + -- processed. addFetchedBlock :: Point blk -> blk -> m () - addFetchedBlock _pt = ChainDB.addBlock chainDB + addFetchedBlock _pt = void . ChainDB.addBlockAsync chainDB readFetchedMaxSlotNo :: STM m MaxSlotNo readFetchedMaxSlotNo = ChainDB.getMaxSlotNo chainDB @@ -455,17 +457,11 @@ forkBlockProduction maxBlockSizeOverride IS{..} BlockProduction{..} = (snapshotMempoolSize mempoolSnapshot) -- Add the block to the chain DB - lift $ ChainDB.addBlock chainDB newBlock + result <- lift $ ChainDB.addBlockAsync chainDB newBlock + -- Block until we have performed chain selection for the block + curTip <- lift $ atomically $ ChainDB.chainSelectionPerformed result -- Check whether we adopted our block - -- - -- addBlock is synchronous, so when it returns the block we produced - -- will have been considered and possibly (probably) adopted - -- - -- TODO: This is wrong. Additional blocks may have been added to the - -- chain since. - -- - curTip <- lift $ atomically $ ChainDB.getTipPoint chainDB when (curTip /= blockPoint newBlock) $ do isInvalid <- lift $ atomically $ ($ blockHash newBlock) . forgetFingerprint <$> diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs index ebeeb8d6d2a..c68f9ea8f2f 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs @@ -19,6 +19,10 @@ module Ouroboros.Consensus.Storage.ChainDB.API ( ChainDB(..) , getCurrentTip , getTipBlockNo + -- * Adding a block + , AddBlockPromise(..) + , addBlock + , addBlock_ -- * Useful utilities , getBlock , streamBlocks @@ -61,6 +65,7 @@ module Ouroboros.Consensus.Storage.ChainDB.API ( import qualified Codec.CBOR.Read as CBOR import Control.Exception (Exception (..)) +import Control.Monad (void) import qualified Data.ByteString.Lazy as Lazy import Data.Typeable import GHC.Generics (Generic) @@ -80,6 +85,7 @@ import Ouroboros.Network.Point (WithOrigin) import Ouroboros.Consensus.Block (GetHeader (..), IsEBB (..)) import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Ledger.SupportsProtocol +import Ouroboros.Consensus.Util ((.:)) import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry import Ouroboros.Consensus.Util.STM (WithFingerprint) @@ -125,9 +131,12 @@ data ChainDB m blk = ChainDB { -- part of the chain if there are other chains available that are -- preferred by the consensus algorithm (typically, longer chains). -- - -- This function triggers chain selection (if necessary) and terminates - -- after chain selection is done. - addBlock :: blk -> m () + -- This function typically returns immediately, yielding a + -- 'AddBlockPromise' which can be used to wait for the result. You can + -- use 'addBlock' to add the block synchronously. + -- + -- NOTE: back pressure can be applied when overloaded. + addBlockAsync :: blk -> m (AddBlockPromise m blk) -- | Get the current chain fragment -- @@ -318,6 +327,49 @@ instance DB (ChainDB m blk) where type DBHeader (ChainDB m blk) = m (Header blk) type DBHeaderHash (ChainDB m blk) = HeaderHash blk +{------------------------------------------------------------------------------- + Adding a block +-------------------------------------------------------------------------------} + +data AddBlockPromise m blk = AddBlockPromise + { blockProcessed :: STM m (Point blk) + -- ^ Use this 'STM' transaction to wait until the block has been + -- processed: the block has been written to disk and chain selection has + -- been performed for the block, /unless/ the block's slot is in the + -- future. + -- + -- The ChainDB's tip after chain selection is returned. When this tip + -- doesn't match the added block, it doesn't necessarily mean the block + -- wasn't adopted. We might have adopted a longer chain of which the + -- added block is a part, but not the tip. + -- + -- NOTE: When the block's slot is in the future, chain selection for the + -- block won't be performed until the block's slot becomes the current + -- slot, which might take some time. For that reason, this transaction + -- will not wait for chain selection of a block from a future slot. It + -- will return the current tip of the ChainDB after writing the block to + -- disk. See 'chainSelectionPerformed' in case you /do/ want to wait. + , chainSelectionPerformed :: STM m (Point blk) + -- ^ Variant of 'blockProcessed' that waits until chain selection has + -- been performed for the block, even when the block's slot is in the + -- future. This can block for a long time. + -- + -- In case the block's slot was not in the future, this is equivalent to + -- 'blockProcessed'. + } + +-- | Add a block synchronously: wait until the block has been processed (see +-- 'blockProcessed'). The new tip of the ChainDB is returned. +addBlock :: IOLike m => ChainDB m blk -> blk -> m (Point blk) +addBlock chainDB blk = do + promise <- addBlockAsync chainDB blk + atomically $ blockProcessed promise + +-- | Add a block synchronously. Variant of 'addBlock' that doesn't return the +-- new tip of the ChainDB. +addBlock_ :: IOLike m => ChainDB m blk -> blk -> m () +addBlock_ = void .: addBlock + {------------------------------------------------------------------------------- Useful utilities -------------------------------------------------------------------------------} @@ -396,13 +448,13 @@ toChain chainDB = withRegistry $ \registry -> IteratorBlockGCed _ -> error "block on the current chain was garbage-collected" -fromChain :: forall m blk. Monad m +fromChain :: forall m blk. IOLike m => m (ChainDB m blk) -> Chain blk -> m (ChainDB m blk) fromChain openDB chain = do chainDB <- openDB - mapM_ (addBlock chainDB) $ Chain.toOldestFirst chain + mapM_ (addBlock_ chainDB) $ Chain.toOldestFirst chain return chainDB {------------------------------------------------------------------------------- diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs index 71ca3f4fd3c..ea701ee3cb6 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl.hs @@ -126,6 +126,7 @@ openDBInternal args launchBgTasks = do varCopyLock <- newMVar () varKillBgThreads <- newTVarM $ return () varFutureBlocks <- newTVarM Map.empty + blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args) let env = CDB { cdbImmDB = immDB , cdbVolDB = volDB @@ -147,11 +148,12 @@ openDBInternal args launchBgTasks = do , cdbIsEBB = toIsEBB . isJust . Args.cdbIsEBB args , cdbCheckIntegrity = Args.cdbCheckIntegrity args , cdbBlockchainTime = Args.cdbBlockchainTime args + , cdbBlocksToAdd = blocksToAdd , cdbFutureBlocks = varFutureBlocks } h <- fmap CDBHandle $ newTVarM $ ChainDbOpen env let chainDB = ChainDB - { addBlock = getEnv1 h ChainSel.addBlock + { addBlockAsync = getEnv1 h ChainSel.addBlockAsync , getCurrentChain = getEnvSTM h Query.getCurrentChain , getCurrentLedger = getEnvSTM h Query.getCurrentLedger , getTipBlock = getEnv h Query.getTipBlock @@ -176,6 +178,7 @@ openDBInternal args launchBgTasks = do , intGarbageCollect = getEnv1 h Background.garbageCollect , intUpdateLedgerSnapshots = getEnv h Background.updateLedgerSnapshots , intScheduledChainSelection = getEnv1 h Background.scheduledChainSelection + , intAddBlockRunner = getEnv h Background.addBlockRunner , intKillBgThreads = varKillBgThreads } diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs index 8575d6c1b45..ff8f76f8f48 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Args.hs @@ -104,18 +104,24 @@ data ChainDbArgs m blk = forall h1 h2 h3. ChainDbArgs { , cdbTraceLedger :: Tracer m (LgrDB.LedgerDB blk) , cdbRegistry :: ResourceRegistry m , cdbGcDelay :: DiffTime + , cdbBlocksToAddSize :: Word + -- ^ Size of the queue used to store asynchronously added blocks. This + -- is the maximum number of blocks that could be kept in memory at the + -- same time when the background thread processing the blocks can't keep + -- up. } -- | Arguments specific to the ChainDB, not to the ImmutableDB, VolatileDB, or -- LedgerDB. data ChainDbSpecificArgs m blk = ChainDbSpecificArgs { - cdbsTracer :: Tracer m (TraceEvent blk) - , cdbsRegistry :: ResourceRegistry m + cdbsTracer :: Tracer m (TraceEvent blk) + , cdbsRegistry :: ResourceRegistry m -- ^ TODO: the ImmutableDB takes a 'ResourceRegistry' too, but we're -- using it for ChainDB-specific things. Revisit these arguments. - , cdbsGcDelay :: DiffTime - , cdbsBlockchainTime :: BlockchainTime m - , cdbsEncodeHeader :: Header blk -> Encoding + , cdbsGcDelay :: DiffTime + , cdbsBlockchainTime :: BlockchainTime m + , cdbsEncodeHeader :: Header blk -> Encoding + , cdbsBlocksToAddSize :: Word } -- | Default arguments @@ -128,12 +134,13 @@ data ChainDbSpecificArgs m blk = ChainDbSpecificArgs { -- * 'cdbsEncodeHeader' defaultSpecificArgs :: ChainDbSpecificArgs m blk defaultSpecificArgs = ChainDbSpecificArgs{ - cdbsGcDelay = oneHour + cdbsGcDelay = oneHour + , cdbsBlocksToAddSize = 10 -- Fields without a default - , cdbsTracer = error "no default for cdbsTracer" - , cdbsRegistry = error "no default for cdbsRegistry" - , cdbsBlockchainTime = error "no default for cdbsBlockchainTime" - , cdbsEncodeHeader = error "no default for cdbsEncodeHeader" + , cdbsTracer = error "no default for cdbsTracer" + , cdbsRegistry = error "no default for cdbsRegistry" + , cdbsBlockchainTime = error "no default for cdbsBlockchainTime" + , cdbsEncodeHeader = error "no default for cdbsEncodeHeader" } where oneHour = secondsToDiffTime 60 * 60 @@ -215,6 +222,7 @@ fromChainDbArgs ChainDbArgs{..} = ( , cdbsGcDelay = cdbGcDelay , cdbsBlockchainTime = cdbBlockchainTime , cdbsEncodeHeader = cdbEncodeHeader + , cdbsBlocksToAddSize = cdbBlocksToAddSize } ) @@ -270,4 +278,5 @@ toChainDbArgs ImmDB.ImmDbArgs{..} , cdbTraceLedger = lgrTraceLedger , cdbRegistry = immRegistry , cdbGcDelay = cdbsGcDelay + , cdbBlocksToAddSize = cdbsBlocksToAddSize } diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 81dc693d33f..f2238179ef1 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -25,7 +25,8 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Background , newGcSchedule , scheduleGC , gcScheduleRunner - -- * Taking and trimming ledger snapshots + -- * Adding blocks to the ChainDB + , addBlockRunner -- * Executing scheduled chain selections , scheduledChainSelection , scheduledChainSelectionRunner @@ -60,7 +61,7 @@ import Ouroboros.Consensus.Storage.ChainDB.API (BlockRef (..), ChainDbFailure (..)) import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache as BlockCache import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel - (chainSelectionForBlock) + (addBlockSync, chainSelectionForBlock) import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.ImmDB as ImmDB import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB as LgrDB import Ouroboros.Consensus.Storage.ChainDB.Impl.Types @@ -76,6 +77,8 @@ launchBgTasks -> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup -> m () launchBgTasks cdb@CDB{..} replayed = do + !addBlockThread <- launch $ + addBlockRunner cdb gcSchedule <- newGcSchedule !gcThread <- launch $ gcScheduleRunner gcSchedule $ garbageCollect cdb @@ -83,7 +86,7 @@ launchBgTasks cdb@CDB{..} replayed = do copyAndSnapshotRunner cdb gcSchedule replayed !chainSyncThread <- scheduledChainSelectionRunner cdb atomically $ writeTVar cdbKillBgThreads $ - sequence_ [gcThread, copyAndSnapshotThread, chainSyncThread] + sequence_ [addBlockThread, gcThread, copyAndSnapshotThread, chainSyncThread] where launch :: m Void -> m (m ()) launch = fmap cancelThread . forkLinkedThread cdbRegistry @@ -373,46 +376,66 @@ gcScheduleRunner (GcSchedule queue) runGc = forever $ do -- Garbage collection is called synchronously runGc slotNo +{------------------------------------------------------------------------------- + Adding blocks to the ChainDB +-------------------------------------------------------------------------------} + +-- | Read blocks from 'cdbBlocksToAdd' and add them synchronously to the +-- ChainDB. +addBlockRunner + :: (IOLike m, LedgerSupportsProtocol blk, HasCallStack) + => ChainDbEnv m blk + -> m Void +addBlockRunner cdb@CDB{..} = forever $ do + blockToAdd <- getBlockToAdd cdbBlocksToAdd + addBlockSync cdb blockToAdd + {------------------------------------------------------------------------------- Executing scheduled chain selections -------------------------------------------------------------------------------} --- | Retrieve the blocks from 'cdbFutureBlocks' for which chain selection was --- scheduled at the current slot. Run chain selection for each of them. +-- | Retrieve the 'FutureBlockToAdd's from 'cdbFutureBlocks' for which chain +-- selection was scheduled at the current slot. Run chain selection for each +-- of them. scheduledChainSelection :: (IOLike m, LedgerSupportsProtocol blk, HasCallStack) => ChainDbEnv m blk -> SlotNo -- ^ The current slot -> m () scheduledChainSelection cdb@CDB{..} curSlot = do - (mbHdrs, remaining) + (mbFutureBlocks, remaining) <- atomically $ updateTVar cdbFutureBlocks $ \futureBlocks -> -- Extract and delete the value stored at @curSlot@ - let (mbHdrs, remaining) = Map.updateLookupWithKey + let (mbFutureBlocks, remaining) = Map.updateLookupWithKey (\_ _ -> Nothing) curSlot futureBlocks - in (remaining, (mbHdrs, remaining)) - -- The list of headers is stored in reverse order so we can easily - -- prepend. We reverse them here now so we add them in chronological - -- order, even though this should not matter, as they are all blocks for - -- the same slot: at most one block per slot can be adopted. + in (remaining, (mbFutureBlocks, remaining)) + -- The list is stored in reverse order so we can easily prepend. We + -- reverse them here now so we add them in chronological order, even + -- though this should not matter, as they are all blocks for the same + -- slot: at most one block per slot can be adopted. -- -- The only exception is an EBB, which shares the slot with a regular -- block. Either order of adding them would result in the same chain, but -- adding the EBB before the regular block is cheaper, as we can simply -- extend the current chain instead of adding a disconnected block first -- and then switching to a very short fork. - whenJust (NE.reverse <$> mbHdrs) $ \hdrs -> do + whenJust (NE.reverse <$> mbFutureBlocks) $ \futureBlocks -> do let nbScheduled = fromIntegral $ sum $ length <$> Map.elems remaining - traceWith cdbTracer $ - TraceAddBlockEvent $ - RunningScheduledChainSelection (fmap headerPoint hdrs) curSlot nbScheduled + traceWith cdbTracer $ TraceAddBlockEvent $ + RunningScheduledChainSelection + (fmap (headerPoint . futureBlockHdr) futureBlocks) + curSlot + nbScheduled -- If an exception occurs during a call to 'chainSelectionForBlock', -- then no chain selection will be performed for the blocks after it. -- Only real errors that would shut down the ChainDB could be thrown. In -- which case, the ChainDB has to be (re)started, triggering a full -- chain selection, which would include these blocks. So there is no -- risk of "forgetting" to add a block. - mapM_ (chainSelectionForBlock cdb BlockCache.empty) hdrs + forM_ futureBlocks $ \(FutureBlockToAdd hdr varChainSelectionPerformed) -> do + newTip <- chainSelectionForBlock cdb BlockCache.empty hdr + -- Important: notify that chain selection has been performed for the block + atomically $ putTMVar varChainSelectionPerformed newTip -- | Whenever the current slot changes, call 'scheduledChainSelection' for the -- (new) current slot. diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs index d0f028412c9..5d4b720b609 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/ChainSel.hs @@ -2,6 +2,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -13,7 +14,8 @@ -- adding a block. module Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel ( initialChainSelection - , addBlock + , addBlockAsync + , addBlockSync , chainSelectionForBlock -- * Type for in-sync chain and ledger , ChainAndLedger -- Opaque @@ -62,8 +64,8 @@ import Ouroboros.Consensus.Util.AnchoredFragment import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.STM (WithFingerprint (..)) -import Ouroboros.Consensus.Storage.ChainDB.API - (InvalidBlockReason (..)) +import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..), + InvalidBlockReason (..)) import Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache (BlockCache) import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache as BlockCache @@ -161,7 +163,37 @@ initialChainSelection immDB volDB lgrDB tracer cfg varInvalid curSlot = do curChainAndLedger (fmap (mkCandidateSuffix 0) candidates) --- | Add a block to the ChainDB. +-- | Add a block to the ChainDB, /asynchronously/. +-- +-- This adds a 'BlockToAdd' corresponding to the given block to the +-- 'cdbBlocksToAdd' queue. The entries in that queue are processed using +-- 'addBlockSync', see that function for more information. +-- +-- When the queue is full, this function will still block. +-- +-- An important advantage of this asynchronous approach over a synchronous +-- approach is that it doesn't have the following disadvantage: when a thread +-- adding a block to the ChainDB is killed, which can happen when +-- disconnecting from the corresponding node, we might have written the block +-- to disk, but not updated the corresponding in-memory state (e.g., that of +-- the VolatileDB), leaving both out of sync. +-- +-- With this asynchronous approach, threads adding blocks asynchronously can +-- be killed without worries, the background thread processing the blocks +-- synchronously won't be killed. Only when the whole ChainDB shuts down will +-- that background thread get killed. But since there will be no more +-- in-memory state, it can't get out of sync with the file system state. On +-- the next startup, a correct in-memory state will be reconstructed from the +-- file system state. +addBlockAsync + :: forall m blk. (IOLike m, HasHeader blk) + => ChainDbEnv m blk + -> blk + -> m (AddBlockPromise m blk) +addBlockAsync CDB { cdbTracer, cdbBlocksToAdd } = + addBlockToAdd (contramap TraceAddBlockEvent cdbTracer) cdbBlocksToAdd + +-- | Add a block to the ChainDB, /synchronously/. -- -- This is the only operation that actually changes the ChainDB. It will store -- the block on disk and trigger chain selection, possibly switching to a @@ -169,7 +201,7 @@ initialChainSelection immDB volDB lgrDB tracer cfg varInvalid curSlot = do -- -- When the slot of the block is > the current slot, a chain selection will be -- scheduled in the slot of the block. -addBlock +addBlockSync :: forall m blk. ( IOLike m , HasHeader blk @@ -177,41 +209,52 @@ addBlock , HasCallStack ) => ChainDbEnv m blk - -> blk + -> BlockToAdd m blk -> m () -addBlock cdb@CDB{..} b = do +addBlockSync cdb@CDB {..} BlockToAdd { blockToAdd = b, .. } = do -- No need to be in the same STM transaction curSlot <- atomically $ getCurrentSlot cdbBlockchainTime - (isMember, invalid, immBlockNo) <- atomically $ (,,) + (isMember, invalid, curChain) <- atomically $ (,,) <$> VolDB.getIsMember cdbVolDB <*> (forgetFingerprint <$> readTVar cdbInvalid) - <*> (AF.anchorBlockNo <$> Query.getCurrentChain cdb) + <*> Query.getCurrentChain cdb + + let immBlockNo = AF.anchorBlockNo curChain + curTip = castPoint (AF.headPoint curChain) -- We follow the steps from section "## Adding a block" in ChainDB.md -- ### Ignore if - | olderThanK hdr (cdbIsEBB hdr) immBlockNo -> + | olderThanK hdr (cdbIsEBB hdr) immBlockNo -> do trace $ IgnoreBlockOlderThanK (blockPoint b) - | isMember (blockHash b) -> + deliverPromises curTip + + | isMember (blockHash b) -> do trace $ IgnoreBlockAlreadyInVolDB (blockPoint b) - | Just (InvalidBlockInfo reason _) <- Map.lookup (blockHash b) invalid -> + deliverPromises curTip + + | Just (InvalidBlockInfo reason _) <- Map.lookup (blockHash b) invalid -> do trace $ IgnoreInvalidBlock (blockPoint b) reason + deliverPromises curTip - --- ### Store but schedule chain selection + -- ### Store but schedule chain selection | blockSlot b > curSlot -> do - VolDB.putBlock cdbVolDB b - trace $ BlockInTheFuture (blockPoint b) curSlot trace $ AddedBlockToVolDB (blockPoint b) (blockNo b) (cdbIsEBB hdr) + atomically $ putTMVar varBlockProcessed curTip + -- We'll fill in 'varChainSelectionPerformed' when the scheduled chain + -- selection is performed. + trace $ BlockInTheFuture (blockPoint b) curSlot scheduleChainSelection curSlot (blockSlot b) -- The remaining cases | otherwise -> do VolDB.putBlock cdbVolDB b trace $ AddedBlockToVolDB (blockPoint b) (blockNo b) (cdbIsEBB hdr) - chainSelectionForBlock cdb (BlockCache.singleton b) hdr + newTip <- chainSelectionForBlock cdb (BlockCache.singleton b) hdr + deliverPromises newTip where trace :: TraceAddBlockEvent blk -> m () trace = traceWith (contramap TraceAddBlockEvent cdbTracer) @@ -219,14 +262,22 @@ addBlock cdb@CDB{..} b = do hdr :: Header blk hdr = getHeader b + -- | Use the given 'Point' to fill in the 'TMVar's corresponding to the + -- block's 'AddBlockPromise'. + deliverPromises :: Point blk -> m () + deliverPromises tip = atomically $ do + putTMVar varBlockProcessed tip + putTMVar varChainSelectionPerformed tip + scheduleChainSelection :: SlotNo -- ^ Current slot number -> SlotNo -- ^ Slot number of the block -> m () scheduleChainSelection curSlot slot = do nbScheduled <- atomically $ updateTVar cdbFutureBlocks $ \futureBlocks -> - let futureBlocks' = Map.insertWith strictAppend slot - (forceElemsToWHNF (hdr NE.:| [])) futureBlocks + let futureBlockToAdd = FutureBlockToAdd hdr varChainSelectionPerformed + futureBlocks' = Map.insertWith strictAppend slot + (forceElemsToWHNF (futureBlockToAdd NE.:| [])) futureBlocks nbScheduled = fromIntegral $ sum $ length <$> Map.elems futureBlocks in (futureBlocks', nbScheduled) trace $ ScheduledChainSelection (headerPoint hdr) curSlot nbScheduled @@ -273,6 +324,8 @@ olderThanK hdr isEBB immBlockNo -- -- PRECONDITION: the slot of the block <= the current (wall) slot -- +-- The new tip of the current chain is returned. +-- -- = Constructing candidate fragments -- -- The VolatileDB keeps a \"successors\" map in memory, telling us the hashes @@ -307,7 +360,7 @@ chainSelectionForBlock => ChainDbEnv m blk -> BlockCache blk -> Header blk - -> m () + -> m (Point blk) chainSelectionForBlock cdb@CDB{..} blockCache hdr = do curSlot <- atomically $ getCurrentSlot cdbBlockchainTime @@ -344,12 +397,14 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do if -- The chain might have grown since we added the block such that the -- block is older than @k@. - | olderThanK hdr (cdbIsEBB hdr) immBlockNo -> + | olderThanK hdr (cdbIsEBB hdr) immBlockNo -> do trace $ IgnoreBlockOlderThanK p + return tipPoint -- We might have validated the block in the meantime - | Just (InvalidBlockInfo reason _) <- Map.lookup (headerHash hdr) invalid -> + | Just (InvalidBlockInfo reason _) <- Map.lookup (headerHash hdr) invalid -> do trace $ IgnoreInvalidBlock p reason + return tipPoint -- The block @b@ fits onto the end of our current chain | pointHash tipPoint == castHash (blockPrevHash hdr) -> do @@ -362,9 +417,10 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do trace (TrySwitchToAFork p hashes) switchToAFork succsOf' curChainAndLedger hashes curSlot - | otherwise -> + | otherwise -> do -- ### Store but don't change the current chain trace (StoreButDontChange p) + return tipPoint -- Note that we may have extended the chain, but have not trimmed it to -- @k@ blocks/headers. That is the job of the background thread, which @@ -387,7 +443,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do -- ^ The current chain and ledger -> SlotNo -- ^ The current slot - -> m () + -> m (Point blk) addToCurrentChain succsOf curChainAndLedger@(ChainAndLedger curChain _) curSlot = assert (AF.validExtension curChain hdr) $ do let suffixesAfterB = VolDB.candidates succsOf p @@ -429,14 +485,15 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do -- candidate will be a two-block (the EBB and the new block) -- extension of the current chain. case candidateSuffixes of - Nothing -> return () + Nothing -> return curTip Just candidateSuffixes' -> chainSelection' curChainAndLedger candidateSuffixes' >>= \case - Nothing -> return () + Nothing -> return curTip Just newChainAndLedger -> trySwitchTo newChainAndLedger (AddedToCurrentChain p) where curHead = AF.headAnchor curChain + curTip = castPoint $ AF.headPoint curChain -- | We have found a path of hashes to the new block through the -- VolatileDB. We try to extend this path by looking for forks that start @@ -450,7 +507,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do -- ^ An uninterrupted path of hashes @(i,b]@. -> SlotNo -- ^ The current slot - -> m () + -> m (Point blk) switchToAFork succsOf curChainAndLedger@(ChainAndLedger curChain _) hashes curSlot = do let suffixesAfterB = VolDB.candidates succsOf p @@ -473,15 +530,15 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do case NE.nonEmpty candidateSuffixes of -- No candidates preferred over the current chain - Nothing -> return () + Nothing -> return curTip Just candidateSuffixes' -> chainSelection' curChainAndLedger candidateSuffixes' >>= \case - Nothing -> return () + Nothing -> return curTip Just newChainAndLedger -> trySwitchTo newChainAndLedger (SwitchedToAFork p) where i = AF.castAnchor $ anchor curChain - + curTip = castPoint $ AF.headPoint curChain -- | 'chainSelection' partially applied to the parameters from the -- 'ChainDbEnv'. @@ -550,7 +607,7 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do ) -- ^ Given the previous chain and the new chain, return the event -- to trace when we switched to the new chain. - -> m () + -> m (Point blk) trySwitchTo (ChainAndLedger newChain newLedger) mkTraceEvent = do (curChain, switched) <- atomically $ do curChain <- readTVar cdbChain @@ -584,8 +641,10 @@ chainSelectionForBlock cdb@CDB{..} blockCache hdr = do if switched then do trace $ mkTraceEvent curChain newChain traceWith cdbTraceLedger newLedger + return $ castPoint $ AF.headPoint newChain else do trace $ ChainChangedInBg curChain newChain + return $ castPoint $ AF.headPoint curChain -- | Build a cache from the headers in the fragment. cacheHeaders :: AnchoredFragment (Header blk) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index 4d8c5ecef51..1fbfc719c48 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} @@ -5,6 +6,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} @@ -36,6 +38,13 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( -- * Invalid blocks , InvalidBlocks , InvalidBlockInfo (..) + -- * Blocks to add + , BlocksToAdd + , BlockToAdd (..) + , newBlocksToAdd + , addBlockToAdd + , getBlockToAdd + , FutureBlockToAdd (..) -- * Trace types , TraceEvent (..) , TraceAddBlockEvent (..) @@ -53,15 +62,18 @@ import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import Data.Time.Clock (DiffTime) import Data.Typeable +import Data.Void (Void) import Data.Word import GHC.Generics (Generic) import GHC.Stack (HasCallStack, callStack) +import Control.Monad.Class.MonadSTM.Strict (newEmptyTMVarM) + import Cardano.Prelude (NoUnexpectedThunks (..), OnlyCheckIsWHNF (..)) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import Ouroboros.Network.Block (BlockNo, HasHeader, HeaderHash, Point, - SlotNo) + SlotNo, blockPoint) import Ouroboros.Network.Point (WithOrigin) import Ouroboros.Consensus.Block (Header, IsEBB (..)) @@ -76,8 +88,9 @@ import Ouroboros.Consensus.Util.STM (WithFingerprint) import Ouroboros.Consensus.Storage.Common (EpochNo) import Ouroboros.Consensus.Storage.EpochInfo (EpochInfo) -import Ouroboros.Consensus.Storage.ChainDB.API (ChainDbError (..), - InvalidBlockReason, StreamFrom, StreamTo, UnknownRange) +import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..), + ChainDbError (..), InvalidBlockReason, StreamFrom, + StreamTo, UnknownRange) import Ouroboros.Consensus.Storage.ChainDB.Impl.ImmDB (ImmDB) import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.ImmDB as ImmDB @@ -219,12 +232,14 @@ data ChainDbEnv m blk = CDB , cdbIsEBB :: !(Header blk -> IsEBB) , cdbCheckIntegrity :: !(blk -> Bool) , cdbBlockchainTime :: !(BlockchainTime m) - , cdbFutureBlocks :: !(StrictTVar m (Map SlotNo (NonEmpty (Header blk)))) + , cdbBlocksToAdd :: !(BlocksToAdd m blk) + -- ^ Queue of blocks that still have to be added. + , cdbFutureBlocks :: !(StrictTVar m (Map SlotNo (NonEmpty (FutureBlockToAdd m blk)))) -- ^ Scheduled chain selections for blocks with a slot in the future. -- -- When a block with slot @s@, which is > the current slot is added, we - -- add its header to the front of the list of headers stored under its - -- slot number. We prepend to the list, so the headers are in reverse + -- add a corresponding 'FutureBlockToAdd' to the front of the list stored + -- under its slot number. We prepend to the list, so they are in reverse -- order w.r.t. the order in which they were added. -- -- INVARIANT: all slots in the map are > the current slot. @@ -264,8 +279,12 @@ data Internal m blk = Internal -- ^ Write a new LedgerDB snapshot to disk and remove the oldest one(s). , intScheduledChainSelection :: SlotNo -> m () -- ^ Run the scheduled chain selections for the given 'SlotNo'. + , intAddBlockRunner :: m Void + -- ^ Start the loop that adds blocks to the ChainDB retrieved from the + -- queue populated by 'ChainDB.addBlock'. Execute this loop in a separate + -- thread. , intKillBgThreads :: StrictTVar m (m ()) - -- ^ A handle to kill the background threads. + -- ^ A handle to kill the background threads. } -- | Wrapper around 'intReopen_' to guarantee HasCallStack @@ -380,6 +399,73 @@ data InvalidBlockInfo blk = InvalidBlockInfo , invalidBlockSlotNo :: !SlotNo } deriving (Eq, Show, Generic, NoUnexpectedThunks) +{------------------------------------------------------------------------------- + Blocks to add +-------------------------------------------------------------------------------} + +-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are +-- read from this queue by a background thread, which processes the blocks +-- synchronously. +newtype BlocksToAdd m blk = BlocksToAdd (TBQueue m (BlockToAdd m blk)) + deriving NoUnexpectedThunks via OnlyCheckIsWHNF "BlocksToAdd" (BlocksToAdd m blk) + +-- | Entry in the 'BlocksToAdd' queue: a block together with the 'TMVar's used +-- to implement 'AddBlockPromise'. +data BlockToAdd m blk = BlockToAdd + { blockToAdd :: !blk + , varBlockProcessed :: !(StrictTMVar m (Point blk)) + -- ^ Used for the 'blockProcessed' field of 'AddBlockPromise'. + , varChainSelectionPerformed :: !(StrictTMVar m (Point blk)) + -- ^ Used for the 'chainSelectionPerformed' field of 'AddBlockPromise'. + } + +-- | Create a new 'BlocksToAdd' with the given size. +newBlocksToAdd :: IOLike m => Word -> m (BlocksToAdd m blk) +newBlocksToAdd queueSize = BlocksToAdd <$> + atomically (newTBQueue (fromIntegral queueSize)) + +-- | Add a block to the 'BlocksToAdd' queue. Can block when the queue is full. +addBlockToAdd + :: (IOLike m, HasHeader blk) + => Tracer m (TraceAddBlockEvent blk) + -> BlocksToAdd m blk + -> blk + -> m (AddBlockPromise m blk) +addBlockToAdd tracer (BlocksToAdd queue) blk = do + varBlockProcessed <- newEmptyTMVarM + varChainSelectionPerformed <- newEmptyTMVarM + let !toAdd = BlockToAdd + { blockToAdd = blk + , varBlockProcessed + , varChainSelectionPerformed + } + queueSize <- atomically $ do + writeTBQueue queue toAdd + lengthTBQueue queue + traceWith tracer $ + AddedBlockToQueue (blockPoint blk) (fromIntegral queueSize) + return AddBlockPromise + { blockProcessed = readTMVar varBlockProcessed + , chainSelectionPerformed = readTMVar varChainSelectionPerformed + } + +-- | Get the oldest block from the 'BlocksToAdd' queue. Can block when the +-- queue is empty. +getBlockToAdd :: IOLike m => BlocksToAdd m blk -> m (BlockToAdd m blk) +getBlockToAdd (BlocksToAdd queue) = atomically $ readTBQueue queue + +data FutureBlockToAdd m blk = FutureBlockToAdd + { futureBlockHdr :: !(Header blk) + , varFutureChainSelectionPerformed :: !(StrictTMVar m (Point blk)) + } + +-- No instance for 'StrictTMVar'; we can't use generics +instance NoUnexpectedThunks (Header blk) + => NoUnexpectedThunks (FutureBlockToAdd m blk) where + showTypeOf _ = "FutureBlockToAdd" + whnfNoUnexpectedThunks ctxt (FutureBlockToAdd hdr _tmvar) = + noUnexpectedThunks ctxt hdr + {------------------------------------------------------------------------------- Trace types -------------------------------------------------------------------------------} @@ -449,6 +535,10 @@ data TraceAddBlockEvent blk | IgnoreInvalidBlock (Point blk) (InvalidBlockReason blk) -- ^ A block that is know to be invalid was ignored. + | AddedBlockToQueue (Point blk) Word + -- ^ The block was added to the queue and will be added to the ChainDB by + -- the background thread. The size of the queue is included. + | BlockInTheFuture (Point blk) SlotNo -- ^ The block is from the future, i.e., its slot number is greater than -- the current slot (the second argument). diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Util/EarlyExit.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Util/EarlyExit.hs index bf774ab1e76..d68445f4699 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Util/EarlyExit.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Util/EarlyExit.hs @@ -26,7 +26,7 @@ import Control.Monad.Trans.Maybe import Data.Function (on) import Data.Proxy -import Cardano.Prelude (NoUnexpectedThunks(..)) +import Cardano.Prelude (NoUnexpectedThunks (..)) import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadFork @@ -37,7 +37,8 @@ import Control.Monad.Class.MonadTime import Control.Monad.Class.MonadTimer import Ouroboros.Consensus.Util ((.:)) -import Ouroboros.Consensus.Util.IOLike (IOLike, StrictTVar, StrictMVar) +import Ouroboros.Consensus.Util.IOLike (IOLike, + MonadSTMTxExtended (..), StrictMVar, StrictTVar) {------------------------------------------------------------------------------- Basic definitions @@ -113,6 +114,9 @@ instance MonadSTMTx stm => MonadSTMTx (WithEarlyExit stm) where isEmptyTBQueue = lift . isEmptyTBQueue isFullTBQueue = lift . isFullTBQueue +instance MonadSTMTxExtended stm => MonadSTMTxExtended (WithEarlyExit stm) where + lengthTBQueue = lift . lengthTBQueue + instance MonadSTM m => MonadSTM (WithEarlyExit m) where type STM (WithEarlyExit m) = WithEarlyExit (STM m) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Util/IOLike.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Util/IOLike.hs index 0dadf3c4efa..0c216237f44 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Util/IOLike.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Util/IOLike.hs @@ -13,6 +13,7 @@ module Ouroboros.Consensus.Util.IOLike ( , ExitCase(..) -- *** MonadSTM , module Ouroboros.Consensus.Util.MonadSTM.NormalForm + , MonadSTMTxExtended(..) -- *** MonadFork , MonadFork(..) -- TODO: Should we hide this in favour of MonadAsync? , MonadThread(..) @@ -20,6 +21,7 @@ module Ouroboros.Consensus.Util.IOLike ( , MonadAsyncSTM(..) , MonadAsync(..) , ExceptionInLinkedThread(..) + , link , linkTo -- *** MonadST , MonadST(..) @@ -36,7 +38,9 @@ module Ouroboros.Consensus.Util.IOLike ( , NoUnexpectedThunks(..) ) where -import Cardano.Prelude (NoUnexpectedThunks (..)) +import qualified Control.Concurrent.STM as IO + +import Cardano.Prelude (Natural, NoUnexpectedThunks (..)) import Control.Monad.Class.MonadAsync import Control.Monad.Class.MonadFork @@ -48,6 +52,20 @@ import Control.Monad.Class.MonadTimer import Ouroboros.Consensus.Util.MonadSTM.NormalForm import Ouroboros.Consensus.Util.Orphans () +{------------------------------------------------------------------------------- + MonadSTMTxExtended +-------------------------------------------------------------------------------} + +-- | Additional STM functionality +class MonadSTMTx stm => MonadSTMTxExtended stm where + -- 'lengthTBQueue' has only been added in stm-2.5.0.0, while io-sim-classes + -- supports older versions too. In consensus, we require stm >= 2.5, giving + -- us the real stm implementation. + lengthTBQueue :: TBQueue_ stm a -> stm Natural + +instance MonadSTMTxExtended IO.STM where + lengthTBQueue = IO.lengthTBQueue + {------------------------------------------------------------------------------- IOLike -------------------------------------------------------------------------------} @@ -62,6 +80,7 @@ class ( MonadAsync m , MonadCatch m , MonadMask m , MonadThrow (STM m) + , MonadSTMTxExtended (STM m) , forall a. NoUnexpectedThunks (m a) , forall a. NoUnexpectedThunks a => NoUnexpectedThunks (StrictTVar m a) , forall a. NoUnexpectedThunks a => NoUnexpectedThunks (StrictMVar m a) diff --git a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/AddBlock.hs b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/AddBlock.hs index ba89789abe6..13c332c46cb 100644 --- a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/AddBlock.hs +++ b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/AddBlock.hs @@ -287,6 +287,7 @@ mkArgs cfg initLedger tracer registry hashInfo , cdbTraceLedger = nullTracer , cdbRegistry = registry , cdbGcDelay = 0 + , cdbBlocksToAddSize = 2 } where addDummyBinaryInfo :: CBOR.Encoding -> BinaryInfo CBOR.Encoding diff --git a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Mock.hs b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Mock.hs index 6a218a686b0..2432e479ec3 100644 --- a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Mock.hs +++ b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Mock.hs @@ -119,7 +119,7 @@ openDB cfg initLedger btime = do void $ onSlotChange btime $ update_ . Model.advanceCurSlot cfg return ChainDB { - addBlock = update_ . Model.addBlock cfg + addBlockAsync = update . Model.addBlockPromise cfg , getCurrentChain = querySTM $ Model.lastK k getHeader , getCurrentLedger = querySTM $ Model.currentLedger , getBlockComponent = queryE .: Model.getBlockComponentByPoint diff --git a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Model.hs b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Model.hs index 8f74f34f7c2..ccf66c27410 100644 --- a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Model.hs +++ b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/Model.hs @@ -23,6 +23,7 @@ module Test.Ouroboros.Storage.ChainDB.Model ( , empty , addBlock , addBlocks + , addBlockPromise -- * Queries , currentChain , currentLedger @@ -103,14 +104,15 @@ import Ouroboros.Consensus.Protocol.Abstract import Ouroboros.Consensus.Protocol.MockChainSel import Ouroboros.Consensus.Util (repeatedly) import qualified Ouroboros.Consensus.Util.AnchoredFragment as Fragment +import Ouroboros.Consensus.Util.IOLike (MonadSTM) import Ouroboros.Consensus.Util.STM (Fingerprint (..), WithFingerprint (..)) -import Ouroboros.Consensus.Storage.ChainDB.API (BlockComponent (..), - ChainDB, ChainDbError (..), InvalidBlockReason (..), - IteratorResult (..), LedgerCursorFailure (..), - StreamFrom (..), StreamTo (..), UnknownRange (..), - validBounds) +import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise (..), + BlockComponent (..), ChainDB, ChainDbError (..), + InvalidBlockReason (..), IteratorResult (..), + LedgerCursorFailure (..), StreamFrom (..), StreamTo (..), + UnknownRange (..), validBounds) import Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel (olderThanK) type IteratorId = Int @@ -406,6 +408,22 @@ addBlocks :: (LedgerSupportsProtocol blk, ModelSupportsBlock blk) -> Model blk -> Model blk addBlocks cfg = repeatedly (addBlock cfg) +-- | Wrapper around 'addBlock' that returns an 'AddBlockPromise'. +addBlockPromise + :: forall m blk. (LedgerSupportsProtocol blk, ModelSupportsBlock blk, MonadSTM m) + => TopLevelConfig blk + -> blk + -> Model blk + -> (AddBlockPromise m blk, Model blk) +addBlockPromise cfg blk m = (result, m') + where + m' = addBlock cfg blk m + result = AddBlockPromise + { blockProcessed = return $ tipPoint m' + -- We currently cannot wait for future blocks + , chainSelectionPerformed = error "chainSelectionPerformed not supported" + } + {------------------------------------------------------------------------------- Iterators -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/StateMachine.hs b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/StateMachine.hs index 4d7f24ac6ba..c7f18992977 100644 --- a/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/StateMachine.hs +++ b/ouroboros-consensus/test-storage/Test/Ouroboros/Storage/ChainDB/StateMachine.hs @@ -304,8 +304,8 @@ run :: forall m blk. (IOLike m, HasHeader blk) -> Cmd blk (TestIterator m blk) (TestReader m blk) -> m (Success blk (TestIterator m blk) (TestReader m blk)) run chainDB@ChainDB{..} internal registry varCurSlot varNextId = \case - AddBlock blk -> Unit <$> (advanceAndAdd (blockSlot blk) blk) - AddFutureBlock blk s -> Unit <$> (advanceAndAdd s blk) + AddBlock blk -> Point <$> (advanceAndAdd (blockSlot blk) blk) + AddFutureBlock blk s -> Point <$> (advanceAndAdd s blk) GetCurrentChain -> Chain <$> atomically getCurrentChain GetCurrentLedger -> Ledger <$> atomically getCurrentLedger GetPastLedger pt -> MbLedger <$> getPastLedger chainDB pt @@ -343,7 +343,7 @@ run chainDB@ChainDB{..} internal registry varCurSlot varNextId = \case forM_ [prevCurSlot + 1..newCurSlot] $ \slot -> do atomically $ writeTVar varCurSlot slot intScheduledChainSelection internal slot - addBlock blk + addBlock chainDB blk giveWithEq :: a -> m (WithEq a) giveWithEq a = @@ -460,8 +460,8 @@ runPure :: forall blk. -> DBModel blk -> (Resp blk IteratorId ReaderId, DBModel blk) runPure cfg = \case - AddBlock blk -> ok Unit $ update_ (advanceAndAdd (blockSlot blk) blk) - AddFutureBlock blk s -> ok Unit $ update_ (advanceAndAdd s blk) + AddBlock blk -> ok Point $ update (advanceAndAdd (blockSlot blk) blk) + AddFutureBlock blk s -> ok Point $ update (advanceAndAdd s blk) GetCurrentChain -> ok Chain $ query (Model.lastK k getHeader) GetCurrentLedger -> ok Ledger $ query Model.currentLedger GetPastLedger pt -> ok MbLedger $ query (Model.getPastLedger cfg pt) @@ -486,8 +486,9 @@ runPure cfg = \case where k = configSecurityParam cfg - advanceAndAdd slot blk = - Model.addBlock cfg blk . Model.advanceCurSlot cfg slot + advanceAndAdd slot blk m = (Model.tipPoint m', m') + where + m' = Model.addBlock cfg blk $ Model.advanceCurSlot cfg slot m iter = either UnknownRange Iter mbGCedAllComponents = MbGCedAllComponents . MaybeGCedBlock False @@ -1304,9 +1305,13 @@ prop_sequential = forAllCommands smUnused Nothing $ \cmds -> QC.monadicIO $ do <*> uncheckedNewTVarM Mock.empty let args = mkArgs testCfg testInitExtLedger tracer threadRegistry varCurSlot fsVars (db, internal) <- QC.run $ openDBInternal args False + -- TODO use withAsync or registry + addBlockAsync <- QC.run $ async $ intAddBlockRunner internal + QC.run $ link addBlockAsync let sm' = sm db internal iteratorRegistry varCurSlot varNextId genBlk testCfg testInitExtLedger (hist, model, res) <- runCommands sm' cmds (realChain, realChain', trace, fses, remainingCleanups) <- QC.run $ do + cancel addBlockAsync trace <- getTrace open <- atomically $ isOpen db unless open $ intReopen internal False @@ -1489,6 +1494,7 @@ mkArgs cfg initLedger tracer registry varCurSlot , cdbTraceLedger = nullTracer , cdbRegistry = registry , cdbGcDelay = 0 + , cdbBlocksToAddSize = 2 } tests :: TestTree