diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 2a315eadd..3f9a970a3 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" + "github.com/tendermint/tendermint/internal/p2p" "math" + "math/rand" + "sort" "sync" "sync/atomic" "time" @@ -80,6 +83,7 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[types.NodeID]*bpPeer + peerManager *p2p.PeerManager maxPeerHeight int64 // the biggest reported height // atomic @@ -101,8 +105,8 @@ func NewBlockPool( start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError, + peerManager *p2p.PeerManager, ) *BlockPool { - bp := &BlockPool{ logger: logger, peers: make(map[types.NodeID]*bpPeer), @@ -113,6 +117,7 @@ func NewBlockPool( requestsCh: requestsCh, errorsCh: errorsCh, lastSyncRate: 0, + peerManager: peerManager, } bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp) return bp @@ -408,13 +413,44 @@ func (pool *BlockPool) updateMaxPeerHeight() { pool.maxPeerHeight = max } +func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID { + // Generate a sorted list + sortedPeers := make([]types.NodeID, 0, len(peers)) + + for peer := range peers { + sortedPeers = append(sortedPeers, peer) + } + // Sort from high to low score + sort.Slice(sortedPeers, func(i, j int) bool { + return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j]) + }) + return sortedPeers +} + // Pick an available peer with the given height available. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + // Generate a sorted list + sortedPeers := pool.getSortedPeers(pool.peers) + var goodPeers []types.NodeID + // Remove peers with 0 score and shuffle list + for _, peer := range sortedPeers { + // We only want to work with peers that are ready & connected (not dialing) + if pool.peerManager.State(peer) == "ready,connected" { + goodPeers = append(goodPeers, peer) + } + if pool.peerManager.Score(peer) == 0 { + break + } + } + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] }) + + for _, nodeId := range sortedPeers { + peer := pool.peers[nodeId] if peer.didTimeout { pool.removePeer(peer.id) continue diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index 3c47b4a64..27caad060 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -2,16 +2,20 @@ package blocksync import ( "context" + "crypto/rand" + "encoding/hex" "fmt" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/internal/p2p" + dbm "github.com/tendermint/tm-db" mrand "math/rand" + "strings" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/log" - tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/types" ) @@ -24,6 +28,7 @@ type testPeer struct { base int64 height int64 inputChan chan inputData // make sure each peer's data is sequential + score p2p.PeerScore } type inputData struct { @@ -70,17 +75,42 @@ func (ps testPeers) stop() { func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { peers := make(testPeers, numPeers) for i := 0; i < numPeers; i++ { - peerID := types.NodeID(tmrand.Str(12)) + bytes := make([]byte, 20) + if _, err := rand.Read(bytes); err != nil { + panic(err) + } + peerID := types.NodeID(hex.EncodeToString(bytes)) height := minHeight + mrand.Int63n(maxHeight-minHeight) base := minHeight + int64(i) if base > height { base = height } - peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)} + peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10), 1} } return peers } +func makePeerManager(peers map[types.NodeID]testPeer) *p2p.PeerManager { + selfKey := ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd}) + selfID := types.NodeIDFromPubKey(selfKey.PubKey()) + peerScores := make(map[types.NodeID]p2p.PeerScore) + for nodeId, peer := range peers { + peerScores[nodeId] = peer.score + + } + peerManager, _ := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ + PeerScores: peerScores, + MaxConnected: 1, + MaxConnectedUpgrade: 2, + }, p2p.NopMetrics()) + for nodeId, _ := range peers { + _, err := peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: nodeId}) + if err != nil { + panic(err) + } + } + return peerManager +} func TestBlockPoolBasic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -89,7 +119,7 @@ func TestBlockPoolBasic(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh) + pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, makePeerManager(peers)) if err := pool.Start(ctx); err != nil { t.Error(err) @@ -147,7 +177,7 @@ func TestBlockPoolTimeout(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(logger, start, requestsCh, errorsCh) + pool := NewBlockPool(logger, start, requestsCh, errorsCh, makePeerManager(peers)) err := pool.Start(ctx) if err != nil { t.Error(err) @@ -203,14 +233,19 @@ func TestBlockPoolRemovePeer(t *testing.T) { peers := make(testPeers, 10) for i := 0; i < 10; i++ { - peerID := types.NodeID(fmt.Sprintf("%d", i+1)) + var peerID types.NodeID + if i+1 == 10 { + peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 20)) + } else { + peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 40)) + } height := int64(i + 1) - peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)} + peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1} } requestsCh := make(chan BlockRequest) errorsCh := make(chan peerError) - pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh) + pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers)) err := pool.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); pool.Wait() }) @@ -225,7 +260,7 @@ func TestBlockPoolRemovePeer(t *testing.T) { assert.NotPanics(t, func() { pool.RemovePeer(types.NodeID("Superman")) }) // remove peer with biggest height - pool.RemovePeer(types.NodeID("10")) + pool.RemovePeer(types.NodeID(strings.Repeat("10", 20))) assert.EqualValues(t, 9, pool.MaxPeerHeight()) // remove all peers @@ -235,3 +270,24 @@ func TestBlockPoolRemovePeer(t *testing.T) { assert.EqualValues(t, 0, pool.MaxPeerHeight()) } + +func TestSortedPeers(t *testing.T) { + peers := make(testPeers, 10) + peerIdA := types.NodeID(strings.Repeat("a", 40)) + peerIdB := types.NodeID(strings.Repeat("b", 40)) + peerIdC := types.NodeID(strings.Repeat("c", 40)) + + peers[peerIdA] = testPeer{peerIdA, 0, 1, make(chan inputData), 11} + peers[peerIdB] = testPeer{peerIdA, 0, 1, make(chan inputData), 10} + peers[peerIdC] = testPeer{peerIdA, 0, 1, make(chan inputData), 13} + + requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError) + pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers)) + // add peers + for peerID, peer := range peers { + pool.SetPeerRange(peerID, peer.base, peer.height) + } + // Peers should be sorted by score via peerManager + assert.Equal(t, []types.NodeID{peerIdC, peerIdA, peerIdB}, pool.getSortedPeers(pool.peers)) +} diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 9935f7f33..caa3d213b 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -81,8 +81,9 @@ type Reactor struct { consReactor consensusReactor blockSync *atomicBool - peerEvents p2p.PeerEventSubscriber - channel *p2p.Channel + peerEvents p2p.PeerEventSubscriber + peerManager *p2p.PeerManager + channel *p2p.Channel requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -105,6 +106,7 @@ func NewReactor( store *store.BlockStore, consReactor consensusReactor, peerEvents p2p.PeerEventSubscriber, + peerManager *p2p.PeerManager, blockSync bool, metrics *consensus.Metrics, eventBus *eventbus.EventBus, @@ -119,6 +121,7 @@ func NewReactor( consReactor: consReactor, blockSync: newAtomicBool(blockSync), peerEvents: peerEvents, + peerManager: peerManager, metrics: metrics, eventBus: eventBus, restartCh: restartCh, @@ -159,7 +162,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { requestsCh := make(chan BlockRequest, maxTotalRequesters) errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. - r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh) + r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, r.peerManager) r.requestsCh = requestsCh r.errorsCh = errorsCh diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 09c6fedbe..0dfd94b80 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -106,6 +106,7 @@ func makeReactor( privVal types.PrivValidator, channelCreator p2p.ChannelCreator, peerEvents p2p.PeerEventSubscriber, + peerManager *p2p.PeerManager, restartChan chan struct{}, selfRemediationConfig *config.SelfRemediationConfig, ) *Reactor { @@ -158,6 +159,7 @@ func makeReactor( blockStore, nil, peerEvents, + peerManager, true, consensus.NopMetrics(), nil, // eventbus, can be nil @@ -203,6 +205,7 @@ func (rts *reactorTestSuite) addNode( privVal, chCreator, peerEvents, + rts.network.Nodes[nodeID].PeerManager, restartChan, config.DefaultSelfRemediationConfig(), ) @@ -354,49 +357,49 @@ func (m *MockBlockStore) Height() int64 { func TestAutoRestartIfBehind(t *testing.T) { t.Parallel() tests := []struct { - name string - blocksBehindThreshold uint64 + name string + blocksBehindThreshold uint64 blocksBehindCheckInterval time.Duration - selfHeight int64 - maxPeerHeight int64 - isBlockSync bool - restartExpected bool + selfHeight int64 + maxPeerHeight int64 + isBlockSync bool + restartExpected bool }{ { - name: "Should not restart if blocksBehindThreshold is 0", - blocksBehindThreshold: 0, + name: "Should not restart if blocksBehindThreshold is 0", + blocksBehindThreshold: 0, blocksBehindCheckInterval: 10 * time.Millisecond, - selfHeight: 100, - maxPeerHeight: 200, - isBlockSync: false, - restartExpected: false, + selfHeight: 100, + maxPeerHeight: 200, + isBlockSync: false, + restartExpected: false, }, { - name: "Should not restart if behindHeight is less than threshold", - blocksBehindThreshold: 50, - selfHeight: 100, + name: "Should not restart if behindHeight is less than threshold", + blocksBehindThreshold: 50, + selfHeight: 100, blocksBehindCheckInterval: 10 * time.Millisecond, - maxPeerHeight: 140, - isBlockSync: false, - restartExpected: false, + maxPeerHeight: 140, + isBlockSync: false, + restartExpected: false, }, { - name: "Should restart if behindHeight is greater than or equal to threshold", - blocksBehindThreshold: 50, - selfHeight: 100, + name: "Should restart if behindHeight is greater than or equal to threshold", + blocksBehindThreshold: 50, + selfHeight: 100, blocksBehindCheckInterval: 10 * time.Millisecond, - maxPeerHeight: 160, - isBlockSync: false, - restartExpected: true, + maxPeerHeight: 160, + isBlockSync: false, + restartExpected: true, }, { - name: "Should not restart if blocksync", - blocksBehindThreshold: 50, - selfHeight: 100, + name: "Should not restart if blocksync", + blocksBehindThreshold: 50, + selfHeight: 100, blocksBehindCheckInterval: 10 * time.Millisecond, - maxPeerHeight: 160, - isBlockSync: true, - restartExpected: false, + maxPeerHeight: 160, + isBlockSync: true, + restartExpected: false, }, } @@ -407,21 +410,20 @@ func TestAutoRestartIfBehind(t *testing.T) { mockBlockStore.On("Height").Return(tt.selfHeight) blockPool := &BlockPool{ - logger: log.TestingLogger(), - height: tt.selfHeight, + logger: log.TestingLogger(), + height: tt.selfHeight, maxPeerHeight: tt.maxPeerHeight, - } restartChan := make(chan struct{}, 1) r := &Reactor{ - logger: log.TestingLogger(), - store: mockBlockStore, - pool: blockPool, - blocksBehindThreshold: tt.blocksBehindThreshold, + logger: log.TestingLogger(), + store: mockBlockStore, + pool: blockPool, + blocksBehindThreshold: tt.blocksBehindThreshold, blocksBehindCheckInterval: tt.blocksBehindCheckInterval, - restartCh: restartChan, - blockSync: newAtomicBool(tt.isBlockSync), + restartCh: restartChan, + blockSync: newAtomicBool(tt.isBlockSync), } ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) diff --git a/node/node.go b/node/node.go index 2b2a0c9e8..0cd8a62bf 100644 --- a/node/node.go +++ b/node/node.go @@ -367,6 +367,7 @@ func makeNode( blockStore, csReactor, peerManager.Subscribe, + peerManager, blockSync && !stateSync && !shoulddbsync, nodeMetrics.consensus, eventBus, diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index b1adcd109..85587dbf8 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -203,6 +203,7 @@ func startLightNode(ctx context.Context, logger log.Logger, cfg *Config) error { providers[0], providers[1:], dbs.New(lightDB), + 5*time.Minute, light.Logger(nodeLogger), ) if err != nil {