Skip to content


test-consensus: add infra-slot delays
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrisby committed Oct 14, 2019
1 parent c36035f commit ae84f1a
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 47 deletions.
1 change: 1 addition & 0 deletions nix/.stack.nix/ouroboros-consensus.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ test-suite test-consensus
Expand Down
24 changes: 18 additions & 6 deletions ouroboros-consensus/test-consensus/Test/Dynamic/General.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Data.Set (Set)
import qualified Data.Set as Set
import Data.Word (Word64)
import GHC.Stack (HasCallStack)
import qualified System.Random.SplitMix as SM
import Test.QuickCheck

import Control.Monad.IOSim (runSimOrThrow)
Expand Down Expand Up @@ -63,41 +64,48 @@ data TestConfig = TestConfig
, numSlots :: !NumSlots
, nodeJoinPlan :: !NodeJoinPlan
, nodeTopology :: !NodeTopology
, latencySeed :: !(Maybe SM.SMGen)
deriving (Show)

genTestConfig :: NumCoreNodes -> NumSlots -> Gen TestConfig
genTestConfig numCoreNodes numSlots = do
nodeJoinPlan <- genNodeJoinPlan numCoreNodes numSlots
nodeTopology <- genNodeTopology numCoreNodes
pure TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology}
latencySeed <- (Just . SM.mkSMGen) <$> arbitrary
pure TestConfig
{ numCoreNodes, numSlots, nodeJoinPlan, nodeTopology, latencySeed }

-- | Shrink without changing the number of nodes or slots
shrinkTestConfig :: TestConfig -> [TestConfig]
shrinkTestConfig testConfig@TestConfig{nodeJoinPlan, nodeTopology} =
testConfig@TestConfig{nodeJoinPlan, nodeTopology, latencySeed} =
tail $ -- drop the identity output
[ testConfig{nodeJoinPlan = p', nodeTopology = top'}
[ testConfig{nodeJoinPlan = p', nodeTopology = top', latencySeed = seed'}
| p' <- nodeJoinPlan : shrinkNodeJoinPlan nodeJoinPlan
, top' <- nodeTopology : shrinkNodeTopology nodeTopology
, seed' <- latencySeed : Nothing : []

-- | Shrink, including the number of nodes and slots
shrinkTestConfigFreely :: TestConfig -> [TestConfig]
TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology} =
TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology, latencySeed} =
tail $ -- drop the identity result
[ TestConfig
{ numCoreNodes = n'
, numSlots = t'
, nodeJoinPlan = p'
, nodeTopology = top'
, latencySeed = seed'
| n' <- idAnd shrink numCoreNodes
, t' <- idAnd shrink numSlots
, let adjustedP = adjustedNodeJoinPlan n' t'
, let adjustedTop = adjustedNodeTopology n'
, p' <- idAnd shrinkNodeJoinPlan adjustedP
, top' <- idAnd shrinkNodeTopology adjustedTop
, seed' <- latencySeed : Nothing : []
idAnd :: forall a. (a -> [a]) -> a -> [a]
Expand Down Expand Up @@ -143,7 +151,7 @@ runTestNetwork ::
-> Seed
-> TestOutput blk
runTestNetwork pInfo
TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology}
TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology, latencySeed}
seed = runSimOrThrow $ do
registry <- unsafeNewRegistry

Expand All @@ -162,11 +170,15 @@ runTestNetwork pInfo
(seedToChaCha seed)
slotLen = 100000 :: DiffTime
quiescenceThreshold = 50000 :: DiffTime

-- a slot cannot advance until all of the ChainSync and BlockFetch pipes
-- have been empty for at least this duration
quiescenceThreshold = 5000 :: DiffTime

Test properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Test.Dynamic.LeaderSchedule (
import Control.Monad (replicateM)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified System.Random.SplitMix as SM
import Test.QuickCheck

import Test.Tasty
Expand Down Expand Up @@ -51,13 +52,23 @@ tests = testGroup "Dynamic chain generation"
(genNodeTopology numCoreNodes)
shrinkNodeTopology $
\nodeTopology ->
(fmap SM.mkSMGen <$> arbitrary)
(\_ -> [Nothing]) $
\latencySeed ->
(genLeaderSchedule k numSlots numCoreNodes nodeJoinPlan)
(shrinkLeaderSchedule numSlots) $
\schedule ->
TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology}
{ numCoreNodes
, numSlots
, nodeJoinPlan
, nodeTopology
, latencySeed
schedule seed

prop_simple_leader_schedule_convergence :: PraosParams
Expand Down
113 changes: 73 additions & 40 deletions ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
Expand Down Expand Up @@ -40,12 +39,15 @@ import qualified Data.Map.Strict as Map
import Data.Proxy (Proxy (..))
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Time.Clock (picosecondsToDiffTime)
import qualified Data.Typeable as Typeable
import Data.Word (Word64)
import GHC.Stack
import System.Random.SplitMix (SMGen)
import qualified System.Random.SplitMix as SM

import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow

import Network.TypedProtocol.Channel
import Network.TypedProtocol.Codec (AnyMessage (..))
Expand Down Expand Up @@ -121,13 +123,14 @@ runNodeNetwork :: forall m blk.
-> NodeJoinPlan
-> NodeTopology
-> (CoreNodeId -> ProtocolInfo blk)
-> Maybe SMGen
-> ChaChaDRG
-> DiffTime
-> m (TestOutput blk)
runNodeNetwork registry
quiescenceThreshold latestDoneSlot testBtime
numCoreNodes nodeJoinPlan nodeTopology
pInfo initRNG slotLen = do
pInfo mbInitSMG initRNG slotLen = do
-- This function is organized around the notion of a network of nodes as a
-- simple graph with no loops. The graph topology is determined by
-- @nodeTopology@.
Expand Down Expand Up @@ -159,10 +162,11 @@ runNodeNetwork registry
uncheckedNewEmptyMVar (error "no App available yet")

-- spawn threads for each undirected edge
mbSMG <- mapM uncheckedNewTVarM mbInitSMG
let edges = edgesNodeTopology nodeTopology
forM_ edges $ \edge -> do
void $ forkLinkedThread registry $ do
undirectedEdge nullTracer livePipesVar nodeVars edge
undirectedEdge nullTracer mbSMG livePipesVar nodeVars edge

-- create nodes
let nodesByJoinSlot =
Expand Down Expand Up @@ -209,11 +213,12 @@ runNodeNetwork registry
undirectedEdge ::
=> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk)
-> Maybe (StrictTVar m SMGen)
-> LivePipesVar m
-> Map CoreNodeId (StrictMVar m (LimitedApp m NodeId blk))
-> (CoreNodeId, CoreNodeId)
-> m ()
undirectedEdge tr livePipesVar nodeVars (node1, node2) = do
undirectedEdge tr mbSMG livePipesVar nodeVars (node1, node2) = do
-- block until both endpoints have joined the network
(endpoint1, endpoint2) <- do
let lu node = case Map.lookup node nodeVars of
Expand All @@ -222,11 +227,9 @@ runNodeNetwork registry
(,) <$> lu node1 <*> lu node2

-- spawn threads for both directed edges
let de = directedEdge slotLen tr btime mbSMG livePipesVar
void $ withAsyncsWaitAny $
directedEdge tr btime livePipesVar endpoint1 endpoint2
[ directedEdge tr btime livePipesVar endpoint2 endpoint1
de endpoint1 endpoint2 NE.:| [de endpoint2 endpoint1]

-- | Produce transactions every time the slot changes and submit them to
-- the mempool.
Expand Down Expand Up @@ -414,17 +417,20 @@ runNodeNetwork registry
-- 'MiniProtocolExpectedException'.
directedEdge ::
forall m blk. (IOLike m, SupportedBlock blk)
=> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk)
=> DiffTime
-> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk)
-> BlockchainTime m
-> Maybe (StrictTVar m SMGen)
-> LivePipesVar m
-> (CoreNodeId, LimitedApp m NodeId blk)
-> (CoreNodeId, LimitedApp m NodeId blk)
-> m ()
directedEdge tr btime livePipesVar nodeapp1 nodeapp2 =
directedEdge slotLen tr btime mbSMG livePipesVar nodeapp1 nodeapp2 =
loopOnMPEE = do
(pids, edge) <- directedEdgeInner livePipesVar nodeapp1 nodeapp2
(pids, edge) <-
directedEdgeInner slotLen mbSMG livePipesVar nodeapp1 nodeapp2
`catch` hExpected pids
`catch` hUnexpected
Expand Down Expand Up @@ -455,22 +461,24 @@ directedEdge tr btime livePipesVar nodeapp1 nodeapp2 =
-- See 'directedEdge'.
directedEdgeInner ::
forall m blk. (IOLike m, SupportedBlock blk)
=> LivePipesVar m
=> DiffTime
-> Maybe (StrictTVar m SMGen)
-> LivePipesVar m
-> (CoreNodeId, LimitedApp m NodeId blk)
-- ^ client threads on this node
-> (CoreNodeId, LimitedApp m NodeId blk)
-- ^ server threads on this node
-> m (Set PipeId, m ())
directedEdgeInner livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do
directedEdgeInner slotLen mbSMG livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do
mps <- sequence $
( miniProtocol
( miniProtocol True
(wrapMPEE MPEEChainSyncClient naChainSyncClient)
) NE.:|
[ miniProtocol
[ miniProtocol True
(wrapMPEE MPEEBlockFetchClient naBlockFetchClient)
(wrapMPEE MPEEBlockFetchServer naBlockFetchServer)
, miniProtocol
, miniProtocol False -- this mini protocol never stops churning
(wrapMPEE MPEETxSubmissionClient naTxSubmissionClient)
(wrapMPEE MPEETxSubmissionServer naTxSubmissionServer)
Expand All @@ -482,7 +490,8 @@ directedEdgeInner livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2)
flattenPairs = uncurry (<>) . NE.unzip

miniProtocol ::
(forall unused1 unused2.
-> (forall unused1 unused2.
LimitedApp' m NodeId blk unused1 unused2
-> NodeId
-> Channel m msg
Expand All @@ -495,9 +504,9 @@ directedEdgeInner livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2)
-> m ())
-- ^ server action to run on node2
-> m ((PipeId, PipeId), (m (), m ()))
miniProtocol client server = do
(pid1, Pipe pipe1) <- newLivePipe livePipesVar
(pid2, Pipe pipe2) <- newLivePipe livePipesVar
miniProtocol live client server = do
(pid1, Pipe pipe1) <- newLivePipe slotLen mbSMG livePipesVar live
(pid2, Pipe pipe2) <- newLivePipe slotLen mbSMG livePipesVar live
let chan1 = Channel{send = send pipe1, recv = recv pipe2}
chan2 = Channel{send = send pipe2, recv = recv pipe1}
pure $ (,)
Expand Down Expand Up @@ -774,10 +783,15 @@ data LivePipes a = LivePipes
, livePipes :: !(Map PipeId a)

data LivePipe m = LivePipe
{ lpCount :: !(StrictTVar m EmptiedCount)
, lpEmpty :: !(StrictTVar m Bool)

-- | A variable mapping each live pipe to its own variable, which contains
-- @Just ec@ when the pipe is empty and @Nothing@ otherwise
type LivePipesVar m = StrictTVar m (LivePipes (StrictTVar m (Maybe EmptiedCount)))
type LivePipesVar m = StrictTVar m (LivePipes (LivePipe m))

-- | Block until all live pipes have been empty for at least the given duration
Expand All @@ -786,44 +800,63 @@ blockUntilQuiescent livePipesVar dur = get >>= go
get = atomically $ do
LivePipes{livePipes} <- readTVar livePipesVar
forM livePipes $ \ecVar -> readTVar ecVar >>= \case
Nothing -> retry
Just ec -> pure ec
forM livePipes $ \LivePipe{lpCount, lpEmpty} -> do
readTVar lpEmpty >>= check
readTVar lpCount

go sig1 = do
threadDelay dur
sig2 <- get
if sig1 == sig2 then pure () else go sig2
threadDelay dur
sig2 <- get
if sig1 == sig2 then pure () else go sig2

-- | Create a pipe backed by a 'LazyTMVar', add it to the live pipes, and
-- return the channel for writing to it and reading from it.
newLivePipe :: IOLike m => LivePipesVar m -> m (PipeId, Pipe m a)
newLivePipe tvar = do
newLivePipe ::
IOLike m
=> DiffTime
-> Maybe (StrictTVar m SMGen)
-> LivePipesVar m
-> Bool
-> m (PipeId, Pipe m a)
newLivePipe slotLen mbSMG tvar live = do
let ec0 = EmptiedCount 0

ecount' <- uncheckedNewTVarM (Just ec0)
lpCount <- uncheckedNewTVarM ec0
lpEmpty <- uncheckedNewTVarM True
pid <- atomically $ do
LivePipes{nextPipeId, livePipes} <- readTVar tvar
writeTVar tvar LivePipes
{ nextPipeId = succ nextPipeId
, livePipes = Map.insert nextPipeId ecount' livePipes
, livePipes =
if not live then livePipes else
Map.insert nextPipeId LivePipe{lpCount, lpEmpty} livePipes
pure nextPipeId

ecount <- uncheckedNewTVarM ec0
mbSMG' <- forM mbSMG $ \smgVar -> atomically $ do
(smg1, smg') <- SM.splitSMGen <$> readTVar smgVar
writeTVar smgVar smg'
newTVar smg1

buffer <- atomically $ newEmptyTMVar
let pipe = Pipe Channel
{ send = \x -> atomically $ do
writeTVar ecount' Nothing
writeTVar lpEmpty False

putTMVar buffer x
, recv = atomically $ do
!ec <- succ <$> readTVar ecount
writeTVar ecount ec
writeTVar ecount' (Just ec)

Just <$> takeTMVar buffer
, recv = do
mbFrac <- forM mbSMG' $ \smgVar -> atomically $ do
(i, smg') <- SM.nextInt <$> readTVar smgVar
writeTVar smgVar smg'
pure $
toEnum (abs i `mod` 1000) / picosecondsToDiffTime 1000
let mbD = (\f -> (slotLen / 10) * f) <$> mbFrac
mapM_ threadDelay mbD
atomically $ do
modifyTVar lpCount succ
writeTVar lpEmpty True
Just <$> takeTMVar buffer

pure (pid, pipe)
Expand Down

0 comments on commit ae84f1a

Please sign in to comment.