Skip to content

Commit

Permalink
ChainDB: new approach to future blocks
Browse files Browse the repository at this point in the history
Previously, we knew the current slot and were able to tell that a block was
from the future by comparing the block's slot against the current slot. For
such blocks we would schedule a chain selection at the block's slot, which
would be performed by a background thread.

Now, we no longer know the current slot. Instead, we validate candidate chains
and use the resulting ledgers to call `CheckInFuture`, which returns the
headers in the candidate fragment that are from the future. We truncate these
headers from the fragment, record that they're from the future
(`cdbFutureBlocks`), and repeat chain selection without them. Headers that are
too far from the future, i.e., exceeding the max clock skew, are recorded as
invalid blocks (with `InFutureExceedsClockSkew` as the `InvalidBlockReason`).

For each new block we receive, we perform chain selection for all future
blocks before performing chain selection for the new block.

* Split off `CandidateSuffix` into a separate module and use it throughout
  chain selection instead of only partially. A `CandidateSuffix` is the
  number of headers to roll back the current chain + a fragment containing the
  new headers to add, like a diff w.r.t. the current chain. Previously, we
  converted such a `CandidateSuffix` to a `ChainAndLedger`, i.e., a fragment
  starting from the immutable tip (typically containing >= k headers) + a
  ledger matching the tip. Now, we stick to the `CandidateSuffix` until the
  end, when we actually install the candidate as the new chain by applying the
  diff. Also introduce `CandidateSuffixAndLedger` and use that instead of
  `ChainAndLedger` for the validated candidate. We still use `ChainAndLedger`
  for the current chain.

* Simplify `trySwitchTo` because there is no concurrency thanks to the queue
  introduced in #1709. Remove the obsolete trace message `ChainChangedInBg`.

* New trace messages:
  - `ChainSelectionForFutureBlock`
  - `CandidateContainsFutureBlocks`
  - `CandidateContainsFutureBlocksExceedingClockSkew`

* Remove `chainSelectionPerformed` from `AddBlockPromise` as it was not really
  used and complicated our new handling of blocks from the future.
  • Loading branch information
mrBliss committed Apr 29, 2020
1 parent 5ac49fa commit b234439
Show file tree
Hide file tree
Showing 12 changed files with 799 additions and 544 deletions.
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ library
Ouroboros.Consensus.Storage.ChainDB.Impl.Background
Ouroboros.Consensus.Storage.ChainDB.Impl.BlockCache
Ouroboros.Consensus.Storage.ChainDB.Impl.BlockComponent
Ouroboros.Consensus.Storage.ChainDB.Impl.CandidateSuffix
Ouroboros.Consensus.Storage.ChainDB.Impl.ChainSel
Ouroboros.Consensus.Storage.ChainDB.Impl.ImmDB
Ouroboros.Consensus.Storage.ChainDB.Impl.Iterator
Expand Down
4 changes: 2 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ forkBlockProduction maxBlockSizeOverride IS{..} BlockProduction{..} =

-- Add the block to the chain DB
result <- lift $ ChainDB.addBlockAsync chainDB newBlock
-- Block until we have performed chain selection for the block
curTip <- lift $ atomically $ ChainDB.chainSelectionPerformed result
-- Block until we have processed the block
curTip <- lift $ atomically $ ChainDB.blockProcessed result

-- Check whether we adopted our block
when (curTip /= blockPoint newBlock) $ do
Expand Down
34 changes: 17 additions & 17 deletions ouroboros-consensus/src/Ouroboros/Consensus/Storage/ChainDB/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ instance DB (ChainDB m blk) where
-------------------------------------------------------------------------------}

data AddBlockPromise m blk = AddBlockPromise
{ blockWrittenToDisk :: STM m Bool
{ blockWrittenToDisk :: STM m Bool
-- ^ Use this 'STM' transaction to wait until the block has been written
-- to disk.
--
Expand All @@ -355,30 +355,22 @@ data AddBlockPromise m blk = AddBlockPromise
-- NOTE: Even when the result is 'False', 'getIsFetched' might still
-- return 'True', e.g., the block was older than @k@, but it has been
-- downloaded and stored on disk before.
, blockProcessed :: STM m (Point blk)
, blockProcessed :: STM m (Point blk)
-- ^ Use this 'STM' transaction to wait until the block has been
-- processed: the block has been written to disk and chain selection has
-- been performed for the block, /unless/ the block's slot is in the
-- future.
-- been performed for the block, /unless/ the block is from the future.
--
-- The ChainDB's tip after chain selection is returned. When this tip
-- doesn't match the added block, it doesn't necessarily mean the block
-- wasn't adopted. We might have adopted a longer chain of which the
-- added block is a part, but not the tip.
--
-- NOTE: When the block's slot is in the future, chain selection for the
-- block won't be performed until the block's slot becomes the current
-- slot, which might take some time. For that reason, this transaction
-- will not wait for chain selection of a block from a future slot. It
-- will return the current tip of the ChainDB after writing the block to
-- disk. See 'chainSelectionPerformed' in case you /do/ want to wait.
, chainSelectionPerformed :: STM m (Point blk)
-- ^ Variant of 'blockProcessed' that waits until chain selection has
-- been performed for the block, even when the block's slot is in the
-- future. This can block for a long time.
--
-- In case the block's slot was not in the future, this is equivalent to
-- 'blockProcessed'.
-- NOTE: When the block is from the future, chain selection for the
-- block won't be performed until the block is no longer in the future,
-- which might take some time. For that reason, this transaction will
-- not wait for chain selection of a block from the future. It will
-- return the current tip of the ChainDB after writing the block to
-- disk.
}

-- | Add a block synchronously: wait until the block has been written to disk
Expand Down Expand Up @@ -661,6 +653,14 @@ data InvalidBlockReason blk
-- ^ The block occurs in a chain after block that was found to be invalid
-- by the ledger. The point and reason corresponding to the original
-- invalid block are stored.
| InFutureExceedsClockSkew !(RealPoint blk)
-- ^ The block's slot is in the future, exceeding the allowed clock skew.
--
-- Possible causes, order by decreasing likelihood:
--
-- 1. Our clock is behind (significantly more likely than the others)
-- 2. Their clock is ahead
-- 3. It's intentional, i.e., an attack
deriving (Eq, Show, Generic)

instance LedgerSupportsProtocol blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ openDBInternal args launchBgTasks = do
(Query.getAnyKnownBlock immDB volDB)
traceWith tracer $ TraceOpenEvent OpenedLgrDB

varInvalid <- newTVarM (WithFingerprint Map.empty (Fingerprint 0))
varInvalid <- newTVarM (WithFingerprint Map.empty (Fingerprint 0))
varFutureBlocks <- newTVarM Map.empty


chainAndLedger <- ChainSel.initialChainSelection
immDB
Expand All @@ -112,6 +114,8 @@ openDBInternal args launchBgTasks = do
tracer
(Args.cdbTopLevelConfig args)
varInvalid
varFutureBlocks
(Args.cdbCheckInFuture args)

let chain = VF.validatedFragment chainAndLedger
ledger = VF.validatedLedger chainAndLedger
Expand All @@ -125,7 +129,6 @@ openDBInternal args launchBgTasks = do
varNextReaderKey <- newTVarM (ReaderKey 0)
varCopyLock <- newMVar ()
varKillBgThreads <- newTVarM $ return ()
varFutureBlocks <- newTVarM Map.empty
blocksToAdd <- newBlocksToAdd (Args.cdbBlocksToAddSize args)

let env = CDB { cdbImmDB = immDB
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
-- | Intended for qualified import
--
-- > import Ouroboros.Consensus.Storage.ChainDB.Impl.CandidateSuffix
-- (CandidateSuffix (..), CandidateSuffixAndLedger)
-- > import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.CandidateSuffix as CS
module Ouroboros.Consensus.Storage.ChainDB.Impl.CandidateSuffix
( CandidateSuffix(CandidateSuffix)
, getRollback
, getSuffix
, getTip
, getAnchorPoint
, mkExtension
, mkRollback
, fitOn
, rollback
, intersect
, truncate
-- * With Ledger
, CandidateSuffixAndLedger
, getCandidateSuffix
, getLedger
, mkCandidateSuffixAndLedger
, toValidatedFragment
) where

import Prelude hiding (truncate)

import Control.Monad.Except (throwError)
import Data.Word (Word64)
import GHC.Stack (HasCallStack)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (HasHeader, HeaderHash, Point,
blockHash, castPoint)

import Ouroboros.Consensus.Block (Header)
import Ouroboros.Consensus.Fragment.Validated (ValidatedFragment)
import qualified Ouroboros.Consensus.Fragment.Validated as VF
import Ouroboros.Consensus.Ledger.Abstract (ledgerTipPoint)
import Ouroboros.Consensus.Ledger.SupportsProtocol
(LedgerSupportsProtocol)
import Ouroboros.Consensus.Util.Assert

import qualified Ouroboros.Consensus.Storage.ChainDB.Impl.LgrDB as LgrDB


-- | Auxiliary data type for 'cdbAddBlock' for a candidate suffix.
--
-- INVARIANT: the length of the suffix must always be >= the rollback
--
-- Note: we allow the suffix to be empty, even though it is rather pointless.
-- Allowing empty ones makes working with them easier: fewer cases to deal
-- with. Such candidates will be filtered out at the end using
-- 'preferAnchoredCandidate' anyway.
data CandidateSuffix blk = UnsafeCandidateSuffix
{ getRollback :: !Word64
-- ^ The number of headers to roll back the current chain
, getSuffix :: !(AnchoredFragment (Header blk))
-- ^ The new headers to add after rolling back the current chain.
}

pattern CandidateSuffix
:: Word64 -> AnchoredFragment (Header blk) -> CandidateSuffix blk
pattern CandidateSuffix r s <- UnsafeCandidateSuffix r s
{-# COMPLETE CandidateSuffix #-}

deriving instance (HasHeader blk, Eq (Header blk))
=> Eq (CandidateSuffix blk)
deriving instance (HasHeader blk, Show (Header blk))
=> Show (CandidateSuffix blk)

-- | Return the tip of the suffix
getTip :: HasHeader (Header blk) => CandidateSuffix blk -> Point blk
getTip = castPoint . AF.headPoint . getSuffix

-- | Return the anchor point of the suffix
getAnchorPoint :: CandidateSuffix blk -> Point blk
getAnchorPoint = castPoint . AF.anchorPoint . getSuffix

-- | Make an extension-only (no rollback) 'CandidateSuffix'.
mkExtension :: AnchoredFragment (Header blk) -> CandidateSuffix blk
mkExtension = UnsafeCandidateSuffix 0

-- | Return 'Nothing' if the length of the suffix < the rollback
mkRollback
:: HasHeader (Header blk)
=> Word64
-> AnchoredFragment (Header blk)
-> Maybe (CandidateSuffix blk)
mkRollback nbRollback suffix
| fromIntegral (AF.length suffix) >= nbRollback
= Just $ UnsafeCandidateSuffix nbRollback suffix
| otherwise
= Nothing

-- | Fit the candidate suffix on a chain after first rolling back the given
-- chain.
--
-- If the given chain is the current chain on which the candidate is based, a
-- 'Just' will be returned. The returned candidate fragment will have the same
-- anchor point as the given chain.
fitOn
:: HasHeader (Header blk)
=> AnchoredFragment (Header blk)
-> CandidateSuffix blk
-> Maybe (AnchoredFragment (Header blk))
fitOn curChain (CandidateSuffix nbRollback suffix) =
AF.join (AF.dropNewest (fromIntegral nbRollback) curChain) suffix

-- | Roll back the candidate suffix to the given point.
--
-- PRECONDITION: the given point must correspond to one of the new headers of
-- the candidate suffix ('csSuffix') or the anchor of 'csRollback' (i.e,
-- @'AF.withinFragmentBounds' pt csSuffix@).
--
-- If the length of the suffix rolled back to the given point is shorter than
-- the rollback ('csRollback'), 'Nothing' is returned.
rollback
:: (HasHeader (Header blk), HasCallStack, HasHeader blk)
=> Point blk
-> CandidateSuffix blk
-> Maybe (CandidateSuffix blk)
rollback pt (CandidateSuffix nbRollback suffix)
| Just suffix' <- AF.rollback (castPoint pt) suffix
= mkRollback nbRollback suffix'
| otherwise
= error $ "rollback point not on the candidate suffix: " <> show pt

-- | Calculate the candidate suffix of a fork of the current chain.
--
-- If the candidate fragment is shorter than the current chain, 'Nothing' is
-- returned (this would violate the invariant of 'CandidateSuffix').
--
-- PRECONDITION: the candidate fragment must intersect with the current chain
-- fragment.
intersect
:: (HasHeader (Header blk), HasCallStack)
=> AnchoredFragment (Header blk) -- ^ Current chain
-> AnchoredFragment (Header blk) -- ^ Candidate chain
-> Maybe (CandidateSuffix blk) -- ^ Candidate suffix
intersect curChain candChain =
case AF.intersect curChain candChain of
Just (_curChainPrefix, _candPrefix, curChainSuffix, candSuffix)
-> mkRollback
(fromIntegral (AF.length curChainSuffix))
candSuffix
-- Precondition violated.
_ -> error "candidate fragment doesn't intersect with current chain"

-- | Truncate any blocks matching the given predicate in the candidate suffix.
--
-- If the suffix becomes too short, return 'Nothing'.
truncate
:: HasHeader (Header blk)
=> (HeaderHash blk -> Bool) -- ^ Truncate when 'True'
-> CandidateSuffix blk
-> Maybe (CandidateSuffix blk)
truncate reject (CandidateSuffix nbRollback suffix) =
mkRollback nbRollback (AF.takeWhileOldest (not . reject . blockHash) suffix)

{-------------------------------------------------------------------------------
With Ledger
-------------------------------------------------------------------------------}

-- | A 'CandidateSuffix' along with the ledger state after validation.
--
-- INVARIANT:
--
-- > getTip candidateSuffix == ledgerTipPoint (LgrDB.ledgerDbCurrent ledger)
data CandidateSuffixAndLedger blk = UnsafeCandidateSuffixAndLedger
{ getCandidateSuffix :: CandidateSuffix blk
, getLedger :: LgrDB.LedgerDB blk
}

-- | Create a 'CandidateSuffixAndLedger'.
--
-- PRECONDITION:
--
-- > getTip candidateSuffix == ledgerTipPoint (LgrDB.ledgerDbCurrent ledger)
mkCandidateSuffixAndLedger
:: (LedgerSupportsProtocol blk, HasCallStack)
=> CandidateSuffix blk
-> LgrDB.LedgerDB blk
-> CandidateSuffixAndLedger blk
mkCandidateSuffixAndLedger candidateSuffix ledger =
assertWithMsg precondition $
UnsafeCandidateSuffixAndLedger candidateSuffix ledger
where
suffixTip = getTip candidateSuffix
ledgerTip = ledgerTipPoint (LgrDB.ledgerDbCurrent ledger)
precondition
| suffixTip == ledgerTip
= return ()
| otherwise
= throwError $
"tip of candidateSuffix doesn't match ledger: " <>
show suffixTip <> " /= " <> show ledgerTip

toValidatedFragment
:: (LedgerSupportsProtocol blk, HasCallStack)
=> CandidateSuffixAndLedger blk
-> ValidatedFragment blk (LgrDB.LedgerDB blk)
toValidatedFragment (UnsafeCandidateSuffixAndLedger cs l) =
VF.new (getSuffix cs) l
Loading

0 comments on commit b234439

Please sign in to comment.