Skip to content

Commit

Permalink
Add tracing for events related to RecentTxIds
Browse files Browse the repository at this point in the history
  • Loading branch information
intricate committed Feb 24, 2020
1 parent 8e9f161 commit 12d0bdf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 20 deletions.
5 changes: 5 additions & 0 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node/Tracers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Control.Tracer (Tracer, nullTracer, showTracing)
import Ouroboros.Network.Block (BlockNo, Point, SlotNo)
import Ouroboros.Network.BlockFetch (FetchDecision,
TraceFetchClientState, TraceLabelPeer)
import Ouroboros.Network.RecentTxIds (TraceRecentTxIdsEvent)
import Ouroboros.Network.TxSubmission.Inbound
(TraceTxSubmissionInbound)
import Ouroboros.Network.TxSubmission.Outbound
Expand Down Expand Up @@ -46,6 +47,7 @@ data Tracers' peer blk f = Tracers
, blockFetchDecisionTracer :: f [TraceLabelPeer peer (FetchDecision [Point (Header blk)])]
, blockFetchClientTracer :: f (TraceLabelPeer peer (TraceFetchClientState (Header blk)))
, blockFetchServerTracer :: f (TraceBlockFetchServerEvent blk)
, recentTxIdsTracer :: f (TraceRecentTxIdsEvent (GenTxId blk))
, txInboundTracer :: f (TraceTxSubmissionInbound (GenTxId blk) (GenTx blk))
, txOutboundTracer :: f (TraceTxSubmissionOutbound (GenTxId blk) (GenTx blk))
, localTxSubmissionServerTracer :: f (TraceLocalTxSubmissionServerEvent blk)
Expand All @@ -62,6 +64,7 @@ instance (forall a. Semigroup (f a)) => Semigroup (Tracers' peer blk f) where
, blockFetchDecisionTracer = f blockFetchDecisionTracer
, blockFetchClientTracer = f blockFetchClientTracer
, blockFetchServerTracer = f blockFetchServerTracer
, recentTxIdsTracer = f recentTxIdsTracer
, txInboundTracer = f txInboundTracer
, txOutboundTracer = f txOutboundTracer
, localTxSubmissionServerTracer = f localTxSubmissionServerTracer
Expand All @@ -85,6 +88,7 @@ nullTracers = Tracers
, blockFetchDecisionTracer = nullTracer
, blockFetchClientTracer = nullTracer
, blockFetchServerTracer = nullTracer
, recentTxIdsTracer = nullTracer
, txInboundTracer = nullTracer
, txOutboundTracer = nullTracer
, localTxSubmissionServerTracer = nullTracer
Expand All @@ -109,6 +113,7 @@ showTracers tr = Tracers
, blockFetchDecisionTracer = showTracing tr
, blockFetchClientTracer = showTracing tr
, blockFetchServerTracer = showTracing tr
, recentTxIdsTracer = showTracing tr
, txInboundTracer = showTracing tr
, txOutboundTracer = showTracing tr
, localTxSubmissionServerTracer = showTracing tr
Expand Down
36 changes: 23 additions & 13 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import Ouroboros.Network.BlockFetch.State (FetchMode (..))
import Ouroboros.Network.Point (WithOrigin (..))
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision
(MkPipelineDecision)
import Ouroboros.Network.RecentTxIds (RecentTxIds)
import Ouroboros.Network.RecentTxIds
(RecentTxIds, TraceRecentTxIdsEvent (..))
import qualified Ouroboros.Network.RecentTxIds as RecentTxIds
import Ouroboros.Network.TxSubmission.Inbound
(TxSubmissionMempoolWriter)
Expand Down Expand Up @@ -258,7 +259,9 @@ initInternalState NodeArgs { tracers, chainDB, registry, cfg,
initState, mempoolCap, recentTxIdsExpiryThresh } = do
varCandidates <- newTVarM mempty
varState <- newTVarM initState
recentTxIds <- openRecentTxIds registry recentTxIdsExpiryThresh
recentTxIds <- openRecentTxIds registry
recentTxIdsExpiryThresh
(recentTxIdsTracer tracers)
mpCap <- atomically $ do
-- If no override is provided, calculate the default mempool capacity as
-- 2x the current ledger's maximum block size.
Expand Down Expand Up @@ -358,10 +361,11 @@ openRecentTxIds
:: (Ord txid, IOLike m)
=> ResourceRegistry m
-> RecentTxIds.ExpiryThreshold
-> Tracer m (TraceRecentTxIdsEvent txid)
-> m (StrictTVar m (RecentTxIds txid))
openRecentTxIds registry recentTxIdsExpiryThresh = do
openRecentTxIds registry recentTxIdsExpiryThresh tracer = do
t <- newTVarM RecentTxIds.empty
forkExpireRecentTxIds registry recentTxIdsExpiryThresh t
forkExpireRecentTxIds registry recentTxIdsExpiryThresh t tracer
pure t

-- | Spawn a thread that periodically attempts to remove expired elements from
Expand All @@ -371,24 +375,30 @@ forkExpireRecentTxIds
=> ResourceRegistry m
-> RecentTxIds.ExpiryThreshold
-> StrictTVar m (RecentTxIds txid)
-> Tracer m (TraceRecentTxIdsEvent txid)
-> m ()
forkExpireRecentTxIds registry recentTxIdsExpiryThresh t =
forkExpireRecentTxIds registry recentTxIdsExpiryThresh varRecentTxIds tracer =
void $ forkLinkedThread registry $ forever $ do
timeScheduledForExpiry <- atomically $ do
txids <- readTVar t
case RecentTxIds.earliestInsertionTime txids of
recentTxIds <- readTVar varRecentTxIds
case RecentTxIds.earliestInsertionTime recentTxIds of
Nothing -> retry
Just insertionTime -> pure (threshold `addTime` insertionTime)

waitUntil timeScheduledForExpiry

currentTime <- getMonotonicTime
-- let (expired, recentTxIds') = RecentTxIds.expireTxIds
-- recentTxIdsExpiryThresh
-- currentTime
-- recentTxIds
atomically $ modifyTVar t $
snd . RecentTxIds.expireTxIds recentTxIdsExpiryThresh currentTime
expired <- atomically $ do
recentTxIds <- readTVar varRecentTxIds
let (expired, recentTxIds') = RecentTxIds.expireTxIds
recentTxIdsExpiryThresh
currentTime
recentTxIds
writeTVar varRecentTxIds recentTxIds'
pure expired

unless (null expired) $
traceWith tracer (TraceRecentTxIdsExpired expired)
where
RecentTxIds.ExpiryThreshold threshold = recentTxIdsExpiryThresh

Expand Down
1 change: 1 addition & 0 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeNetwork.hs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ protocolHandlers NodeArgs {btime, maxClockSkew, tracers, maxUnackTxs, chainSyncP
, phTxSubmissionServer =
txSubmissionInbound
(txInboundTracer tracers)
(recentTxIdsTracer tracers)
maxUnackTxs
getRecentTxIds
(getMempoolWriter getMempool)
Expand Down
19 changes: 19 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/RecentTxIds.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ module Ouroboros.Network.RecentTxIds

-- * Conversion
, toList

-- * Tracing
, TraceRecentTxIdsEvent (..)
) where

import Cardano.Prelude (NoUnexpectedThunks, OnlyCheckIsWHNF (..))
Expand Down Expand Up @@ -144,6 +147,22 @@ toList (RecentTxIds psq) = OrdPSQ.fold'
[]
psq

{-----------------------------------------------------------------------------
Tracing
-----------------------------------------------------------------------------}

data TraceRecentTxIdsEvent txid
= TraceRecentTxIdsInserted
![txid]
-- ^ Transaction IDs that have been inserted into the 'RecentTxIds'.
!Time
-- ^ The time at which the transactions were added to the 'RecentTxIds'.
| TraceRecentTxIdsExpired
![(txid, Time)]
-- ^ Transaction IDs, along with their expiration times, that have been
-- expired from the 'RecentTxIds'.
deriving Show

{-----------------------------------------------------------------------------
Orphan instances
-----------------------------------------------------------------------------}
Expand Down
21 changes: 14 additions & 7 deletions ouroboros-network/src/Ouroboros/Network/TxSubmission/Inbound.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import Control.Monad.Class.MonadSTM.Strict (StrictTVar, modifyTVar,
readTVar)
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime (MonadTime (..))
import Control.Tracer (Tracer)
import Control.Tracer (Tracer, traceWith)

import Network.TypedProtocol.Pipelined (N, Nat (..))

import Ouroboros.Network.Protocol.TxSubmission.Server
import Ouroboros.Network.RecentTxIds (RecentTxIds)
import Ouroboros.Network.RecentTxIds (RecentTxIds,
TraceRecentTxIdsEvent (..))
import qualified Ouroboros.Network.RecentTxIds as RecentTxIds


Expand Down Expand Up @@ -146,14 +147,16 @@ txSubmissionInbound
:: forall txid tx idx m.
(Ord txid, Ord idx, MonadSTM m, MonadThrow m, MonadTime m)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> Tracer m (TraceRecentTxIdsEvent txid)
-- ^ Tracer for events related to the 'RecentTxIds' data structure.
-> Word16
-- ^ Maximum number of unacknowledged txids allowed
-> StrictTVar m (RecentTxIds txid)
-- ^ A collection of transaction IDs that we've most recently added to the
-- mempool from instances of the transaction submission server.
-> TxSubmissionMempoolWriter txid tx idx m
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound _tracer maxUnacked recentTxIdsVar mpWriter =
txSubmissionInbound _tracer recentTxIdsTr maxUnacked recentTxIdsVar mpWriter =
TxSubmissionServerPipelined (serverIdle Zero initialServerState)
where
-- TODO #1656: replace these fixed limits by policies based on
Expand Down Expand Up @@ -301,10 +304,14 @@ txSubmissionInbound _tracer maxUnacked recentTxIdsVar mpWriter =

-- Insert the transactions that were added to the mempool into the
-- 'RecentTxIds'.
currTime <- getMonotonicTime
atomically $ modifyTVar
recentTxIdsVar
(RecentTxIds.insertTxIds addedTxIds currTime)
unless (null addedTxIds) $ do
currTime <- getMonotonicTime
atomically $ modifyTVar
recentTxIdsVar
(RecentTxIds.insertTxIds addedTxIds currTime)
traceWith
recentTxIdsTr
(TraceRecentTxIdsInserted addedTxIds currTime)

serverIdle n st {
bufferedTxs = bufferedTxs'',
Expand Down

0 comments on commit 12d0bdf

Please sign in to comment.