Skip to content

Commit

Permalink
feat(metrics): calculate the real pending tx (#983) (#992)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xmountaintop authored Aug 21, 2024
1 parent 68713c4 commit 443353d
Showing 1 changed file with 43 additions and 11 deletions.
54 changes: 43 additions & 11 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,17 @@ var (
// that this number is pretty low, since txpool reorgs happen very frequently.
dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015))

pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
realPendingGauge = metrics.NewRegisteredGauge("txpool/real_pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
realQueuedGauge = metrics.NewRegisteredGauge("txpool/real_queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)

reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)

txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil)
txLifecycleTimer = metrics.NewRegisteredTimer("txpool/txfifecycle", nil)
statsWithMinBaseFeeTimer = metrics.NewRegisteredTimer("txpool/stats_min_base_fee", nil)
)

// BlockChain defines the minimal set of methods needed to back a tx pool with
Expand Down Expand Up @@ -229,11 +232,13 @@ type LegacyPool struct {
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

reorgPauseCh chan bool // requests to pause scheduleReorgLoop
realTxActivityShutdownCh chan struct{}
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -261,8 +266,10 @@ func New(config Config, chain BlockChain) *LegacyPool {
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),

reorgPauseCh: make(chan bool),
realTxActivityShutdownCh: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -329,9 +336,28 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool
}
pool.wg.Add(1)
go pool.loop()

pool.wg.Add(1)
go pool.periodicallyCalculateRealTxActivity()

return nil
}

func (pool *LegacyPool) periodicallyCalculateRealTxActivity() {
defer pool.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pool.StatsWithMinBaseFee(pool.chain.CurrentBlock().BaseFee)
case <-pool.realTxActivityShutdownCh:
log.Info("Real tx activity calculation stopped")
return
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -406,6 +432,7 @@ func (pool *LegacyPool) loop() {
func (pool *LegacyPool) Close() error {
// Terminate the pool reorger and return
close(pool.reorgShutdownCh)
close(pool.realTxActivityShutdownCh)
pool.wg.Wait()

if pool.journal != nil {
Expand Down Expand Up @@ -488,10 +515,12 @@ func (pool *LegacyPool) stats() (int, int) {
// StatsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions greater equal minBaseFee.
func (pool *LegacyPool) StatsWithMinBaseFee(minBaseFee *big.Int) (int, int) {
pool.mu.RLock()
defer pool.mu.RUnlock()

return pool.statsWithMinBaseFee(minBaseFee)
statsStart := time.Now()
pool.mu.Lock()
pendingTxs, queuedTxs := pool.statsWithMinBaseFee(minBaseFee)
pool.mu.Unlock()
statsWithMinBaseFeeTimer.UpdateSince(statsStart)
return pendingTxs, queuedTxs
}

// statsWithMinBaseFee retrieves the current pool stats, namely the number of pending and the
Expand All @@ -506,6 +535,7 @@ func (pool *LegacyPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) {
pending++
}
}
realPendingGauge.Update(int64(pending))

queued := 0
for _, list := range pool.queue {
Expand All @@ -516,6 +546,8 @@ func (pool *LegacyPool) statsWithMinBaseFee(minBaseFee *big.Int) (int, int) {
queued++
}
}
realQueuedGauge.Update(int64(queued))

return pending, queued
}

Expand Down

0 comments on commit 443353d

Please sign in to comment.