Skip to content

Commit

Permalink
Merge #1518
Browse files Browse the repository at this point in the history
1518: Better ledger DB policy r=mrBliss a=edsko

Closes #1456
Closes #1264

Co-authored-by: Edsko de Vries <[email protected]>
  • Loading branch information
iohk-bors[bot] and edsko authored Jan 29, 2020
2 parents 65085e3 + 67ec998 commit 86941cb
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 126 deletions.
4 changes: 2 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ openDBInternal args launchBgTasks = do
(Args.cdbEpochInfo args)
immDbTipPoint
(contramap TraceLedgerReplayEvent tracer)
lgrDB <- LgrDB.openDB argsLgrDb
(lgrDB, replayed) <- LgrDB.openDB argsLgrDb
lgrReplayTracer
immDB
(Query.getAnyKnownBlock immDB volDB)
Expand Down Expand Up @@ -198,7 +198,7 @@ openDBInternal args launchBgTasks = do
, _chainTip = castPoint $ AF.headPoint chain
}

when launchBgTasks $ Background.launchBgTasks env
when launchBgTasks $ Background.launchBgTasks env replayed

return (chainDB, testing)
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ data ChainDbArgs m blk = forall h1 h2 h3. ChainDbArgs {
, cdbValidation :: ImmDB.ValidationPolicy
, cdbBlocksPerFile :: Int
, cdbParamsLgrDB :: LgrDB.LedgerDbParams
, cdbDiskPolicy :: LgrDB.DiskPolicy m
, cdbDiskPolicy :: LgrDB.DiskPolicy

-- Integration
, cdbNodeConfig :: NodeConfig (BlockProtocol blk)
Expand Down
129 changes: 79 additions & 50 deletions ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/Background.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ module Ouroboros.Storage.ChainDB.Impl.Background
launchBgTasks
-- * Copying blocks from the VolatileDB to the ImmutableDB
, copyToImmDB
, copyToImmDBRunner
, copyAndSnapshotRunner
, updateLedgerSnapshots
-- * Executing garbage collection
, garbageCollect
-- * Scheduling garbage collections
Expand All @@ -25,8 +26,6 @@ module Ouroboros.Storage.ChainDB.Impl.Background
, scheduleGC
, gcScheduleRunner
-- * Taking and trimming ledger snapshots
, updateLedgerSnapshots
, updateLedgerSnapshotsRunner
-- * Executing scheduled chain selections
, scheduledChainSelection
, scheduledChainSelectionRunner
Expand All @@ -38,9 +37,11 @@ import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Void (Void)
import Data.Word
import GHC.Stack (HasCallStack)

import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Tracer

import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..))
Expand Down Expand Up @@ -71,18 +72,17 @@ import qualified Ouroboros.Storage.ChainDB.Impl.VolDB as VolDB
launchBgTasks
:: forall m blk. (IOLike m, ProtocolLedgerView blk)
=> ChainDbEnv m blk
-> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
-> m ()
launchBgTasks cdb@CDB{..} = do
launchBgTasks cdb@CDB{..} replayed = do
gcSchedule <- newGcSchedule
!gcThread <- launch $
!gcThread <- launch $
gcScheduleRunner gcSchedule $ garbageCollect cdb
!copyThread <- launch $
copyToImmDBRunner cdb gcSchedule
!lgrSnapshotThread <- launch $
updateLedgerSnapshotsRunner cdb
!copyAndSnapshotThread <- launch $
copyAndSnapshotRunner cdb gcSchedule replayed
!chainSyncThread <- scheduledChainSelectionRunner cdb
atomically $ writeTVar cdbKillBgThreads $
sequence_ [gcThread, copyThread, lgrSnapshotThread, chainSyncThread]
sequence_ [gcThread, copyAndSnapshotThread, chainSyncThread]
where
launch :: m Void -> m (m ())
launch = fmap cancelThread . forkLinkedThread cdbRegistry
Expand Down Expand Up @@ -186,12 +186,29 @@ copyToImmDB CDB{..} = withCopyLock $ do
mustBeUnlocked = fromMaybe
$ error "copyToImmDB running concurrently with itself"

-- | Watches the current chain for changes. Whenever the chain is longer than
-- @k@, then the headers older than @k@ are copied from the VolatileDB to the
-- ImmutableDB (with 'copyToImmDB'). Afterwards, a garbage collection of the
-- VolatileDB is scheduled using ('scheduleGC') for the 'SlotNo' of the most
-- recent block that was copied.
copyToImmDBRunner
-- | Copy blocks from the VolDB to ImmDB and take snapshots of the LgrDB
--
-- We watch the chain for changes. Whenever the chain is longer than @k@, then
-- the headers older than @k@ are copied from the VolDB to the ImmDB (using
-- 'copyToImmDB'). Once that is complete,
--
-- * We periodically take a snapshot of the LgrDB (depending on its config).
-- NOTE: This implies we do not take a snapshot of the LgrDB if the chain
-- hasn't changed, irrespective of the LgrDB policy.
-- * Schedule GC of the VolDB ('scheduleGC') for the 'SlotNo' of the most
-- recent block that was copied.
--
-- It is important that we only take LgrDB snapshots when are are /sure/ they
-- have been copied to the ImmDB, since the LgrDB assumes that all snapshots
-- correspond to immutable blocks. (Of course, data corruption can occur and we
-- can handle it by reverting to an older LgrDB snapshot, but we should need
-- this only in exceptional circumstances.)
--
-- We do not store any state of the VolDB GC. If the node shuts down before GC
-- can happen, when we restart the node and schedule the /next/ GC, it will
-- /imply/ any previously scheduled GC, since GC is driven by slot number
-- ("garbage collect anything older than @x@").
copyAndSnapshotRunner
:: forall m blk.
( IOLike m
, OuroborosTag (BlockProtocol blk)
Expand All @@ -201,18 +218,54 @@ copyToImmDBRunner
)
=> ChainDbEnv m blk
-> GcSchedule m
-> Word64 -- ^ Number of immutable blocks replayed on ledger DB startup
-> m Void
copyToImmDBRunner cdb@CDB{..} gcSchedule = forever $ do
atomically $ do
curChain <- readTVar cdbChain
check $ fromIntegral (AF.length curChain) > k

mSlotNo <- copyToImmDB cdb
case mSlotNo of
Origin -> pure ()
At slotNo -> scheduleGC (contramap TraceGCEvent cdbTracer) slotNo cdbGcDelay gcSchedule
copyAndSnapshotRunner cdb@CDB{..} gcSchedule =
loop Nothing
where
SecurityParam k = protocolSecurityParam cdbNodeConfig
SecurityParam k = protocolSecurityParam cdbNodeConfig
LgrDB.DiskPolicy{..} = LgrDB.getDiskPolicy cdbLgrDB

loop :: Maybe Time -> Word64 -> m Void
loop mPrevSnapshot distance = do
-- Wait for the chain to grow larger than @k@
numToWrite <- atomically $ do
curChain <- readTVar cdbChain
check $ fromIntegral (AF.length curChain) > k
return $ fromIntegral (AF.length curChain) - k

-- Copy blocks to imm DB
--
-- This is a synchronous operation: when it returns, the blocks have been
-- copied to disk (though not flushed, necessarily).
copyToImmDB cdb >>= scheduleGC'

now <- getMonotonicTime
let distance' = distance + numToWrite
elapsed = (\prev -> now `diffTime` prev) <$> mPrevSnapshot

if onDiskShouldTakeSnapshot elapsed distance' then do
updateLedgerSnapshots cdb
loop (Just now) 0
else
loop mPrevSnapshot distance'

scheduleGC' :: WithOrigin SlotNo -> m ()
scheduleGC' Origin = return ()
scheduleGC' (At slotNo) =
scheduleGC
(contramap TraceGCEvent cdbTracer)
slotNo
cdbGcDelay
gcSchedule

-- | Write a snapshot of the LedgerDB to disk and remove old snapshots
-- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk.
updateLedgerSnapshots :: IOLike m => ChainDbEnv m blk -> m ()
updateLedgerSnapshots CDB{..} = do
-- TODO avoid taking multiple snapshots corresponding to the same tip.
void $ LgrDB.takeSnapshot cdbLgrDB
void $ LgrDB.trimSnapshots cdbLgrDB

{-------------------------------------------------------------------------------
Executing garbage collection
Expand Down Expand Up @@ -313,30 +366,6 @@ gcScheduleRunner (GcSchedule queue) runGc = forever $ do
-- Garbage collection is called synchronously
runGc slotNo

{-------------------------------------------------------------------------------
Taking and trimming ledger snapshots
-------------------------------------------------------------------------------}

-- | Write a snapshot of the LedgerDB to disk and remove old snapshots
-- (typically one) so that only 'onDiskNumSnapshots' snapshots are on disk.
updateLedgerSnapshots :: IOLike m => ChainDbEnv m blk -> m ()
updateLedgerSnapshots CDB{..} = do
-- TODO avoid taking multiple snapshots corresponding to the same tip.
void $ LgrDB.takeSnapshot cdbLgrDB
void $ LgrDB.trimSnapshots cdbLgrDB

-- | Execute 'updateLedgerSnapshots', wait 'onDiskWriteInterval', and repeat.
updateLedgerSnapshotsRunner :: IOLike m => ChainDbEnv m blk -> m Void
updateLedgerSnapshotsRunner cdb@CDB{..} = loop
where
LgrDB.DiskPolicy{..} = LgrDB.getDiskPolicy cdbLgrDB

loop = updateLedgerSnapshots cdb >> waitInterval >> loop

waitInterval = do
interval <- atomically onDiskWriteInterval
threadDelay interval

{-------------------------------------------------------------------------------
Executing scheduled chain selections
-------------------------------------------------------------------------------}
Expand Down
40 changes: 25 additions & 15 deletions ouroboros-consensus/src/Ouroboros/Storage/ChainDB/Impl/LgrDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ data LgrDbArgs m blk = forall h. LgrDbArgs {
, lgrEncodeChainState :: ChainState (BlockProtocol blk) -> Encoding
, lgrEncodeHash :: HeaderHash blk -> Encoding
, lgrParams :: LedgerDbParams
, lgrDiskPolicy :: DiskPolicy m
, lgrDiskPolicy :: DiskPolicy
, lgrGenesis :: m (ExtLedgerState blk)
, lgrTracer :: Tracer m (TraceEvent (Point blk))
, lgrTraceLedger :: Tracer m (LedgerDB blk)
Expand Down Expand Up @@ -190,6 +190,9 @@ defaultArgs fp = LgrDbArgs {
}

-- | Open the ledger DB
--
-- In addition to the ledger DB also returns the number of immutable blocks
-- that were replayed.
openDB :: forall m blk. (IOLike m, ProtocolLedgerView blk)
=> LgrDbArgs m blk
-- ^ Stateless initializaton arguments
Expand All @@ -208,18 +211,21 @@ openDB :: forall m blk. (IOLike m, ProtocolLedgerView blk)
--
-- The block may be in the immutable DB or in the volatile DB; the ledger
-- DB does not know where the boundary is at any given point.
-> m (LgrDB m blk)
-> m (LgrDB m blk, Word64)
openDB args@LgrDbArgs{..} replayTracer immDB getBlock = do
createDirectoryIfMissing lgrHasFS True (mkFsPath [])
db <- initFromDisk args replayTracer lgrDbConf immDB
(db, replayed) <- initFromDisk args replayTracer lgrDbConf immDB
(varDB, varPrevApplied) <-
(,) <$> newTVarM db <*> newTVarM Set.empty
return LgrDB {
conf = lgrDbConf
, varDB = varDB
, varPrevApplied = varPrevApplied
, args = args
}
return (
LgrDB {
conf = lgrDbConf
, varDB = varDB
, varPrevApplied = varPrevApplied
, args = args
}
, replayed
)
where
apply :: blk
-> ExtLedgerState blk
Expand All @@ -240,23 +246,27 @@ openDB args@LgrDbArgs{..} replayTracer immDB getBlock = do
, ldbConfResolve = getBlock
}

-- | Reopen the ledger DB
--
-- Returns the number of immutable blocks replayed.
reopen :: (IOLike m, ProtocolLedgerView blk, HasCallStack)
=> LgrDB m blk
-> ImmDB m blk
-> Tracer m (TraceReplayEvent (Point blk) () (Point blk))
-> m ()
-> m Word64
reopen LgrDB{..} immDB replayTracer = do
db <- initFromDisk args replayTracer conf immDB
(db, replayed) <- initFromDisk args replayTracer conf immDB
atomically $ writeTVar varDB db
return replayed

initFromDisk :: (IOLike m, HasHeader blk, HasCallStack)
=> LgrDbArgs m blk
-> Tracer m (TraceReplayEvent (Point blk) () (Point blk))
-> Conf m blk
-> ImmDB m blk
-> m (LedgerDB blk)
-> m (LedgerDB blk, Word64)
initFromDisk args@LgrDbArgs{..} replayTracer lgrDbConf immDB = wrapFailure args $ do
(_initLog, db) <-
(_initLog, db, replayed) <-
LedgerDB.initLedgerDB
replayTracer
lgrTracer
Expand All @@ -266,7 +276,7 @@ initFromDisk args@LgrDbArgs{..} replayTracer lgrDbConf immDB = wrapFailure args
lgrParams
lgrDbConf
(streamAPI immDB)
return db
return (db, replayed)

{-------------------------------------------------------------------------------
TraceReplayEvent decorator
Expand Down Expand Up @@ -347,7 +357,7 @@ trimSnapshots :: MonadThrow m => LgrDB m blk -> m [DiskSnapshot]
trimSnapshots LgrDB{ args = args@LgrDbArgs{..} } = wrapFailure args $
LedgerDB.trimSnapshots lgrTracer lgrHasFS lgrDiskPolicy

getDiskPolicy :: LgrDB m blk -> DiskPolicy m
getDiskPolicy :: LgrDB m blk -> DiskPolicy
getDiskPolicy LgrDB{ args = LgrDbArgs{..} } = lgrDiskPolicy

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ closeDB (CDBHandle varState) = do
killBgThreads <- atomically $ readTVar cdbKillBgThreads
killBgThreads

-- Write a 'LedgerDB' snapshot so that we don't have to replay too many
-- blocks when restarting
Background.updateLedgerSnapshots cdb
ImmDB.closeDB cdbImmDB
VolDB.closeDB cdbVolDB

Expand Down Expand Up @@ -144,7 +141,7 @@ reopen (CDBHandle varState) launchBgTasks = do
cdbEpochInfo
immDbTipPoint
(contramap TraceLedgerReplayEvent cdbTracer)
LgrDB.reopen cdbLgrDB cdbImmDB lgrReplayTracer
replayed <- LgrDB.reopen cdbLgrDB cdbImmDB lgrReplayTracer
traceWith cdbTracer $ TraceOpenEvent OpenedLgrDB

curSlot <- atomically $ getCurrentSlot cdbBlockchainTime
Expand Down Expand Up @@ -173,4 +170,4 @@ reopen (CDBHandle varState) launchBgTasks = do
, _chainTip = castPoint $ AF.headPoint chain
}

when launchBgTasks $ Background.launchBgTasks env
when launchBgTasks $ Background.launchBgTasks env replayed
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ data ChainDbEnv m blk = CDB
-- ^ Contains the current chain fragment.
--
-- INVARIANT: the anchor point of this fragment is the tip of the
-- ImmutableDB.
-- ImmutableDB. This implies that this fragment never contains any blocks
-- that are stored in the immutable DB.
--
-- Note that this fragment might be shorter than @k@ headers when the
-- whole chain is shorter than @k@ or in case of corruption of the
Expand Down
Loading

0 comments on commit 86941cb

Please sign in to comment.