Skip to content

Commit

Permalink
Re-read acks upon sending
Browse files Browse the repository at this point in the history
After possible callback local acks might change so we need to re-read
them from a TVar.

Add a party in the BroadcastCounter to improve logging.
  • Loading branch information
v0d1ch committed Sep 15, 2023
1 parent a3562c5 commit ff34c79
Showing 1 changed file with 23 additions and 22 deletions.
45 changes: 23 additions & 22 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ withReliability tracer us allParties withRawNetwork callback action = do
modifyTVar' sentMessages (`snoc` msg)
readTVar ackCounter

traceWith tracer (BroadcastCounter ackCounter')
traceWith tracer (BroadcastCounter us ackCounter')

broadcast $ Authenticated (Msg ackCounter' msg) us
}
Expand All @@ -142,31 +142,32 @@ withReliability tracer us allParties withRawNetwork callback action = do
atomically $ writeTVar ackCounter newAcks
callback (Authenticated msg party)

-- read our local acks again since they might be updated above
existingAcks' <- readTVarIO ackCounter
-- resend messages if party did not acknowledge our latest idx
let myIndex = fromJust $ elemIndex us allParties
let acked = acks ! myIndex
let latestMsgAck = (! myIndex) existingAcks
let latestMsgAck = (! myIndex) existingAcks'
when (acked < latestMsgAck) $ do
let missing = fromList [acked + 1 .. latestMsgAck]
traceWith tracer (Resending missing acks existingAcks party)
atomically $ do
messages <- readTVar sentMessages
forM_ missing $ \idx -> do
case messages !? (idx - 1) of
Nothing ->
throwIO $
ReliabilityFailedToFindMsg $
"FIXME: this should never happen, there's no sent message at index "
<> show idx
<> ", messages length = "
<> show (length messages)
<> ", latest message ack: "
<> show latestMsgAck
<> ", acked: "
<> show acked
Just missingMsg -> do
let newAcks = zipWith (\ack i -> if i == myIndex then idx else ack) existingAcks partyIndexes
resend $ Authenticated (Msg newAcks missingMsg) us
messages <- readTVarIO sentMessages
forM_ missing $ \idx -> do
case messages !? (idx - 1) of
Nothing ->
throwIO $
ReliabilityFailedToFindMsg $
"FIXME: this should never happen, there's no sent message at index "
<> show idx
<> ", messages length = "
<> show (length messages)
<> ", latest message ack: "
<> show latestMsgAck
<> ", acked: "
<> show acked
Just missingMsg -> do
let newAcks' = zipWith (\ack i -> if i == myIndex then idx else ack) existingAcks' partyIndexes
traceWith tracer (Resending missing acks newAcks' party)
atomically $ resend $ Authenticated (Msg newAcks' missingMsg) us

partyIndexes = generate (length allParties) id

Expand All @@ -175,7 +176,7 @@ withReliability tracer us allParties withRawNetwork callback action = do

data ReliabilityLog
= Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, party :: Party}
| BroadcastCounter {localCounter :: Vector Int}
| BroadcastCounter {party :: Party, localCounter :: Vector Int}
deriving stock (Show, Eq, Generic)
deriving anyclass (ToJSON, FromJSON)

Expand Down

0 comments on commit ff34c79

Please sign in to comment.