Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use peermanager scores for blocksync peers and don't error out on block mismatch #162

Merged
merged 21 commits into from
Oct 31, 2023
Merged
46 changes: 42 additions & 4 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/internal/p2p"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: fix lint issue

"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -47,7 +50,7 @@ const (
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
)

var peerTimeout = 15 * time.Second // not const so we can override with tests
var peerTimeout = 3 * time.Second // not const so we can override with tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting 15s for timeout is too long given we expect blocks much more frequently


/*
Peers self report their heights when we join the block pool.
Expand Down Expand Up @@ -80,6 +83,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
// peers
peers map[types.NodeID]*bpPeer
peerManager *p2p.PeerManager
maxPeerHeight int64 // the biggest reported height

// atomic
Expand All @@ -101,8 +105,8 @@ func NewBlockPool(
start int64,
requestsCh chan<- BlockRequest,
errorsCh chan<- peerError,
peerManager *p2p.PeerManager,
) *BlockPool {

bp := &BlockPool{
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
Expand All @@ -113,6 +117,7 @@ func NewBlockPool(
requestsCh: requestsCh,
errorsCh: errorsCh,
lastSyncRate: 0,
peerManager: peerManager,
}
bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp)
return bp
Expand Down Expand Up @@ -315,7 +320,9 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm
}
} else {
err := errors.New("requester is different or block already exists")
pool.sendError(err, peerID)
// Original behavior is to error out when there is a mismatch, which shuts down the entire reactor.
Copy link
Contributor

@yzang2019 yzang2019 Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, one question: do we understand why the original behavior wants to error this out and shutdown the entire reactor? Is there going to be any side effect if we change this behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's revert this now, and when the standalone rpc falls behind again (currently it is healthly, and was upgraded to the new version), we can re-apply this patch and see if it helps

// Instead, make the reactor more robust and just log error
//pool.sendError(err, peerID)
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
}

Expand Down Expand Up @@ -408,13 +415,44 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}

func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID {
// Generate a sorted list
sortedPeers := make([]types.NodeID, 0, len(peers))

for peer := range peers {
sortedPeers = append(sortedPeers, peer)
}
// Sort from high to low score
sort.Slice(sortedPeers, func(i, j int) bool {
return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j])
})
return sortedPeers
}

// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
// Generate a sorted list
sortedPeers := pool.getSortedPeers(pool.peers)
var goodPeers []types.NodeID
// Remove peers with 0 score and shuffle list
for _, peer := range sortedPeers {
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(peer) == "ready,connected" {
goodPeers = append(goodPeers, peer)
}
if pool.peerManager.Score(peer) == 0 {
break
}
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] })

for _, nodeId := range sortedPeers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few further optimizations we can do:

  1. We should probably avoid using the low score peers, for example the ones with score 0
  2. We should do some random shuffling so that we don't always targeting the same few top peers

peer := pool.peers[nodeId]
if peer.didTimeout {
pool.removePeer(peer.id)
continue
Expand Down
69 changes: 60 additions & 9 deletions internal/blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package blocksync

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p"
dbm "github.com/tendermint/tm-db"
mrand "math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/types"
)

Expand All @@ -24,6 +28,7 @@ type testPeer struct {
base int64
height int64
inputChan chan inputData // make sure each peer's data is sequential
score p2p.PeerScore
}

type inputData struct {
Expand Down Expand Up @@ -70,17 +75,42 @@ func (ps testPeers) stop() {
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := types.NodeID(tmrand.Str(12))
bytes := make([]byte, 20)
if _, err := rand.Read(bytes); err != nil {
panic(err)
}
peerID := types.NodeID(hex.EncodeToString(bytes))
height := minHeight + mrand.Int63n(maxHeight-minHeight)
base := minHeight + int64(i)
if base > height {
base = height
}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10), 1}
}
return peers
}

func makePeerManager(peers map[types.NodeID]testPeer) *p2p.PeerManager {
selfKey := ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd})
selfID := types.NodeIDFromPubKey(selfKey.PubKey())
peerScores := make(map[types.NodeID]p2p.PeerScore)
for nodeId, peer := range peers {
peerScores[nodeId] = peer.score

}
peerManager, _ := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: peerScores,
MaxConnected: 1,
MaxConnectedUpgrade: 2,
}, p2p.NopMetrics())
for nodeId, _ := range peers {
_, err := peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: nodeId})
if err != nil {
panic(err)
}
}
return peerManager
}
func TestBlockPoolBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -89,7 +119,7 @@ func TestBlockPoolBasic(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, makePeerManager(peers))

if err := pool.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -147,7 +177,7 @@ func TestBlockPoolTimeout(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(logger, start, requestsCh, errorsCh)
pool := NewBlockPool(logger, start, requestsCh, errorsCh, makePeerManager(peers))
err := pool.Start(ctx)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -205,12 +235,12 @@ func TestBlockPoolRemovePeer(t *testing.T) {
for i := 0; i < 10; i++ {
peerID := types.NodeID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)}
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)

pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, nil)
err := pool.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); pool.Wait() })
Expand All @@ -235,3 +265,24 @@ func TestBlockPoolRemovePeer(t *testing.T) {

assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

func TestSortedPeers(t *testing.T) {
peers := make(testPeers, 10)
peerIdA := types.NodeID(strings.Repeat("a", 40))
peerIdB := types.NodeID(strings.Repeat("b", 40))
peerIdC := types.NodeID(strings.Repeat("c", 40))

peers[peerIdA] = testPeer{peerIdA, 0, 1, make(chan inputData), 11}
peers[peerIdB] = testPeer{peerIdA, 0, 1, make(chan inputData), 10}
peers[peerIdC] = testPeer{peerIdA, 0, 1, make(chan inputData), 13}

requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers))
// add peers
for peerID, peer := range peers {
pool.SetPeerRange(peerID, peer.base, peer.height)
}
// Peers should be sorted by score via peerManager
assert.Equal(t, []types.NodeID{peerIdC, peerIdA, peerIdB}, pool.getSortedPeers(pool.peers))
}
9 changes: 6 additions & 3 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool

peerEvents p2p.PeerEventSubscriber
channel *p2p.Channel
peerEvents p2p.PeerEventSubscriber
peerManager *p2p.PeerManager
channel *p2p.Channel

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
Expand All @@ -105,6 +106,7 @@ func NewReactor(
store *store.BlockStore,
consReactor consensusReactor,
peerEvents p2p.PeerEventSubscriber,
peerManager *p2p.PeerManager,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
Expand All @@ -119,6 +121,7 @@ func NewReactor(
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
peerEvents: peerEvents,
peerManager: peerManager,
metrics: metrics,
eventBus: eventBus,
restartCh: restartCh,
Expand Down Expand Up @@ -159,7 +162,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {

requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, r.peerManager)
r.requestsCh = requestsCh
r.errorsCh = errorsCh

Expand Down
Loading
Loading