From 3e0859df9675c97e62f06c78f03e4e72a95e8c06 Mon Sep 17 00:00:00 2001 From: Jordan Millar Date: Mon, 14 Mar 2022 15:27:53 -0400 Subject: [PATCH] Implement queryTxMonitoringLocal in cardano-api. This exposes functionality to utilize the local tx monitoring protocol which allows users to: Check if a transaction exists in the mempool Request the next tx in the mempool Request the current size and capacity of the mempool Note: This is not exposed via cardano-cli as yet --- cardano-api/src/Cardano/Api.hs | 7 ++ cardano-api/src/Cardano/Api/IPC.hs | 100 +++++++++++++++++++++ cardano-api/src/Cardano/Api/IPC/Monad.hs | 29 +++--- cardano-api/src/Cardano/Api/LedgerState.hs | 7 +- 4 files changed, 126 insertions(+), 17 deletions(-) diff --git a/cardano-api/src/Cardano/Api.hs b/cardano-api/src/Cardano/Api.hs index 1a3483ae3c7..8994e0801a7 100644 --- a/cardano-api/src/Cardano/Api.hs +++ b/cardano-api/src/Cardano/Api.hs @@ -581,6 +581,13 @@ module Cardano.Api ( UTxO(..), queryNodeLocalState, + -- *** Local tx monitoring + LocalTxMonitorClient(..), + LocalTxMonitoringQuery(..), + LocalTxMonitoringResult(..), + MempoolSizeAndCapacity, + queryTxMonitoringLocal, + EraHistory(..), getProgress, diff --git a/cardano-api/src/Cardano/Api/IPC.hs b/cardano-api/src/Cardano/Api/IPC.hs index 90cc95e73f6..86f54718f64 100644 --- a/cardano-api/src/Cardano/Api/IPC.hs +++ b/cardano-api/src/Cardano/Api/IPC.hs @@ -58,6 +58,13 @@ module Cardano.Api.IPC ( QueryInShelleyBasedEra(..), queryNodeLocalState, + -- *** Local tx monitoring + LocalTxMonitorClient(..), + LocalTxMonitoringQuery(..), + LocalTxMonitoringResult(..), + Consensus.MempoolSizeAndCapacity, + queryTxMonitoringLocal, + EraHistory(..), getProgress, @@ -99,6 +106,7 @@ import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query import Ouroboros.Network.Protocol.LocalTxMonitor.Client (LocalTxMonitorClient (..), localTxMonitorClientPeer) import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Client as CTxMon +import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Type as Consensus import Ouroboros.Network.Protocol.LocalTxSubmission.Client (LocalTxSubmissionClient (..), SubmitResult (..)) import qualified Ouroboros.Network.Protocol.LocalTxSubmission.Client as Net.Tx @@ -119,6 +127,7 @@ import Cardano.Api.Modes import Cardano.Api.NetworkId import Cardano.Api.Protocol.Types import Cardano.Api.Query +import Cardano.Api.TxBody -- ---------------------------------------------------------------------------- -- The types for the client side of the node-to-client IPC protocols @@ -615,6 +624,97 @@ submitTxToNodeLocal connctInfo tx = do atomically $ putTMVar resultVar result pure (Net.Tx.SendMsgDone ()) + +data LocalTxMonitoringResult mode + = LocalTxMonitoringTxExists + TxId + -- ^ Slot number at which the mempool snapshot was taken + SlotNo + | LocalTxMonitoringTxDoesNotExist + TxId + -- ^ Slot number at which the mempool snapshot was taken + SlotNo + | LocalTxMonitoringNextTx + (Maybe (TxInMode mode)) + -- ^ Slot number at which the mempool snapshot was taken + SlotNo + | LocalTxMonitoringMempoolSizeAndCapacity + Consensus.MempoolSizeAndCapacity + -- ^ Slot number at which the mempool snapshot was taken + SlotNo + +data LocalTxMonitoringQuery mode + = LocalTxMonitoringQueryTx (TxIdInMode mode) + -- ^ Query if a particular tx exists in the mempool. Note that, the absence + -- of a transaction does not imply anything about how the transaction was + -- processed: it may have been dropped, or inserted in a block. + | LocalTxMonitoringSendNextTx + -- ^ The mempool is modeled as an ordered list of transactions and thus, can + -- be traversed linearly. 'LocalTxMonitoringSendNextTx' requests the next transaction from the + -- current list. This must be a transaction that was not previously sent to + -- the client for this particular snapshot. + | LocalTxMonitoringMempoolInformation + -- ^ Ask the server about the current mempool's capacity and sizes. This is + -- fixed in a given snapshot. + +queryTxMonitoringLocal + :: forall mode. LocalNodeConnectInfo mode + -> LocalTxMonitoringQuery mode + -> IO (LocalTxMonitoringResult mode) +queryTxMonitoringLocal connectInfo localTxMonitoringQuery = do + resultVar <- newEmptyTMVarIO + + let client = case localTxMonitoringQuery of + LocalTxMonitoringQueryTx txidInMode -> + localTxMonitorClientTxExists txidInMode resultVar + LocalTxMonitoringSendNextTx -> + localTxMonitorNextTx resultVar + LocalTxMonitoringMempoolInformation -> + localTxMonitorMempoolInfo resultVar + + connectToLocalNode + connectInfo + LocalNodeClientProtocols { + localChainSyncClient = NoLocalChainSyncClient, + localTxSubmissionClient = Nothing, + localStateQueryClient = Nothing, + localTxMonitoringClient = Just client + } + atomically (takeTMVar resultVar) + where + localTxMonitorClientTxExists + :: TxIdInMode mode + -> TMVar (LocalTxMonitoringResult mode) + -> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO () + localTxMonitorClientTxExists tIdInMode@(TxIdInMode txid _) resultVar = do + LocalTxMonitorClient $ return $ + CTxMon.SendMsgAcquire $ \slt -> do + return $ CTxMon.SendMsgHasTx tIdInMode $ \txPresentBool -> do + if txPresentBool + then atomically . putTMVar resultVar $ LocalTxMonitoringTxExists txid slt + else atomically . putTMVar resultVar $ LocalTxMonitoringTxDoesNotExist txid slt + return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone () + + localTxMonitorNextTx + :: TMVar (LocalTxMonitoringResult mode) + -> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO () + localTxMonitorNextTx resultVar = + LocalTxMonitorClient $ return $ do + CTxMon.SendMsgAcquire $ \slt -> do + return $ CTxMon.SendMsgNextTx $ \mTx -> do + atomically $ putTMVar resultVar $ LocalTxMonitoringNextTx mTx slt + return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone () + + localTxMonitorMempoolInfo + :: TMVar (LocalTxMonitoringResult mode) + -> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO () + localTxMonitorMempoolInfo resultVar = + LocalTxMonitorClient $ return $ do + CTxMon.SendMsgAcquire $ \slt -> do + return$ CTxMon.SendMsgGetSizes $ \mempoolCapacity -> do + atomically $ putTMVar resultVar $ LocalTxMonitoringMempoolSizeAndCapacity mempoolCapacity slt + return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone () + -- ---------------------------------------------------------------------------- -- Get tip as 'ChainPoint' -- diff --git a/cardano-api/src/Cardano/Api/IPC/Monad.hs b/cardano-api/src/Cardano/Api/IPC/Monad.hs index eba3017ba5c..fe77e76710c 100644 --- a/cardano-api/src/Cardano/Api/IPC/Monad.hs +++ b/cardano-api/src/Cardano/Api/IPC/Monad.hs @@ -9,20 +9,20 @@ module Cardano.Api.IPC.Monad , determineEraExpr ) where -import Cardano.Api.Block -import Cardano.Api.Eras -import Cardano.Api.IPC -import Cardano.Api.Modes -import Control.Applicative -import Control.Concurrent.STM -import Control.Monad -import Control.Monad.IO.Class -import Control.Monad.Trans.Cont -import Data.Either -import Data.Function -import Data.Maybe -import Cardano.Ledger.Shelley.Scripts () -import System.IO +import Cardano.Api.Block +import Cardano.Api.Eras +import Cardano.Api.IPC +import Cardano.Api.Modes +import Cardano.Ledger.Shelley.Scripts () +import Control.Applicative +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Cont +import Data.Either +import Data.Function +import Data.Maybe +import System.IO import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as Net.Query import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query @@ -62,6 +62,7 @@ executeLocalStateQueryExpr connectInfo mpoint f = do { localChainSyncClient = NoLocalChainSyncClient , localStateQueryClient = Just $ setupLocalStateQueryExpr waitResult mpoint tmvResultLocalState (f ntcVersion) , localTxSubmissionClient = Nothing + , localTxMonitoringClient = Nothing } ) diff --git a/cardano-api/src/Cardano/Api/LedgerState.hs b/cardano-api/src/Cardano/Api/LedgerState.hs index e8a74b34919..9801d7fcbd8 100644 --- a/cardano-api/src/Cardano/Api/LedgerState.hs +++ b/cardano-api/src/Cardano/Api/LedgerState.hs @@ -116,9 +116,10 @@ import qualified Cardano.Ledger.Crypto as Crypto import qualified Cardano.Ledger.Era as Ledger import qualified Cardano.Ledger.Keys as Shelley.Spec import qualified Cardano.Ledger.Shelley.API as ShelleyAPI -import qualified Cardano.Protocol.TPraos.API as TPraos import qualified Cardano.Ledger.Shelley.Genesis as Shelley.Spec +import qualified Cardano.Protocol.TPraos.API as TPraos import qualified Cardano.Protocol.TPraos.BHeader as TPraos +import qualified Cardano.Protocol.TPraos.Rules.Prtcl as TPraos import qualified Cardano.Protocol.TPraos.Rules.Tickn as Tick import Cardano.Slotting.EpochInfo (EpochInfo) import qualified Cardano.Slotting.EpochInfo.API as Slot @@ -151,7 +152,6 @@ import qualified Ouroboros.Network.Block import qualified Ouroboros.Network.Protocol.ChainSync.Client as CS import qualified Ouroboros.Network.Protocol.ChainSync.ClientPipelined as CSP import Ouroboros.Network.Protocol.ChainSync.PipelineDecision -import qualified Cardano.Protocol.TPraos.Rules.Prtcl as TPraos data InitialLedgerStateError = ILSEConfigFile Text @@ -360,7 +360,8 @@ foldBlocks nodeConfigFilePath socketPath validationMode state0 accumulate = do LocalNodeClientProtocols { localChainSyncClient = LocalChainSyncClientPipelined (chainSyncClient 50 stateIORef errorIORef env ledgerState), localTxSubmissionClient = Nothing, - localStateQueryClient = Nothing + localStateQueryClient = Nothing, + localTxMonitoringClient = Nothing } -- | Defines the client side of the chain sync protocol.