Skip to content

Commit

Permalink
Add metrics for p2p sync
Browse files Browse the repository at this point in the history
  • Loading branch information
kirugan committed Feb 5, 2024
1 parent 88058f7 commit 1828673
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ node1:
./build/juno \
--network=sepolia \
--log-level=debug \
--metrics \
--db-path=./p2p-dbs/node1 \
--p2p \
--p2p-peers=/ip4/127.0.0.1/tcp/7777/p2p/12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG \
Expand Down
9 changes: 6 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen
synchronizer := sync.New(chain, adaptfeeder.New(client), log, cfg.PendingPollInterval, dbIsRemote)
gatewayClient := gateway.NewClient(cfg.Network.GatewayURL(), log).WithUserAgent(ua).WithAPIKey(cfg.GatewayAPIKey)

var p2pService *p2p.Service
if cfg.P2P {
if !cfg.P2PFeederNode {
// Do not start the feeder synchronisation
synchronizer = nil
}
var p2pService *p2p.Service
p2pService, err = p2p.New(cfg.P2PAddr, "juno", cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode, chain, cfg.Network, log)
if err != nil {
return nil, fmt.Errorf("set up p2p service: %w", err)
Expand All @@ -162,8 +162,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen

throttledVM := NewThrottledVM(vm.New(log), cfg.MaxVMs, int32(cfg.MaxVMQueue))

var syncReader sync.Reader
syncReader = &sync.NoopSynchronizer{}
var syncReader sync.Reader = &sync.NoopSynchronizer{}
if synchronizer != nil {
syncReader = synchronizer
}
Expand Down Expand Up @@ -206,6 +205,10 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen
jsonrpcServerLegacy.WithListener(legacyRPCMetrics)
client.WithListener(makeFeederMetrics())
gatewayClient.WithListener(makeGatewayMetrics())

if p2pService != nil {
p2pService.WithListener(makeSyncMetrics(&sync.NoopSynchronizer{}, chain))
}
if synchronizer != nil {
synchronizer.WithListener(makeSyncMetrics(synchronizer, chain))
}
Expand Down
5 changes: 5 additions & 0 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/p2p/starknet"
junoSync "github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
Expand Down Expand Up @@ -348,3 +349,7 @@ func (s *Service) PublishOnTopic(topic string, data []byte) error {
func (s *Service) SetProtocolHandler(pid protocol.ID, handler func(network.Stream)) {
s.host.SetStreamHandler(pid, handler)
}

func (s *Service) WithListener(l junoSync.EventListener) {
s.synchroniser.WithListener(l)
}
16 changes: 13 additions & 3 deletions p2p/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math/rand"
"time"

junoSync "github.com/NethermindEth/juno/sync"

"github.com/NethermindEth/juno/adapters/p2p2core"
"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core"
Expand All @@ -31,6 +33,7 @@ type syncService struct {
client *starknet.Client // todo: merge all the functionality of Client with p2p SyncService

blockchain *blockchain.Blockchain
listener junoSync.EventListener
log utils.SimpleLogger
}

Expand All @@ -40,6 +43,7 @@ func newSyncService(bc *blockchain.Blockchain, h host.Host, network utils.Networ
network: network,
blockchain: bc,
log: log,
listener: &junoSync.SelectiveListener{},
}
}

Expand Down Expand Up @@ -167,15 +171,17 @@ func (s *syncService) start(ctx context.Context) {
break
}

storeTimer := time.Now()
err = s.blockchain.Store(b.block, b.commitments, b.stateUpdate, b.newClasses)
if err != nil {
s.log.Errorw("Failed to Store Block", "number", b.block.Number, "err", err)
cancelIteration()
break
} else {
s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root",
b.block.GlobalStateRoot.ShortString())
}

s.log.Infow("Stored Block", "number", b.block.Number, "hash", b.block.Hash.ShortString(), "root",
b.block.GlobalStateRoot.ShortString())
s.listener.OnSyncStepDone(junoSync.OpStore, b.block.Number, time.Since(storeTimer))
}
cancelIteration()
}
Expand Down Expand Up @@ -657,3 +663,7 @@ func (s *syncService) createIterator(start, limit uint64) *spec.Iteration {
Step: 1,
}
}

func (s *syncService) WithListener(l junoSync.EventListener) {
s.listener = l
}

0 comments on commit 1828673

Please sign in to comment.