Skip to content

Commit

Permalink
Use peermanager scores for blocksync peers and don't error out on blo…
Browse files Browse the repository at this point in the history
…ck mismatch (#162)

* Use peermanager scores for blocksync peers

* Add debug

* Randomize

* debug

* use state to filter

* debug

* debug

* debug

* debug

* add comments

* don't err

* revert timeout

* Add missing param

* Remove flaky test

* fix nil

* debug

* debug

* debug

* debug

---------

Co-authored-by: Yiming Zang <[email protected]>
  • Loading branch information
philipsu522 and yzang2019 authored Oct 31, 2023
1 parent 265f357 commit dd79101
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 55 deletions.
40 changes: 38 additions & 2 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/internal/p2p"
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -80,6 +83,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
// peers
peers map[types.NodeID]*bpPeer
peerManager *p2p.PeerManager
maxPeerHeight int64 // the biggest reported height

// atomic
Expand All @@ -101,8 +105,8 @@ func NewBlockPool(
start int64,
requestsCh chan<- BlockRequest,
errorsCh chan<- peerError,
peerManager *p2p.PeerManager,
) *BlockPool {

bp := &BlockPool{
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
Expand All @@ -113,6 +117,7 @@ func NewBlockPool(
requestsCh: requestsCh,
errorsCh: errorsCh,
lastSyncRate: 0,
peerManager: peerManager,
}
bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp)
return bp
Expand Down Expand Up @@ -408,13 +413,44 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}

func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID {
// Generate a sorted list
sortedPeers := make([]types.NodeID, 0, len(peers))

for peer := range peers {
sortedPeers = append(sortedPeers, peer)
}
// Sort from high to low score
sort.Slice(sortedPeers, func(i, j int) bool {
return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j])
})
return sortedPeers
}

// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
// Generate a sorted list
sortedPeers := pool.getSortedPeers(pool.peers)
var goodPeers []types.NodeID
// Remove peers with 0 score and shuffle list
for _, peer := range sortedPeers {
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(peer) == "ready,connected" {
goodPeers = append(goodPeers, peer)
}
if pool.peerManager.Score(peer) == 0 {
break
}
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] })

for _, nodeId := range sortedPeers {
peer := pool.peers[nodeId]
if peer.didTimeout {
pool.removePeer(peer.id)
continue
Expand Down
78 changes: 67 additions & 11 deletions internal/blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package blocksync

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p"
dbm "github.com/tendermint/tm-db"
mrand "math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/types"
)

Expand All @@ -24,6 +28,7 @@ type testPeer struct {
base int64
height int64
inputChan chan inputData // make sure each peer's data is sequential
score p2p.PeerScore
}

type inputData struct {
Expand Down Expand Up @@ -70,17 +75,42 @@ func (ps testPeers) stop() {
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := types.NodeID(tmrand.Str(12))
bytes := make([]byte, 20)
if _, err := rand.Read(bytes); err != nil {
panic(err)
}
peerID := types.NodeID(hex.EncodeToString(bytes))
height := minHeight + mrand.Int63n(maxHeight-minHeight)
base := minHeight + int64(i)
if base > height {
base = height
}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10), 1}
}
return peers
}

func makePeerManager(peers map[types.NodeID]testPeer) *p2p.PeerManager {
selfKey := ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd})
selfID := types.NodeIDFromPubKey(selfKey.PubKey())
peerScores := make(map[types.NodeID]p2p.PeerScore)
for nodeId, peer := range peers {
peerScores[nodeId] = peer.score

}
peerManager, _ := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: peerScores,
MaxConnected: 1,
MaxConnectedUpgrade: 2,
}, p2p.NopMetrics())
for nodeId, _ := range peers {
_, err := peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: nodeId})
if err != nil {
panic(err)
}
}
return peerManager
}
func TestBlockPoolBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -89,7 +119,7 @@ func TestBlockPoolBasic(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, makePeerManager(peers))

if err := pool.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -147,7 +177,7 @@ func TestBlockPoolTimeout(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(logger, start, requestsCh, errorsCh)
pool := NewBlockPool(logger, start, requestsCh, errorsCh, makePeerManager(peers))
err := pool.Start(ctx)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -203,14 +233,19 @@ func TestBlockPoolRemovePeer(t *testing.T) {

peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := types.NodeID(fmt.Sprintf("%d", i+1))
var peerID types.NodeID
if i+1 == 10 {
peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 20))
} else {
peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 40))
}
height := int64(i + 1)
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)}
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)

pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers))
err := pool.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); pool.Wait() })
Expand All @@ -225,7 +260,7 @@ func TestBlockPoolRemovePeer(t *testing.T) {
assert.NotPanics(t, func() { pool.RemovePeer(types.NodeID("Superman")) })

// remove peer with biggest height
pool.RemovePeer(types.NodeID("10"))
pool.RemovePeer(types.NodeID(strings.Repeat("10", 20)))
assert.EqualValues(t, 9, pool.MaxPeerHeight())

// remove all peers
Expand All @@ -235,3 +270,24 @@ func TestBlockPoolRemovePeer(t *testing.T) {

assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

func TestSortedPeers(t *testing.T) {
peers := make(testPeers, 10)
peerIdA := types.NodeID(strings.Repeat("a", 40))
peerIdB := types.NodeID(strings.Repeat("b", 40))
peerIdC := types.NodeID(strings.Repeat("c", 40))

peers[peerIdA] = testPeer{peerIdA, 0, 1, make(chan inputData), 11}
peers[peerIdB] = testPeer{peerIdA, 0, 1, make(chan inputData), 10}
peers[peerIdC] = testPeer{peerIdA, 0, 1, make(chan inputData), 13}

requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers))
// add peers
for peerID, peer := range peers {
pool.SetPeerRange(peerID, peer.base, peer.height)
}
// Peers should be sorted by score via peerManager
assert.Equal(t, []types.NodeID{peerIdC, peerIdA, peerIdB}, pool.getSortedPeers(pool.peers))
}
9 changes: 6 additions & 3 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool

peerEvents p2p.PeerEventSubscriber
channel *p2p.Channel
peerEvents p2p.PeerEventSubscriber
peerManager *p2p.PeerManager
channel *p2p.Channel

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
Expand All @@ -105,6 +106,7 @@ func NewReactor(
store *store.BlockStore,
consReactor consensusReactor,
peerEvents p2p.PeerEventSubscriber,
peerManager *p2p.PeerManager,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
Expand All @@ -119,6 +121,7 @@ func NewReactor(
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
peerEvents: peerEvents,
peerManager: peerManager,
metrics: metrics,
eventBus: eventBus,
restartCh: restartCh,
Expand Down Expand Up @@ -159,7 +162,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {

requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, r.peerManager)
r.requestsCh = requestsCh
r.errorsCh = errorsCh

Expand Down
Loading

0 comments on commit dd79101

Please sign in to comment.