diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 1930eca8723..949e2547a50 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -139,12 +139,12 @@ withReliability :: Tracer m ReliabilityLog -> -- | Our own party identifier. Party -> - -- | Other parties' identifiers. - [Party] -> + -- | All parties' identifiers already sorted. + Vector Party -> -- | Underlying network component providing consuming and sending channels. NetworkComponent m (Authenticated (ReliableMsg (Heartbeat msg))) (ReliableMsg (Heartbeat msg)) a -> NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a -withReliability tracer me otherParties withRawNetwork callback action = do +withReliability tracer me allParties withRawNetwork callback action = do ackCounter <- newTVarIO $ replicate (length allParties) 0 sentMessages <- newTVarIO empty resendQ <- newTQueueIO @@ -153,8 +153,6 @@ withReliability tracer me otherParties withRawNetwork callback action = do withAsync (forever $ atomically (readTQueue resendQ) >>= broadcast) $ \_ -> reliableBroadcast ackCounter sentMessages network where - allParties = fromList $ sort $ me : otherParties - reliableBroadcast ackCounter sentMessages Network{broadcast} = action $ Network @@ -205,11 +203,11 @@ withReliability tracer me otherParties withRawNetwork callback action = do -- Pings are observed only for the information it provides about the -- peer's view of our index if n == count + 1 - then do - let newAcks = constructAcks existingAcks partyIndex - writeTVar ackCounter newAcks - return (True, n, count, newAcks) - else return (isPing msg, n, count, existingAcks) + then do + let newAcks = constructAcks existingAcks partyIndex + writeTVar ackCounter newAcks + return (True, n, count, newAcks) + else return (isPing msg, n, count, existingAcks) when shouldCallback $ do traceWith tracer (Receiving acks existingAcks partyIndex) diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index bcbaf0f74cf..a640b8594cf 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -74,6 +74,7 @@ import Hydra.Network.Heartbeat (ConnectionMessages, Heartbeat (..), withHeartbea import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork) import Hydra.Network.Reliability (ReliableMsg, withReliability) import Hydra.Party (Party, deriveParty) +import Data.Vector (fromList) -- | An alias for logging messages output by network component. -- The type is made complicated because the various subsystems use part of the tracer only. @@ -102,9 +103,10 @@ withNetwork :: withNetwork tracer connectionMessages signingKey otherParties host port peers nodeId = let localhost = Host{hostname = show host, port} me = deriveParty signingKey + allParties = fromList $ sort $ me : otherParties in withHeartbeat nodeId connectionMessages $ withFlipHeartbeats $ - withReliability (contramap Reliability tracer) me otherParties $ + withReliability (contramap Reliability tracer) me allParties $ withAuthentication (contramap Authentication tracer) signingKey otherParties $ withOuroborosNetwork (contramap Network tracer) localhost peers diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index fc5f8150282..0267ab4588e 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -25,6 +25,7 @@ spec = parallel $ do captureIncoming receivedMessages msg = atomically $ modifyTVar' receivedMessages (`snoc` msg) + let msg' = 42 :: Int msg <- Data "node-1" <$> runIO (generate @String arbitrary) it "forward received messages" $ do @@ -34,7 +35,7 @@ spec = parallel $ do withReliability nullTracer alice - [bob] + (fromList [alice, bob]) ( \incoming _ -> do incoming (Authenticated (ReliableMsg (fromList [1, 1]) (Data "node-2" msg)) bob) ) @@ -50,7 +51,7 @@ spec = parallel $ do let sentMsgs = runSimOrThrow $ do sentMessages <- newTVarIO empty - withReliability nullTracer alice [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do + withReliability nullTracer alice (fromList [alice]) (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do mapM_ (broadcast . Data "node-1") messages fromList . toList <$> readTVarIO sentMessages @@ -73,14 +74,14 @@ spec = parallel $ do $ \_ -> action (Network{broadcast = const $ pure ()}) - withReliability nullTracer alice [bob] aliceNetwork (const $ pure ()) $ \Network{broadcast} -> - withReliability nullTracer bob [bob] bobNetwork (captureIncoming receivedMessages) $ \_ -> do - broadcast (Data "node-1" msg) + withReliability nullTracer alice (fromList [alice, bob]) aliceNetwork (const $ pure ()) $ \Network{broadcast} -> + withReliability nullTracer bob (fromList [alice, bob]) bobNetwork (captureIncoming receivedMessages) $ \_ -> do + broadcast (Data "node-1" msg') threadDelay 1 toList <$> readTVarIO receivedMessages - receivedMsgs `shouldBe` [Authenticated (Data "node-1" msg) alice] + receivedMsgs `shouldBe` [Authenticated (Data "node-1" msg') alice] prop "drops already received messages" $ \(messages :: [Positive Int]) -> let receivedMsgs = runSimOrThrow $ do @@ -89,7 +90,7 @@ spec = parallel $ do withReliability nullTracer alice - [bob] + (fromList [alice, bob]) ( \incoming _ -> do forM_ messages $ \(Positive m) -> incoming (Authenticated (ReliableMsg (fromList [0, m]) (Data "node-2" m)) bob) @@ -112,10 +113,10 @@ spec = parallel $ do withReliability nullTracer alice - [bob, carol] + (fromList [alice, bob, carol]) ( \incoming _ -> do - incoming (Authenticated (ReliableMsg (fromList [0, 1, 0]) (Data "node-2" msg)) bob) - incoming (Authenticated (ReliableMsg (fromList [0, 0, 1]) (Data "node-3" msg)) carol) + incoming (Authenticated (ReliableMsg (fromList [0, 1, 0]) (Data "node-2" msg')) bob) + incoming (Authenticated (ReliableMsg (fromList [0, 0, 1]) (Data "node-3" msg')) carol) ) (captureIncoming receivedMessages) $ \_ -> @@ -123,19 +124,18 @@ spec = parallel $ do toList <$> readTVarIO receivedMessages - receivedMsgs `shouldBe` [Authenticated (Data "node-2" msg) bob, Authenticated (Data "node-3" msg) carol] + receivedMsgs `shouldBe` [Authenticated (Data "node-2" msg') bob, Authenticated (Data "node-3" msg') carol] prop "retransmits unacknowledged messages given peer index does not change" $ \(Positive lastMessageKnownToBob) -> forAll (arbitrary `suchThat` (> lastMessageKnownToBob)) $ \totalNumberOfMessages -> let messagesList = Data "node-1" <$> [1 .. totalNumberOfMessages] - msg' = 42 sentMsgs = runSimOrThrow $ do sentMessages <- newTVarIO empty withReliability nullTracer alice - [bob] + (fromList [alice, bob]) ( \incoming action -> do concurrently_ (action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` message m)}) @@ -166,7 +166,7 @@ spec = parallel $ do withReliability nullTracer alice - [bob] + (fromList [alice, bob]) ( \incoming action -> do concurrently_ (action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` m)})