From 53930a681d430c1c20795c8c287482c729c01056 Mon Sep 17 00:00:00 2001 From: Noon van der Silk Date: Thu, 21 Nov 2024 15:25:50 +0000 Subject: [PATCH 1/2] Hotfixes from PR 1572 --- hydra-node/src/Hydra/API/Server.hs | 25 ++++++------ hydra-node/src/Hydra/API/WSServer.hs | 43 +++++++++++---------- hydra-node/src/Hydra/Network/Reliability.hs | 18 ++++----- 3 files changed, 44 insertions(+), 42 deletions(-) diff --git a/hydra-node/src/Hydra/API/Server.hs b/hydra-node/src/Hydra/API/Server.hs index 5c265beaccd..20825872897 100644 --- a/hydra-node/src/Hydra/API/Server.hs +++ b/hydra-node/src/Hydra/API/Server.hs @@ -7,7 +7,7 @@ import Hydra.Prelude hiding (TVar, readTVar, seq) import Cardano.Ledger.Core (PParams) import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan) -import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO) +import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar) import Control.Exception (IOException) import Hydra.API.APIServerLog (APIServerLog (..)) import Hydra.API.ClientInput (ClientInput) @@ -27,7 +27,7 @@ import Hydra.API.ServerOutput ( import Hydra.API.ServerOutputFilter ( ServerOutputFilter, ) -import Hydra.API.WSServer (nextSequenceNumber, wsApp) +import Hydra.API.WSServer (wsApp) import Hydra.Cardano.Api (LedgerEra) import Hydra.Chain (Chain (..)) import Hydra.Chain.ChainState (IsChainState) @@ -98,9 +98,12 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId pendingDepositsP <- mkProjection [] (output <$> timedOutputEvents) projectPendingDeposits - -- NOTE: we need to reverse the list because we store history in a reversed - -- list in memory but in order on disk - history <- newTVarIO (reverse timedOutputEvents) + nextSeqVar <- newTVarIO 0 + let nextSeq = do + seq <- readTVar nextSeqVar + modifyTVar' nextSeqVar (+ 1) + pure seq + (notifyServerRunning, waitForServerRunning) <- setupServerNotification let serverSettings = @@ -117,7 +120,7 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt . simpleCors $ websocketsOr defaultConnectionOptions - (wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter) + (wsApp party tracer nextSeq callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter) (httpApp tracer chain env pparams (atomically $ getLatest commitInfoP) (atomically $ getLatest snapshotUtxoP) (atomically $ getLatest pendingDepositsP) callback) ) ( do @@ -125,7 +128,7 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt action $ Server { sendOutput = \output -> do - timedOutput <- appendToHistory history output + timedOutput <- persistOutput nextSeq output atomically $ do update headStatusP output update commitInfoP output @@ -152,13 +155,11 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt _ -> runSettings settings app - appendToHistory history output = do + persistOutput nextSeq output = do time <- getCurrentTime timedOutput <- atomically $ do - seq <- nextSequenceNumber history - let timedOutput = TimedServerOutput{output, time, seq} - modifyTVar' history (timedOutput :) - pure timedOutput + seq <- nextSeq + pure TimedServerOutput{output, time, seq} append timedOutput pure timedOutput diff --git a/hydra-node/src/Hydra/API/WSServer.hs b/hydra-node/src/Hydra/API/WSServer.hs index 3e01d49c2c4..4bf57751d1a 100644 --- a/hydra-node/src/Hydra/API/WSServer.hs +++ b/hydra-node/src/Hydra/API/WSServer.hs @@ -3,11 +3,11 @@ module Hydra.API.WSServer where -import Hydra.Prelude hiding (TVar, readTVar, seq) +import Hydra.Prelude hiding (TVar, seq) import Control.Concurrent.STM (TChan, dupTChan, readTChan) import Control.Concurrent.STM qualified as STM -import Control.Concurrent.STM.TVar (TVar, readTVar) +import Control.Concurrent.STM.TVar (TVar) import Data.Aeson qualified as Aeson import Data.Version (showVersion) import Hydra.API.APIServerLog (APIServerLog (..)) @@ -42,7 +42,6 @@ import Network.WebSockets ( acceptRequest, receiveData, sendTextData, - sendTextDatas, withPingThread, ) import Text.URI hiding (ParseException) @@ -53,7 +52,7 @@ wsApp :: IsChainState tx => Party -> Tracer IO APIServerLog -> - TVar [TimedServerOutput tx] -> + STM IO Natural -> (ClientInput tx -> IO ()) -> -- | Read model to enhance 'Greetings' messages with 'HeadStatus'. Projection STM.STM (ServerOutput tx) HeadStatus -> @@ -65,7 +64,7 @@ wsApp :: ServerOutputFilter tx -> PendingConnection -> IO () -wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel ServerOutputFilter{txContainsAddr} pending = do +wsApp party tracer nextSeq callback headStatusP headIdP snapshotUtxoP responseChannel ServerOutputFilter{txContainsAddr} pending = do traceWith tracer NewAPIConnection let path = requestPath $ pendingRequest pending queryParams <- uriQuery <$> mkURIBs path @@ -74,11 +73,13 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh let outConfig = mkServerOutputConfig queryParams + -- FIXME: No support of history forwarding anymore (disabled because of memory growing too much) -- api client can decide if they want to see the past history of server outputs - unless (shouldNotServeHistory queryParams) $ - forwardHistory con outConfig + -- unless (shouldNotServeHistory queryParams) $ + -- forwardHistory con outConfig - forwardGreetingOnly con + -- forwardGreetingOnly con + sendGreetings con withPingThread con 30 (pure ()) $ race_ (receiveInputs con) (sendOutputs chan con outConfig) @@ -86,8 +87,8 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh -- NOTE: We will add a 'Greetings' message on each API server start. This is -- important to make sure the latest configured 'party' is reaching the -- client. - forwardGreetingOnly con = do - seq <- atomically $ nextSequenceNumber history + sendGreetings con = do + seq <- atomically nextSeq headStatus <- atomically getLatestHeadStatus hydraHeadId <- atomically getLatestHeadId snapshotUtxo <- atomically getLatestSnapshotUtxo @@ -134,11 +135,11 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh (QueryParam key _) | key == [queryKey|address|] -> True _other -> False - shouldNotServeHistory qp = - flip any qp $ \case - (QueryParam key val) - | key == [queryKey|history|] -> val == [queryValue|no|] - _other -> False + -- shouldNotServeHistory qp = + -- flip any qp $ \case + -- (QueryParam key val) + -- | key == [queryKey|history|] -> val == [queryValue|no|] + -- _other -> False sendOutputs chan con outConfig@ServerOutputConfig{addressInTx} = forever $ do response <- STM.atomically $ readTChan chan @@ -161,16 +162,16 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh -- message to memory let clientInput = decodeUtf8With lenientDecode $ toStrict msg time <- getCurrentTime - seq <- atomically $ nextSequenceNumber history + seq <- atomically nextSeq let timedOutput = TimedServerOutput{output = InvalidInput @tx e clientInput, time, seq} sendTextData con $ Aeson.encode timedOutput traceWith tracer (APIInvalidInput e clientInput) - forwardHistory con ServerOutputConfig{addressInTx} = do - rawHist <- STM.atomically (readTVar history) - let hist = filter (isAddressInTx addressInTx) rawHist - let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs - sendTextDatas con $ foldl' encodeAndReverse [] hist + -- forwardHistory con ServerOutputConfig{addressInTx} = do + -- rawHist <- STM.atomically (readTVar history) + -- let hist = filter (isAddressInTx addressInTx) rawHist + -- let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs + -- sendTextDatas con $ foldl' encodeAndReverse [] hist isAddressInTx addressInTx tx = case addressInTx of diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 36beac9dd4b..96a8d69a350 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -86,7 +86,6 @@ import Cardano.Binary (serialize') import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation)) import Control.Concurrent.Class.MonadSTM ( MonadSTM (readTQueue, writeTQueue), - modifyTVar', newTQueueIO, newTVarIO, readTVarIO, @@ -94,7 +93,6 @@ import Control.Concurrent.Class.MonadSTM ( ) import Control.Tracer (Tracer) import Data.IntMap qualified as IMap -import Data.Sequence.Strict ((|>)) import Data.Sequence.Strict qualified as Seq import Data.Vector ( Vector, @@ -221,7 +219,7 @@ withReliability :: -- | Underlying network component providing consuming and sending channels. NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a -> NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a -withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do +withReliability tracer MessagePersistence{saveAcks, loadAcks, loadMessages} me otherParties withRawNetwork callback action = do acksCache <- loadAcks >>= newTVarIO sentMessages <- loadMessages >>= newTVarIO . Seq.fromList resendQ <- newTQueueIO @@ -235,15 +233,17 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa allParties = fromList $ sort $ me : otherParties - reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} = + reliableBroadcast _sentMessages ourIndex acksCache Network{broadcast} = action $ Network { broadcast = \msg -> case msg of Data{} -> do - localCounter <- atomically $ cacheMessage msg >> incrementAckCounter - saveAcks localCounter - appendMessage msg + -- localCounter <- atomically $ cacheMessage msg >> incrementAckCounter + -- saveAcks localCounter + -- appendMessage msg + localCounter <- atomically $ do + incrementAckCounter traceWith tracer BroadcastCounter{ourIndex, localCounter} broadcast $ ReliableMsg localCounter msg Ping{} -> do @@ -259,8 +259,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa writeTVar acksCache newAcks pure newAcks - cacheMessage msg = - modifyTVar' sentMessages (|> msg) + -- cacheMessage msg = + -- modifyTVar' sentMessages (|> msg) reliableCallback acksCache sentMessages resend ourIndex = NetworkCallback $ \(Authenticated (ReliableMsg acknowledged payload) party) -> do From 85a0c1331d9dca29a6d4390eefd7e137eaf36401 Mon Sep 17 00:00:00 2001 From: Noon van der Silk Date: Thu, 21 Nov 2024 15:44:55 +0000 Subject: [PATCH 2/2] Unsafe persistence as well --- hydra-node/src/Hydra/Persistence.hs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hydra-node/src/Hydra/Persistence.hs b/hydra-node/src/Hydra/Persistence.hs index ad89ce7e4d9..6fac32b1ef2 100644 --- a/hydra-node/src/Hydra/Persistence.hs +++ b/hydra-node/src/Hydra/Persistence.hs @@ -12,7 +12,7 @@ import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as C8 import System.Directory (createDirectoryIfMissing, doesFileExist) import System.FilePath (takeDirectory) -import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic) +import UnliftIO.IO.File (withBinaryFile, writeBinaryFile) data PersistenceException = PersistenceException String @@ -37,7 +37,8 @@ createPersistence fp = do pure $ Persistence { save = \a -> do - writeBinaryFileDurableAtomic fp . toStrict $ Aeson.encode a + -- FIXME: this is not atomic / durable + writeBinaryFile fp . toStrict $ Aeson.encode a , load = liftIO (doesFileExist fp) >>= \case False -> pure Nothing