Skip to content

Commit

Permalink
metrics, cmd/XDC: change init-process of metrics (ethereum#30814)
Browse files Browse the repository at this point in the history
This PR modifies how the metrics library handles `Enabled`: previously,
the package `init` decided whether to serve real metrics or just
dummy-types.

This has several drawbacks:
- During pkg init, we need to determine whether metrics are enabled or
not. So we first hacked in a check if certain geth-specific
commandline-flags were enabled. Then we added a similar check for
geth-env-vars. Then we almost added a very elaborate check for
toml-config-file, plus toml parsing.

- Using "real" types and dummy types interchangeably means that
everything is hidden behind interfaces. This has a performance penalty,
and also it just adds a lot of code.

This PR removes the interface stuff, uses concrete types, and allows for
the setting of Enabled to happen later. It is still assumed that
`metrics.Enable()` is invoked early on.

The somewhat 'heavy' operations, such as ticking meters and exp-decay,
now checks the enable-flag to prevent resource leak.

The change may be large, but it's mostly pretty trivial, and from the
last time I gutted the metrics, I ensured that we have fairly good test
coverage.

---------

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
2 people authored and gzliudan committed Dec 11, 2024
1 parent 0867ad8 commit ae413d9
Show file tree
Hide file tree
Showing 47 changed files with 674 additions and 1,309 deletions.
11 changes: 4 additions & 7 deletions cmd/XDC/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -239,14 +238,12 @@ func importChain(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
utils.Fatalf("This command requires an argument.")
}
// Start metrics export if enabled
utils.SetupMetrics(ctx)
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)

stack, _ := makeFullNode(ctx)
stack, cfg := makeFullNode(ctx)
defer stack.Close()

// Start metrics export if enabled
utils.SetupMetrics(&cfg.Metrics)

chain, chainDb := utils.MakeChain(ctx, stack)
defer chainDb.Close()

Expand Down
24 changes: 24 additions & 0 deletions cmd/XDC/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ func applyValues(values []string, params *[]string) {
func makeFullNode(ctx *cli.Context) (*node.Node, XDCConfig) {
stack, cfg := makeConfigNode(ctx)

// Start metrics export if enabled
utils.SetupMetrics(&cfg.Metrics)

// Register XDCX's OrderBook service if requested.
// enable in default
utils.RegisterXDCXService(stack, &cfg.XDCX)
Expand Down Expand Up @@ -310,4 +313,25 @@ func applyMetricConfig(ctx *cli.Context, cfg *XDCConfig) {
if ctx.IsSet(utils.MetricsInfluxDBOrganizationFlag.Name) {
cfg.Metrics.InfluxDBOrganization = ctx.String(utils.MetricsInfluxDBOrganizationFlag.Name)
}
// Sanity-check the commandline flags. It is fine if some unused fields is part
// of the toml-config, but we expect the commandline to only contain relevant
// arguments, otherwise it indicates an error.
var (
enableExport = ctx.Bool(utils.MetricsEnableInfluxDBFlag.Name)
enableExportV2 = ctx.Bool(utils.MetricsEnableInfluxDBV2Flag.Name)
)
if enableExport || enableExportV2 {
v1FlagIsSet := ctx.IsSet(utils.MetricsInfluxDBUsernameFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBPasswordFlag.Name)

v2FlagIsSet := ctx.IsSet(utils.MetricsInfluxDBTokenFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBOrganizationFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBBucketFlag.Name)

if enableExport && v2FlagIsSet {
utils.Fatalf("Flags --influxdb-metrics.organization, --influxdb-metrics.token, --influxdb-metrics.bucket are only available for influxdb-v2")
} else if enableExportV2 && v1FlagIsSet {
utils.Fatalf("Flags --influxdb-metrics.username, --influxdb-metrics.password are only available for influxdb-v1")
}
}
}
4 changes: 0 additions & 4 deletions cmd/XDC/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,6 @@ func startNode(ctx *cli.Context, stack *node.Node, cfg XDCConfig) {
if ctx.Bool(utils.LightModeFlag.Name) || ctx.String(utils.SyncModeFlag.Name) == "light" {
utils.Fatalf("Light clients do not support staking")
}
// Start metrics export if enabled
utils.SetupMetrics(ctx)
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)

var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
Expand Down
103 changes: 47 additions & 56 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"math/big"
"net"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -1532,66 +1533,56 @@ func SetupNetwork(ctx *cli.Context) {
params.TargetGasLimit = ctx.Uint64(MinerGasLimitFlag.Name)
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
var (
enableExport = ctx.Bool(MetricsEnableInfluxDBFlag.Name)
enableExportV2 = ctx.Bool(MetricsEnableInfluxDBV2Flag.Name)
)

if enableExport || enableExportV2 {
checkExclusive(ctx, MetricsEnableInfluxDBFlag, MetricsEnableInfluxDBV2Flag)

v1FlagIsSet := ctx.IsSet(MetricsInfluxDBUsernameFlag.Name) ||
ctx.IsSet(MetricsInfluxDBPasswordFlag.Name)

v2FlagIsSet := ctx.IsSet(MetricsInfluxDBTokenFlag.Name) ||
ctx.IsSet(MetricsInfluxDBOrganizationFlag.Name) ||
ctx.IsSet(MetricsInfluxDBBucketFlag.Name)

if enableExport && v2FlagIsSet {
Fatalf("Flags --influxdb.metrics.organization, --influxdb.metrics.token, --influxdb.metrics.bucket are only available for influxdb-v2")
} else if enableExportV2 && v1FlagIsSet {
Fatalf("Flags --influxdb.metrics.username, --influxdb.metrics.password are only available for influxdb-v1")
}
}

var (
endpoint = ctx.String(MetricsInfluxDBEndpointFlag.Name)
database = ctx.String(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.String(MetricsInfluxDBUsernameFlag.Name)
password = ctx.String(MetricsInfluxDBPasswordFlag.Name)

token = ctx.String(MetricsInfluxDBTokenFlag.Name)
bucket = ctx.String(MetricsInfluxDBBucketFlag.Name)
organization = ctx.String(MetricsInfluxDBOrganizationFlag.Name)
)

if enableExport {
log.Info("Enabling metrics export to InfluxDB")

tagsMap := SplitTagsFlag(ctx.String(MetricsInfluxDBTagsFlag.Name))

go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "xdc.", tagsMap)
} else if enableExportV2 {
log.Info("Enabling metrics export to InfluxDB (v2)")

tagsMap := SplitTagsFlag(ctx.String(MetricsInfluxDBTagsFlag.Name))

go influxdb.InfluxDBV2WithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, token, bucket, organization, "xdc.", tagsMap)
}
// SetupMetrics configures the metrics system.
func SetupMetrics(cfg *metrics.Config) {
if !cfg.Enabled {
return
}
log.Info("Enabling metrics collection")
metrics.Enable()

if ctx.IsSet(MetricsHTTPFlag.Name) {
address := fmt.Sprintf("%s:%d", ctx.String(MetricsHTTPFlag.Name), ctx.Int(MetricsPortFlag.Name))
log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address)
exp.Setup(address)
} else if ctx.IsSet(MetricsPortFlag.Name) {
log.Warn(fmt.Sprintf("--%s specified without --%s, metrics server will not start.", MetricsPortFlag.Name, MetricsHTTPFlag.Name))
}
// InfluxDB exporter.
var (
enableExport = cfg.EnableInfluxDB
enableExportV2 = cfg.EnableInfluxDBV2
)
if cfg.EnableInfluxDB && cfg.EnableInfluxDBV2 {
Fatalf("Flags %v can't be used at the same time", strings.Join([]string{MetricsEnableInfluxDBFlag.Name, MetricsEnableInfluxDBV2Flag.Name}, ", "))
}
var (
endpoint = cfg.InfluxDBEndpoint
database = cfg.InfluxDBDatabase
username = cfg.InfluxDBUsername
password = cfg.InfluxDBPassword

token = cfg.InfluxDBToken
bucket = cfg.InfluxDBBucket
organization = cfg.InfluxDBOrganization
tagsMap = SplitTagsFlag(cfg.InfluxDBTags)
)
if enableExport {
log.Info("Enabling metrics export to InfluxDB")
go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "geth.", tagsMap)
} else if enableExportV2 {
tagsMap := SplitTagsFlag(cfg.InfluxDBTags)
log.Info("Enabling metrics export to InfluxDB (v2)")
go influxdb.InfluxDBV2WithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, token, bucket, organization, "geth.", tagsMap)
}

// Expvar exporter.
if cfg.HTTP != "" {
address := net.JoinHostPort(cfg.HTTP, fmt.Sprintf("%d", cfg.Port))
log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address)
exp.Setup(address)
} else if cfg.HTTP == "" && cfg.Port != 0 {
log.Warn(fmt.Sprintf("--%s specified without --%s, metrics server will not start.", MetricsPortFlag.Name, MetricsHTTPFlag.Name))
}

// Enable system metrics collection.
go metrics.CollectProcessMetrics(3 * time.Second)
}

// SplitTagsFlag parses a comma-separated list of k=v metrics tags.
func SplitTagsFlag(tagsFlag string) map[string]string {
tags := strings.Split(tagsFlag, ",")
tagsMap := map[string]string{}
Expand Down
2 changes: 1 addition & 1 deletion consensus/ethash/ethash.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ type Ethash struct {
rand *rand.Rand // Properly seeded random source for nonces
threads int // Number of threads to mine on if mining
update chan struct{} // Notification channel to update mining parameters
hashrate metrics.Meter // Meter tracking the average hashrate
hashrate *metrics.Meter // Meter tracking the average hashrate

// The fields below are hooks for testing
shared *Ethash // Shared PoW verifier to avoid cache regeneration
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,7 @@ func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter *metrics.Meter) (err error) {
// Update the delivery metrics for both good and failed deliveries
inMeter.Mark(int64(packet.Items()))
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter *metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
expiries := make(map[string]int)
for id, request := range pendPool {
Expand Down Expand Up @@ -805,7 +805,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer *metrics.Timer,
results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {

// Short circuit if the data was never requested
Expand Down
2 changes: 1 addition & 1 deletion eth/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type meteredMsgReadWriter struct {
// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
// metrics system is disabled, this function returns the original object.
func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
if !metrics.Enabled {
if !metrics.Enabled() {
return rw
}
return &meteredMsgReadWriter{MsgReadWriter: rw}
Expand Down
24 changes: 12 additions & 12 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ type Database struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance

compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
compTimeMeter *metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter *metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter *metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter *metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter *metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskSizeGauge *metrics.Gauge // Gauge for tracking the size of all the levels in the database
diskReadMeter *metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter *metrics.Meter // Meter for measuring the effective amount of data written
memCompGauge *metrics.Gauge // Gauge for tracking the number of memory compaction
level0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in level0
nonlevel0CompGauge *metrics.Gauge // Gauge for tracking the number of table compaction in non0 level
seekCompGauge *metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt

quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
Expand Down
2 changes: 1 addition & 1 deletion les/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type meteredMsgReadWriter struct {
// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
// metrics system is disabled, this function returns the original object.
func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
if !metrics.Enabled {
if !metrics.Enabled() {
return rw
}
return &meteredMsgReadWriter{MsgReadWriter: rw}
Expand Down
Loading

0 comments on commit ae413d9

Please sign in to comment.