Skip to content

Commit

Permalink
Use Map for messages
Browse files Browse the repository at this point in the history
We want to be able to delete old messages but keep the indices growing.
Is there a better data structure for this?
  • Loading branch information
v0d1ch committed Sep 14, 2023
1 parent a3562c5 commit 9b2b760
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
19 changes: 12 additions & 7 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,17 @@ import Control.Concurrent.Class.MonadSTM (
writeTVar,
)
import Control.Tracer (Tracer)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust)
import Data.Vector (
Vector,
elemIndex,
empty,
fromList,
generate,
length,
replicate,
snoc,
zipWith,
(!),
(!?),
)
import Hydra.Logging (traceWith)
import Hydra.Network (Network (..), NetworkComponent)
Expand Down Expand Up @@ -103,7 +101,7 @@ withReliability ::
NetworkComponent m (Authenticated msg) (Authenticated msg) a
withReliability tracer us allParties withRawNetwork callback action = do
ackCounter <- newTVarIO $ replicate (length allParties) 0
sentMessages <- newTVarIO empty
sentMessages <- newTVarIO Map.empty
resendQ <- newTQueueIO
let resend = writeTQueue resendQ
withRawNetwork (reliableCallback ackCounter sentMessages resend) $ \network@Network{broadcast} -> do
Expand All @@ -119,7 +117,7 @@ withReliability tracer us allParties withRawNetwork callback action = do
let ourIndex = fromJust $ elemIndex us allParties
let newAcks = constructAcks acks ourIndex
writeTVar ackCounter newAcks
modifyTVar' sentMessages (`snoc` msg)
modifyTVar' sentMessages (mapInsert msg)
readTVar ackCounter

traceWith tracer (BroadcastCounter ackCounter')
Expand Down Expand Up @@ -152,14 +150,14 @@ withReliability tracer us allParties withRawNetwork callback action = do
atomically $ do
messages <- readTVar sentMessages
forM_ missing $ \idx -> do
case messages !? (idx - 1) of
case messages Map.!? (idx - 1) of
Nothing ->
throwIO $
ReliabilityFailedToFindMsg $
"FIXME: this should never happen, there's no sent message at index "
<> show idx
<> ", messages length = "
<> show (length messages)
<> show (Map.size messages)
<> ", latest message ack: "
<> show latestMsgAck
<> ", acked: "
Expand All @@ -173,6 +171,13 @@ withReliability tracer us allParties withRawNetwork callback action = do
constructAcks acks wantedIndex =
zipWith (\ack i -> if i == wantedIndex then ack + 1 else ack) acks partyIndexes

mapInsert :: msg -> Map Int msg -> Map Int msg
mapInsert msg m =
case Map.lookupMax m of
Nothing -> Map.insert 1 msg m
Just (lastIndex, _) ->
Map.insert (lastIndex + 1) msg m

data ReliabilityLog
= Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, party :: Party}
| BroadcastCounter {localCounter :: Vector Int}
Expand Down
27 changes: 14 additions & 13 deletions hydra-node/test/Hydra/Network/ReliabilitySpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,28 @@ import Test.Hydra.Prelude
import Control.Concurrent.Class.MonadSTM (MonadSTM (readTQueue, readTVarIO, writeTQueue), modifyTVar', newTQueueIO, newTVarIO)
import Control.Monad.IOSim (runSimOrThrow)
import Control.Tracer (nullTracer)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Vector (empty, fromList, head, snoc)
import Data.Vector (fromList, head)
import Hydra.Network (Network (..))
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Reliability (Msg (..), withReliability)
import Hydra.Network.Reliability (Msg (..), mapInsert, withReliability)
import Test.Hydra.Fixture (alice, bob, carol)
import Test.QuickCheck (Positive (Positive), collect, counterexample, forAll, generate, suchThat, tabulate)

spec :: Spec
spec = parallel $ do
let captureOutgoing msgqueue _cb action =
action $ Network{broadcast = \msg -> atomically $ modifyTVar' msgqueue (`snoc` msg)}
action $ Network{broadcast = atomically . modifyTVar' msgqueue . mapInsert}

captureIncoming receivedMessages msg =
atomically $ modifyTVar' receivedMessages (`snoc` msg)
atomically $ modifyTVar' receivedMessages (mapInsert msg)

msg <- runIO $ generate @String arbitrary

it "forward received messages" $ do
let receivedMsgs = runSimOrThrow $ do
receivedMessages <- newTVarIO empty
receivedMessages <- newTVarIO Map.empty

withReliability
nullTracer
Expand All @@ -47,7 +48,7 @@ spec = parallel $ do

prop "broadcast messages to the network assigning a sequential id" $ \(messages :: [String]) ->
let sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
sentMessages <- newTVarIO Map.empty

withReliability nullTracer alice (fromList [alice]) (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
mapM_ (\m -> broadcast (Authenticated m alice)) messages
Expand All @@ -57,7 +58,7 @@ spec = parallel $ do

it "broadcasts messages to single connected peer" $ do
let receivedMsgs = runSimOrThrow $ do
receivedMessages <- newTVarIO empty
receivedMessages <- newTVarIO Map.empty
queue <- newTQueueIO

let aliceNetwork _ action = do
Expand All @@ -83,7 +84,7 @@ spec = parallel $ do

prop "drops already received messages" $ \(messages :: [Positive Int]) ->
let receivedMsgs = runSimOrThrow $ do
receivedMessages <- newTVarIO empty
receivedMessages <- newTVarIO Map.empty

withReliability
nullTracer
Expand All @@ -106,7 +107,7 @@ spec = parallel $ do

it "do not drop messages with same ids from different peers" $ do
let receivedMsgs = runSimOrThrow $ do
receivedMessages <- newTVarIO empty
receivedMessages <- newTVarIO Map.empty

withReliability
nullTracer
Expand All @@ -128,15 +129,15 @@ spec = parallel $ do
forAll (arbitrary `suchThat` (> lastMessageKnownToBob)) $ \totalNumberOfMessages ->
let messagesList = show <$> [1 .. totalNumberOfMessages]
sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
sentMessages <- newTVarIO Map.empty

withReliability
nullTracer
alice
(fromList [alice, bob])
( \incoming action -> do
concurrently_
(action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` message (payload m))})
(action $ Network{broadcast = atomically . modifyTVar' sentMessages . mapInsert . message . payload})
(threadDelay 2 >> incoming (Authenticated (Msg (fromList [lastMessageKnownToBob, 1]) msg) bob))
)
noop
Expand All @@ -156,14 +157,14 @@ spec = parallel $ do

it "broadcast updates counter from peers" $ do
let receivedMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
sentMessages <- newTVarIO Map.empty
withReliability
nullTracer
alice
(fromList [alice, bob])
( \incoming action -> do
concurrently_
(action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` payload m)})
(action $ Network{broadcast = atomically . modifyTVar' sentMessages . mapInsert . payload})
(incoming (Authenticated (Msg (fromList [0, 1]) msg) bob))
)
noop
Expand Down

0 comments on commit 9b2b760

Please sign in to comment.