diff --git a/app/app.go b/app/app.go index 47c8f59c3..0b2eac669 100644 --- a/app/app.go +++ b/app/app.go @@ -21,6 +21,7 @@ import ( k1 "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/automaxprocs/maxprocs" @@ -286,7 +287,10 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, } // Start libp2p TCP node. - opts := []libp2p.Option{p2p.WithBandwidthReporter(peerIDs)} + opts := []libp2p.Option{ + p2p.WithBandwidthReporter(peerIDs), + libp2p.ResourceManager(new(network.NullResourceManager)), + } opts = append(opts, conf.TestConfig.LibP2POpts...) tcpNode, err := p2p.NewTCPNode(ctx, conf.P2P, p2pKey, connGater, diff --git a/app/monitoringapi.go b/app/monitoringapi.go index 700b9f39a..c99acf8da 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -207,6 +207,8 @@ func beaconNodeSyncing(ctx context.Context, eth2Cl eth2client.NodeSyncingProvide // beaconNodeVersionMetric sets the beacon node version gauge. func beaconNodeVersionMetric(ctx context.Context, eth2Cl eth2wrap.Client, clock clockwork.Clock) { nodeVersionTicker := clock.NewTicker(10 * time.Minute) + + // TODO(corver): Refactor to use ResetGauge. var prevNodeVersion string setNodeVersion := func() { version, err := eth2Cl.NodeVersion(ctx) diff --git a/app/peerinfo/peerinfo.go b/app/peerinfo/peerinfo.go index b8f8bfa2c..6e2a30e60 100644 --- a/app/peerinfo/peerinfo.go +++ b/app/peerinfo/peerinfo.go @@ -195,7 +195,8 @@ func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) { // newMetricsSubmitter returns a prometheus metric submitter. func newMetricsSubmitter() metricSubmitter { var ( - mu sync.Mutex + mu sync.Mutex + // TODO(corver): Refactor to use ResetGauge. prevVersions = make(map[string]string) prevGitHashes = make(map[string]string) ) diff --git a/app/promauto/resetgauge.go b/app/promauto/resetgauge.go new file mode 100644 index 000000000..e9010b66d --- /dev/null +++ b/app/promauto/resetgauge.go @@ -0,0 +1,56 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package promauto + +import ( + "strings" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +const separator = "|" + +// NewResetGaugeVec creates a new ResetGaugeVec. +func NewResetGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *ResetGaugeVec { + return &ResetGaugeVec{ + inner: NewGaugeVec(opts, labelNames), + labels: make(map[string]bool), + } +} + +// ResetGaugeVec is a GaugeVec that can be reset which deletes all previously set labels. +// This is useful to clear out labels that are no longer present. +type ResetGaugeVec struct { + inner *prometheus.GaugeVec + + mu sync.Mutex + labels map[string]bool +} + +func (g *ResetGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge { + for _, lv := range lvs { + if strings.Contains(lv, separator) { + panic("label value cannot contain separator") + } + } + + g.mu.Lock() + defer g.mu.Unlock() + + g.labels[strings.Join(lvs, separator)] = true + + return g.inner.WithLabelValues(lvs...) +} + +// Reset deletes all previously set labels. +func (g *ResetGaugeVec) Reset() { + g.mu.Lock() + defer g.mu.Unlock() + + for lv := range g.labels { + g.inner.DeleteLabelValues(strings.Split(lv, separator)...) + } + + g.labels = make(map[string]bool) +} diff --git a/app/promauto/resetgauge_test.go b/app/promauto/resetgauge_test.go new file mode 100644 index 000000000..217c0021b --- /dev/null +++ b/app/promauto/resetgauge_test.go @@ -0,0 +1,59 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package promauto_test + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/app/promauto" +) + +const resetTest = "reset_test" + +var testResetGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{ + Name: resetTest, + Help: "", +}, []string{"label"}) + +func TestResetGaugeVec(t *testing.T) { + registry, err := promauto.NewRegistry(nil) + require.NoError(t, err) + + testResetGauge.WithLabelValues("1").Set(1) + assertVecLen(t, registry, resetTest, 1) + + testResetGauge.WithLabelValues("2").Set(2) + assertVecLen(t, registry, resetTest, 2) + + testResetGauge.Reset() + assertVecLen(t, registry, resetTest, 0) + + testResetGauge.WithLabelValues("3").Set(3) + assertVecLen(t, registry, resetTest, 1) +} + +func assertVecLen(t *testing.T, registry *prometheus.Registry, name string, l int) { //nolint:unparam // abstracting name is fine even though it is always currently constant + t.Helper() + + metrics, err := registry.Gather() + require.NoError(t, err) + + for _, metricFam := range metrics { + if *metricFam.Name != name { + continue + } + + require.Len(t, metricFam.Metric, l) + + return + } + + if l == 0 { + return + } + + require.Fail(t, "metric not found") +} diff --git a/cmd/relay/p2p.go b/cmd/relay/p2p.go index 004d9759d..f75d53eea 100644 --- a/cmd/relay/p2p.go +++ b/cmd/relay/p2p.go @@ -40,7 +40,7 @@ func startP2P(ctx context.Context, config Config, key *k1.PrivateKey, reporter m } tcpNode, err := p2p.NewTCPNode(ctx, config.P2PConfig, key, p2p.NewOpenGater(), config.FilterPrivAddrs, - libp2p.ResourceManager(&network.NullResourceManager{}), libp2p.BandwidthReporter(reporter)) + libp2p.ResourceManager(new(network.NullResourceManager)), libp2p.BandwidthReporter(reporter)) if err != nil { return nil, errors.Wrap(err, "new tcp node") } diff --git a/core/scheduler/metrics.go b/core/scheduler/metrics.go index 341de8859..12e36c825 100644 --- a/core/scheduler/metrics.go +++ b/core/scheduler/metrics.go @@ -77,6 +77,7 @@ func instrumentDuty(duty core.Duty, defSet core.DutyDefinitionSet) { // newMetricSubmitter returns a function that sets validator balance and status metric. func newMetricSubmitter() func(pubkey core.PubKey, totalBal eth2p0.Gwei, status string) { + // TODO(corver): Refactor to use ResetGauge. prevStatus := make(map[core.PubKey]string) return func(pubkey core.PubKey, totalBal eth2p0.Gwei, status string) { diff --git a/docs/metrics.md b/docs/metrics.md index 90a0f0efa..6ebfeccac 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -67,6 +67,7 @@ when storing metrics from multiple nodes or clusters in one Prometheus instance. | `p2p_peer_connection_types` | Gauge | Current number of libp2p connections by peer and type (`direct` or `relay`). Note that peers may have multiple connections. | `peer, type` | | `p2p_peer_network_receive_bytes_total` | Counter | Total number of network bytes received from the peer by protocol. | `peer, protocol` | | `p2p_peer_network_sent_bytes_total` | Counter | Total number of network bytes sent to the peer by protocol. | `peer, protocol` | +| `p2p_peer_streams` | Gauge | Current number of libp2p streams by peer, direction (`inbound` or `outbound` or `unknown`) and protocol. | `peer, direction, protocol` | | `p2p_ping_error_total` | Counter | Total number of ping errors per peer | `peer` | | `p2p_ping_latency_secs` | Histogram | Ping latencies in seconds per peer | `peer` | | `p2p_ping_success` | Gauge | Whether the last ping was successful (1) or not (0). Can be used as proxy for connected peers | `peer` | diff --git a/p2p/metrics.go b/p2p/metrics.go index bee04c33e..bcfba8e1a 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -56,6 +56,12 @@ var ( Help: "Current number of libp2p connections by peer and type ('direct' or 'relay'). Note that peers may have multiple connections.", }, []string{"peer", "type"}) + peerStreamGauge = promauto.NewResetGaugeVec(prometheus.GaugeOpts{ + Namespace: "p2p", + Name: "peer_streams", + Help: "Current number of libp2p streams by peer, direction ('inbound' or 'outbound' or 'unknown') and protocol.", + }, []string{"peer", "direction", "protocol"}) + peerConnCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "p2p", Name: "peer_connection_total", diff --git a/p2p/p2p.go b/p2p/p2p.go index 1b2e8ee2e..478acf454 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -276,6 +276,12 @@ func RegisterConnectionLogger(ctx context.Context, tcpNode host.Host, peerIDs [] Type string } + type streamKey struct { + PeerName string + Direction string + Protocol string + } + var ( quit = make(chan struct{}) peers = make(map[peer.ID]bool) @@ -300,18 +306,37 @@ func RegisterConnectionLogger(ctx context.Context, tcpNode host.Host, peerIDs [] case <-ctx.Done(): return case <-ticker.C: - // Instrument connection counts. + // Instrument connection and stream counts. + peerStreamGauge.Reset() // Reset stream gauge to clear previously set protocols. counts := make(map[connKey]int) + streams := make(map[streamKey]int) + for _, conn := range tcpNode.Network().Conns() { - key := connKey{PeerName: PeerName(conn.RemotePeer()), Type: addrType(conn.RemoteMultiaddr())} - counts[key]++ + p := PeerName(conn.RemotePeer()) + cKey := connKey{ + PeerName: p, + Type: addrType(conn.RemoteMultiaddr()), + } + counts[cKey]++ + + for _, stream := range conn.GetStreams() { + sKey := streamKey{ + PeerName: p, + Direction: stream.Stat().Direction.String(), + Protocol: string(stream.Protocol()), + } + streams[sKey]++ + } } for _, pID := range peerIDs { for _, typ := range []string{addrTypeRelay, addrTypeDirect} { - key := connKey{PeerName: PeerName(pID), Type: typ} - peerConnGauge.WithLabelValues(key.PeerName, key.Type).Set(float64(counts[key])) + cKey := connKey{PeerName: PeerName(pID), Type: typ} + peerConnGauge.WithLabelValues(cKey.PeerName, cKey.Type).Set(float64(counts[cKey])) } } + for sKey, amount := range streams { + peerStreamGauge.WithLabelValues(sKey.PeerName, sKey.Direction, sKey.Protocol).Set(float64(amount)) + } case e := <-events: // Log and instrument events. addr := NamedAddr(e.Addr)