Skip to content

Commit

Permalink
Sort all parties and pass them to withReliability
Browse files Browse the repository at this point in the history
- We used to sort inside of `withReliability` layer which
makes the testing a bit hard. It made sense to sort beforehand
since all we did with the `otherParties` was to add our party
and sort.

- Also fix all ReliabilitySpec tests.
  • Loading branch information
v0d1ch committed Sep 20, 2023
1 parent d18d539 commit 3b1f5c9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
18 changes: 8 additions & 10 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ withReliability ::
Tracer m ReliabilityLog ->
-- | Our own party identifier.
Party ->
-- | Other parties' identifiers.
[Party] ->
-- | All parties' identifiers already sorted.
Vector Party ->
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat msg))) (ReliableMsg (Heartbeat msg)) a ->
NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability tracer me otherParties withRawNetwork callback action = do
withReliability tracer me allParties withRawNetwork callback action = do
ackCounter <- newTVarIO $ replicate (length allParties) 0
sentMessages <- newTVarIO empty
resendQ <- newTQueueIO
Expand All @@ -153,8 +153,6 @@ withReliability tracer me otherParties withRawNetwork callback action = do
withAsync (forever $ atomically (readTQueue resendQ) >>= broadcast) $ \_ ->
reliableBroadcast ackCounter sentMessages network
where
allParties = fromList $ sort $ me : otherParties

reliableBroadcast ackCounter sentMessages Network{broadcast} =
action $
Network
Expand Down Expand Up @@ -205,11 +203,11 @@ withReliability tracer me otherParties withRawNetwork callback action = do
-- Pings are observed only for the information it provides about the
-- peer's view of our index
if n == count + 1
then do
let newAcks = constructAcks existingAcks partyIndex
writeTVar ackCounter newAcks
return (True, n, count, newAcks)
else return (isPing msg, n, count, existingAcks)
then do
let newAcks = constructAcks existingAcks partyIndex
writeTVar ackCounter newAcks
return (True, n, count, newAcks)
else return (isPing msg, n, count, existingAcks)

when shouldCallback $ do
traceWith tracer (Receiving acks existingAcks partyIndex)
Expand Down
4 changes: 3 additions & 1 deletion hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import Hydra.Network.Heartbeat (ConnectionMessages, Heartbeat (..), withHeartbea
import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
import Hydra.Network.Reliability (ReliableMsg, withReliability)
import Hydra.Party (Party, deriveParty)
import Data.Vector (fromList)

-- | An alias for logging messages output by network component.
-- The type is made complicated because the various subsystems use part of the tracer only.
Expand Down Expand Up @@ -102,9 +103,10 @@ withNetwork ::
withNetwork tracer connectionMessages signingKey otherParties host port peers nodeId =
let localhost = Host{hostname = show host, port}
me = deriveParty signingKey
allParties = fromList $ sort $ me : otherParties
in withHeartbeat nodeId connectionMessages $
withFlipHeartbeats $
withReliability (contramap Reliability tracer) me otherParties $
withReliability (contramap Reliability tracer) me allParties $
withAuthentication (contramap Authentication tracer) signingKey otherParties $
withOuroborosNetwork (contramap Network tracer) localhost peers

Expand Down
28 changes: 14 additions & 14 deletions hydra-node/test/Hydra/Network/ReliabilitySpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ spec = parallel $ do
captureIncoming receivedMessages msg =
atomically $ modifyTVar' receivedMessages (`snoc` msg)

let msg' = 42 :: Int
msg <- Data "node-1" <$> runIO (generate @String arbitrary)

it "forward received messages" $ do
Expand All @@ -34,7 +35,7 @@ spec = parallel $ do
withReliability
nullTracer
alice
[bob]
(fromList [alice, bob])
( \incoming _ -> do
incoming (Authenticated (ReliableMsg (fromList [1, 1]) (Data "node-2" msg)) bob)
)
Expand All @@ -50,7 +51,7 @@ spec = parallel $ do
let sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty

withReliability nullTracer alice [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
withReliability nullTracer alice (fromList [alice]) (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
mapM_ (broadcast . Data "node-1") messages

fromList . toList <$> readTVarIO sentMessages
Expand All @@ -73,14 +74,14 @@ spec = parallel $ do
$ \_ ->
action (Network{broadcast = const $ pure ()})

withReliability nullTracer alice [bob] aliceNetwork (const $ pure ()) $ \Network{broadcast} ->
withReliability nullTracer bob [bob] bobNetwork (captureIncoming receivedMessages) $ \_ -> do
broadcast (Data "node-1" msg)
withReliability nullTracer alice (fromList [alice, bob]) aliceNetwork (const $ pure ()) $ \Network{broadcast} ->
withReliability nullTracer bob (fromList [alice, bob]) bobNetwork (captureIncoming receivedMessages) $ \_ -> do
broadcast (Data "node-1" msg')
threadDelay 1

toList <$> readTVarIO receivedMessages

receivedMsgs `shouldBe` [Authenticated (Data "node-1" msg) alice]
receivedMsgs `shouldBe` [Authenticated (Data "node-1" msg') alice]

prop "drops already received messages" $ \(messages :: [Positive Int]) ->
let receivedMsgs = runSimOrThrow $ do
Expand All @@ -89,7 +90,7 @@ spec = parallel $ do
withReliability
nullTracer
alice
[bob]
(fromList [alice, bob])
( \incoming _ -> do
forM_ messages $ \(Positive m) ->
incoming (Authenticated (ReliableMsg (fromList [0, m]) (Data "node-2" m)) bob)
Expand All @@ -112,30 +113,29 @@ spec = parallel $ do
withReliability
nullTracer
alice
[bob, carol]
(fromList [alice, bob, carol])
( \incoming _ -> do
incoming (Authenticated (ReliableMsg (fromList [0, 1, 0]) (Data "node-2" msg)) bob)
incoming (Authenticated (ReliableMsg (fromList [0, 0, 1]) (Data "node-3" msg)) carol)
incoming (Authenticated (ReliableMsg (fromList [0, 1, 0]) (Data "node-2" msg')) bob)
incoming (Authenticated (ReliableMsg (fromList [0, 0, 1]) (Data "node-3" msg')) carol)
)
(captureIncoming receivedMessages)
$ \_ ->
pure ()

toList <$> readTVarIO receivedMessages

receivedMsgs `shouldBe` [Authenticated (Data "node-2" msg) bob, Authenticated (Data "node-3" msg) carol]
receivedMsgs `shouldBe` [Authenticated (Data "node-2" msg') bob, Authenticated (Data "node-3" msg') carol]

prop "retransmits unacknowledged messages given peer index does not change" $ \(Positive lastMessageKnownToBob) ->
forAll (arbitrary `suchThat` (> lastMessageKnownToBob)) $ \totalNumberOfMessages ->
let messagesList = Data "node-1" <$> [1 .. totalNumberOfMessages]
msg' = 42
sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty

withReliability
nullTracer
alice
[bob]
(fromList [alice, bob])
( \incoming action -> do
concurrently_
(action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` message m)})
Expand Down Expand Up @@ -166,7 +166,7 @@ spec = parallel $ do
withReliability
nullTracer
alice
[bob]
(fromList [alice, bob])
( \incoming action -> do
concurrently_
(action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` m)})
Expand Down

0 comments on commit 3b1f5c9

Please sign in to comment.