Skip to content

Commit

Permalink
Implement queryTxMonitoringLocal in cardano-api. This exposes functio…
Browse files Browse the repository at this point in the history
…nality

to utilize the local tx monitoring protocol which allows users to:
  Check if a transaction exists in the mempool
  Request the next tx in the mempool
  Request the current size and capacity of the mempool

Note: This is not exposed via cardano-cli as yet
  • Loading branch information
Jimbo4350 committed Mar 14, 2022
1 parent 9f72faf commit 3e0859d
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 17 deletions.
7 changes: 7 additions & 0 deletions cardano-api/src/Cardano/Api.hs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,13 @@ module Cardano.Api (
UTxO(..),
queryNodeLocalState,

-- *** Local tx monitoring
LocalTxMonitorClient(..),
LocalTxMonitoringQuery(..),
LocalTxMonitoringResult(..),
MempoolSizeAndCapacity,
queryTxMonitoringLocal,

EraHistory(..),
getProgress,

Expand Down
100 changes: 100 additions & 0 deletions cardano-api/src/Cardano/Api/IPC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ module Cardano.Api.IPC (
QueryInShelleyBasedEra(..),
queryNodeLocalState,

-- *** Local tx monitoring
LocalTxMonitorClient(..),
LocalTxMonitoringQuery(..),
LocalTxMonitoringResult(..),
Consensus.MempoolSizeAndCapacity,
queryTxMonitoringLocal,

EraHistory(..),
getProgress,

Expand Down Expand Up @@ -99,6 +106,7 @@ import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query
import Ouroboros.Network.Protocol.LocalTxMonitor.Client (LocalTxMonitorClient (..),
localTxMonitorClientPeer)
import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Client as CTxMon
import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Type as Consensus
import Ouroboros.Network.Protocol.LocalTxSubmission.Client (LocalTxSubmissionClient (..),
SubmitResult (..))
import qualified Ouroboros.Network.Protocol.LocalTxSubmission.Client as Net.Tx
Expand All @@ -119,6 +127,7 @@ import Cardano.Api.Modes
import Cardano.Api.NetworkId
import Cardano.Api.Protocol.Types
import Cardano.Api.Query
import Cardano.Api.TxBody

-- ----------------------------------------------------------------------------
-- The types for the client side of the node-to-client IPC protocols
Expand Down Expand Up @@ -615,6 +624,97 @@ submitTxToNodeLocal connctInfo tx = do
atomically $ putTMVar resultVar result
pure (Net.Tx.SendMsgDone ())


data LocalTxMonitoringResult mode
= LocalTxMonitoringTxExists
TxId
-- ^ Slot number at which the mempool snapshot was taken
SlotNo
| LocalTxMonitoringTxDoesNotExist
TxId
-- ^ Slot number at which the mempool snapshot was taken
SlotNo
| LocalTxMonitoringNextTx
(Maybe (TxInMode mode))
-- ^ Slot number at which the mempool snapshot was taken
SlotNo
| LocalTxMonitoringMempoolSizeAndCapacity
Consensus.MempoolSizeAndCapacity
-- ^ Slot number at which the mempool snapshot was taken
SlotNo

data LocalTxMonitoringQuery mode
= LocalTxMonitoringQueryTx (TxIdInMode mode)
-- ^ Query if a particular tx exists in the mempool. Note that, the absence
-- of a transaction does not imply anything about how the transaction was
-- processed: it may have been dropped, or inserted in a block.
| LocalTxMonitoringSendNextTx
-- ^ The mempool is modeled as an ordered list of transactions and thus, can
-- be traversed linearly. 'LocalTxMonitoringSendNextTx' requests the next transaction from the
-- current list. This must be a transaction that was not previously sent to
-- the client for this particular snapshot.
| LocalTxMonitoringMempoolInformation
-- ^ Ask the server about the current mempool's capacity and sizes. This is
-- fixed in a given snapshot.

queryTxMonitoringLocal
:: forall mode. LocalNodeConnectInfo mode
-> LocalTxMonitoringQuery mode
-> IO (LocalTxMonitoringResult mode)
queryTxMonitoringLocal connectInfo localTxMonitoringQuery = do
resultVar <- newEmptyTMVarIO

let client = case localTxMonitoringQuery of
LocalTxMonitoringQueryTx txidInMode ->
localTxMonitorClientTxExists txidInMode resultVar
LocalTxMonitoringSendNextTx ->
localTxMonitorNextTx resultVar
LocalTxMonitoringMempoolInformation ->
localTxMonitorMempoolInfo resultVar

connectToLocalNode
connectInfo
LocalNodeClientProtocols {
localChainSyncClient = NoLocalChainSyncClient,
localTxSubmissionClient = Nothing,
localStateQueryClient = Nothing,
localTxMonitoringClient = Just client
}
atomically (takeTMVar resultVar)
where
localTxMonitorClientTxExists
:: TxIdInMode mode
-> TMVar (LocalTxMonitoringResult mode)
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO ()
localTxMonitorClientTxExists tIdInMode@(TxIdInMode txid _) resultVar = do
LocalTxMonitorClient $ return $
CTxMon.SendMsgAcquire $ \slt -> do
return $ CTxMon.SendMsgHasTx tIdInMode $ \txPresentBool -> do
if txPresentBool
then atomically . putTMVar resultVar $ LocalTxMonitoringTxExists txid slt
else atomically . putTMVar resultVar $ LocalTxMonitoringTxDoesNotExist txid slt
return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone ()

localTxMonitorNextTx
:: TMVar (LocalTxMonitoringResult mode)
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO ()
localTxMonitorNextTx resultVar =
LocalTxMonitorClient $ return $ do
CTxMon.SendMsgAcquire $ \slt -> do
return $ CTxMon.SendMsgNextTx $ \mTx -> do
atomically $ putTMVar resultVar $ LocalTxMonitoringNextTx mTx slt
return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone ()

localTxMonitorMempoolInfo
:: TMVar (LocalTxMonitoringResult mode)
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO ()
localTxMonitorMempoolInfo resultVar =
LocalTxMonitorClient $ return $ do
CTxMon.SendMsgAcquire $ \slt -> do
return$ CTxMon.SendMsgGetSizes $ \mempoolCapacity -> do
atomically $ putTMVar resultVar $ LocalTxMonitoringMempoolSizeAndCapacity mempoolCapacity slt
return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone ()

-- ----------------------------------------------------------------------------
-- Get tip as 'ChainPoint'
--
Expand Down
29 changes: 15 additions & 14 deletions cardano-api/src/Cardano/Api/IPC/Monad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ module Cardano.Api.IPC.Monad
, determineEraExpr
) where

import Cardano.Api.Block
import Cardano.Api.Eras
import Cardano.Api.IPC
import Cardano.Api.Modes
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Cont
import Data.Either
import Data.Function
import Data.Maybe
import Cardano.Ledger.Shelley.Scripts ()
import System.IO
import Cardano.Api.Block
import Cardano.Api.Eras
import Cardano.Api.IPC
import Cardano.Api.Modes
import Cardano.Ledger.Shelley.Scripts ()
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Cont
import Data.Either
import Data.Function
import Data.Maybe
import System.IO

import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as Net.Query
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query
Expand Down Expand Up @@ -62,6 +62,7 @@ executeLocalStateQueryExpr connectInfo mpoint f = do
{ localChainSyncClient = NoLocalChainSyncClient
, localStateQueryClient = Just $ setupLocalStateQueryExpr waitResult mpoint tmvResultLocalState (f ntcVersion)
, localTxSubmissionClient = Nothing
, localTxMonitoringClient = Nothing
}
)

Expand Down
7 changes: 4 additions & 3 deletions cardano-api/src/Cardano/Api/LedgerState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ import qualified Cardano.Ledger.Crypto as Crypto
import qualified Cardano.Ledger.Era as Ledger
import qualified Cardano.Ledger.Keys as Shelley.Spec
import qualified Cardano.Ledger.Shelley.API as ShelleyAPI
import qualified Cardano.Protocol.TPraos.API as TPraos
import qualified Cardano.Ledger.Shelley.Genesis as Shelley.Spec
import qualified Cardano.Protocol.TPraos.API as TPraos
import qualified Cardano.Protocol.TPraos.BHeader as TPraos
import qualified Cardano.Protocol.TPraos.Rules.Prtcl as TPraos
import qualified Cardano.Protocol.TPraos.Rules.Tickn as Tick
import Cardano.Slotting.EpochInfo (EpochInfo)
import qualified Cardano.Slotting.EpochInfo.API as Slot
Expand Down Expand Up @@ -151,7 +152,6 @@ import qualified Ouroboros.Network.Block
import qualified Ouroboros.Network.Protocol.ChainSync.Client as CS
import qualified Ouroboros.Network.Protocol.ChainSync.ClientPipelined as CSP
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision
import qualified Cardano.Protocol.TPraos.Rules.Prtcl as TPraos

data InitialLedgerStateError
= ILSEConfigFile Text
Expand Down Expand Up @@ -360,7 +360,8 @@ foldBlocks nodeConfigFilePath socketPath validationMode state0 accumulate = do
LocalNodeClientProtocols {
localChainSyncClient = LocalChainSyncClientPipelined (chainSyncClient 50 stateIORef errorIORef env ledgerState),
localTxSubmissionClient = Nothing,
localStateQueryClient = Nothing
localStateQueryClient = Nothing,
localTxMonitoringClient = Nothing
}

-- | Defines the client side of the chain sync protocol.
Expand Down

0 comments on commit 3e0859d

Please sign in to comment.