diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime.hs b/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime.hs index 385abee641f..34452e9440c 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime.hs @@ -17,7 +17,6 @@ module Ouroboros.Consensus.BlockchainTime ( , NumSlots(..) , TestBlockchainTime(..) , newTestBlockchainTime - , newTestGuardedBlockchainTime -- * Real blockchain time , realBlockchainTime -- * Time to slots and back again @@ -160,7 +159,7 @@ data TestBlockchainTime m = TestBlockchainTime -- ^ Blocks until the end of the final requested slot. } --- | Construct new blockchain time that ticks at the specified slot duration +-- | Each slot advances once the given callback returns -- -- NOTE: This is just one way to construct time. We can of course also connect -- this to the real time (if we are in IO), or indeed to a manual tick @@ -173,25 +172,14 @@ data TestBlockchainTime m = TestBlockchainTime -- first slot @SlotNo 0@, i.e. during 'Initializing'. This is likely only -- appropriate for initialization code etc. In contrast, the argument to -- 'onSlotChange' is blocked at least until @SlotNo 0@ begins. +-- newTestBlockchainTime :: forall m. (IOLike m, HasCallStack) => ResourceRegistry m -> NumSlots -- ^ Number of slots - -> DiffTime -- ^ Slot duration - -> m (TestBlockchainTime m) -newTestBlockchainTime registry numSlots slotLen = - newTestGuardedBlockchainTime registry numSlots slotLen (\_ -> pure ()) - --- | Like 'newTestBlockchainTime', but also allows other computations to delay --- the end of each slot -newTestGuardedBlockchainTime - :: forall m. (IOLike m, HasCallStack) - => ResourceRegistry m - -> NumSlots -- ^ Number of slots - -> DiffTime -- ^ Slot duration -> (SlotNo -> m ()) -- ^ Blocks until slot is finished -> m (TestBlockchainTime m) -newTestGuardedBlockchainTime registry (NumSlots numSlots) slotLen waitOn = do +newTestBlockchainTime registry (NumSlots numSlots) waitOn = do slotVar <- newTVarM initVal doneVar <- newEmptyMVar () @@ -227,7 +215,6 @@ newTestGuardedBlockchainTime registry (NumSlots numSlots) slotLen waitOn = do Running s -> succ s writeTVar slotVar (Running s') pure s' - threadDelay slotLen waitOn s' -- signal the end of the final slot putMVar doneVar () diff --git a/ouroboros-consensus/test-consensus/Test/Consensus/ChainSyncClient.hs b/ouroboros-consensus/test-consensus/Test/Consensus/ChainSyncClient.hs index 0582fe23fdb..2799f6c6631 100644 --- a/ouroboros-consensus/test-consensus/Test/Consensus/ChainSyncClient.hs +++ b/ouroboros-consensus/test-consensus/Test/Consensus/ChainSyncClient.hs @@ -254,7 +254,9 @@ runChainSync runChainSync securityParam maxClockSkew (ClientUpdates clientUpdates) (ServerUpdates serverUpdates) startSyncingAt = withRegistry $ \registry -> do - testBtime <- newTestBlockchainTime registry numSlots slotDuration + testBtime <- do + let waitOn _s = threadDelay 100000 -- io-sim "seconds" + newTestBlockchainTime registry numSlots waitOn let btime = testBlockchainTime testBtime -- Set up the client @@ -388,9 +390,6 @@ runChainSync securityParam maxClockSkew (ClientUpdates clientUpdates) where k = maxRollbacks securityParam - slotDuration :: DiffTime - slotDuration = 100000 - nodeCfg :: CoreNodeId -> NodeConfig (Bft BftMockCrypto) nodeCfg coreNodeId = BftNodeConfig { bftParams = BftParams diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs index bbec0e356c2..05d57aa3404 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs @@ -173,25 +173,26 @@ runTestNetwork pInfo seed = runSimOrThrow $ do registry <- unsafeNewRegistry - let slotLen = 100000 :: DiffTime -- io-sim "seconds" - -- the latest slot that is ready to start latestReadySlot <- uncheckedNewTVarM (SlotNo 0) -- a slot cannot end before a later slot is ready to start let waitOn s = atomically $ do x <- readTVar latestReadySlot check (s < x) - testBtime <- newTestGuardedBlockchainTime registry numSlots slotLen waitOn + testBtime <- newTestBlockchainTime registry numSlots waitOn runNodeNetwork NodeNetworkArgs - { nnaLatencySeed = latencySeed + { nnaMaxLatencies = MaxLatencies + { maxSendLatency = 10 -- io-sim "seconds" + , maxVar1Latency = 1000 -- io-sim "seconds" + } + , nnaLatencySeed = latencySeed , nnaLatestReadySlot = latestReadySlot , nnaNodeJoinPlan = nodeJoinPlan , nnaNodeTopology = nodeTopology , nnaNumCoreNodes = numCoreNodes , nnaProtocol = pInfo - , nnaQuiesence = 5000 -- io-sim "seconds" - , nnaSlotLen = slotLen + , nnaQuiesence = 50000 -- io-sim "seconds" , nnaTestBtime = testBtime , nnaTxSeed = seedToChaCha seed , nnaRegistry = registry diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs index 13aadf025db..0567f4c3725 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs @@ -20,6 +20,7 @@ -- | Setup network module Test.Dynamic.Network ( runNodeNetwork + , MaxLatencies (..) , NodeNetworkArgs (..) , TracingConstraints , LatencyInjection (..) @@ -110,6 +111,18 @@ import qualified Test.Util.FS.Sim.MockFS as Mock import Test.Util.FS.Sim.STM (simHasFS) import Test.Util.Tracer +-- | The maximum possible latencies +-- +data MaxLatencies = MaxLatencies + { maxSendLatency :: !DiffTime + -- ^ Similar to the Serialisation latency of GSV, though see the NOTE on + -- 'newLivePipe' regarding back pressure + -- + , maxVar1Latency :: !DiffTime + -- ^ Similar to the Variable latency of GSV, though the NOTE on + -- 'newLivePipe' regarding pipelining + } + data NodeNetworkArgs m blk = NodeNetworkArgs { nnaLatencySeed :: !(LatencyInjection SMGen) , nnaLatestReadySlot :: !(StrictTVar m SlotNo) @@ -120,11 +133,9 @@ data NodeNetworkArgs m blk = NodeNetworkArgs , nnaNumCoreNodes :: !NumCoreNodes , nnaProtocol :: !(CoreNodeId -> ProtocolInfo blk) , nnaQuiesence :: !DiffTime - -- ^ A slot cannot end until there have been no sent-but-not-unreceived - -- mini protocol messages for this long - , nnaSlotLen :: !DiffTime - -- ^ A slot cannot end until at least this duration has passed since it - -- started + -- ^ A slot ends when there have been no sent-but-not-yet-received mini + -- protocol messages for at least this long + , nnaMaxLatencies :: !MaxLatencies , nnaTestBtime :: !(TestBlockchainTime m) , nnaTxSeed :: !ChaChaDRG , nnaRegistry :: !(ResourceRegistry m) @@ -152,7 +163,7 @@ runNodeNetwork NodeNetworkArgs , nnaNumCoreNodes = numCoreNodes , nnaProtocol = pInfo , nnaQuiesence = quiescenceThreshold - , nnaSlotLen = slotLen + , nnaMaxLatencies , nnaTestBtime = testBtime , nnaTxSeed = initRNG , nnaRegistry = registry @@ -253,7 +264,7 @@ runNodeNetwork NodeNetworkArgs (,) <$> lu node1 <*> lu node2 -- spawn threads for both directed edges - let de = directedEdge slotLen tr btime mbSMG livePipesVar + let de = directedEdge nnaMaxLatencies tr btime mbSMG livePipesVar void $ withAsyncsWaitAny $ de endpoint1 endpoint2 NE.:| [de endpoint2 endpoint1] @@ -310,7 +321,7 @@ runNodeNetwork NodeNetworkArgs , cdbValidation = ImmDB.ValidateAllEpochs , cdbBlocksPerFile = 4 , cdbParamsLgrDB = LgrDB.ledgerDbDefaultParams (protocolSecurityParam cfg) - , cdbDiskPolicy = LgrDB.defaultDiskPolicy (protocolSecurityParam cfg) slotLen + , cdbDiskPolicy = LgrDB.defaultDiskPolicy (protocolSecurityParam cfg) generousApproxSlotLen -- Integration , cdbNodeConfig = cfg , cdbEpochInfo = epochInfo @@ -331,6 +342,12 @@ runNodeNetwork NodeNetworkArgs , cdbRegistry = registry , cdbGcDelay = 0 } + where + -- a rough estimate of the average slot length, more likely an + -- overestimate than an underestimate + generousApproxSlotLen = 100 * (sendL + recvL) + where + MaxLatencies sendL recvL = nnaMaxLatencies createNode :: HasCallStack @@ -443,7 +460,7 @@ runNodeNetwork NodeNetworkArgs -- 'MiniProtocolExpectedException'. directedEdge :: forall m blk. (IOLike m, SupportedBlock blk) - => DiffTime + => MaxLatencies -> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk) -> BlockchainTime m -> LatencyInjection (StrictTVar m SMGen) @@ -451,12 +468,12 @@ directedEdge :: -> (CoreNodeId, LimitedApp m NodeId blk) -> (CoreNodeId, LimitedApp m NodeId blk) -> m () -directedEdge slotLen tr btime mbSMG livePipesVar nodeapp1 nodeapp2 = +directedEdge maxLatencies tr btime mbSMG livePipesVar nodeapp1 nodeapp2 = loopOnMPEE where loopOnMPEE = do (pids, edge) <- - directedEdgeInner slotLen mbSMG livePipesVar nodeapp1 nodeapp2 + directedEdgeInner maxLatencies mbSMG livePipesVar nodeapp1 nodeapp2 edge `catch` hExpected pids `catch` hUnexpected @@ -488,7 +505,7 @@ directedEdge slotLen tr btime mbSMG livePipesVar nodeapp1 nodeapp2 = -- See 'directedEdge'. directedEdgeInner :: forall m blk. (IOLike m, SupportedBlock blk) - => DiffTime + => MaxLatencies -> LatencyInjection (StrictTVar m SMGen) -> LivePipesVar m -> (CoreNodeId, LimitedApp m NodeId blk) @@ -496,7 +513,7 @@ directedEdgeInner :: -> (CoreNodeId, LimitedApp m NodeId blk) -- ^ server threads on this node -> m (Set PipeId, m ()) -directedEdgeInner slotLen mbSMG livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do +directedEdgeInner maxLatencies mbSMG livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do mps <- sequence $ ( miniProtocol (wrapMPEE MPEEChainSyncClient naChainSyncClient) @@ -531,8 +548,8 @@ directedEdgeInner slotLen mbSMG livePipesVar (node1, LimitedApp app1) (node2, Li -- ^ server action to run on node2 -> m ((PipeId, PipeId), (m (), m ())) miniProtocol client server = do - (pid1, Pipe pipe1) <- newLivePipe slotLen mbSMG livePipesVar - (pid2, Pipe pipe2) <- newLivePipe slotLen mbSMG livePipesVar + (pid1, Pipe pipe1) <- newLivePipe maxLatencies mbSMG livePipesVar + (pid2, Pipe pipe2) <- newLivePipe maxLatencies mbSMG livePipesVar let chan1 = Channel{send = send pipe1, recv = recv pipe2} chan2 = Channel{send = send pipe2, recv = recv pipe1} pure $ (,) @@ -849,15 +866,31 @@ blockUntilQuiescent livePipesVar dur = get >>= go -- | Create a pipe backed by a 'TQueue', add it to the live pipes, and -- return the channel for writing to it and reading from it. -- +-- NOTE This mock \"physical layer\" carries one message at a time (hence the +-- name 'maxVar1Latency'). Though it uses queues so that a (pipelined) sender +-- is never blocked, there will be no _latency hiding_. In other words, the +-- stream of Variable latency samples have _already_ taken into account any +-- possible latency hiding. +-- +-- NOTE Similarly, it is assumed that the Serialisation latency sufficiently +-- subsumes _back pressure_, since otherwise sending never blocks the sender. +-- +-- The motivation for including latencies in this mock network is to explore +-- different permutations/ways to interleave concurrent events. While realistic +-- latencies would exhibit (conditional) (temporal) correlations due to +-- pipelining, back pressure, and so on, we assume that samplers crafted to +-- accurately synthesize those correlations would explore fewer scheduling +-- permutations. +-- newLivePipe :: ( IOLike m , HasCallStack ) - => DiffTime + => MaxLatencies -> LatencyInjection (StrictTVar m SMGen) -> LivePipesVar m -> m (PipeId, Pipe m a) -newLivePipe slotLen mbSMG tvar = do +newLivePipe MaxLatencies{maxVar1Latency, maxSendLatency} mbSMG tvar = do lpCount <- uncheckedNewTVarM (EmptiedCount 0) lpInFlight <- uncheckedNewTVarM 0 pid <- atomically $ do @@ -870,17 +903,13 @@ newLivePipe slotLen mbSMG tvar = do -- create a thread that reliably and order-preservingly transports messages -- down the pipe - inBuf <- atomically Q.newTQueue - outBuf <- atomically Q.newTQueue - mbGeoD <- mapM genDelay mbSMG - mbTransportSMG <- mapM split mbSMG + inBuf <- atomically Q.newTQueue + outBuf <- atomically Q.newTQueue + mbVar1SMG <- mapM split mbSMG void $ fork $ forever $ do x <- atomically $ Q.readTQueue inBuf - -- Geographical latency - threadDelayLI mbGeoD - -- Variable latency - mapM genDelay mbTransportSMG >>= threadDelayLI + mapM (genDelay maxVar1Latency) mbVar1SMG >>= threadDelayLI atomically $ Q.writeTQueue outBuf x @@ -889,8 +918,7 @@ newLivePipe slotLen mbSMG tvar = do { send = \ !x -> do atomically $ modifyTVar lpInFlight succ - -- Serialisation latency and back pressure - mapM genDelay mbSendSMG >>= threadDelayLI + mapM (genDelay maxSendLatency) mbSendSMG >>= threadDelayLI atomically $ Q.writeTQueue inBuf x , recv = atomically $ do @@ -916,12 +944,12 @@ newLivePipe slotLen mbSMG tvar = do InjectTrivialLatencies -> threadDelay 0 InjectLatencies d -> threadDelay d - genDelay smgVar = atomically $ do + genDelay mx smgVar = atomically $ do (i, smg') <- SM.nextInt <$> readTVar smgVar writeTVar smgVar smg' let denom = picosecondsToDiffTime 1000 let frac = toEnum (abs i `mod` 1000) / denom - pure $ (slotLen / 10) * frac + pure $ mx * frac forgetPipeSTM :: IOLike m => LivePipesVar m -> PipeId -> STM m () forgetPipeSTM tvar pid = modifyTVar tvar $ \lp@LivePipes{livePipes} ->