diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index cbfba29d9..101e652f7 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -20,6 +20,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peers", Help: "Number of peers.", }, labels).With(labelsAndValues...), + PeerScore: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_score", + Help: "Score for each peer", + }, append(labels, "peer_id")).With(labelsAndValues...), PeerReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -74,6 +80,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ Peers: discard.NewGauge(), + PeerScore: discard.NewGauge(), PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index b45f128e5..bc9678414 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -28,6 +28,8 @@ var ( type Metrics struct { // Number of peers. Peers metrics.Gauge + // Score for each peer + PeerScore metrics.Gauge `metrics_labels:"peer_id"` // Number of bytes per channel received from a given peer. PeerReceiveBytesTotal metrics.Counter `metrics_labels:"peer_id, chID, message_type"` // Number of bytes per channel sent to a given peer. diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 9e61b1929..7069ac850 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -263,7 +263,7 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) RetryTimeJitter: time.Millisecond, MaxPeers: opts.MaxPeers, MaxConnected: opts.MaxConnected, - }) + }, p2p.NopMetrics()) require.NoError(t, err) router, err := p2p.NewRouter( diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 80bd4df95..634586e64 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -300,6 +300,7 @@ type PeerManager struct { ready map[types.NodeID]bool // ready peers (Ready → Disconnected) evict map[types.NodeID]bool // peers scheduled for eviction (Connected → EvictNext) evicting map[types.NodeID]bool // peers being evicted (EvictNext → Disconnected) + metrics *Metrics } // NewPeerManager creates a new peer manager. @@ -308,6 +309,7 @@ func NewPeerManager( selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptions, + metrics *Metrics, ) (*PeerManager, error) { if selfID == "" { return nil, errors.New("self ID not given") @@ -339,6 +341,7 @@ func NewPeerManager( evict: map[types.NodeID]bool{}, evicting: map[types.NodeID]bool{}, subscriptions: map[*PeerUpdates]*PeerUpdates{}, + metrics: metrics, } if err = peerManager.configurePeers(); err != nil { return nil, err @@ -873,6 +876,7 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress } for _, peer := range m.store.Ranked() { + m.metrics.PeerScore.With("peer_id", string(peerID)).Set(float64(int(peer.Score()))) if peer.ID == peerID { continue } diff --git a/internal/p2p/peermanager_scoring_test.go b/internal/p2p/peermanager_scoring_test.go index 60caf4dce..ecceb6288 100644 --- a/internal/p2p/peermanager_scoring_test.go +++ b/internal/p2p/peermanager_scoring_test.go @@ -21,7 +21,7 @@ func TestPeerScoring(t *testing.T) { // create a mock peer manager db := dbm.NewMemDB() - peerManager, err := NewPeerManager(log.NewNopLogger(), selfID, db, PeerManagerOptions{}) + peerManager, err := NewPeerManager(log.NewNopLogger(), selfID, db, PeerManagerOptions{}, NopMetrics()) require.NoError(t, err) // create a fake node diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 61d19f1b0..ad258dfc4 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -114,21 +114,21 @@ func TestPeerManagerOptions_Validate(t *testing.T) { func TestNewPeerManager(t *testing.T) { // Zero options should be valid. - _, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + _, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Invalid options should error. _, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ PersistentPeers: []types.NodeID{"foo"}, - }) + }, p2p.NopMetrics()) require.Error(t, err) // Invalid database should error. - _, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, nil, p2p.PeerManagerOptions{}) + _, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, nil, p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.Error(t, err) // Empty self ID should error. - _, err = p2p.NewPeerManager(log.NewNopLogger(), "", nil, p2p.PeerManagerOptions{}) + _, err = p2p.NewPeerManager(log.NewNopLogger(), "", nil, p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.Error(t, err) } @@ -156,7 +156,7 @@ func TestNewPeerManager_Persistence(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, db, p2p.PeerManagerOptions{ PersistentPeers: []types.NodeID{aID}, PeerScores: map[types.NodeID]p2p.PeerScore{bID: 1}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) for _, addr := range append(append(aAddresses, bAddresses...), cAddresses...) { @@ -181,7 +181,7 @@ func TestNewPeerManager_Persistence(t *testing.T) { peerManager, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, db, p2p.PeerManagerOptions{ PersistentPeers: []types.NodeID{bID}, PeerScores: map[types.NodeID]p2p.PeerScore{cID: 1}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID)) @@ -227,7 +227,7 @@ func TestNewPeerManager_Unconditional(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, db, p2p.PeerManagerOptions{ UnconditionalPeers: []types.NodeID{aID}, PeerScores: map[types.NodeID]p2p.PeerScore{bID: 1}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) for _, addr := range append(append(aAddresses, bAddresses...), cAddresses...) { @@ -251,7 +251,7 @@ func TestNewPeerManager_Unconditional(t *testing.T) { peerManager, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, db, p2p.PeerManagerOptions{ UnconditionalPeers: []types.NodeID{bID}, PeerScores: map[types.NodeID]p2p.PeerScore{cID: 1}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) require.ElementsMatch(t, aAddresses, peerManager.Addresses(aID)) @@ -269,7 +269,7 @@ func TestNewPeerManager_SelfIDChange(t *testing.T) { b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} db := dbm.NewMemDB() - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, db, p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, db, p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -282,7 +282,7 @@ func TestNewPeerManager_SelfIDChange(t *testing.T) { // If we change our selfID to one of the peers in the peer store, it // should be removed from the store. - peerManager, err = p2p.NewPeerManager(log.NewNopLogger(), a.NodeID, db, p2p.PeerManagerOptions{}) + peerManager, err = p2p.NewPeerManager(log.NewNopLogger(), a.NodeID, db, p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) require.Equal(t, []types.NodeID{b.NodeID}, peerManager.Peers()) } @@ -296,7 +296,7 @@ func TestPeerManager_Add(t *testing.T) { PersistentPeers: []types.NodeID{aID, cID}, MaxPeers: 2, MaxConnected: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Adding a couple of addresses should work. @@ -349,7 +349,7 @@ func TestPeerManager_DialNext(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Add an address. DialNext should return it. @@ -379,7 +379,7 @@ func TestPeerManager_DialNext_Retry(t *testing.T) { MinRetryTime: 100 * time.Millisecond, MaxRetryTime: 1000 * time.Millisecond, } - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), options) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), options, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -414,7 +414,7 @@ func TestPeerManagerDeleteOnMaxRetries(t *testing.T) { MinRetryTime: 100 * time.Millisecond, MaxRetryTime: 1000 * time.Millisecond, } - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), options) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), options, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -449,7 +449,7 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 1, - }) + }, p2p.NopMetrics()) require.NoError(t, err) a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} @@ -493,7 +493,7 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) { defer cancel() options := p2p.PeerManagerOptions{MinRetryTime: 200 * time.Millisecond} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), options) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), options, p2p.NopMetrics()) require.NoError(t, err) a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} @@ -524,7 +524,7 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -558,7 +558,7 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Add a and connect to it. @@ -608,7 +608,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { PersistentPeers: []types.NodeID{c.NodeID, d.NodeID}, MaxConnected: 2, MaxConnectedUpgrade: 1, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Add a and connect to it. @@ -683,7 +683,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11}, MaxConnected: 1, MaxConnectedUpgrade: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Add a and connect to it. @@ -723,7 +723,7 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Add a and dial it. @@ -771,7 +771,7 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) { {Protocol: "tcp", NodeID: bID, Hostname: "::1"}, } - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) for _, address := range addresses { @@ -804,7 +804,7 @@ func TestPeerManager_DialFailed(t *testing.T) { bID := types.NodeID(strings.Repeat("b", 40)) b := p2p.NodeAddress{Protocol: "memory", NodeID: bID} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -849,7 +849,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11}, MaxConnected: 1, MaxConnectedUpgrade: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Add a and connect to it. @@ -890,7 +890,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Marking a as dialed twice should error. @@ -917,7 +917,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) { } func TestPeerManager_Dialed_Self(t *testing.T) { - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Dialing self should error. @@ -932,7 +932,7 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 1, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Start to dial a. @@ -964,7 +964,7 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) { MaxConnected: 2, MaxConnectedUpgrade: 1, PeerScores: map[types.NodeID]p2p.PeerScore{c.NodeID: 11, d.NodeID: 11}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Dialing a and b is fine. @@ -998,7 +998,7 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) { func TestPeerManager_Dialed_Unknown(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Marking an unknown node as dialed should error. @@ -1014,7 +1014,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { MaxConnected: 1, MaxConnectedUpgrade: 2, PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Dialing a is fine. @@ -1065,7 +1065,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) { c.NodeID: 10, d.NodeID: 1, }, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Connect to a and b. @@ -1120,7 +1120,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) { b.NodeID: 2, c.NodeID: 3, }, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Connect to a and b. @@ -1160,7 +1160,7 @@ func TestPeerManager_Accepted(t *testing.T) { c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))} d := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("d", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Accepting a connection from self should error. @@ -1208,7 +1208,7 @@ func TestPeerManager_Accepted_MaxConnected(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MaxConnected: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Connect to a and b. @@ -1242,7 +1242,7 @@ func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) { }, MaxConnected: 1, MaxConnectedUpgrade: 1, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Dial a. @@ -1290,7 +1290,7 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) { }, MaxConnected: 1, MaxConnectedUpgrade: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Accept a. @@ -1333,7 +1333,7 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) { }, MaxConnected: 1, MaxConnectedUpgrade: 2, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Accept a. @@ -1375,7 +1375,7 @@ func TestPeerManager_Ready(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) sub := peerManager.Subscribe(ctx) @@ -1409,7 +1409,7 @@ func TestPeerManager_Ready_Channels(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pm, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + pm, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) sub := pm.Subscribe(ctx) @@ -1435,7 +1435,7 @@ func TestPeerManager_EvictNext(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -1471,7 +1471,7 @@ func TestPeerManager_EvictNext_WakeOnError(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -1505,7 +1505,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { MaxConnected: 1, MaxConnectedUpgrade: 1, PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Connect a. @@ -1546,7 +1546,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) { MaxConnected: 1, MaxConnectedUpgrade: 1, PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // Connect a. @@ -1575,7 +1575,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -1611,7 +1611,7 @@ func TestPeerManager_TryEvictNext(t *testing.T) { func TestPeerManager_Disconnected(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) @@ -1670,7 +1670,7 @@ func TestPeerManager_Errored(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // Erroring an unknown peer does nothing. @@ -1709,7 +1709,7 @@ func TestPeerManager_Subscribe(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // This tests all subscription events for full peer lifecycles. @@ -1772,7 +1772,7 @@ func TestPeerManager_Subscribe_Close(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) sub := peerManager.Subscribe(ctx) @@ -1801,7 +1801,7 @@ func TestPeerManager_Subscribe_Broadcast(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) s2ctx, s2cancel := context.WithCancel(ctx) @@ -1851,7 +1851,7 @@ func TestPeerManager_Close(t *testing.T) { peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ MinRetryTime: 10 * time.Second, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // This subscription isn't closed, but PeerManager.Close() @@ -1887,7 +1887,7 @@ func TestPeerManager_Advertise(t *testing.T) { // Create an initial peer manager and add the peers. peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ PeerScores: map[types.NodeID]p2p.PeerScore{aID: 3, bID: 2, cID: 1}, - }) + }, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(aTCP) @@ -1936,7 +1936,7 @@ func TestPeerManager_Advertise_Self(t *testing.T) { // Create a peer manager with SelfAddress defined. peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ SelfAddress: self, - }) + }, p2p.NopMetrics()) require.NoError(t, err) // peer manager should always advertise its SelfAddress. diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index eb8a58a9b..33778b19d 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -299,7 +299,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { peerCh := make(chan p2p.PeerUpdate, chBuf) peerUpdates := p2p.NewPeerUpdates(peerCh, chBuf) - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) reactor := pex.NewReactor( diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 51529d5f0..c1674b8a0 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -102,7 +102,7 @@ func TestRouter_Channel_Basic(t *testing.T) { defer cancel() // Set up a router with no transports (so no peers). - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) testnet := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 1}) @@ -402,7 +402,7 @@ func TestRouter_AcceptPeers(t *testing.T) { mockTransport.On("Listen", mock.Anything).Return(nil) // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) sub := peerManager.Subscribe(ctx) @@ -462,7 +462,7 @@ func TestRouter_AcceptPeers_Errors(t *testing.T) { mockTransport.On("Listen", mock.Anything).Return(nil) // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) router, err := p2p.NewRouter( @@ -518,7 +518,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) { mockTransport.On("Listen", mock.Anything).Return(nil) // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) router, err := p2p.NewRouter( @@ -617,7 +617,7 @@ func TestRouter_DialPeers(t *testing.T) { } // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(address) @@ -696,7 +696,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { } // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) added, err := peerManager.Add(a) @@ -781,7 +781,7 @@ func TestRouter_EvictPeers(t *testing.T) { mockTransport.On("Listen", mock.Anything).Return(nil) // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) sub := peerManager.Subscribe(ctx) @@ -846,7 +846,7 @@ func TestRouter_ChannelCompatability(t *testing.T) { mockTransport.On("Listen", mock.Anything).Return(nil) // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) router, err := p2p.NewRouter( @@ -900,7 +900,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { mockTransport.On("Listen", mock.Anything).Return(nil) // Set up and start the router. - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) sub := peerManager.Subscribe(ctx) @@ -966,7 +966,7 @@ func TestRouter_Channel_FilterByID(t *testing.T) { mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, io.EOF) mockTransport.On("Listen", mock.Anything).Return(nil) - peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // no filter @@ -990,7 +990,7 @@ func TestRouter_Channel_FilterByID(t *testing.T) { require.Equal(t, 1, len(peerManager.Peers())) - peerManager, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) + peerManager, err = p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics()) require.NoError(t, err) // with filter diff --git a/node/node.go b/node/node.go index 06bf82ca7..2b2a0c9e8 100644 --- a/node/node.go +++ b/node/node.go @@ -219,7 +219,7 @@ func makeNode( } } - peerManager, peerCloser, err := createPeerManager(logger, cfg, dbProvider, nodeKey.ID) + peerManager, peerCloser, err := createPeerManager(logger, cfg, dbProvider, nodeKey.ID, nodeMetrics.p2p) closers = append(closers, peerCloser) if err != nil { return nil, combineCloseError( diff --git a/node/seed.go b/node/seed.go index 4221abf2b..08e5fae0f 100644 --- a/node/seed.go +++ b/node/seed.go @@ -80,7 +80,7 @@ func makeSeedNode( } // Setup Transport and Switch. - peerManager, peerCloser, err := createPeerManager(logger, cfg, dbProvider, nodeKey.ID) + peerManager, peerCloser, err := createPeerManager(logger, cfg, dbProvider, nodeKey.ID, nodeMetrics.p2p) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create peer manager: %w", err), diff --git a/node/setup.go b/node/setup.go index c30a7818d..921f37f21 100644 --- a/node/setup.go +++ b/node/setup.go @@ -201,6 +201,7 @@ func createPeerManager( cfg *config.Config, dbProvider config.DBProvider, nodeID types.NodeID, + metrics *p2p.Metrics, ) (*p2p.PeerManager, closer, error) { selfAddr, err := p2p.ParseNodeAddress(nodeID.AddressString(cfg.P2P.ExternalAddress)) if err != nil { @@ -263,7 +264,7 @@ func createPeerManager( return nil, func() error { return nil }, fmt.Errorf("unable to initialize peer store: %w", err) } p2pLogger := logger.With("module", "p2p") - peerManager, err := p2p.NewPeerManager(p2pLogger, nodeID, peerDB, options) + peerManager, err := p2p.NewPeerManager(p2pLogger, nodeID, peerDB, options, metrics) if err != nil { return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err) }