Skip to content

Commit

Permalink
miner,api: Add env var cfg, thresholds to timing logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed Nov 25, 2023
1 parent 405acbb commit 75f241a
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 57 deletions.
21 changes: 20 additions & 1 deletion eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/big"
"os"
"sync"
"time"

Expand Down Expand Up @@ -227,6 +228,21 @@ func checkAttribute(active func(*big.Int, uint64) bool, exists bool, block *big.
return nil
}

var timingThresholdForkchoiceUpdated = func() time.Duration {
const defaultThr = 10 * time.Millisecond
const envStr = "GETH_TIMING_THR_FORKCHOICEUPDATED"
thrEnv, ok := os.LookupEnv(envStr)
if !ok {
return defaultThr
}
thr, err := time.ParseDuration(thrEnv)
if err != nil {
return defaultThr
}
log.Info("ForkchoiceUpdated timing threshold read from env.", "threshold", thr, "env", envStr)
return thr
}()

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
start := time.Now()
timings := []interface{}{"method", "ForkchoiceUpdated"}
Expand All @@ -235,8 +251,11 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
}
defer func() {
recordElapsed("complete")
log.Trace("Engine API request complete", timings...)
if time.Since(start) > timingThresholdForkchoiceUpdated {
log.Debug("Engine API request complete", timings...)
}
}()

api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()
recordElapsed("lockAcquired")
Expand Down
134 changes: 78 additions & 56 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/big"
"os"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -536,6 +537,26 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
}
}

var (
timingThresholdNewWork = readThrEnv("NEWWORK", 250*time.Millisecond)
timingThresholdGetWork = readThrEnv("GETWORK", 10*time.Millisecond)
timingThresholdTxs = readThrEnv("TXS", 10*time.Millisecond)
)

func readThrEnv(envSuffix string, defaultThr time.Duration) time.Duration {
envStr := "GETH_TIMING_THR_" + envSuffix
thrEnv, ok := os.LookupEnv(envStr)
if !ok {
return defaultThr
}
thr, err := time.ParseDuration(thrEnv)
if err != nil {
return defaultThr
}
log.Info("Miner loop timing threshold read from env.", "threshold", thr, "env", envStr)
return thr
}

// mainLoop is responsible for generating and submitting sealing work based on
// the received event. It can support two modes: automatically generate task and
// submit it or return task according to given parameters for various proposes.
Expand All @@ -549,75 +570,76 @@ func (w *worker) mainLoop() {
}
}()

logTime := func(workType string, action func()) {
logTime := func(workType string, threshold time.Duration) func() {
start := time.Now()
defer func() {
return func() {
elapsed := time.Since(start)
log.Trace("Completed work", "type", workType, "elapsed", common.PrettyDuration(elapsed))
}()
action()
if elapsed > threshold {
log.Debug("Completed work", "type", workType, "elapsed", common.PrettyDuration(elapsed))
}
}
}

for {
select {
case req := <-w.newWorkCh:
logTime("newWork", func() {
w.commitWork(req.interrupt, req.timestamp)
})
done := logTime("newWork", timingThresholdNewWork)
w.commitWork(req.interrupt, req.timestamp)
done()

case req := <-w.getWorkCh:
logTime("getWork", func() {
req.result <- w.generateWork(req.params)
})
done := logTime("getWork", timingThresholdGetWork)
req.result <- w.generateWork(req.params)
done()

case ev := <-w.txsCh:
logTime("txsCh", func() {
if w.chainConfig.Optimism != nil && !w.config.RollupComputePendingBlock {
return // don't update the pending-block snapshot if we are not computing the pending block
done := logTime("txsCh", timingThresholdTxs)
if w.chainConfig.Optimism != nil && !w.config.RollupComputePendingBlock {
continue // don't update the pending-block snapshot if we are not computing the pending block
}
// Apply transactions to the pending state if we're not sealing
//
// Note all transactions received may not be continuous with transactions
// already included in the current sealing block. These transactions will
// be automatically eliminated.
if !w.isRunning() && w.current != nil {
// If block is already full, abort
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
continue
}
// Apply transactions to the pending state if we're not sealing
//
// Note all transactions received may not be continuous with transactions
// already included in the current sealing block. These transactions will
// be automatically eliminated.
if !w.isRunning() && w.current != nil {
// If block is already full, abort
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
return
}
txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs))
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], &txpool.LazyTransaction{
Pool: w.eth.TxPool(), // We don't know where this came from, yolo resolve from everywhere
Hash: tx.Hash(),
Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
}
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil)

// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot(w.current)
}
} else {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitWork(nil, time.Now().Unix())
}
txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs))
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], &txpool.LazyTransaction{
Pool: w.eth.TxPool(), // We don't know where this came from, yolo resolve from everywhere
Hash: tx.Hash(),
Tx: nil, // Do *not* set this! We need to resolve it later to pull blobs in
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
}
w.newTxs.Add(int32(len(ev.Txs)))
})
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil)

// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot(w.current)
}
} else {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitWork(nil, time.Now().Unix())
}
}
w.newTxs.Add(int32(len(ev.Txs)))
done()

// System stopped
case <-w.exitCh:
Expand Down

0 comments on commit 75f241a

Please sign in to comment.