Skip to content

Commit

Permalink
Persist and load peers from separate database (#1935)
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored Jul 16, 2024
1 parent 5698ff5 commit 50f6c92
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 6 deletions.
1 change: 1 addition & 0 deletions db/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
BlockCommitments
Temporary // used temporarily for migrations
SchemaIntermediateState
Peer // maps peer ID to peer multiaddresses
)

// Key flattens a prefix and series of byte arrays into a single []byte.
Expand Down
4 changes: 3 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen
} else {
database, err = pebble.New(cfg.DatabasePath, cfg.DBCacheSize, cfg.DBMaxHandles, dbLog)
}

if err != nil {
return nil, fmt.Errorf("open DB: %w", err)
}
Expand Down Expand Up @@ -164,7 +165,8 @@ func New(cfg *Config, version string) (*Node, error) { //nolint:gocyclo,funlen
// Do not start the feeder synchronisation
synchronizer = nil
}
p2pService, err = p2p.New(cfg.P2PAddr, "juno", cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode, chain, &cfg.Network, log)
p2pService, err = p2p.New(cfg.P2PAddr, "juno", cfg.P2PPeers, cfg.P2PPrivateKey, cfg.P2PFeederNode,
chain, &cfg.Network, log, database)
if err != nil {
return nil, fmt.Errorf("set up p2p service: %w", err)
}
Expand Down
98 changes: 93 additions & 5 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/p2p/starknet"
junoSync "github.com/NethermindEth/juno/sync"
"github.com/NethermindEth/juno/utils"
Expand Down Expand Up @@ -48,10 +49,11 @@ type Service struct {
synchroniser *syncService

feederNode bool
database db.DB
}

func New(addr, userAgent, peers, privKeyStr string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network,
log utils.SimpleLogger,
log utils.SimpleLogger, database db.DB,
) (*Service, error) {
if addr == "" {
// 0.0.0.0/tcp/0 will listen on any interface device and assing a free port.
Expand All @@ -74,17 +76,27 @@ func New(addr, userAgent, peers, privKeyStr string, feederNode bool, bc *blockch
// Todo: try to understand what will happen if user passes a multiaddr with p2p public and a private key which doesn't match.
// For example, a user passes the following multiaddr: --p2p-addr=/ip4/0.0.0.0/tcp/7778/p2p/(SomePublicKey) and also passes a
// --p2p-private-key="SomePrivateKey". However, the private public key pair don't match, in this case what will happen?
return NewWithHost(p2pHost, peers, feederNode, bc, snNetwork, log)
return NewWithHost(p2pHost, peers, feederNode, bc, snNetwork, log, database)
}

func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchain.Blockchain, snNetwork *utils.Network,
log utils.SimpleLogger,
log utils.SimpleLogger, database db.DB,
) (*Service, error) {
peersAddrInfoS := []peer.AddrInfo{}
var (
peersAddrInfoS []peer.AddrInfo
err error
)

peersAddrInfoS, err = loadPeers(database)
if err != nil {
log.Warnw("Failed to load peers", "err", err)
}

if peers != "" {
splitted := strings.Split(peers, ",")
for _, peerStr := range splitted {
peerAddr, err := peer.AddrInfoFromString(peerStr)
var peerAddr *peer.AddrInfo
peerAddr, err = peer.AddrInfoFromString(peerStr)
if err != nil {
return nil, fmt.Errorf("addr info from %q: %w", peerStr, err)
}
Expand All @@ -110,6 +122,7 @@ func NewWithHost(p2phost host.Host, peers string, feederNode bool, bc *blockchai
feederNode: feederNode,
topics: make(map[string]*pubsub.Topic),
handler: starknet.NewHandler(bc, log),
database: database,
}
return s, nil
}
Expand Down Expand Up @@ -213,6 +226,9 @@ func (s *Service) Run(ctx context.Context) error {
}

<-ctx.Done()
if err := s.persistPeers(); err != nil {
s.log.Warnw("Failed to persist peers", "err", err)
}
if err := s.dht.Close(); err != nil {
s.log.Warnw("Failed stopping DHT", "err", err.Error())
}
Expand Down Expand Up @@ -346,3 +362,75 @@ func (s *Service) WithListener(l junoSync.EventListener) {
runMetrics(s.host.Peerstore())
s.synchroniser.WithListener(l)
}

// persistPeers stores the given peers in the peers database
func (s *Service) persistPeers() error {
txn, err := s.database.NewTransaction(true)
if err != nil {
return fmt.Errorf("create transaction: %w", err)
}

store := s.host.Peerstore()
peers := store.Peers()
for _, peerID := range peers {
peerInfo := store.PeerInfo(peerID)

encodedAddrs, err := EncodeAddrs(peerInfo.Addrs)
if err != nil {
return fmt.Errorf("encode addresses for peer %s: %w", peerID, err)
}

if err := txn.Set(db.Peer.Key([]byte(peerID)), encodedAddrs); err != nil {
return fmt.Errorf("set data for peer %s: %w", peerID, err)
}
}

if err := txn.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}

s.log.Infow("Stored peers", "num", len(peers))

return nil
}

// loadPeers loads the previously stored peers from the database
func loadPeers(database db.DB) ([]peer.AddrInfo, error) {
var peers []peer.AddrInfo

err := database.View(func(txn db.Transaction) error {
it, err := txn.NewIterator()
if err != nil {
return fmt.Errorf("create iterator: %w", err)
}
defer it.Close()

prefix := db.Peer.Key()
for it.Seek(prefix); it.Valid(); it.Next() {
peerIDBytes := it.Key()[len(prefix):]
peerID, err := peer.IDFromBytes(peerIDBytes)
if err != nil {
return fmt.Errorf("decode peer ID: %w", err)
}

val, err := it.Value()
if err != nil {
return fmt.Errorf("get value: %w", err)
}

addrs, err := decodeAddrs(val)
if err != nil {
return fmt.Errorf("decode addresses for peer %s: %w", peerID, err)
}

peers = append(peers, peer.AddrInfo{ID: peerID, Addrs: addrs})
}

return nil
})
if err != nil {
return nil, fmt.Errorf("load peers: %w", err)
}

return peers, nil
}
43 changes: 43 additions & 0 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
"testing"
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/p2p"
"github.com/NethermindEth/juno/utils"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

Expand All @@ -33,6 +37,7 @@ func TestService(t *testing.T) {
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)
require.NoError(t, err)

Expand All @@ -54,6 +59,7 @@ func TestService(t *testing.T) {
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -140,6 +146,7 @@ func TestInvalidKey(t *testing.T) {
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)

require.Error(t, err)
Expand All @@ -156,7 +163,43 @@ func TestValidKey(t *testing.T) {
nil,
&utils.Integration,
utils.NewNopZapLogger(),
nil,
)

require.NoError(t, err)
}

func TestLoadAndPersistPeers(t *testing.T) {
testDB := pebble.NewMemTest(t)

txn, err := testDB.NewTransaction(true)
require.NoError(t, err)

decodedID, err := peer.Decode("12D3KooWLdURCjbp1D7hkXWk6ZVfcMDPtsNnPHuxoTcWXFtvrxGG")
require.NoError(t, err)

addrs := []multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/127.0.0.1/tcp/7777"),
}
encAddrs, err := p2p.EncodeAddrs(addrs)
require.NoError(t, err)

err = txn.Set(db.Peer.Key([]byte(decodedID)), encAddrs)
require.NoError(t, err)

err = txn.Commit()
require.NoError(t, err)

_, err = p2p.New(
"/ip4/127.0.0.1/tcp/30301",
"peerA",
"",
"5f6cdc3aebcc74af494df054876100368ef6126e3a33fa65b90c765b381ffc37a0a63bbeeefab0740f24a6a38dabb513b9233254ad0020c721c23e69bc820089",
false,
nil,
&utils.Integration,
utils.NewNopZapLogger(),
testDB,
)
require.NoError(t, err)
}
43 changes: 43 additions & 0 deletions p2p/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package p2p

import (
"bytes"
"fmt"

"github.com/fxamacker/cbor/v2"
"github.com/multiformats/go-multiaddr"
)

// EncodeAddrs encodes a slice of multiaddrs into a byte slice
func EncodeAddrs(addrs []multiaddr.Multiaddr) ([]byte, error) {
multiAddrBytes := make([][]byte, len(addrs))
for i, addr := range addrs {
multiAddrBytes[i] = addr.Bytes()
}

var buf bytes.Buffer
if err := cbor.NewEncoder(&buf).Encode(multiAddrBytes); err != nil {
return nil, fmt.Errorf("encode addresses: %w", err)
}

return buf.Bytes(), nil
}

// decodeAddrs decodes a byte slice into a slice of multiaddrs
func decodeAddrs(b []byte) ([]multiaddr.Multiaddr, error) {
var multiAddrBytes [][]byte
if err := cbor.NewDecoder(bytes.NewReader(b)).Decode(&multiAddrBytes); err != nil {
return nil, fmt.Errorf("decode addresses: %w", err)
}

addrs := make([]multiaddr.Multiaddr, 0, len(multiAddrBytes))
for _, addrBytes := range multiAddrBytes {
addr, err := multiaddr.NewMultiaddrBytes(addrBytes)
if err != nil {
return nil, fmt.Errorf("parse multiaddr: %w", err)
}
addrs = append(addrs, addr)
}

return addrs, nil
}

0 comments on commit 50f6c92

Please sign in to comment.