From 182867364261fcaab0f53cbc58b32d006270d642 Mon Sep 17 00:00:00 2001 From: Kirill Date: Mon, 5 Feb 2024 13:09:38 +0400 Subject: [PATCH] Add metrics for p2p sync --- Makefile | 1 + node/node.go | 9 ++++++--- p2p/p2p.go | 5 +++++ p2p/sync.go | 16 +++++++++++++--- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 7e5d6b9e9c..fdba425553 100644 --- a/Makefile +++ b/Makefile @@ -101,6 +101,7 @@ node1: ./build/juno \ --network=sepolia \ --log-level=debug \ + --metrics \ --db-path=./p2p-dbs/node1 \ --p2p \ --p2p-peers=/ip4/127.0.0.1/tcp/7777/p2p/12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG \ diff --git a/node/node.go b/node/node.go index 61fa4d48c9..2152f5f561 100644 --- a/node/node.go +++ b/node/node.go @@ -143,12 +143,12 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote) gatewayClient := gateway.NewClient(cfg.Network.GatewayURL(), log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey) + var p2pService *p2p.Service if cfg.P2P { if !cfg.P2PFeederNode { // Do not start the feeder synchronisation synchronizer = nil } - var p2pService *p2p.Service p2pService, err = p2p.New(cfg.P2PAddr, "juno", cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode, chain, cfg.Network, log) if err != nil { return nil, fmt.Errorf("set up p2p service: %w", err) @@ -162,8 +162,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen throttledVM := NewThrottledVM(vm.New(log), cfg.MaxVMs, int32(cfg.MaxVMQueue)) - var syncReader sync.Reader - syncReader = &sync.NoopSynchronizer{} + var syncReader sync.Reader = &sync.NoopSynchronizer{} if synchronizer != nil { syncReader = synchronizer } @@ -206,6 +205,10 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen jsonrpcServerLegacy.WithListener(legacyRPCMetrics) client.WithListener(makeFeederMetrics()) gatewayClient.WithListener(makeGatewayMetrics()) + + if p2pService != nil { + p2pService.WithListener(makeSyncMetrics(&sync.NoopSynchronizer{}, chain)) + } if synchronizer != nil { synchronizer.WithListener(makeSyncMetrics(synchronizer, chain)) } diff --git a/p2p/p2p.go b/p2p/p2p.go index 91a3dee46a..4391081c7d 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -12,6 +12,7 @@ import ( "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/p2p/starknet" + junoSync "github.com/NethermindEth/juno/sync" "github.com/NethermindEth/juno/utils" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -348,3 +349,7 @@ func (s *Service) PublishOnTopic(topic string, data []byte) error { func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) { s.host.SetStreamHandler(pid, handler) } + +func (s *Service) WithListener(l junoSync.EventListener) { + s.synchroniser.WithListener(l) +} diff --git a/p2p/sync.go b/p2p/sync.go index 164e6e6bc7..fa2df3991c 100644 --- a/p2p/sync.go +++ b/p2p/sync.go @@ -7,6 +7,8 @@ import ( "math/rand" "time" + junoSync "github.com/NethermindEth/juno/sync" + "github.com/NethermindEth/juno/adapters/p2p2core" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/core" @@ -31,6 +33,7 @@ type syncService struct { client *starknet.Client // todo: merge all the functionality of Client with p2p SyncService blockchain *blockchain.Blockchain + listener junoSync.EventListener log utils.SimpleLogger } @@ -40,6 +43,7 @@ func newSyncService(bc *blockchain.Blockchain, h host.Host, network utils.Networ network: network, blockchain: bc, log: log, + listener: &junoSync.SelectiveListener{}, } } @@ -167,15 +171,17 @@ func (s *syncService) start(ctx context.Context) { break } + storeTimer := time.Now() err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses) if err != nil { s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err) cancelIteration() break - } else { - s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root", - b.block.GlobalStateRoot.ShortString()) } + + s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root", + b.block.GlobalStateRoot.ShortString()) + s.listener.OnSyncStepDone(junoSync.OpStore, b.block.Number, time.Since(storeTimer)) } cancelIteration() } @@ -657,3 +663,7 @@ func (s *syncService) createIterator(start, limit uint64) *spec.Iteration { Step: 1, } } + +func (s *syncService) WithListener(l junoSync.EventListener) { + s.listener = l +}