From ff34c7929a173881bd77d8e671b26857adfbba65 Mon Sep 17 00:00:00 2001 From: Sasha Bogicevic Date: Fri, 15 Sep 2023 17:35:57 +0200 Subject: [PATCH] Re-read acks upon sending After possible callback local acks might change so we need to re-read them from a TVar. Add a party in the BroadcastCounter to improve logging. --- hydra-node/src/Hydra/Network/Reliability.hs | 45 +++++++++++---------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 51a06c0d2b3..f9d0ba5e574 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -122,7 +122,7 @@ withReliability tracer us allParties withRawNetwork callback action = do modifyTVar' sentMessages (`snoc` msg) readTVar ackCounter - traceWith tracer (BroadcastCounter ackCounter') + traceWith tracer (BroadcastCounter us ackCounter') broadcast $ Authenticated (Msg ackCounter' msg) us } @@ -142,31 +142,32 @@ withReliability tracer us allParties withRawNetwork callback action = do atomically $ writeTVar ackCounter newAcks callback (Authenticated msg party) + -- read our local acks again since they might be updated above + existingAcks' <- readTVarIO ackCounter -- resend messages if party did not acknowledge our latest idx let myIndex = fromJust $ elemIndex us allParties let acked = acks ! myIndex - let latestMsgAck = (! myIndex) existingAcks + let latestMsgAck = (! myIndex) existingAcks' when (acked < latestMsgAck) $ do let missing = fromList [acked + 1 .. latestMsgAck] - traceWith tracer (Resending missing acks existingAcks party) - atomically $ do - messages <- readTVar sentMessages - forM_ missing $ \idx -> do - case messages !? (idx - 1) of - Nothing -> - throwIO $ - ReliabilityFailedToFindMsg $ - "FIXME: this should never happen, there's no sent message at index " - <> show idx - <> ", messages length = " - <> show (length messages) - <> ", latest message ack: " - <> show latestMsgAck - <> ", acked: " - <> show acked - Just missingMsg -> do - let newAcks = zipWith (\ack i -> if i == myIndex then idx else ack) existingAcks partyIndexes - resend $ Authenticated (Msg newAcks missingMsg) us + messages <- readTVarIO sentMessages + forM_ missing $ \idx -> do + case messages !? (idx - 1) of + Nothing -> + throwIO $ + ReliabilityFailedToFindMsg $ + "FIXME: this should never happen, there's no sent message at index " + <> show idx + <> ", messages length = " + <> show (length messages) + <> ", latest message ack: " + <> show latestMsgAck + <> ", acked: " + <> show acked + Just missingMsg -> do + let newAcks' = zipWith (\ack i -> if i == myIndex then idx else ack) existingAcks' partyIndexes + traceWith tracer (Resending missing acks newAcks' party) + atomically $ resend $ Authenticated (Msg newAcks' missingMsg) us partyIndexes = generate (length allParties) id @@ -175,7 +176,7 @@ withReliability tracer us allParties withRawNetwork callback action = do data ReliabilityLog = Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, party :: Party} - | BroadcastCounter {localCounter :: Vector Int} + | BroadcastCounter {party :: Party, localCounter :: Vector Int} deriving stock (Show, Eq, Generic) deriving anyclass (ToJSON, FromJSON)