Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose the tx mempool monitoring mini protocol in cardano-api #3706

Merged
merged 2 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cardano-api/cardano-api.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ library
Cardano.Api.StakePoolMetadata
Cardano.Api.Tx
Cardano.Api.TxBody
Cardano.Api.TxInMode
Cardano.Api.InMode
Cardano.Api.TxMetadata
Cardano.Api.TxSubmit.ErrorRender
Cardano.Api.TxSubmit.Types
Expand Down
7 changes: 7 additions & 0 deletions cardano-api/src/Cardano/Api.hs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,13 @@ module Cardano.Api (
UTxO(..),
queryNodeLocalState,

-- *** Local tx monitoring
LocalTxMonitorClient(..),
LocalTxMonitoringQuery(..),
LocalTxMonitoringResult(..),
MempoolSizeAndCapacity(..),
queryTxMonitoringLocal,

EraHistory(..),
getProgress,

Expand Down
182 changes: 172 additions & 10 deletions cardano-api/src/Cardano/Api/IPC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ module Cardano.Api.IPC (
QueryInShelleyBasedEra(..),
queryNodeLocalState,

-- *** Local tx monitoring
LocalTxMonitorClient(..),
LocalTxMonitoringQuery(..),
LocalTxMonitoringResult(..),
Consensus.MempoolSizeAndCapacity(..),
queryTxMonitoringLocal,

EraHistory(..),
getProgress,

Expand All @@ -79,7 +86,8 @@ import Data.Void (Void)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Map.Strict as Map

import Control.Concurrent.STM
import Control.Concurrent.STM (TMVar, atomically, newEmptyTMVarIO, putTMVar, takeTMVar,
tryPutTMVar)
import Control.Monad (void)
import Control.Tracer (nullTracer)

Expand All @@ -95,6 +103,10 @@ import Ouroboros.Network.Protocol.LocalStateQuery.Client (LocalStateQu
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as Net.Query
import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure (..))
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query
import Ouroboros.Network.Protocol.LocalTxMonitor.Client (LocalTxMonitorClient (..),
localTxMonitorClientPeer)
import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Client as CTxMon
import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Type as Consensus
import Ouroboros.Network.Protocol.LocalTxSubmission.Client (LocalTxSubmissionClient (..),
SubmitResult (..))
import qualified Ouroboros.Network.Protocol.LocalTxSubmission.Client as Net.Tx
Expand All @@ -110,11 +122,12 @@ import qualified Ouroboros.Consensus.Node.Run as Consensus

import Cardano.Api.Block
import Cardano.Api.HasTypeProxy
import Cardano.Api.InMode
import Cardano.Api.Modes
import Cardano.Api.NetworkId
import Cardano.Api.Protocol.Types
import Cardano.Api.Query
import Cardano.Api.TxInMode
import Cardano.Api.TxBody

-- ----------------------------------------------------------------------------
-- The types for the client side of the node-to-client IPC protocols
Expand All @@ -127,7 +140,7 @@ import Cardano.Api.TxInMode
-- to\/from the types used by the underlying wire formats is handled by
-- 'connectToLocalNode'.
--
data LocalNodeClientProtocols block point tip tx txerr query m =
data LocalNodeClientProtocols block point tip slot tx txid txerr query m =
LocalNodeClientProtocols {
localChainSyncClient
:: LocalChainSyncClient block point tip m
Expand All @@ -137,6 +150,9 @@ data LocalNodeClientProtocols block point tip tx txerr query m =

, localStateQueryClient
:: Maybe (LocalStateQueryClient block point query m ())

, localTxMonitoringClient
:: Maybe (LocalTxMonitorClient txid tx slot m ())
}

data LocalChainSyncClient block point tip m
Expand All @@ -150,7 +166,9 @@ type LocalNodeClientProtocolsInMode mode =
(BlockInMode mode)
ChainPoint
ChainTip
SlotNo
(TxInMode mode)
(TxIdInMode mode)
(TxValidationErrorInMode mode)
(QueryInMode mode)
IO
Expand Down Expand Up @@ -256,7 +274,8 @@ mkVersionedProtocols networkid ptcl unversionedClients =
LocalNodeClientProtocolsForBlock {
localChainSyncClientForBlock,
localTxSubmissionClientForBlock,
localStateQueryClientForBlock
localStateQueryClientForBlock,
localTxMonitoringClientForBlock
}
ptclBlockVersion
ptclVersion =
Expand Down Expand Up @@ -298,7 +317,9 @@ mkVersionedProtocols networkid ptcl unversionedClients =
Net.MuxPeer
nullTracer
cTxMonitorCodec
Net.localTxMonitorPeerNull
(maybe Net.localTxMonitorPeerNull
localTxMonitorClientPeer
localTxMonitoringClientForBlock)
}
where
Consensus.Codecs {
Expand Down Expand Up @@ -355,6 +376,10 @@ data LocalNodeClientProtocolsForBlock block =
:: Maybe (LocalTxSubmissionClient (Consensus.GenTx block)
(Consensus.ApplyTxErr block)
IO ())
, localTxMonitoringClientForBlock
:: Maybe (LocalTxMonitorClient (Consensus.TxId (Consensus.GenTx block))
(Consensus.GenTx block)
SlotNo IO ())
}


Expand Down Expand Up @@ -404,7 +429,8 @@ convLocalNodeClientProtocols
LocalNodeClientProtocols {
localChainSyncClient,
localTxSubmissionClient,
localStateQueryClient
localStateQueryClient,
localTxMonitoringClient
} =
LocalNodeClientProtocolsForBlock {
localChainSyncClientForBlock = case localChainSyncClient of
Expand All @@ -416,9 +442,23 @@ convLocalNodeClientProtocols
localTxSubmissionClient,

localStateQueryClientForBlock = convLocalStateQueryClient mode <$>
localStateQueryClient
localStateQueryClient,

localTxMonitoringClientForBlock = convLocalTxMonitoringClient mode <$>
localTxMonitoringClient

}

convLocalTxMonitoringClient
:: forall mode block m a. ConsensusBlockForMode mode ~ block
=> Functor m
=> ConsensusMode mode
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo m a
-> LocalTxMonitorClient (Consensus.TxId (Consensus.GenTx block)) (Consensus.GenTx block) SlotNo m a
convLocalTxMonitoringClient mode =
mapLocalTxMonitoringClient
toConsensusTxId
(fromConsensusGenTx mode)

convLocalChainSyncClient
:: forall mode block m a.
Expand Down Expand Up @@ -473,6 +513,38 @@ convLocalStateQueryClient mode =
fromConsensusQueryResult


--TODO: Move to consensus
mapLocalTxMonitoringClient
:: forall txid txid' tx tx' m a. Functor m
=> (txid -> txid')
-> (tx'-> tx)
-> LocalTxMonitorClient txid tx SlotNo m a
-> LocalTxMonitorClient txid' tx' SlotNo m a
mapLocalTxMonitoringClient convTxid convTx ltxmc =
let LocalTxMonitorClient idleEff = ltxmc
in LocalTxMonitorClient (fmap convClientStateIdle idleEff)
where
convClientStateIdle
:: CTxMon.ClientStIdle txid tx SlotNo m a
-> CTxMon.ClientStIdle txid' tx' SlotNo m a
convClientStateIdle (CTxMon.SendMsgAcquire f) =
CTxMon.SendMsgAcquire $ (fmap . fmap) convClientStateAcquired f
convClientStateIdle (CTxMon.SendMsgDone a) = CTxMon.SendMsgDone a

convClientStateAcquired
:: CTxMon.ClientStAcquired txid tx SlotNo m a
-> CTxMon.ClientStAcquired txid' tx' SlotNo m a
convClientStateAcquired (CTxMon.SendMsgNextTx f) =
CTxMon.SendMsgNextTx (\mTx -> convClientStateAcquired <$> f (convTx <$> mTx))
convClientStateAcquired (CTxMon.SendMsgHasTx txid f)=
CTxMon.SendMsgHasTx (convTxid txid) ((fmap . fmap) convClientStateAcquired f)
convClientStateAcquired (CTxMon.SendMsgGetSizes f) =
CTxMon.SendMsgGetSizes $ (fmap . fmap) convClientStateAcquired f
convClientStateAcquired (CTxMon.SendMsgAwaitAcquire f) =
CTxMon.SendMsgAwaitAcquire $ (fmap . fmap ) convClientStateAcquired f
convClientStateAcquired (CTxMon.SendMsgRelease eff) =
CTxMon.SendMsgRelease (convClientStateIdle <$> eff)

-- ----------------------------------------------------------------------------
-- Wrappers for specific protocol use-cases
--
Expand All @@ -496,7 +568,8 @@ queryNodeLocalState connctInfo mpoint query = do
LocalNodeClientProtocols {
localChainSyncClient = NoLocalChainSyncClient,
localStateQueryClient = Just (singleQuery mpoint resultVar),
localTxSubmissionClient = Nothing
localTxSubmissionClient = Nothing,
localTxMonitoringClient = Nothing
}
atomically (takeTMVar resultVar)
where
Expand Down Expand Up @@ -535,7 +608,8 @@ submitTxToNodeLocal connctInfo tx = do
LocalNodeClientProtocols {
localChainSyncClient = NoLocalChainSyncClient,
localTxSubmissionClient = Just (localTxSubmissionClientSingle resultVar),
localStateQueryClient = Nothing
localStateQueryClient = Nothing,
localTxMonitoringClient = Nothing
}
atomically (takeTMVar resultVar)
where
Expand All @@ -550,11 +624,98 @@ submitTxToNodeLocal connctInfo tx = do
atomically $ putTMVar resultVar result
pure (Net.Tx.SendMsgDone ())


data LocalTxMonitoringResult mode
= LocalTxMonitoringTxExists
TxId
SlotNo -- ^ Slot number at which the mempool snapshot was taken
| LocalTxMonitoringTxDoesNotExist
TxId
SlotNo -- ^ Slot number at which the mempool snapshot was taken
| LocalTxMonitoringNextTx
(Maybe (TxInMode mode))
SlotNo -- ^ Slot number at which the mempool snapshot was taken
| LocalTxMonitoringMempoolSizeAndCapacity
Consensus.MempoolSizeAndCapacity
SlotNo -- ^ Slot number at which the mempool snapshot was taken

data LocalTxMonitoringQuery mode
-- | Query if a particular tx exists in the mempool. Note that, the absence
-- of a transaction does not imply anything about how the transaction was
-- processed: it may have been dropped, or inserted in a block.
= LocalTxMonitoringQueryTx (TxIdInMode mode)
-- | The mempool is modeled as an ordered list of transactions and thus, can
-- be traversed linearly. 'LocalTxMonitoringSendNextTx' requests the next transaction from the
-- current list. This must be a transaction that was not previously sent to
-- the client for this particular snapshot.
Copy link
Contributor

@newhoggy newhoggy Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there state associated with the connection that tracks where the current transaction in the list is?

Copy link
Contributor Author

@Jimbo4350 Jimbo4350 Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are only 3 things you can get:

data ServerStBusy (kind :: StBusyKind) txid tx slot m a where
  SendMsgReplyNextTx
    :: Maybe tx
    -> ServerStAcquired txid tx slot m a
    -> ServerStBusy NextTx txid tx slot m a

  SendMsgReplyHasTx
    :: Bool
    -> ServerStAcquired txid tx slot m a
    -> ServerStBusy HasTx txid tx slot m a

  SendMsgReplyGetSizes
    :: MempoolSizeAndCapacity
    -> ServerStAcquired txid tx slot m a
    -> ServerStBusy GetSizes txid tx slot 

The next tx, if a tx exists or the mempool size.

| LocalTxMonitoringSendNextTx
-- | Ask the server about the current mempool's capacity and sizes. This is
-- fixed in a given snapshot.
| LocalTxMonitoringMempoolInformation


queryTxMonitoringLocal
:: forall mode. LocalNodeConnectInfo mode
-> LocalTxMonitoringQuery mode
-> IO (LocalTxMonitoringResult mode)
queryTxMonitoringLocal connectInfo localTxMonitoringQuery = do
resultVar <- newEmptyTMVarIO

let client = case localTxMonitoringQuery of
LocalTxMonitoringQueryTx txidInMode ->
localTxMonitorClientTxExists txidInMode resultVar
LocalTxMonitoringSendNextTx ->
localTxMonitorNextTx resultVar
LocalTxMonitoringMempoolInformation ->
localTxMonitorMempoolInfo resultVar

connectToLocalNode
connectInfo
LocalNodeClientProtocols {
localChainSyncClient = NoLocalChainSyncClient,
localTxSubmissionClient = Nothing,
localStateQueryClient = Nothing,
localTxMonitoringClient = Just client
}
atomically (takeTMVar resultVar)
where
localTxMonitorClientTxExists
:: TxIdInMode mode
-> TMVar (LocalTxMonitoringResult mode)
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO ()
localTxMonitorClientTxExists tIdInMode@(TxIdInMode txid _) resultVar = do
LocalTxMonitorClient $ return $
CTxMon.SendMsgAcquire $ \slt -> do
return $ CTxMon.SendMsgHasTx tIdInMode $ \txPresentBool -> do
if txPresentBool
then atomically . putTMVar resultVar $ LocalTxMonitoringTxExists txid slt
else atomically . putTMVar resultVar $ LocalTxMonitoringTxDoesNotExist txid slt
return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone ()

localTxMonitorNextTx
:: TMVar (LocalTxMonitoringResult mode)
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO ()
localTxMonitorNextTx resultVar =
LocalTxMonitorClient $ return $ do
CTxMon.SendMsgAcquire $ \slt -> do
return $ CTxMon.SendMsgNextTx $ \mTx -> do
atomically $ putTMVar resultVar $ LocalTxMonitoringNextTx mTx slt
return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone ()

localTxMonitorMempoolInfo
:: TMVar (LocalTxMonitoringResult mode)
-> LocalTxMonitorClient (TxIdInMode mode) (TxInMode mode) SlotNo IO ()
localTxMonitorMempoolInfo resultVar =
LocalTxMonitorClient $ return $ do
CTxMon.SendMsgAcquire $ \slt -> do
return$ CTxMon.SendMsgGetSizes $ \mempoolCapacity -> do
atomically $ putTMVar resultVar $ LocalTxMonitoringMempoolSizeAndCapacity mempoolCapacity slt
return $ CTxMon.SendMsgRelease $ return $ CTxMon.SendMsgDone ()

-- ----------------------------------------------------------------------------
-- Get tip as 'ChainPoint'
--


getLocalChainTip :: LocalNodeConnectInfo mode -> IO ChainTip
getLocalChainTip localNodeConInfo = do
resultVar <- newEmptyTMVarIO
Expand All @@ -564,6 +725,7 @@ getLocalChainTip localNodeConInfo = do
{ localChainSyncClient = LocalChainSyncClient $ chainSyncGetCurrentTip resultVar
, localTxSubmissionClient = Nothing
, localStateQueryClient = Nothing
, localTxMonitoringClient = Nothing
}
atomically $ takeTMVar resultVar

Expand Down
29 changes: 15 additions & 14 deletions cardano-api/src/Cardano/Api/IPC/Monad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ module Cardano.Api.IPC.Monad
, determineEraExpr
) where

import Cardano.Api.Block
import Cardano.Api.Eras
import Cardano.Api.IPC
import Cardano.Api.Modes
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Cont
import Data.Either
import Data.Function
import Data.Maybe
import Cardano.Ledger.Shelley.Scripts ()
import System.IO
import Cardano.Api.Block
import Cardano.Api.Eras
import Cardano.Api.IPC
import Cardano.Api.Modes
import Cardano.Ledger.Shelley.Scripts ()
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Cont
import Data.Either
import Data.Function
import Data.Maybe
import System.IO

import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as Net.Query
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query
Expand Down Expand Up @@ -62,6 +62,7 @@ executeLocalStateQueryExpr connectInfo mpoint f = do
{ localChainSyncClient = NoLocalChainSyncClient
, localStateQueryClient = Just $ setupLocalStateQueryExpr waitResult mpoint tmvResultLocalState (f ntcVersion)
, localTxSubmissionClient = Nothing
, localTxMonitoringClient = Nothing
}
)

Expand Down
Loading