Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*DO NOT MERGE* Doom hotfixes #1745

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions hydra-node/src/Hydra/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Hydra.Prelude hiding (TVar, readTVar, seq)
import Cardano.Ledger.Core (PParams)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Concurrent.STM.TChan (newBroadcastTChanIO, writeTChan)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar)
import Control.Exception (IOException)
import Hydra.API.APIServerLog (APIServerLog (..))
import Hydra.API.ClientInput (ClientInput)
Expand All @@ -27,7 +27,7 @@ import Hydra.API.ServerOutput (
import Hydra.API.ServerOutputFilter (
ServerOutputFilter,
)
import Hydra.API.WSServer (nextSequenceNumber, wsApp)
import Hydra.API.WSServer (wsApp)
import Hydra.Cardano.Api (LedgerEra)
import Hydra.Chain (Chain (..))
import Hydra.Chain.ChainState (IsChainState)
Expand Down Expand Up @@ -98,9 +98,12 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
headIdP <- mkProjection Nothing (output <$> timedOutputEvents) projectInitializingHeadId
pendingDepositsP <- mkProjection [] (output <$> timedOutputEvents) projectPendingDeposits

-- NOTE: we need to reverse the list because we store history in a reversed
-- list in memory but in order on disk
history <- newTVarIO (reverse timedOutputEvents)
nextSeqVar <- newTVarIO 0
let nextSeq = do
seq <- readTVar nextSeqVar
modifyTVar' nextSeqVar (+ 1)
pure seq

(notifyServerRunning, waitForServerRunning) <- setupServerNotification

let serverSettings =
Expand All @@ -117,15 +120,15 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
. simpleCors
$ websocketsOr
defaultConnectionOptions
(wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter)
(wsApp party tracer nextSeq callback headStatusP headIdP snapshotUtxoP responseChannel serverOutputFilter)
(httpApp tracer chain env pparams (atomically $ getLatest commitInfoP) (atomically $ getLatest snapshotUtxoP) (atomically $ getLatest pendingDepositsP) callback)
)
( do
waitForServerRunning
action $
Server
{ sendOutput = \output -> do
timedOutput <- appendToHistory history output
timedOutput <- persistOutput nextSeq output
atomically $ do
update headStatusP output
update commitInfoP output
Expand All @@ -152,13 +155,11 @@ withAPIServer config env party persistence tracer chain pparams serverOutputFilt
_ ->
runSettings settings app

appendToHistory history output = do
persistOutput nextSeq output = do
time <- getCurrentTime
timedOutput <- atomically $ do
seq <- nextSequenceNumber history
let timedOutput = TimedServerOutput{output, time, seq}
modifyTVar' history (timedOutput :)
pure timedOutput
seq <- nextSeq
pure TimedServerOutput{output, time, seq}
append timedOutput
pure timedOutput

Expand Down
43 changes: 22 additions & 21 deletions hydra-node/src/Hydra/API/WSServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

module Hydra.API.WSServer where

import Hydra.Prelude hiding (TVar, readTVar, seq)
import Hydra.Prelude hiding (TVar, seq)

import Control.Concurrent.STM (TChan, dupTChan, readTChan)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TVar (TVar, readTVar)
import Control.Concurrent.STM.TVar (TVar)
import Data.Aeson qualified as Aeson
import Data.Version (showVersion)
import Hydra.API.APIServerLog (APIServerLog (..))
Expand Down Expand Up @@ -42,7 +42,6 @@ import Network.WebSockets (
acceptRequest,
receiveData,
sendTextData,
sendTextDatas,
withPingThread,
)
import Text.URI hiding (ParseException)
Expand All @@ -53,7 +52,7 @@ wsApp ::
IsChainState tx =>
Party ->
Tracer IO APIServerLog ->
TVar [TimedServerOutput tx] ->
STM IO Natural ->
(ClientInput tx -> IO ()) ->
-- | Read model to enhance 'Greetings' messages with 'HeadStatus'.
Projection STM.STM (ServerOutput tx) HeadStatus ->
Expand All @@ -65,7 +64,7 @@ wsApp ::
ServerOutputFilter tx ->
PendingConnection ->
IO ()
wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseChannel ServerOutputFilter{txContainsAddr} pending = do
wsApp party tracer nextSeq callback headStatusP headIdP snapshotUtxoP responseChannel ServerOutputFilter{txContainsAddr} pending = do
traceWith tracer NewAPIConnection
let path = requestPath $ pendingRequest pending
queryParams <- uriQuery <$> mkURIBs path
Expand All @@ -74,20 +73,22 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh

let outConfig = mkServerOutputConfig queryParams

-- FIXME: No support of history forwarding anymore (disabled because of memory growing too much)
-- api client can decide if they want to see the past history of server outputs
unless (shouldNotServeHistory queryParams) $
forwardHistory con outConfig
-- unless (shouldNotServeHistory queryParams) $
-- forwardHistory con outConfig

forwardGreetingOnly con
-- forwardGreetingOnly con
sendGreetings con

withPingThread con 30 (pure ()) $
race_ (receiveInputs con) (sendOutputs chan con outConfig)
where
-- NOTE: We will add a 'Greetings' message on each API server start. This is
-- important to make sure the latest configured 'party' is reaching the
-- client.
forwardGreetingOnly con = do
seq <- atomically $ nextSequenceNumber history
sendGreetings con = do
seq <- atomically nextSeq
headStatus <- atomically getLatestHeadStatus
hydraHeadId <- atomically getLatestHeadId
snapshotUtxo <- atomically getLatestSnapshotUtxo
Expand Down Expand Up @@ -134,11 +135,11 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh
(QueryParam key _) | key == [queryKey|address|] -> True
_other -> False

shouldNotServeHistory qp =
flip any qp $ \case
(QueryParam key val)
| key == [queryKey|history|] -> val == [queryValue|no|]
_other -> False
-- shouldNotServeHistory qp =
-- flip any qp $ \case
-- (QueryParam key val)
-- | key == [queryKey|history|] -> val == [queryValue|no|]
-- _other -> False

sendOutputs chan con outConfig@ServerOutputConfig{addressInTx} = forever $ do
response <- STM.atomically $ readTChan chan
Expand All @@ -161,16 +162,16 @@ wsApp party tracer history callback headStatusP headIdP snapshotUtxoP responseCh
-- message to memory
let clientInput = decodeUtf8With lenientDecode $ toStrict msg
time <- getCurrentTime
seq <- atomically $ nextSequenceNumber history
seq <- atomically nextSeq
let timedOutput = TimedServerOutput{output = InvalidInput @tx e clientInput, time, seq}
sendTextData con $ Aeson.encode timedOutput
traceWith tracer (APIInvalidInput e clientInput)

forwardHistory con ServerOutputConfig{addressInTx} = do
rawHist <- STM.atomically (readTVar history)
let hist = filter (isAddressInTx addressInTx) rawHist
let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs
sendTextDatas con $ foldl' encodeAndReverse [] hist
-- forwardHistory con ServerOutputConfig{addressInTx} = do
-- rawHist <- STM.atomically (readTVar history)
-- let hist = filter (isAddressInTx addressInTx) rawHist
-- let encodeAndReverse xs serverOutput = Aeson.encode serverOutput : xs
-- sendTextDatas con $ foldl' encodeAndReverse [] hist

isAddressInTx addressInTx tx =
case addressInTx of
Expand Down
18 changes: 9 additions & 9 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,13 @@ import Cardano.Binary (serialize')
import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation))
import Control.Concurrent.Class.MonadSTM (
MonadSTM (readTQueue, writeTQueue),
modifyTVar',
newTQueueIO,
newTVarIO,
readTVarIO,
writeTVar,
)
import Control.Tracer (Tracer)
import Data.IntMap qualified as IMap
import Data.Sequence.Strict ((|>))
import Data.Sequence.Strict qualified as Seq
import Data.Vector (
Vector,
Expand Down Expand Up @@ -221,7 +219,7 @@ withReliability ::
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
withReliability tracer MessagePersistence{saveAcks, loadAcks, loadMessages} me otherParties withRawNetwork callback action = do
acksCache <- loadAcks >>= newTVarIO
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList
resendQ <- newTQueueIO
Expand All @@ -235,15 +233,17 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa

allParties = fromList $ sort $ me : otherParties

reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} =
reliableBroadcast _sentMessages ourIndex acksCache Network{broadcast} =
action $
Network
{ broadcast = \msg ->
case msg of
Data{} -> do
localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
saveAcks localCounter
appendMessage msg
-- localCounter <- atomically $ cacheMessage msg >> incrementAckCounter
-- saveAcks localCounter
-- appendMessage msg
localCounter <- atomically $ do
incrementAckCounter
traceWith tracer BroadcastCounter{ourIndex, localCounter}
broadcast $ ReliableMsg localCounter msg
Ping{} -> do
Expand All @@ -259,8 +259,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa
writeTVar acksCache newAcks
pure newAcks

cacheMessage msg =
modifyTVar' sentMessages (|> msg)
-- cacheMessage msg =
-- modifyTVar' sentMessages (|> msg)

reliableCallback acksCache sentMessages resend ourIndex =
NetworkCallback $ \(Authenticated (ReliableMsg acknowledged payload) party) -> do
Expand Down
5 changes: 3 additions & 2 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as C8
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath (takeDirectory)
import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic)
import UnliftIO.IO.File (withBinaryFile, writeBinaryFile)

data PersistenceException
= PersistenceException String
Expand All @@ -37,7 +37,8 @@ createPersistence fp = do
pure $
Persistence
{ save = \a -> do
writeBinaryFileDurableAtomic fp . toStrict $ Aeson.encode a
-- FIXME: this is not atomic / durable
writeBinaryFile fp . toStrict $ Aeson.encode a
, load =
liftIO (doesFileExist fp) >>= \case
False -> pure Nothing
Expand Down
Loading