Skip to content

Commit

Permalink
test-consensus: remove slotLen parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrisby committed Oct 17, 2019
1 parent 7bb3851 commit 302ff08
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 55 deletions.
19 changes: 3 additions & 16 deletions ouroboros-consensus/src/Ouroboros/Consensus/BlockchainTime.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ module Ouroboros.Consensus.BlockchainTime (
, NumSlots(..)
, TestBlockchainTime(..)
, newTestBlockchainTime
, newTestGuardedBlockchainTime
-- * Real blockchain time
, realBlockchainTime
-- * Time to slots and back again
Expand Down Expand Up @@ -160,7 +159,7 @@ data TestBlockchainTime m = TestBlockchainTime
-- ^ Blocks until the end of the final requested slot.
}

-- | Construct new blockchain time that ticks at the specified slot duration
-- | Each slot advances once the given callback returns
--
-- NOTE: This is just one way to construct time. We can of course also connect
-- this to the real time (if we are in IO), or indeed to a manual tick
Expand All @@ -173,25 +172,14 @@ data TestBlockchainTime m = TestBlockchainTime
-- first slot @SlotNo 0@, i.e. during 'Initializing'. This is likely only
-- appropriate for initialization code etc. In contrast, the argument to
-- 'onSlotChange' is blocked at least until @SlotNo 0@ begins.
--
newTestBlockchainTime
:: forall m. (IOLike m, HasCallStack)
=> ResourceRegistry m
-> NumSlots -- ^ Number of slots
-> DiffTime -- ^ Slot duration
-> m (TestBlockchainTime m)
newTestBlockchainTime registry numSlots slotLen =
newTestGuardedBlockchainTime registry numSlots slotLen (\_ -> pure ())

-- | Like 'newTestBlockchainTime', but also allows other computations to delay
-- the end of each slot
newTestGuardedBlockchainTime
:: forall m. (IOLike m, HasCallStack)
=> ResourceRegistry m
-> NumSlots -- ^ Number of slots
-> DiffTime -- ^ Slot duration
-> (SlotNo -> m ()) -- ^ Blocks until slot is finished
-> m (TestBlockchainTime m)
newTestGuardedBlockchainTime registry (NumSlots numSlots) slotLen waitOn = do
newTestBlockchainTime registry (NumSlots numSlots) waitOn = do
slotVar <- newTVarM initVal
doneVar <- newEmptyMVar ()

Expand Down Expand Up @@ -227,7 +215,6 @@ newTestGuardedBlockchainTime registry (NumSlots numSlots) slotLen waitOn = do
Running s -> succ s
writeTVar slotVar (Running s')
pure s'
threadDelay slotLen
waitOn s'
-- signal the end of the final slot
putMVar doneVar ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ runChainSync
runChainSync securityParam maxClockSkew (ClientUpdates clientUpdates)
(ServerUpdates serverUpdates) startSyncingAt = withRegistry $ \registry -> do

testBtime <- newTestBlockchainTime registry numSlots slotDuration
testBtime <- do
let waitOn _s = threadDelay 100000 -- io-sim "seconds"
newTestBlockchainTime registry numSlots waitOn
let btime = testBlockchainTime testBtime

-- Set up the client
Expand Down Expand Up @@ -388,9 +390,6 @@ runChainSync securityParam maxClockSkew (ClientUpdates clientUpdates)
where
k = maxRollbacks securityParam

slotDuration :: DiffTime
slotDuration = 100000

nodeCfg :: CoreNodeId -> NodeConfig (Bft BftMockCrypto)
nodeCfg coreNodeId = BftNodeConfig
{ bftParams = BftParams
Expand Down
13 changes: 7 additions & 6 deletions ouroboros-consensus/test-consensus/Test/Dynamic/General.hs
Original file line number Diff line number Diff line change
Expand Up @@ -173,25 +173,26 @@ runTestNetwork pInfo
seed = runSimOrThrow $ do
registry <- unsafeNewRegistry

let slotLen = 100000 :: DiffTime -- io-sim "seconds"

-- the latest slot that is ready to start
latestReadySlot <- uncheckedNewTVarM (SlotNo 0)
-- a slot cannot end before a later slot is ready to start
let waitOn s = atomically $ do
x <- readTVar latestReadySlot
check (s < x)
testBtime <- newTestGuardedBlockchainTime registry numSlots slotLen waitOn
testBtime <- newTestBlockchainTime registry numSlots waitOn

runNodeNetwork NodeNetworkArgs
{ nnaLatencySeed = latencySeed
{ nnaMaxLatencies = MaxLatencies
{ maxSendLatency = 10 -- io-sim "seconds"
, maxVar1Latency = 1000 -- io-sim "seconds"
}
, nnaLatencySeed = latencySeed
, nnaLatestReadySlot = latestReadySlot
, nnaNodeJoinPlan = nodeJoinPlan
, nnaNodeTopology = nodeTopology
, nnaNumCoreNodes = numCoreNodes
, nnaProtocol = pInfo
, nnaQuiesence = 5000 -- io-sim "seconds"
, nnaSlotLen = slotLen
, nnaQuiesence = 50000 -- io-sim "seconds"
, nnaTestBtime = testBtime
, nnaTxSeed = seedToChaCha seed
, nnaRegistry = registry
Expand Down
86 changes: 57 additions & 29 deletions ouroboros-consensus/test-consensus/Test/Dynamic/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-- | Setup network
module Test.Dynamic.Network (
runNodeNetwork
, MaxLatencies (..)
, NodeNetworkArgs (..)
, TracingConstraints
, LatencyInjection (..)
Expand Down Expand Up @@ -110,6 +111,18 @@ import qualified Test.Util.FS.Sim.MockFS as Mock
import Test.Util.FS.Sim.STM (simHasFS)
import Test.Util.Tracer

-- | The maximum possible latencies
--
data MaxLatencies = MaxLatencies
{ maxSendLatency :: !DiffTime
-- ^ Similar to the Serialisation latency of GSV, though see the NOTE on
-- 'newLivePipe' regarding back pressure
--
, maxVar1Latency :: !DiffTime
-- ^ Similar to the Variable latency of GSV, though the NOTE on
-- 'newLivePipe' regarding pipelining
}

data NodeNetworkArgs m blk = NodeNetworkArgs
{ nnaLatencySeed :: !(LatencyInjection SMGen)
, nnaLatestReadySlot :: !(StrictTVar m SlotNo)
Expand All @@ -120,11 +133,9 @@ data NodeNetworkArgs m blk = NodeNetworkArgs
, nnaNumCoreNodes :: !NumCoreNodes
, nnaProtocol :: !(CoreNodeId -> ProtocolInfo blk)
, nnaQuiesence :: !DiffTime
-- ^ A slot cannot end until there have been no sent-but-not-unreceived
-- mini protocol messages for this long
, nnaSlotLen :: !DiffTime
-- ^ A slot cannot end until at least this duration has passed since it
-- started
-- ^ A slot ends when there have been no sent-but-not-yet-received mini
-- protocol messages for at least this long
, nnaMaxLatencies :: !MaxLatencies
, nnaTestBtime :: !(TestBlockchainTime m)
, nnaTxSeed :: !ChaChaDRG
, nnaRegistry :: !(ResourceRegistry m)
Expand Down Expand Up @@ -152,7 +163,7 @@ runNodeNetwork NodeNetworkArgs
, nnaNumCoreNodes = numCoreNodes
, nnaProtocol = pInfo
, nnaQuiesence = quiescenceThreshold
, nnaSlotLen = slotLen
, nnaMaxLatencies
, nnaTestBtime = testBtime
, nnaTxSeed = initRNG
, nnaRegistry = registry
Expand Down Expand Up @@ -253,7 +264,7 @@ runNodeNetwork NodeNetworkArgs
(,) <$> lu node1 <*> lu node2

-- spawn threads for both directed edges
let de = directedEdge slotLen tr btime mbSMG livePipesVar
let de = directedEdge nnaMaxLatencies tr btime mbSMG livePipesVar
void $ withAsyncsWaitAny $
de endpoint1 endpoint2 NE.:| [de endpoint2 endpoint1]

Expand Down Expand Up @@ -310,7 +321,7 @@ runNodeNetwork NodeNetworkArgs
, cdbValidation = ImmDB.ValidateAllEpochs
, cdbBlocksPerFile = 4
, cdbParamsLgrDB = LgrDB.ledgerDbDefaultParams (protocolSecurityParam cfg)
, cdbDiskPolicy = LgrDB.defaultDiskPolicy (protocolSecurityParam cfg) slotLen
, cdbDiskPolicy = LgrDB.defaultDiskPolicy (protocolSecurityParam cfg) generousApproxSlotLen
-- Integration
, cdbNodeConfig = cfg
, cdbEpochInfo = epochInfo
Expand All @@ -331,6 +342,12 @@ runNodeNetwork NodeNetworkArgs
, cdbRegistry = registry
, cdbGcDelay = 0
}
where
-- a rough estimate of the average slot length, more likely an
-- overestimate than an underestimate
generousApproxSlotLen = 100 * (sendL + recvL)
where
MaxLatencies sendL recvL = nnaMaxLatencies

createNode
:: HasCallStack
Expand Down Expand Up @@ -443,20 +460,20 @@ runNodeNetwork NodeNetworkArgs
-- 'MiniProtocolExpectedException'.
directedEdge ::
forall m blk. (IOLike m, SupportedBlock blk)
=> DiffTime
=> MaxLatencies
-> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException blk)
-> BlockchainTime m
-> LatencyInjection (StrictTVar m SMGen)
-> LivePipesVar m
-> (CoreNodeId, LimitedApp m NodeId blk)
-> (CoreNodeId, LimitedApp m NodeId blk)
-> m ()
directedEdge slotLen tr btime mbSMG livePipesVar nodeapp1 nodeapp2 =
directedEdge maxLatencies tr btime mbSMG livePipesVar nodeapp1 nodeapp2 =
loopOnMPEE
where
loopOnMPEE = do
(pids, edge) <-
directedEdgeInner slotLen mbSMG livePipesVar nodeapp1 nodeapp2
directedEdgeInner maxLatencies mbSMG livePipesVar nodeapp1 nodeapp2
edge
`catch` hExpected pids
`catch` hUnexpected
Expand Down Expand Up @@ -488,15 +505,15 @@ directedEdge slotLen tr btime mbSMG livePipesVar nodeapp1 nodeapp2 =
-- See 'directedEdge'.
directedEdgeInner ::
forall m blk. (IOLike m, SupportedBlock blk)
=> DiffTime
=> MaxLatencies
-> LatencyInjection (StrictTVar m SMGen)
-> LivePipesVar m
-> (CoreNodeId, LimitedApp m NodeId blk)
-- ^ client threads on this node
-> (CoreNodeId, LimitedApp m NodeId blk)
-- ^ server threads on this node
-> m (Set PipeId, m ())
directedEdgeInner slotLen mbSMG livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do
directedEdgeInner maxLatencies mbSMG livePipesVar (node1, LimitedApp app1) (node2, LimitedApp app2) = do
mps <- sequence $
( miniProtocol
(wrapMPEE MPEEChainSyncClient naChainSyncClient)
Expand Down Expand Up @@ -531,8 +548,8 @@ directedEdgeInner slotLen mbSMG livePipesVar (node1, LimitedApp app1) (node2, Li
-- ^ server action to run on node2
-> m ((PipeId, PipeId), (m (), m ()))
miniProtocol client server = do
(pid1, Pipe pipe1) <- newLivePipe slotLen mbSMG livePipesVar
(pid2, Pipe pipe2) <- newLivePipe slotLen mbSMG livePipesVar
(pid1, Pipe pipe1) <- newLivePipe maxLatencies mbSMG livePipesVar
(pid2, Pipe pipe2) <- newLivePipe maxLatencies mbSMG livePipesVar
let chan1 = Channel{send = send pipe1, recv = recv pipe2}
chan2 = Channel{send = send pipe2, recv = recv pipe1}
pure $ (,)
Expand Down Expand Up @@ -849,15 +866,31 @@ blockUntilQuiescent livePipesVar dur = get >>= go
-- | Create a pipe backed by a 'TQueue', add it to the live pipes, and
-- return the channel for writing to it and reading from it.
--
-- NOTE This mock \"physical layer\" carries one message at a time (hence the
-- name 'maxVar1Latency'). Though it uses queues so that a (pipelined) sender
-- is never blocked, there will be no _latency hiding_. In other words, the
-- stream of Variable latency samples have _already_ taken into account any
-- possible latency hiding.
--
-- NOTE Similarly, it is assumed that the Serialisation latency sufficiently
-- subsumes _back pressure_, since otherwise sending never blocks the sender.
--
-- The motivation for including latencies in this mock network is to explore
-- different permutations/ways to interleave concurrent events. While realistic
-- latencies would exhibit (conditional) (temporal) correlations due to
-- pipelining, back pressure, and so on, we assume that samplers crafted to
-- accurately synthesize those correlations would explore fewer scheduling
-- permutations.
--
newLivePipe ::
( IOLike m
, HasCallStack
)
=> DiffTime
=> MaxLatencies
-> LatencyInjection (StrictTVar m SMGen)
-> LivePipesVar m
-> m (PipeId, Pipe m a)
newLivePipe slotLen mbSMG tvar = do
newLivePipe MaxLatencies{maxVar1Latency, maxSendLatency} mbSMG tvar = do
lpCount <- uncheckedNewTVarM (EmptiedCount 0)
lpInFlight <- uncheckedNewTVarM 0
pid <- atomically $ do
Expand All @@ -870,17 +903,13 @@ newLivePipe slotLen mbSMG tvar = do

-- create a thread that reliably and order-preservingly transports messages
-- down the pipe
inBuf <- atomically Q.newTQueue
outBuf <- atomically Q.newTQueue
mbGeoD <- mapM genDelay mbSMG
mbTransportSMG <- mapM split mbSMG
inBuf <- atomically Q.newTQueue
outBuf <- atomically Q.newTQueue
mbVar1SMG <- mapM split mbSMG
void $ fork $ forever $ do
x <- atomically $ Q.readTQueue inBuf

-- Geographical latency
threadDelayLI mbGeoD
-- Variable latency
mapM genDelay mbTransportSMG >>= threadDelayLI
mapM (genDelay maxVar1Latency) mbVar1SMG >>= threadDelayLI

atomically $ Q.writeTQueue outBuf x

Expand All @@ -889,8 +918,7 @@ newLivePipe slotLen mbSMG tvar = do
{ send = \ !x -> do
atomically $ modifyTVar lpInFlight succ

-- Serialisation latency and back pressure
mapM genDelay mbSendSMG >>= threadDelayLI
mapM (genDelay maxSendLatency) mbSendSMG >>= threadDelayLI

atomically $ Q.writeTQueue inBuf x
, recv = atomically $ do
Expand All @@ -916,12 +944,12 @@ newLivePipe slotLen mbSMG tvar = do
InjectTrivialLatencies -> threadDelay 0
InjectLatencies d -> threadDelay d

genDelay smgVar = atomically $ do
genDelay mx smgVar = atomically $ do
(i, smg') <- SM.nextInt <$> readTVar smgVar
writeTVar smgVar smg'
let denom = picosecondsToDiffTime 1000
let frac = toEnum (abs i `mod` 1000) / denom
pure $ (slotLen / 10) * frac
pure $ mx * frac

forgetPipeSTM :: IOLike m => LivePipesVar m -> PipeId -> STM m ()
forgetPipeSTM tvar pid = modifyTVar tvar $ \lp@LivePipes{livePipes} ->
Expand Down

0 comments on commit 302ff08

Please sign in to comment.