From 6e409ac8dcb221d4670372a8b57969d6ed7e56b9 Mon Sep 17 00:00:00 2001 From: Edmund Noble Date: Wed, 27 Mar 2024 18:24:08 -0400 Subject: [PATCH] feature: miner continues blocks periodically Makes the mining loop use ContinueBlock to periodically "refresh" blocks, ensuring that they use all of the transactions in the mempool if possible. Test plan will be to put this on a testnet mining node. Change-Id: I25e1ab240e4522d7e56812ff170d0aa849125abe --- src/Chainweb/Chainweb/MinerResources.hs | 62 ++++++++++++++++++++----- src/Chainweb/Miner/Config.hs | 15 +++++- src/Chainweb/Miner/Coordinator.hs | 49 ++++++++++--------- src/Chainweb/WebPactExecutionService.hs | 6 +-- test/Chainweb/Test/Orphans/Internal.hs | 7 ++- test/Chainweb/Test/Pact/Utils.hs | 2 +- 6 files changed, 98 insertions(+), 43 deletions(-) diff --git a/src/Chainweb/Chainweb/MinerResources.hs b/src/Chainweb/Chainweb/MinerResources.hs index 9c8533bddb..bdd237cb8f 100644 --- a/src/Chainweb/Chainweb/MinerResources.hs +++ b/src/Chainweb/Chainweb/MinerResources.hs @@ -5,6 +5,7 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} -- | @@ -55,7 +56,7 @@ import Chainweb.ChainId import Chainweb.Chainweb.ChainResources import Chainweb.Cut (_cutMap) import Chainweb.CutDB (CutDb, awaitNewBlock, cutDbPactService, _cut) -import Chainweb.Logger (Logger, logFunction) +import Chainweb.Logger import Chainweb.Miner.Config import Chainweb.Miner.Coordinator import Chainweb.Miner.Miners @@ -65,7 +66,7 @@ import Chainweb.Pact.Utils import Chainweb.Payload import Chainweb.Payload.PayloadStore import Chainweb.Sync.WebBlockHeaderStore -import Chainweb.Time (Micros, Time, minute, getCurrentTimeIntegral, scaleTimeSpan) +import Chainweb.Time import Chainweb.Utils import Chainweb.Version import Chainweb.WebPactExecutionService @@ -97,7 +98,7 @@ withMiningCoordination logger conf cdb inner in fmap ((mid,) . HM.fromList) $ forM cids $ \cid -> do let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut)) - newBlock <- getPayload (ParentHeader bh) cid miner + newBlock <- getPayload cid miner (ParentHeader bh) return (cid, Just newBlock) m <- newTVarIO initialPw @@ -145,19 +146,55 @@ withMiningCoordination logger conf cdb inner -- we assume that this path always exists in PrimedWork and never delete it. ourMiner :: Traversal' PrimedWork (Maybe NewBlock) ourMiner = _Wrapped' . ix (view minerId miner) . ix cid - let !nb = pw ^?! ourMiner . _Just - let ph = newBlockParentHeader nb - -- wait for a block different from what we've got primed work for - new <- awaitNewBlock cdb cid (_parentHeader ph) + let !outdatedPayload = pw ^?! ourMiner . _Just + let ParentHeader outdatedParent = newBlockParentHeader outdatedPayload + let + periodicallyRefreshPayload = do + let delay = + timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf + threadDelay (fromIntegral @Micros @Int delay) + when (not $ v ^. versionCheats . disablePact) $ do + continuableBlockInProgress <- atomically $ do + primed <- readTVar tpw <&> (^?! (ourMiner . _Just)) + case primed of + NewBlockInProgress bip -> return bip + NewBlockPayload {} -> + error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed" + maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress + -- if continuing returns Nothing then the parent header + -- isn't available in the checkpointer right now. + -- in that case we just mark the payload as not stale + let newBlock = case maybeNewBlock of + NoHistory -> continuableBlockInProgress + Historical b -> b + + logFunctionText logger Info + $ "refreshed block on chain " <> sshow cid + <> ", old and new tx count " + <> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock) + + atomically $ modifyTVar' tpw $ + ourMiner .~ Just (NewBlockInProgress newBlock) + periodicallyRefreshPayload + + newParent <- either ParentHeader id <$> race + -- wait for a block different from what we've got primed work for + (awaitNewBlock cdb cid outdatedParent) + -- in the meantime, periodically refresh the payload to make sure + -- it has all of the transactions it can have + periodicallyRefreshPayload + -- Temporarily block this chain from being considered for queries atomically $ modifyTVar' tpw (ourMiner .~ Nothing) - -- Generate new payload for this miner - newBlock <- getPayload (ParentHeader new) cid miner + + -- Get a payload for the new block + newBlock <- getPayload cid miner newParent atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock) - getPayload :: ParentHeader -> ChainId -> Miner -> IO NewBlock - getPayload new cid m = + + getPayload :: ChainId -> Miner -> ParentHeader -> IO NewBlock + getPayload cid m ph = if v ^. versionCheats . disablePact -- if pact is disabled, we must keep track of the latest header -- ourselves. otherwise we use the header we get from newBlock as the @@ -165,7 +202,8 @@ withMiningCoordination logger conf cdb inner -- with rocksdb though that shouldn't cause a problem, just wasted work, -- see docs for -- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader' - then return $ NewBlockPayload new emptyPayload + then return $ + NewBlockPayload ph emptyPayload else trace (logFunction logger) "Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock" () 1 (_pactNewBlock pact cid m NewBlockFill) diff --git a/src/Chainweb/Miner/Config.hs b/src/Chainweb/Miner/Config.hs index eb5ce112bb..e5fd8908dc 100644 --- a/src/Chainweb/Miner/Config.hs +++ b/src/Chainweb/Miner/Config.hs @@ -3,6 +3,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} -- | @@ -55,7 +56,7 @@ import Pact.Types.Term (mkKeySet, PublicKeyText(..)) -- internal modules import Chainweb.Miner.Pact (Miner(..), MinerKeys(..), MinerId(..), minerId) -import Chainweb.Time (Seconds) +import Chainweb.Time --- @@ -138,6 +139,8 @@ data CoordinationConfig = CoordinationConfig -- ^ the maximum number of concurrent update streams that is supported , _coordinationUpdateStreamTimeout :: !Seconds -- ^ the duration that an update stream is kept open in seconds + , _coordinationPayloadRefreshDelay :: !(TimeSpan Micros) + -- ^ the duration between payload refreshes in microseconds } deriving stock (Eq, Show, Generic) coordinationEnabled :: Lens' CoordinationConfig Bool @@ -157,6 +160,10 @@ coordinationUpdateStreamTimeout :: Lens' CoordinationConfig Seconds coordinationUpdateStreamTimeout = lens _coordinationUpdateStreamTimeout (\m c -> m { _coordinationUpdateStreamTimeout = c }) +coordinationPayloadRefreshDelay :: Lens' CoordinationConfig (TimeSpan Micros) +coordinationPayloadRefreshDelay = + lens _coordinationPayloadRefreshDelay (\m c -> m { _coordinationPayloadRefreshDelay = c }) + instance ToJSON CoordinationConfig where toJSON o = object [ "enabled" .= _coordinationEnabled o @@ -164,6 +171,7 @@ instance ToJSON CoordinationConfig where , "miners" .= (J.toJsonViaEncode <$> S.toList (_coordinationMiners o)) , "updateStreamLimit" .= _coordinationUpdateStreamLimit o , "updateStreamTimeout" .= _coordinationUpdateStreamTimeout o + , "payloadRefreshDelay" .= _coordinationPayloadRefreshDelay o ] instance FromJSON (CoordinationConfig -> CoordinationConfig) where @@ -173,6 +181,7 @@ instance FromJSON (CoordinationConfig -> CoordinationConfig) where <*< coordinationMiners .fromLeftMonoidalUpdate %.: "miners" % o <*< coordinationUpdateStreamLimit ..: "updateStreamLimit" % o <*< coordinationUpdateStreamTimeout ..: "updateStreamTimeout" % o + <*< coordinationPayloadRefreshDelay ..: "payloadRefreshDelay" % o defaultCoordination :: CoordinationConfig defaultCoordination = CoordinationConfig @@ -181,6 +190,7 @@ defaultCoordination = CoordinationConfig , _coordinationReqLimit = 1200 , _coordinationUpdateStreamLimit = 2000 , _coordinationUpdateStreamTimeout = 240 + , _coordinationPayloadRefreshDelay = TimeSpan (Micros 15_000_000) } pCoordinationConfig :: MParser CoordinationConfig @@ -198,6 +208,9 @@ pCoordinationConfig = id <*< coordinationUpdateStreamTimeout .:: jsonOption % long "mining-update-stream-timeout" <> help "duration that an update stream is kept open in seconds" + <*< coordinationPayloadRefreshDelay .:: jsonOption + % long "mining-payload-refresh-delay" + <> help "frequency that the mining payload is refreshed" pMiner :: String -> Parser Miner pMiner prefix = pkToMiner <$> pPk diff --git a/src/Chainweb/Miner/Coordinator.hs b/src/Chainweb/Miner/Coordinator.hs index b4d743c4f9..d2a82a0dbf 100644 --- a/src/Chainweb/Miner/Coordinator.hs +++ b/src/Chainweb/Miner/Coordinator.hs @@ -215,31 +215,30 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do Nothing -> do logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable" newWork logFun Anything eminer hdb pact tpw c - Just (T2 (Just newBlock) extension) - | ParentHeader primedParent <- newBlockParentHeader newBlock -> - if _blockHash primedParent == - _blockHash (_parentHeader (_cutExtensionParent extension)) - then do - let payload = newBlockToPayloadWithOutputs newBlock - let !phash = _payloadWithOutputsPayloadHash payload - !wh <- newWorkHeader hdb extension phash - pure $ Just $ T2 wh payload - else do - -- The cut is too old or the primed work is outdated. Probably - -- the former because it the mining coordination background job - -- is updating the primed work cache regularly. We could try - -- another chain, but it's safer to just return 'Nothing' here - -- and retry with an updated cut. - -- - let !extensionParent = _parentHeader (_cutExtensionParent extension) - logFun @T.Text Info - $ "newWork: chain " <> sshow cid <> " not mineable because of parent header mismatch" - <> ". Primed parent hash: " <> toText (_blockHash primedParent) - <> ". Primed parent height: " <> sshow (_blockHeight primedParent) - <> ". Extension parent: " <> toText (_blockHash extensionParent) - <> ". Extension height: " <> sshow (_blockHeight extensionParent) - - return Nothing + Just (T2 (Just newBlock) extension) -> do + let ParentHeader primedParent = newBlockParentHeader newBlock + if _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension)) + then do + let payload = newBlockToPayloadWithOutputs newBlock + let !phash = _payloadWithOutputsPayloadHash payload + !wh <- newWorkHeader hdb extension phash + pure $ Just $ T2 wh payload + else do + -- The cut is too old or the primed work is outdated. Probably + -- the former because it the mining coordination background job + -- is updating the primed work cache regularly. We could try + -- another chain, but it's safer to just return 'Nothing' here + -- and retry with an updated cut. + -- + let !extensionParent = _parentHeader (_cutExtensionParent extension) + logFun @T.Text Info + $ "newWork: chain " <> sshow cid <> " not mineable because of parent header mismatch" + <> ". Primed parent hash: " <> toText (_blockHash primedParent) + <> ". Primed parent height: " <> sshow (_blockHeight primedParent) + <> ". Extension parent: " <> toText (_blockHash extensionParent) + <> ". Extension height: " <> sshow (_blockHeight extensionParent) + + return Nothing -- | Accepts a "solved" `BlockHeader` from some external source (e.g. a remote -- mining client), attempts to reassociate it with the current best `Cut`, and diff --git a/src/Chainweb/WebPactExecutionService.hs b/src/Chainweb/WebPactExecutionService.hs index ad5e2ff338..82bfc440ff 100644 --- a/src/Chainweb/WebPactExecutionService.hs +++ b/src/Chainweb/WebPactExecutionService.hs @@ -81,7 +81,7 @@ data PactExecutionService = PactExecutionService , _pactContinueBlock :: !( ChainId -> BlockInProgress -> - IO (Historical NewBlock) + IO (Historical BlockInProgress) ) -- ^ Request a new block to be formed using mempool , _pactLocal :: !( @@ -151,7 +151,7 @@ _webPactContinueBlock :: WebPactExecutionService -> ChainId -> BlockInProgress - -> IO (Historical NewBlock) + -> IO (Historical BlockInProgress) _webPactContinueBlock = _pactContinueBlock . _webPactExecutionService {-# INLINE _webPactContinueBlock #-} @@ -201,7 +201,7 @@ mkPactExecutionService q = PactExecutionService , _pactNewBlock = \_ m fill -> do NewBlockInProgress <$> newBlock m fill q , _pactContinueBlock = \_ bip -> do - fmap NewBlockInProgress <$> continueBlock bip q + continueBlock bip q , _pactLocal = \pf sv rd ct -> local pf sv rd ct q , _pactLookup = \_ cd txs -> diff --git a/test/Chainweb/Test/Orphans/Internal.hs b/test/Chainweb/Test/Orphans/Internal.hs index d81683723e..65d312ae6d 100644 --- a/test/Chainweb/Test/Orphans/Internal.hs +++ b/test/Chainweb/Test/Orphans/Internal.hs @@ -824,7 +824,12 @@ deriving newtype instance Arbitrary MinerCount instance Arbitrary CoordinationConfig where arbitrary = CoordinationConfig - <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary + <$> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary + <*> arbitrary instance Arbitrary NodeMiningConfig where arbitrary = NodeMiningConfig diff --git a/test/Chainweb/Test/Pact/Utils.hs b/test/Chainweb/Test/Pact/Utils.hs index 4628641cbc..6e948948e9 100644 --- a/test/Chainweb/Test/Pact/Utils.hs +++ b/test/Chainweb/Test/Pact/Utils.hs @@ -743,7 +743,7 @@ withWebPactExecutionService logger v pactConfig bdb mempoolAccess gasmodel act = { _pactNewBlock = \_ m fill -> evalPactServiceM_ ctx $ NewBlockInProgress <$> execNewBlock mempoolAccess m fill , _pactContinueBlock = \_ bip -> - evalPactServiceM_ ctx $ fmap NewBlockInProgress <$> execContinueBlock mempoolAccess bip + evalPactServiceM_ ctx $ execContinueBlock mempoolAccess bip , _pactValidateBlock = \h d -> evalPactServiceM_ ctx $ fst <$> execValidateBlock mempoolAccess h d , _pactLocal = \pf sv rd cmd ->