Skip to content

Commit

Permalink
Merge #1709
Browse files Browse the repository at this point in the history
1709: ChainDB: add blocks asynchronously r=edsko a=mrBliss

Fixes #1463.

Instead of adding blocks synchronously, they are now put into a queue, after
which `addBlockAsync` returns an `AddBlockResult`, which can be used to wait
until the block has been processed.

A background thread will read the blocks from the queue and add them
synchronously to the ChainDB. The queue is limited in size; when it is full,
callers of `addBlockAsync` might still have to wait.

With this asynchronous approach, threads adding blocks asynchronously can be
killed without worries, the background thread processing the blocks
synchronously won't be killed. Only when the whole ChainDB shuts down will
that background thread get killed. But since there will be no more in-memory
state, it can't get out of sync with the file system state. On the next
startup, a correct in-memory state will be reconstructed from the file system
state.

By letting the BlockFetchClient add blocks asynchronously, we also get a
20-40% bulk chain sync speed-up in some microbenchmarks.

Co-authored-by: Thomas Winant <[email protected]>
  • Loading branch information
iohk-bors[bot] and mrBliss authored Feb 27, 2020
2 parents c6df49d + 6996372 commit 387ab3c
Show file tree
Hide file tree
Showing 20 changed files with 406 additions and 100 deletions.
17 changes: 17 additions & 0 deletions io-sim-classes/src/Control/Monad/Class/MonadSTM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ module Control.Monad.Class.MonadSTM
, writeTBQueueDefault
, isEmptyTBQueueDefault
, isFullTBQueueDefault
, lengthTBQueueDefault
) where

import Prelude hiding (read)
Expand Down Expand Up @@ -423,3 +424,19 @@ isFullTBQueueDefault (TBQueue rsize _read wsize _write _size) = do
if (r > 0)
then return False
else return True

-- 'lengthTBQueue' was added in stm-2.5.0.0, but since we support older
-- versions of stm, we don't include it as a method in the type class. If we
-- were to conditionally (@MIN_VERSION_stm(2,5,0)@) include the method in the
-- type class, the IO simulator would have to conditionally include the
-- method, requiring a dependency on the @stm@ package, which would be
-- strange.
--
-- Nevertheless, we already provide a default implementation. Downstream
-- packages that don't mind having a >= 2.5 constraint on stm can use this to
-- implement 'lengthTBQueue' for the IO simulator.
lengthTBQueueDefault :: MonadSTM m => TBQueueDefault m a -> STM m Natural
lengthTBQueueDefault (TBQueue rsize _read wsize _write size) = do
r <- readTVar rsize
w <- readTVar wsize
return $! size - r - w
3 changes: 3 additions & 0 deletions io-sim/src/Control/Monad/IOSim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
module Control.Monad.IOSim (
-- * Simulation monad
SimM,
SimSTM,
-- ** Run simulation
runSim,
runSimOrThrow,
Expand Down Expand Up @@ -140,6 +141,8 @@ data StmA s a where
Retry :: StmA s b
OrElse :: StmA s a -> StmA s a -> (a -> StmA s b) -> StmA s b

-- Exported type
type SimSTM = STM

data MaskingState = Unmasked | MaskedInterruptible | MaskedUninterruptible
deriving (Eq, Ord, Show)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ instance RunNode DualByronBlock where
tip <- atomically $ ChainDB.getTipPoint chainDB
case tip of
BlockPoint {} -> return () -- Chain is not empty
GenesisPoint -> ChainDB.addBlock chainDB genesisEBB
GenesisPoint -> ChainDB.addBlock_ chainDB genesisEBB
where
genesisEBB :: DualByronBlock
genesisEBB = DualBlock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ instance RunNode ByronBlock where
case tip of
-- Chain is not empty
BlockPoint {} -> return ()
GenesisPoint -> ChainDB.addBlock chainDB genesisEBB
GenesisPoint -> ChainDB.addBlock_ chainDB genesisEBB
where
genesisEBB = forgeEBB cfg (SlotNo 0) (BlockNo 0) GenesisHash

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ runThreadNetwork ThreadNetworkArgs
pure (mSlot, fromWithOrigin (firstBlockNo (Proxy @blk)) bno, pointHash p)
when (prevSlot < At ebbSlotNo) $ do
let ebb = forgeEBB cfg ebbSlotNo ebbBlockNo prevHash
ChainDB.addBlock chainDB ebb
ChainDB.addBlock_ chainDB ebb

go (succ epoch)

Expand Down Expand Up @@ -551,6 +551,7 @@ runThreadNetwork ThreadNetworkArgs
, cdbTraceLedger = nullDebugTracer
, cdbRegistry = registry
, cdbGcDelay = 0
, cdbBlocksToAddSize = 2
}
where
-- prop_general relies on this tracer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
{-# LANGUAGE FlexibleInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Test.Util.Orphans.IOLike () where

import Control.Monad.Class.MonadSTM (lengthTBQueueDefault)
import Control.Monad.IOSim
import Ouroboros.Consensus.Util.IOLike
import Test.Util.Orphans.NoUnexpectedThunks ()

instance MonadSTMTxExtended (SimSTM s) where
lengthTBQueue = lengthTBQueueDefault

instance IOLike (SimM s)
2 changes: 1 addition & 1 deletion ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ library
, network >=3.1 && <3.2
, psqueues >=0.2.3 && <0.3
, serialise >=0.2 && <0.3
, stm
, stm >=2.5 && <2.6
, streaming
, text >=1.2 && <1.3
, time
Expand Down
16 changes: 6 additions & 10 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ initBlockFetchConsensusInterface cfg chainDB getCandidates blockFetchSize
readFetchedBlocks :: STM m (Point blk -> Bool)
readFetchedBlocks = ChainDB.getIsFetched chainDB

-- Asynchronous: doesn't wait until the block has been written to disk or
-- processed.
addFetchedBlock :: Point blk -> blk -> m ()
addFetchedBlock _pt = ChainDB.addBlock chainDB
addFetchedBlock _pt = void . ChainDB.addBlockAsync chainDB

readFetchedMaxSlotNo :: STM m MaxSlotNo
readFetchedMaxSlotNo = ChainDB.getMaxSlotNo chainDB
Expand Down Expand Up @@ -455,17 +457,11 @@ forkBlockProduction maxBlockSizeOverride IS{..} BlockProduction{..} =
(snapshotMempoolSize mempoolSnapshot)

-- Add the block to the chain DB
lift $ ChainDB.addBlock chainDB newBlock
result <- lift $ ChainDB.addBlockAsync chainDB newBlock
-- Block until we have performed chain selection for the block
curTip <- lift $ atomically $ ChainDB.chainSelectionPerformed result

-- Check whether we adopted our block
--
-- addBlock is synchronous, so when it returns the block we produced
-- will have been considered and possibly (probably) adopted
--
-- TODO: This is wrong. Additional blocks may have been added to the
-- chain since.
-- <https://github.com/input-output-hk/ouroboros-network/issues/1463>
curTip <- lift $ atomically $ ChainDB.getTipPoint chainDB
when (curTip /= blockPoint newBlock) $ do
isInvalid <- lift $ atomically $
($ blockHash newBlock) . forgetFingerprint <$>
Expand Down
62 changes: 57 additions & 5 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ module Ouroboros.Consensus.Storage.ChainDB.API (
ChainDB(..)
, getCurrentTip
, getTipBlockNo
-- * Adding a block
, AddBlockPromise(..)
, addBlock
, addBlock_
-- * Useful utilities
, getBlock
, streamBlocks
Expand Down Expand Up @@ -61,6 +65,7 @@ module Ouroboros.Consensus.Storage.ChainDB.API (

import qualified Codec.CBOR.Read as CBOR
import Control.Exception (Exception (..))
import Control.Monad (void)
import qualified Data.ByteString.Lazy as Lazy
import Data.Typeable
import GHC.Generics (Generic)
Expand All @@ -80,6 +85,7 @@ import Ouroboros.Network.Point (WithOrigin)
import Ouroboros.Consensus.Block (GetHeader (..), IsEBB (..))
import Ouroboros.Consensus.Ledger.Extended
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Util ((.:))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (WithFingerprint)
Expand Down Expand Up @@ -125,9 +131,12 @@ data ChainDB m blk = ChainDB {
-- part of the chain if there are other chains available that are
-- preferred by the consensus algorithm (typically, longer chains).
--
-- This function triggers chain selection (if necessary) and terminates
-- after chain selection is done.
addBlock :: blk -> m ()
-- This function typically returns immediately, yielding a
-- 'AddBlockPromise' which can be used to wait for the result. You can
-- use 'addBlock' to add the block synchronously.
--
-- NOTE: back pressure can be applied when overloaded.
addBlockAsync :: blk -> m (AddBlockPromise m blk)

-- | Get the current chain fragment
--
Expand Down Expand Up @@ -318,6 +327,49 @@ instance DB (ChainDB m blk) where
type DBHeader (ChainDB m blk) = m (Header blk)
type DBHeaderHash (ChainDB m blk) = HeaderHash blk

{-------------------------------------------------------------------------------
Adding a block
-------------------------------------------------------------------------------}

data AddBlockPromise m blk = AddBlockPromise
{ blockProcessed :: STM m (Point blk)
-- ^ Use this 'STM' transaction to wait until the block has been
-- processed: the block has been written to disk and chain selection has
-- been performed for the block, /unless/ the block's slot is in the
-- future.
--
-- The ChainDB's tip after chain selection is returned. When this tip
-- doesn't match the added block, it doesn't necessarily mean the block
-- wasn't adopted. We might have adopted a longer chain of which the
-- added block is a part, but not the tip.
--
-- NOTE: When the block's slot is in the future, chain selection for the
-- block won't be performed until the block's slot becomes the current
-- slot, which might take some time. For that reason, this transaction
-- will not wait for chain selection of a block from a future slot. It
-- will return the current tip of the ChainDB after writing the block to
-- disk. See 'chainSelectionPerformed' in case you /do/ want to wait.
, chainSelectionPerformed :: STM m (Point blk)
-- ^ Variant of 'blockProcessed' that waits until chain selection has
-- been performed for the block, even when the block's slot is in the
-- future. This can block for a long time.
--
-- In case the block's slot was not in the future, this is equivalent to
-- 'blockProcessed'.
}

-- | Add a block synchronously: wait until the block has been processed (see
-- 'blockProcessed'). The new tip of the ChainDB is returned.
addBlock :: IOLike m => ChainDB m blk -> blk -> m (Point blk)
addBlock chainDB blk = do
promise <- addBlockAsync chainDB blk
atomically $ blockProcessed promise

-- | Add a block synchronously. Variant of 'addBlock' that doesn't return the
-- new tip of the ChainDB.
addBlock_ :: IOLike m => ChainDB m blk -> blk -> m ()
addBlock_ = void .: addBlock

{-------------------------------------------------------------------------------
Useful utilities
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -396,13 +448,13 @@ toChain chainDB = withRegistry $ \registry ->
IteratorBlockGCed _ ->
error "block on the current chain was garbage-collected"

fromChain :: forall m blk. Monad m
fromChain :: forall m blk. IOLike m
=> m (ChainDB m blk)
-> Chain blk
-> m (ChainDB m blk)
fromChain openDB chain = do
chainDB <- openDB
mapM_ (addBlock chainDB) $ Chain.toOldestFirst chain
mapM_ (addBlock_ chainDB) $ Chain.toOldestFirst chain
return chainDB

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ openDBInternal args launchBgTasks = do
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarM $ return ()
varFutureBlocks <- newTVarM Map.empty
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)

let env = CDB { cdbImmDB = immDB
, cdbVolDB = volDB
Expand All @@ -147,11 +148,12 @@ openDBInternal args launchBgTasks = do
, cdbIsEBB = toIsEBB . isJust . Args.cdbIsEBB args
, cdbCheckIntegrity = Args.cdbCheckIntegrity args
, cdbBlockchainTime = Args.cdbBlockchainTime args
, cdbBlocksToAdd = blocksToAdd
, cdbFutureBlocks = varFutureBlocks
}
h <- fmap CDBHandle $ newTVarM $ ChainDbOpen env
let chainDB = ChainDB
{ addBlock = getEnv1 h ChainSel.addBlock
{ addBlockAsync = getEnv1 h ChainSel.addBlockAsync
, getCurrentChain = getEnvSTM h Query.getCurrentChain
, getCurrentLedger = getEnvSTM h Query.getCurrentLedger
, getTipBlock = getEnv h Query.getTipBlock
Expand All @@ -176,6 +178,7 @@ openDBInternal args launchBgTasks = do
, intGarbageCollect = getEnv1 h Background.garbageCollect
, intUpdateLedgerSnapshots = getEnv h Background.updateLedgerSnapshots
, intScheduledChainSelection = getEnv1 h Background.scheduledChainSelection
, intAddBlockRunner = getEnv h Background.addBlockRunner
, intKillBgThreads = varKillBgThreads
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,24 @@ data ChainDbArgs m blk = forall h1 h2 h3. ChainDbArgs {
, cdbTraceLedger :: Tracer m (LgrDB.LedgerDB blk)
, cdbRegistry :: ResourceRegistry m
, cdbGcDelay :: DiffTime
, cdbBlocksToAddSize :: Word
-- ^ Size of the queue used to store asynchronously added blocks. This
-- is the maximum number of blocks that could be kept in memory at the
-- same time when the background thread processing the blocks can't keep
-- up.
}

-- | Arguments specific to the ChainDB, not to the ImmutableDB, VolatileDB, or
-- LedgerDB.
data ChainDbSpecificArgs m blk = ChainDbSpecificArgs {
cdbsTracer :: Tracer m (TraceEvent blk)
, cdbsRegistry :: ResourceRegistry m
cdbsTracer :: Tracer m (TraceEvent blk)
, cdbsRegistry :: ResourceRegistry m
-- ^ TODO: the ImmutableDB takes a 'ResourceRegistry' too, but we're
-- using it for ChainDB-specific things. Revisit these arguments.
, cdbsGcDelay :: DiffTime
, cdbsBlockchainTime :: BlockchainTime m
, cdbsEncodeHeader :: Header blk -> Encoding
, cdbsGcDelay :: DiffTime
, cdbsBlockchainTime :: BlockchainTime m
, cdbsEncodeHeader :: Header blk -> Encoding
, cdbsBlocksToAddSize :: Word
}

-- | Default arguments
Expand All @@ -128,12 +134,13 @@ data ChainDbSpecificArgs m blk = ChainDbSpecificArgs {
-- * 'cdbsEncodeHeader'
defaultSpecificArgs :: ChainDbSpecificArgs m blk
defaultSpecificArgs = ChainDbSpecificArgs{
cdbsGcDelay = oneHour
cdbsGcDelay = oneHour
, cdbsBlocksToAddSize = 10
-- Fields without a default
, cdbsTracer = error "no default for cdbsTracer"
, cdbsRegistry = error "no default for cdbsRegistry"
, cdbsBlockchainTime = error "no default for cdbsBlockchainTime"
, cdbsEncodeHeader = error "no default for cdbsEncodeHeader"
, cdbsTracer = error "no default for cdbsTracer"
, cdbsRegistry = error "no default for cdbsRegistry"
, cdbsBlockchainTime = error "no default for cdbsBlockchainTime"
, cdbsEncodeHeader = error "no default for cdbsEncodeHeader"
}
where
oneHour = secondsToDiffTime 60 * 60
Expand Down Expand Up @@ -215,6 +222,7 @@ fromChainDbArgs ChainDbArgs{..} = (
, cdbsGcDelay = cdbGcDelay
, cdbsBlockchainTime = cdbBlockchainTime
, cdbsEncodeHeader = cdbEncodeHeader
, cdbsBlocksToAddSize = cdbBlocksToAddSize
}
)

Expand Down Expand Up @@ -270,4 +278,5 @@ toChainDbArgs ImmDB.ImmDbArgs{..}
, cdbTraceLedger = lgrTraceLedger
, cdbRegistry = immRegistry
, cdbGcDelay = cdbsGcDelay
, cdbBlocksToAddSize = cdbsBlocksToAddSize
}
Loading

0 comments on commit 387ab3c

Please sign in to comment.