Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explore: remove message persistence from network layer #1593

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ import Data.Vector (
zipWith,
(!?),
)
import Data.Vector qualified as Vector
import Hydra.Logging (traceWith)
import Hydra.Network (Network (..), NetworkComponent)
import Hydra.Network.Authenticate (Authenticated (..))
Expand Down Expand Up @@ -216,18 +217,16 @@ withReliability ::
(MonadThrow (STM m), MonadThrow m, MonadAsync m) =>
-- | Tracer for logging messages.
Tracer m ReliabilityLog ->
-- | Our persistence handle
MessagePersistence m outbound ->
-- | Our own party identifier.
Party ->
-- | Other parties' identifiers.
[Party] ->
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
acksCache <- loadAcks >>= newTVarIO
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList
withReliability tracer me otherParties withRawNetwork callback action = do
acksCache <- newTVarIO Vector.empty
sentMessages <- newTVarIO Seq.empty
resendQ <- newTQueueIO
let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me)
let resend = writeTQueue resendQ
Expand All @@ -243,13 +242,10 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
case msg of
Data{} -> do
localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
saveAcks localCounter
appendMessage msg
traceWith tracer BroadcastCounter{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
Ping{} -> do
localCounter <- readTVarIO acksCache
saveAcks localCounter
traceWith tracer BroadcastPing{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
}
Expand Down
7 changes: 2 additions & 5 deletions hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,9 @@ withNetwork ::
withNetwork tracer configuration callback action = do
let localHost = Host{hostname = show host, port}
me = deriveParty signingKey
numberOfParties = length $ me : otherParties
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties

let reliability =
withFlipHeartbeats $
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
withReliability (contramap Reliability tracer) me otherParties $
withAuthentication (contramap Authentication tracer) signingKey otherParties $
withOuroborosNetwork
(contramap Network tracer)
Expand All @@ -153,7 +150,7 @@ withNetwork tracer configuration callback action = do
withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network ->
action network
where
NetworkConfiguration{persistenceDir, signingKey, otherParties, host, port, peers, nodeId} = configuration
NetworkConfiguration{signingKey, otherParties, host, port, peers, nodeId} = configuration

mapHeartbeat :: Either Connectivity (Authenticated (Message tx)) -> NetworkEvent (Message tx)
mapHeartbeat = \case
Expand Down
75 changes: 5 additions & 70 deletions hydra-node/test/Hydra/Network/ReliabilitySpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Message (Connectivity)
import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability)
import Hydra.Node.Network (withFlipHeartbeats)
import Hydra.Persistence (
Persistence (..),
PersistenceIncremental (..),
createPersistence,
createPersistenceIncremental,
)
import System.Directory (doesFileExist)
import System.FilePath ((</>))
import System.Random (mkStdGen, uniformR)
import Test.Hydra.Fixture (alice, bob, carol)
import Test.QuickCheck (
Expand Down Expand Up @@ -99,9 +91,8 @@ spec = parallel $ do
prop "broadcast messages to the network assigning a sequential id" $ \(messages :: [String]) ->
let sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
persistence <- mockMessagePersistence 1

withReliability nullTracer persistence alice [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
withReliability nullTracer alice [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
mapM_ (broadcast . Data "node-1") messages

fromList . Vector.toList <$> readTVarIO sentMessages
Expand All @@ -119,16 +110,14 @@ spec = parallel $ do
randomSeed <- newTVarIO $ mkStdGen seed
aliceToBob <- newTQueueIO
bobToAlice <- newTQueueIO
alicePersistence <- mockMessagePersistence 2
bobPersistence <- mockMessagePersistence 2
let
-- this is a NetworkComponent that broadcasts authenticated messages
-- mediated through a read and a write TQueue but drops 0.2 % of them
aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob)
bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice)

bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice]
aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob]
bobReliabilityStack = reliabilityStack bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice]
aliceReliabilityStack = reliabilityStack aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob]

runAlice = runPeer aliceReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages
runBob = runPeer bobReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages
Expand All @@ -150,10 +139,8 @@ spec = parallel $ do
it "broadcast updates counter from peers" $ do
let receivedMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO empty
alicePersistence <- mockMessagePersistence 2
withReliability
nullTracer
alicePersistence
alice
[bob]
( \incoming action -> do
Expand All @@ -168,51 +155,6 @@ spec = parallel $ do
Vector.toList <$> readTVarIO sentMessages

receivedMsgs `shouldBe` [ReliableMsg (fromList [1, 1]) (Data "node-1" msg)]

it "appends messages to disk and can load them back" $ do
withTempDir "network-messages-persistence" $ \tmpDir -> do
let networkMessagesFile = tmpDir <> "/network-messages"

Persistence{load, save} <- createPersistence $ tmpDir <> "/acks"
PersistenceIncremental{loadAll, append} <- createPersistenceIncremental networkMessagesFile

let messagePersistence =
MessagePersistence
{ loadAcks = do
mloaded <- load
case mloaded of
Nothing -> pure $ replicate (length [alice, bob]) 0
Just acks -> pure acks
, saveAcks = save
, loadMessages = loadAll
, appendMessage = append
}

receivedMsgs <- do
sentMessages <- newTVarIO empty
withReliability
nullTracer
messagePersistence
alice
[bob]
( \incoming action -> do
concurrently_
(action $ Network{broadcast = \m -> atomically $ modifyTVar' sentMessages (`snoc` m)})
(incoming (Authenticated (ReliableMsg (fromList [0, 1]) (Data "node-2" msg)) bob))
)
noop
$ \Network{broadcast} -> do
threadDelay 1
broadcast (Data "node-1" msg)
Vector.toList <$> readTVarIO sentMessages

receivedMsgs `shouldBe` [ReliableMsg (fromList [1, 1]) (Data "node-1" msg)]

doesFileExist networkMessagesFile `shouldReturn` True
reloadAll networkMessagesFile `shouldReturn` [Data "node-1" msg]

doesFileExist (tmpDir </> "acks") `shouldReturn` True
load `shouldReturn` Just (fromList [1, 1])
where
runPeer reliability partyName receivedMessageContainer sentMessageContainer messagesToSend expectedMessages =
reliability (capturePayload receivedMessageContainer) $ \Network{broadcast} -> do
Expand All @@ -224,10 +166,10 @@ spec = parallel $ do
(waitForAllMessages expectedMessages receivedMessageContainer)
(waitForAllMessages messagesToSend sentMessageContainer)

reliabilityStack persistence underlyingNetwork tracer nodeId party peers =
reliabilityStack underlyingNetwork tracer nodeId party peers =
withHeartbeat nodeId $
withFlipHeartbeats $
withReliability tracer persistence party peers underlyingNetwork
withReliability tracer party peers underlyingNetwork

failingNetwork seed peer (readQueue, writeQueue) callback action =
withAsync
Expand All @@ -249,25 +191,18 @@ spec = parallel $ do
writeTVar seed' newGenSeed
pure res

reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)]
reloadAll fileName =
createPersistenceIncremental fileName
>>= \PersistenceIncremental{loadAll} -> loadAll

noop :: Monad m => b -> m ()
noop = const $ pure ()

aliceReceivesMessages :: [Authenticated (ReliableMsg (Heartbeat msg))] -> [Authenticated (Heartbeat msg)]
aliceReceivesMessages messages = runSimOrThrow $ do
receivedMessages <- newTVarIO empty
alicePersistence <- mockMessagePersistence 3

let baseNetwork incoming _ = mapM incoming messages

aliceReliabilityStack =
withReliability
nullTracer
alicePersistence
alice
[bob, carol]
baseNetwork
Expand Down
Loading