Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EVM] Fix evm pending nonce #179

Merged
merged 13 commits into from
Jan 4, 2024
5 changes: 3 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ jobs:
- name: Run Go Tests
run: |
NUM_SPLIT=20
make test-group-${{matrix.part}} NUM_SPLIT=20
make split-test-packages
make test-group-${{matrix.part}}

- name: Upload coverage artifact
uses: actions/upload-artifact@v3
Expand Down Expand Up @@ -99,7 +100,7 @@ jobs:
with:
max_attempts: 2
retry_on: error
timeout_seconds: 30
timeout_seconds: 60
command: |
jobs=$(curl https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/jobs)
job_statuses=$(echo "$jobs" | jq -r '.jobs[] | .conclusion')
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,6 @@ $(BUILDDIR)/packages.txt:$(GO_TEST_FILES) $(BUILDDIR)
go list -f "{{ if (or .TestGoFiles .XTestGoFiles) }}{{ .ImportPath }}{{ end }}" ./... | sort > $@

split-test-packages:$(BUILDDIR)/packages.txt
split -d -n l/$(NUM_SPLIT) $< $<.
split -d -l $(NUM_SPLIT) $< $<.
test-group-%:split-test-packages
cat $(BUILDDIR)/packages.txt.$* | xargs go test -mod=readonly -timeout=10m -race -covermode=atomic -coverprofile=$*.profile.out
cat $(BUILDDIR)/packages.txt.$* | xargs go test -mod=readonly -timeout=15m -race -covermode=atomic -coverprofile=$*.profile.out
4 changes: 3 additions & 1 deletion abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ const (
)

type PendingTxChecker func() PendingTxCheckerResponse
type ExpireTxHandler func()

// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// ResponseCheckTxV2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler
}
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,9 +1383,9 @@ func DefaultSelfRemediationConfig() *SelfRemediationConfig {
P2pNoPeersRestarWindowSeconds: 0,
StatesyncNoPeersRestartWindowSeconds: 0,
BlocksBehindThreshold: 0,
BlocksBehindCheckIntervalSeconds: 30,
BlocksBehindCheckIntervalSeconds: 60,
// 30 minutes
RestartCooldownSeconds: 1800,
RestartCooldownSeconds: 600,
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/tendermint/tendermint

go 1.17
go 1.19

require (
github.com/BurntSushi/toml v1.1.0
Expand Down
225 changes: 0 additions & 225 deletions go.sum

Large diffs are not rendered by default.

148 changes: 95 additions & 53 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/internal/p2p"
"math"
"math/rand"
"sort"
Expand All @@ -13,6 +12,7 @@ import (
"time"

"github.com/tendermint/tendermint/internal/libs/flowrate"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types"
Expand All @@ -31,7 +31,7 @@ eg, L = latency = 0.1s
*/

const (
requestInterval = 2 * time.Millisecond
requestInterval = 10 * time.Millisecond
inactiveSleepInterval = 1 * time.Second
maxTotalRequesters = 600
maxPeerErrBuffer = 1000
Expand All @@ -48,9 +48,13 @@ const (

// Maximum difference between current and new block's height.
maxDiffBetweenCurrentAndReceivedBlockHeight = 100

// Used to indicate the reason of the redo
PeerRemoved RetryReason = "PeerRemoved"
BadBlock RetryReason = "BadBlock"
)

var peerTimeout = 15 * time.Second // not const so we can override with tests
var peerTimeout = 10 * time.Second // not const so we can override with tests

/*
Peers self report their heights when we join the block pool.
Expand Down Expand Up @@ -187,7 +191,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
}

if peer.didTimeout {
pool.removePeer(peer.id)
pool.removePeer(peer.id, true)
}
}
}
Expand All @@ -206,8 +210,8 @@ func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
// Need at least 2 peers to be considered caught up.
if len(pool.peers) <= 1 {
return false
}

Expand Down Expand Up @@ -277,8 +281,13 @@ func (pool *BlockPool) RedoRequest(height int64) types.NodeID {
request := pool.requesters[height]
peerID := request.getPeerID()
if peerID != types.NodeID("") {
// RemovePeer will redo all requesters associated with this peer.
pool.removePeer(peerID)
pool.removePeer(peerID, false)
}
// Redo all requesters associated with this peer.
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID, BadBlock)
}
}
return peerID
}
Expand Down Expand Up @@ -376,13 +385,15 @@ func (pool *BlockPool) RemovePeer(peerID types.NodeID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

pool.removePeer(peerID)
pool.removePeer(peerID, true)
}

func (pool *BlockPool) removePeer(peerID types.NodeID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID)
func (pool *BlockPool) removePeer(peerID types.NodeID, redo bool) {
stevenlanders marked this conversation as resolved.
Show resolved Hide resolved
if redo {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID, PeerRemoved)
}
}
}

Expand Down Expand Up @@ -437,22 +448,10 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
sortedPeers := pool.getSortedPeers(pool.peers)
var goodPeers []types.NodeID
// Remove peers with 0 score and shuffle list
for _, peer := range sortedPeers {
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(peer) == "ready,connected" {
goodPeers = append(goodPeers, peer)
}
if pool.peerManager.Score(peer) == 0 {
break
}
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] })

for _, nodeId := range sortedPeers {
peer := pool.peers[nodeId]
if peer.didTimeout {
pool.removePeer(peer.id)
pool.removePeer(peer.id, true)
continue
}
if peer.numPending >= maxPendingRequestsPerPeer {
Expand All @@ -461,6 +460,25 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
if height < peer.base || height > peer.height {
continue
}
// We only want to work with peers that are ready & connected (not dialing)
if pool.peerManager.State(nodeId) == "ready,connected" {
goodPeers = append(goodPeers, nodeId)
}

// Skip the ones with zero score to avoid connecting to bad peers
if pool.peerManager.Score(nodeId) <= 0 {
break
}
}

// randomly pick one
if len(goodPeers) > 0 {
rand.Seed(time.Now().UnixNano())
index := rand.Intn(len(goodPeers))
if index >= len(goodPeers) {
index = len(goodPeers) - 1
}
peer := pool.peers[goodPeers[index]]
peer.incrPending()
return peer
}
Expand Down Expand Up @@ -606,28 +624,35 @@ func (peer *bpPeer) onTimeout() {

type bpRequester struct {
service.BaseService
logger log.Logger
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan types.NodeID // redo may send multitime, add peerId to identify repeat
logger log.Logger
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan RedoOp // redo may send multitime, add peerId to identify repeat
timeoutTicker *time.Ticker
mtx sync.Mutex
peerID types.NodeID
block *types.Block
extCommit *types.ExtendedCommit
}

type RetryReason string

mtx sync.Mutex
peerID types.NodeID
block *types.Block
extCommit *types.ExtendedCommit
type RedoOp struct {
PeerId types.NodeID
Reason RetryReason
}

func newBPRequester(logger log.Logger, pool *BlockPool, height int64) *bpRequester {
bpr := &bpRequester{
logger: pool.logger,
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan types.NodeID, 1),

peerID: "",
block: nil,
logger: pool.logger,
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan RedoOp, 1),
timeoutTicker: time.NewTicker(peerTimeout),
peerID: "",
block: nil,
}
bpr.BaseService = *service.NewBaseService(logger, "bpRequester", bpr)
return bpr
Expand Down Expand Up @@ -679,25 +704,34 @@ func (bpr *bpRequester) getPeerID() types.NodeID {
}

// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() {
func (bpr *bpRequester) reset(force bool) bool {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()

if bpr.block != nil && !force {
// Do not reset if we already have a block
return false
}

if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}

bpr.peerID = ""
bpr.block = nil
bpr.extCommit = nil
return true
}

// Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo(peerID types.NodeID) {
func (bpr *bpRequester) redo(peerID types.NodeID, retryReason RetryReason) {
select {
case bpr.redoCh <- peerID:
case bpr.redoCh <- RedoOp{
PeerId: peerID,
Reason: retryReason,
}:
default:
}
}
Expand All @@ -711,7 +745,8 @@ OUTER_LOOP:
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
if !bpr.IsRunning() || !bpr.pool.IsRunning() || ctx.Err() != nil {
bpr.timeoutTicker.Stop()
return
}
if ctx.Err() != nil {
Expand All @@ -734,21 +769,28 @@ OUTER_LOOP:

// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
bpr.timeoutTicker.Reset(peerTimeout)
WAIT_LOOP:
for {
select {
case <-ctx.Done():
bpr.timeoutTicker.Stop()
return
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
case redoOp := <-bpr.redoCh:
// if we don't have an existing block or this is a bad block
// we should reset the previous block
if bpr.reset(redoOp.Reason == BadBlock) {
continue OUTER_LOOP
}
continue WAIT_LOOP
case <-bpr.timeoutTicker.C:
if bpr.reset(false) {
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
// Continue the for-loop and wait til Quit
// in case we need to reset the block
continue WAIT_LOOP
}
}
Expand Down
Loading
Loading