diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 700b9f39a..c99acf8da 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -207,6 +207,8 @@ func beaconNodeSyncing(ctx context.Context, eth2Cl eth2client.NodeSyncingProvide // beaconNodeVersionMetric sets the beacon node version gauge. func beaconNodeVersionMetric(ctx context.Context, eth2Cl eth2wrap.Client, clock clockwork.Clock) { nodeVersionTicker := clock.NewTicker(10 * time.Minute) + + // TODO(corver): Refactor to use ResetGauge. var prevNodeVersion string setNodeVersion := func() { version, err := eth2Cl.NodeVersion(ctx) diff --git a/app/peerinfo/peerinfo.go b/app/peerinfo/peerinfo.go index b8f8bfa2c..6e2a30e60 100644 --- a/app/peerinfo/peerinfo.go +++ b/app/peerinfo/peerinfo.go @@ -195,7 +195,8 @@ func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) { // newMetricsSubmitter returns a prometheus metric submitter. func newMetricsSubmitter() metricSubmitter { var ( - mu sync.Mutex + mu sync.Mutex + // TODO(corver): Refactor to use ResetGauge. prevVersions = make(map[string]string) prevGitHashes = make(map[string]string) ) diff --git a/app/promauto/resetgauge.go b/app/promauto/resetgauge.go new file mode 100644 index 000000000..e9010b66d --- /dev/null +++ b/app/promauto/resetgauge.go @@ -0,0 +1,56 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package promauto + +import ( + "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +const separator = "|" + +// NewResetGaugeVec creates a new ResetGaugeVec. +func NewResetGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *ResetGaugeVec { + return &ResetGaugeVec{ + inner: NewGaugeVec(opts, labelNames), + labels: make(map[string]bool), + } +} + +// ResetGaugeVec is a GaugeVec that can be reset which deletes all previously set labels. +// This is useful to clear out labels that are no longer present. +type ResetGaugeVec struct { + inner *prometheus.GaugeVec + + mu sync.Mutex + labels map[string]bool +} + +func (g *ResetGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge { + for _, lv := range lvs { + if strings.Contains(lv, separator) { + panic("label value cannot contain separator") + } + } + + g.mu.Lock() + defer g.mu.Unlock() + + g.labels[strings.Join(lvs, separator)] = true + + return g.inner.WithLabelValues(lvs...) +} + +// Reset deletes all previously set labels. +func (g *ResetGaugeVec) Reset() { + g.mu.Lock() + defer g.mu.Unlock() + + for lv := range g.labels { + g.inner.DeleteLabelValues(strings.Split(lv, separator)...) + } + + g.labels = make(map[string]bool) +} diff --git a/app/promauto/resetgauge_test.go b/app/promauto/resetgauge_test.go new file mode 100644 index 000000000..217c0021b --- /dev/null +++ b/app/promauto/resetgauge_test.go @@ -0,0 +1,59 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package promauto_test + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/app/promauto" +) + +const resetTest = "reset_test" + +var testResetGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{ + Name: resetTest, + Help: "", +}, []string{"label"}) + +func TestResetGaugeVec(t *testing.T) { + registry, err := promauto.NewRegistry(nil) + require.NoError(t, err) + + testResetGauge.WithLabelValues("1").Set(1) + assertVecLen(t, registry, resetTest, 1) + + testResetGauge.WithLabelValues("2").Set(2) + assertVecLen(t, registry, resetTest, 2) + + testResetGauge.Reset() + assertVecLen(t, registry, resetTest, 0) + + testResetGauge.WithLabelValues("3").Set(3) + assertVecLen(t, registry, resetTest, 1) +} + +func assertVecLen(t *testing.T, registry *prometheus.Registry, name string, l int) { //nolint:unparam // abstracting name is fine even though it is always currently constant + t.Helper() + + metrics, err := registry.Gather() + require.NoError(t, err) + + for _, metricFam := range metrics { + if *metricFam.Name != name { + continue + } + + require.Len(t, metricFam.Metric, l) + + return + } + + if l == 0 { + return + } + + require.Fail(t, "metric not found") +} diff --git a/core/scheduler/metrics.go b/core/scheduler/metrics.go index 341de8859..12e36c825 100644 --- a/core/scheduler/metrics.go +++ b/core/scheduler/metrics.go @@ -77,6 +77,7 @@ func instrumentDuty(duty core.Duty, defSet core.DutyDefinitionSet) { // newMetricSubmitter returns a function that sets validator balance and status metric. func newMetricSubmitter() func(pubkey core.PubKey, totalBal eth2p0.Gwei, status string) { + // TODO(corver): Refactor to use ResetGauge. prevStatus := make(map[core.PubKey]string) return func(pubkey core.PubKey, totalBal eth2p0.Gwei, status string) { diff --git a/docs/metrics.md b/docs/metrics.md index 57fb1357c..6ebfeccac 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -67,7 +67,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance. | `p2p_peer_connection_types` | Gauge | Current number of libp2p connections by peer and type (`direct` or `relay`). Note that peers may have multiple connections. | `peer, type` | | `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol. | `peer, protocol` | | `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol. | `peer, protocol` | -| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer and direction (`inbound` or `outbound` or `unknown`). | `peer, direction, protocol` | +| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`) and protocol. | `peer, direction, protocol` | | `p2p_ping_error_total` | Counter | Total number of ping errors per peer | `peer` | | `p2p_ping_latency_secs` | Histogram | Ping latencies in seconds per peer | `peer` | | `p2p_ping_success` | Gauge | Whether the last ping was successful (1) or not (0). Can be used as proxy for connected peers | `peer` | diff --git a/p2p/metrics.go b/p2p/metrics.go index a8d0074bd..bcfba8e1a 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -56,11 +56,10 @@ var ( Help: "Current number of libp2p connections by peer and type ('direct' or 'relay'). Note that peers may have multiple connections.", }, []string{"peer", "type"}) - // TODO(gsora): remove this once we fix the stream leak issue. - peerStreamGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + peerStreamGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{ Namespace: "p2p", Name: "peer_streams", - Help: "Current number of libp2p streams by peer and direction ('inbound' or 'outbound' or 'unknown').", + Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown') and protocol.", }, []string{"peer", "direction", "protocol"}) peerConnCounter = promauto.NewCounterVec(prometheus.CounterOpts{ diff --git a/p2p/p2p.go b/p2p/p2p.go index 23fab1800..478acf454 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -277,8 +277,9 @@ func RegisterConnectionLogger(ctx context.Context, tcpNode host.Host, peerIDs [] } type streamKey struct { - connKey - Protocol string + PeerName string + Direction string + Protocol string } var ( @@ -305,35 +306,36 @@ func RegisterConnectionLogger(ctx context.Context, tcpNode host.Host, peerIDs [] case <-ctx.Done(): return case <-ticker.C: - // Instrument connection counts. + // Instrument connection and stream counts. + peerStreamGauge.Reset() // Reset stream gauge to clear previously set protocols. counts := make(map[connKey]int) streams := make(map[streamKey]int) for _, conn := range tcpNode.Network().Conns() { p := PeerName(conn.RemotePeer()) - key := connKey{PeerName: p, Type: addrType(conn.RemoteMultiaddr())} - counts[key]++ + cKey := connKey{ + PeerName: p, + Type: addrType(conn.RemoteMultiaddr()), + } + counts[cKey]++ for _, stream := range conn.GetStreams() { - dir := stream.Stat().Direction.String() - protocol := stream.Protocol() sKey := streamKey{ - connKey: connKey{PeerName: p, Type: dir}, - Protocol: string(protocol), + PeerName: p, + Direction: stream.Stat().Direction.String(), + Protocol: string(stream.Protocol()), } - streams[sKey]++ } } for _, pID := range peerIDs { for _, typ := range []string{addrTypeRelay, addrTypeDirect} { - key := connKey{PeerName: PeerName(pID), Type: typ} - peerConnGauge.WithLabelValues(key.PeerName, key.Type).Set(float64(counts[key])) + cKey := connKey{PeerName: PeerName(pID), Type: typ} + peerConnGauge.WithLabelValues(cKey.PeerName, cKey.Type).Set(float64(counts[cKey])) } } - // TODO(gsora): remove this once we fix the stream leak issue - for key, amount := range streams { - peerStreamGauge.WithLabelValues(key.PeerName, key.Type, key.Protocol).Set(float64(amount)) + for sKey, amount := range streams { + peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol).Set(float64(amount)) } case e := <-events: // Log and instrument events.