From ae84f1a741eb384c47c86129144c71c4f44c835e Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Sun, 13 Oct 2019 16:13:34 -0700 Subject: [PATCH] test-consensus: add infra-slot delays --- nix/.stack.nix/ouroboros-consensus.nix | 1 + ouroboros-consensus/ouroboros-consensus.cabal | 1 + .../test-consensus/Test/Dynamic/General.hs | 24 +++- .../Test/Dynamic/LeaderSchedule.hs | 13 +- .../test-consensus/Test/Dynamic/Network.hs | 113 +++++++++++------- .../test-consensus/Test/Dynamic/Praos.hs | 5 + .../test-consensus/Test/Dynamic/RealPBFT.hs | 1 + 7 files changed, 111 insertions(+), 47 deletions(-) diff --git a/nix/.stack.nix/ouroboros-consensus.nix b/nix/.stack.nix/ouroboros-consensus.nix index 0d33eb89a8a..81ffe8246c2 100644 --- a/nix/.stack.nix/ouroboros-consensus.nix +++ b/nix/.stack.nix/ouroboros-consensus.nix @@ -115,6 +115,7 @@ (hsPkgs.random) (hsPkgs.reflection) (hsPkgs.serialise) + (hsPkgs.splitmix) (hsPkgs.tasty) (hsPkgs.tasty-hunit) (hsPkgs.tasty-quickcheck) diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 903332a14d0..3cdb4e752f6 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -317,6 +317,7 @@ test-suite test-consensus random, reflection, serialise, + splitmix, tasty, tasty-hunit, tasty-quickcheck, diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs index d8727599199..9c436d17396 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/General.hs @@ -21,6 +21,7 @@ import Data.Set (Set) import qualified Data.Set as Set import Data.Word (Word64) import GHC.Stack (HasCallStack) +import qualified System.Random.SplitMix as SM import Test.QuickCheck import Control.Monad.IOSim (runSimOrThrow) @@ -63,6 +64,7 @@ data TestConfig = TestConfig , numSlots :: !NumSlots , nodeJoinPlan :: !NodeJoinPlan , nodeTopology :: !NodeTopology + , latencySeed :: !(Maybe SM.SMGen) } deriving (Show) @@ -70,27 +72,32 @@ genTestConfig :: NumCoreNodes -> NumSlots -> Gen TestConfig genTestConfig numCoreNodes numSlots = do nodeJoinPlan <- genNodeJoinPlan numCoreNodes numSlots nodeTopology <- genNodeTopology numCoreNodes - pure TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology} + latencySeed <- (Just . SM.mkSMGen) <$> arbitrary + pure TestConfig + { numCoreNodes, numSlots, nodeJoinPlan, nodeTopology, latencySeed } -- | Shrink without changing the number of nodes or slots shrinkTestConfig :: TestConfig -> [TestConfig] -shrinkTestConfig testConfig@TestConfig{nodeJoinPlan, nodeTopology} = +shrinkTestConfig + testConfig@TestConfig{nodeJoinPlan, nodeTopology, latencySeed} = tail $ -- drop the identity output - [ testConfig{nodeJoinPlan = p', nodeTopology = top'} + [ testConfig{nodeJoinPlan = p', nodeTopology = top', latencySeed = seed'} | p' <- nodeJoinPlan : shrinkNodeJoinPlan nodeJoinPlan , top' <- nodeTopology : shrinkNodeTopology nodeTopology + , seed' <- latencySeed : Nothing : [] ] -- | Shrink, including the number of nodes and slots shrinkTestConfigFreely :: TestConfig -> [TestConfig] shrinkTestConfigFreely - TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology} = + TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology, latencySeed} = tail $ -- drop the identity result [ TestConfig { numCoreNodes = n' , numSlots = t' , nodeJoinPlan = p' , nodeTopology = top' + , latencySeed = seed' } | n' <- idAnd shrink numCoreNodes , t' <- idAnd shrink numSlots @@ -98,6 +105,7 @@ shrinkTestConfigFreely , let adjustedTop = adjustedNodeTopology n' , p' <- idAnd shrinkNodeJoinPlan adjustedP , top' <- idAnd shrinkNodeTopology adjustedTop + , seed' <- latencySeed : Nothing : [] ] where idAnd :: forall a. (a -> [a]) -> a -> [a] @@ -143,7 +151,7 @@ runTestNetwork :: -> Seed -> TestOutput blk runTestNetwork pInfo - TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology} + TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology, latencySeed} seed = runSimOrThrow $ do registry <- unsafeNewRegistry @@ -162,11 +170,15 @@ runTestNetwork pInfo nodeJoinPlan nodeTopology pInfo + latencySeed (seedToChaCha seed) slotLen where slotLen = 100000 :: DiffTime - quiescenceThreshold = 50000 :: DiffTime + + -- a slot cannot advance until all of the ChainSync and BlockFetch pipes + -- have been empty for at least this duration + quiescenceThreshold = 5000 :: DiffTime {------------------------------------------------------------------------------- Test properties diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/LeaderSchedule.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/LeaderSchedule.hs index a762396d12e..1fd8dfdaa88 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/LeaderSchedule.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/LeaderSchedule.hs @@ -7,6 +7,7 @@ module Test.Dynamic.LeaderSchedule ( import Control.Monad (replicateM) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map +import qualified System.Random.SplitMix as SM import Test.QuickCheck import Test.Tasty @@ -51,13 +52,23 @@ tests = testGroup "Dynamic chain generation" (genNodeTopology numCoreNodes) shrinkNodeTopology $ \nodeTopology -> + forAllShrink + (fmap SM.mkSMGen <$> arbitrary) + (\_ -> [Nothing]) $ + \latencySeed -> forAllShrink (genLeaderSchedule k numSlots numCoreNodes nodeJoinPlan) (shrinkLeaderSchedule numSlots) $ \schedule -> prop_simple_leader_schedule_convergence params - TestConfig{numCoreNodes, numSlots, nodeJoinPlan, nodeTopology} + TestConfig + { numCoreNodes + , numSlots + , nodeJoinPlan + , nodeTopology + , latencySeed + } schedule seed prop_simple_leader_schedule_convergence :: PraosParams diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs index f4defd5d12f..88e6895a021 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs @@ -1,4 +1,3 @@ -{-# LANGUAGE BangPatterns #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} @@ -40,12 +39,15 @@ import qualified Data.Map.Strict as Map import Data.Proxy (Proxy (..)) import Data.Set (Set) import qualified Data.Set as Set +import Data.Time.Clock (picosecondsToDiffTime) import qualified Data.Typeable as Typeable import Data.Word (Word64) import GHC.Stack +import System.Random.SplitMix (SMGen) +import qualified System.Random.SplitMix as SM -import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadSTM.Strict +import Control.Monad.Class.MonadThrow import Network.TypedProtocol.Channel import Network.TypedProtocol.Codec (AnyMessage (..)) @@ -121,13 +123,14 @@ runNodeNetwork :: forall m blk. -> NodeJoinPlan -> NodeTopology -> (CoreNodeId -> ProtocolInfo blk) + -> Maybe SMGen -> ChaChaDRG -> DiffTime -> m (TestOutput blk) runNodeNetwork registry quiescenceThreshold latestDoneSlot testBtime numCoreNodes nodeJoinPlan nodeTopology - pInfo initRNG slotLen = do + pInfo mbInitSMG initRNG slotLen = do -- This function is organized around the notion of a network of nodes as a -- simple graph with no loops. The graph topology is determined by -- @nodeTopology@. @@ -159,10 +162,11 @@ runNodeNetwork registry uncheckedNewEmptyMVar (error "no App available yet") -- spawn threads for each undirected edge + mbSMG <- mapM uncheckedNewTVarM mbInitSMG let edges = edgesNodeTopology nodeTopology forM_ edges $ \edge -> do void $ forkLinkedThread registry $ do - undirectedEdge nullTracer livePipesVar nodeVars edge + undirectedEdge nullTracer mbSMG livePipesVar nodeVars edge -- create nodes let nodesByJoinSlot = @@ -209,11 +213,12 @@ runNodeNetwork registry undirectedEdge :: HasCallStack => Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk) + -> Maybe (StrictTVar m SMGen) -> LivePipesVar m -> Map CoreNodeId (StrictMVar m (LimitedApp m NodeId blk)) -> (CoreNodeId, CoreNodeId) -> m () - undirectedEdge tr livePipesVar nodeVars (node1, node2) = do + undirectedEdge tr mbSMG livePipesVar nodeVars (node1, node2) = do -- block until both endpoints have joined the network (endpoint1, endpoint2) <- do let lu node = case Map.lookup node nodeVars of @@ -222,11 +227,9 @@ runNodeNetwork registry (,) <$> lu node1 <*> lu node2 -- spawn threads for both directed edges + let de = directedEdge slotLen tr btime mbSMG livePipesVar void $ withAsyncsWaitAny $ - directedEdge tr btime livePipesVar endpoint1 endpoint2 - NE.:| - [ directedEdge tr btime livePipesVar endpoint2 endpoint1 - ] + de endpoint1 endpoint2 NE.:| [de endpoint2 endpoint1] -- | Produce transactions every time the slot changes and submit them to -- the mempool. @@ -414,17 +417,20 @@ runNodeNetwork registry -- 'MiniProtocolExpectedException'. directedEdge :: forall m blk. (IOLike m, SupportedBlock blk) - => Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk) + => DiffTime + -> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk) -> BlockchainTime m + -> Maybe (StrictTVar m SMGen) -> LivePipesVar m -> (CoreNodeId, LimitedApp m NodeId blk) -> (CoreNodeId, LimitedApp m NodeId blk) -> m () -directedEdge tr btime livePipesVar nodeapp1 nodeapp2 = +directedEdge slotLen tr btime mbSMG livePipesVar nodeapp1 nodeapp2 = loopOnMPEE where loopOnMPEE = do - (pids, edge) <- directedEdgeInner livePipesVar nodeapp1 nodeapp2 + (pids, edge) <- + directedEdgeInner slotLen mbSMG livePipesVar nodeapp1 nodeapp2 edge `catch` hExpected pids `catch` hUnexpected @@ -455,22 +461,24 @@ directedEdge tr btime livePipesVar nodeapp1 nodeapp2 = -- See 'directedEdge'. directedEdgeInner :: forall m blk. (IOLike m, SupportedBlock blk) - => LivePipesVar m + => DiffTime + -> Maybe (StrictTVar m SMGen) + -> LivePipesVar m -> (CoreNodeId, LimitedApp m NodeId blk) -- ^ client threads on this node -> (CoreNodeId, LimitedApp m NodeId blk) -- ^ server threads on this node -> m (Set PipeId, m ()) -directedEdgeInner livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do +directedEdgeInner slotLen mbSMG livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do mps <- sequence $ - ( miniProtocol + ( miniProtocol True (wrapMPEE MPEEChainSyncClient naChainSyncClient) naChainSyncServer ) NE.:| - [ miniProtocol + [ miniProtocol True (wrapMPEE MPEEBlockFetchClient naBlockFetchClient) (wrapMPEE MPEEBlockFetchServer naBlockFetchServer) - , miniProtocol + , miniProtocol False -- this mini protocol never stops churning (wrapMPEE MPEETxSubmissionClient naTxSubmissionClient) (wrapMPEE MPEETxSubmissionServer naTxSubmissionServer) ] @@ -482,7 +490,8 @@ directedEdgeInner livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) flattenPairs = uncurry (<>) . NE.unzip miniProtocol :: - (forall unused1 unused2. + Bool + -> (forall unused1 unused2. LimitedApp' m NodeId blk unused1 unused2 -> NodeId -> Channel m msg @@ -495,9 +504,9 @@ directedEdgeInner livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) -> m ()) -- ^ server action to run on node2 -> m ((PipeId, PipeId), (m (), m ())) - miniProtocol client server = do - (pid1, Pipe pipe1) <- newLivePipe livePipesVar - (pid2, Pipe pipe2) <- newLivePipe livePipesVar + miniProtocol live client server = do + (pid1, Pipe pipe1) <- newLivePipe slotLen mbSMG livePipesVar live + (pid2, Pipe pipe2) <- newLivePipe slotLen mbSMG livePipesVar live let chan1 = Channel{send = send pipe1, recv = recv pipe2} chan2 = Channel{send = send pipe2, recv = recv pipe1} pure $ (,) @@ -774,10 +783,15 @@ data LivePipes a = LivePipes , livePipes :: !(Map PipeId a) } +data LivePipe m = LivePipe + { lpCount :: !(StrictTVar m EmptiedCount) + , lpEmpty :: !(StrictTVar m Bool) + } + -- | A variable mapping each live pipe to its own variable, which contains -- @Just ec@ when the pipe is empty and @Nothing@ otherwise -- -type LivePipesVar m = StrictTVar m (LivePipes (StrictTVar m (Maybe EmptiedCount))) +type LivePipesVar m = StrictTVar m (LivePipes (LivePipe m)) -- | Block until all live pipes have been empty for at least the given duration -- @@ -786,44 +800,63 @@ blockUntilQuiescent livePipesVar dur = get >>= go where get = atomically $ do LivePipes{livePipes} <- readTVar livePipesVar - forM livePipes $ \ecVar -> readTVar ecVar >>= \case - Nothing -> retry - Just ec -> pure ec + forM livePipes $ \LivePipe{lpCount, lpEmpty} -> do + readTVar lpEmpty >>= check + readTVar lpCount go sig1 = do - threadDelay dur - sig2 <- get - if sig1 == sig2 then pure () else go sig2 + threadDelay dur + sig2 <- get + if sig1 == sig2 then pure () else go sig2 -- | Create a pipe backed by a 'LazyTMVar', add it to the live pipes, and -- return the channel for writing to it and reading from it. -- -newLivePipe :: IOLike m => LivePipesVar m -> m (PipeId, Pipe m a) -newLivePipe tvar = do +newLivePipe :: + IOLike m + => DiffTime + -> Maybe (StrictTVar m SMGen) + -> LivePipesVar m + -> Bool + -> m (PipeId, Pipe m a) +newLivePipe slotLen mbSMG tvar live = do let ec0 = EmptiedCount 0 - ecount' <- uncheckedNewTVarM (Just ec0) + lpCount <- uncheckedNewTVarM ec0 + lpEmpty <- uncheckedNewTVarM True pid <- atomically $ do LivePipes{nextPipeId, livePipes} <- readTVar tvar writeTVar tvar LivePipes { nextPipeId = succ nextPipeId - , livePipes = Map.insert nextPipeId ecount' livePipes + , livePipes = + if not live then livePipes else + Map.insert nextPipeId LivePipe{lpCount, lpEmpty} livePipes } pure nextPipeId - ecount <- uncheckedNewTVarM ec0 + mbSMG' <- forM mbSMG $ \smgVar -> atomically $ do + (smg1, smg') <- SM.splitSMGen <$> readTVar smgVar + writeTVar smgVar smg' + newTVar smg1 + buffer <- atomically $ newEmptyTMVar let pipe = Pipe Channel { send = \x -> atomically $ do - writeTVar ecount' Nothing + writeTVar lpEmpty False putTMVar buffer x - , recv = atomically $ do - !ec <- succ <$> readTVar ecount - writeTVar ecount ec - writeTVar ecount' (Just ec) - - Just <$> takeTMVar buffer + , recv = do + mbFrac <- forM mbSMG' $ \smgVar -> atomically $ do + (i, smg') <- SM.nextInt <$> readTVar smgVar + writeTVar smgVar smg' + pure $ + toEnum (abs i `mod` 1000) / picosecondsToDiffTime 1000 + let mbD = (\f -> (slotLen / 10) * f) <$> mbFrac + mapM_ threadDelay mbD + atomically $ do + modifyTVar lpCount succ + writeTVar lpEmpty True + Just <$> takeTMVar buffer } pure (pid, pipe) diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/Praos.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/Praos.hs index 324314c7950..0d36b6203e1 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/Praos.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/Praos.hs @@ -5,6 +5,8 @@ module Test.Dynamic.Praos ( tests ) where +import qualified System.Random.SplitMix as SM + import Test.QuickCheck import Test.Tasty @@ -49,11 +51,13 @@ tests = testGroup "Dynamic chain generation" (genNodeTopology numCoreNodes) shrinkNodeTopology $ \nodeTopology -> + \latencySeed -> testPraos' TestConfig { numCoreNodes , numSlots , nodeJoinPlan , nodeTopology + , latencySeed = SM.mkSMGen <$> latencySeed } seed ] @@ -64,6 +68,7 @@ tests = testGroup "Dynamic chain generation" , numSlots , nodeJoinPlan = trivialNodeJoinPlan numCoreNodes , nodeTopology = meshNodeTopology numCoreNodes + , latencySeed = Nothing } testPraos' :: TestConfig -> Seed -> Property diff --git a/ouroboros-consensus/test-consensus/Test/Dynamic/RealPBFT.hs b/ouroboros-consensus/test-consensus/Test/Dynamic/RealPBFT.hs index 4ff1b6136d3..d14ca3742d7 100644 --- a/ouroboros-consensus/test-consensus/Test/Dynamic/RealPBFT.hs +++ b/ouroboros-consensus/test-consensus/Test/Dynamic/RealPBFT.hs @@ -71,6 +71,7 @@ tests = testGroup "Dynamic chain generation" , (CoreNodeId 2,SlotNo 22) ] , nodeTopology = meshNodeTopology ncn + , latencySeed = Nothing } , testProperty "simple Real PBFT convergence" $ prop_simple_real_pbft_convergence