Skip to content

Commit

Permalink
Use Map to store sent messages
Browse files Browse the repository at this point in the history
  • Loading branch information
v0d1ch committed Sep 21, 2023
1 parent 2365cdf commit 5139a31
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,12 @@ import qualified Data.Map.Strict as Map
import Data.Vector (
Vector,
elemIndex,
empty,
fromList,
generate,
length,
replicate,
snoc,
zipWith,
(!),
(!?),
)
import Hydra.Logging (traceWith)
import Hydra.Network (Network (..), NetworkComponent)
Expand Down Expand Up @@ -152,7 +149,7 @@ withReliability ::
NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
withReliability tracer me otherParties withRawNetwork callback action = do
ackCounter <- newTVarIO $ replicate (length allParties) 0
sentMessages <- newTVarIO empty
sentMessages <- newTVarIO Map.empty
seenMessages <- newTVarIO $ Map.fromList $ (,0) <$> toList allParties
resendQ <- newTQueueIO
ourIndex <- findPartyIndex me
Expand All @@ -174,7 +171,7 @@ withReliability tracer me otherParties withRawNetwork callback action = do
acks <- readTVar ackCounter
let newAcks = constructAcks acks ourIndex
writeTVar ackCounter newAcks
modifyTVar' sentMessages (`snoc` msg)
modifyTVar' sentMessages (insertNewMsg msg)
readTVar ackCounter

traceWith tracer (BroadcastCounter ourIndex ackCounter')
Expand Down Expand Up @@ -241,14 +238,14 @@ withReliability tracer me otherParties withRawNetwork callback action = do
let missing = fromList [messageAckForUs + 1 .. knownAckForUs]
messages <- readTVarIO sentMessages
forM_ missing $ \idx -> do
case messages !? (idx - 1) of
case messages Map.!? (idx - 1) of
Nothing ->
throwIO $
ReliabilityFailedToFindMsg $
"FIXME: this should never happen, there's no sent message at index "
<> show idx
<> ", messages length = "
<> show (length messages)
<> show (Map.size messages)
<> ", latest message ack: "
<> show knownAckForUs
<> ", acked: "
Expand All @@ -270,6 +267,11 @@ withReliability tracer me otherParties withRawNetwork callback action = do
-- modifyTVar' sentMessages (Map.delete (const $ Just messageAckForUs) party)
traceWith tracer (ClearedMessageQueue queueLength deleted)

insertNewMsg msg m =
case Map.lookupMax m of
Nothing -> Map.insert 1 msg m
Just (k, _) -> Map.insert (k + 1) msg m

-- find the index of a party in the list of parties or fail with 'ReliabilityMissingPartyIndex'
findPartyIndex party =
maybe (throwIO $ ReliabilityMissingPartyIndex party) pure $ elemIndex party allParties

0 comments on commit 5139a31

Please sign in to comment.