Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Wait to join consensus until synced with best peer #2020

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions consensus/polybft/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error {
return args.Error(0)
}

func (tp *syncerMock) IsSyncing() bool {
args := tp.Called()

return args.Bool(0)
}

func init() {
// setup custom hash header func
setupHeaderHashFunc()
Expand Down
14 changes: 13 additions & 1 deletion consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,24 @@ func (p *Polybft) startConsensusProtocol() {
if ev.Source == "syncer" && ev.NewChain[0].Number >= p.blockchain.CurrentHeader().Number {
p.logger.Info("sync block notification received", "block height", ev.NewChain[0].Number,
"current height", p.blockchain.CurrentHeader().Number)
syncerBlockCh <- struct{}{}

select {
case syncerBlockCh <- struct{}{}:
default:
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}()

// wait until he stops syncing
p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator")

for p.syncer.IsSyncing() {
}

p.logger.Info("node synced up on start. Trying to join consensus if validator")

var (
sequenceCh <-chan struct{}
stopSequence func()
Expand Down
34 changes: 29 additions & 5 deletions e2e-polybft/e2e/consensus_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"bytes"
"fmt"
"math/big"
"path"
Expand Down Expand Up @@ -59,22 +60,40 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) {
})

t.Run("sync protocol, drop single validator node", func(t *testing.T) {
validatorSrv := cluster.Servers[0]
validatorAcc, err := sidechain.GetAccountFromDir(validatorSrv.DataDir())
require.NoError(t, err)

// query the current block number, as it is a starting point for the test
currentBlockNum, err := cluster.Servers[0].JSONRPC().Eth().BlockNumber()
currentBlockNum, err := validatorSrv.JSONRPC().Eth().BlockNumber()
require.NoError(t, err)

// wait for 2 epochs to elapse, before we stop the node
require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute))

// stop one node
node := cluster.Servers[0]
node.Stop()
validatorSrv.Stop()

// check what is the current block on the running nodes
currentBlockNum, err = cluster.Servers[1].JSONRPC().Eth().BlockNumber()
require.NoError(t, err)

// wait for 2 epochs to elapse, so that rest of the network progresses
require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute))

// start the node again
node.Start()
validatorSrv.Start()

// wait 2 more epochs to elapse and make sure that stopped node managed to catch up
require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute))

// wait until the validator mines one block to check if he is back in consensus
goran-ethernal marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, cluster.WaitUntil(3*time.Minute, 2*time.Second, func() bool {
latestBlock, err := cluster.Servers[0].JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false)
require.NoError(t, err)

return bytes.Equal(validatorAcc.Address().Bytes(), latestBlock.Miner.Bytes())
}))
})

t.Run("sync protocol, drop single non-validator node", func(t *testing.T) {
Expand All @@ -93,7 +112,12 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) {
node.Start()

// wait 2 more epochs to elapse and make sure that stopped node managed to catch up
require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute))
blockToWait := currentBlockNum + 4*epochSize
require.NoError(t, cluster.WaitForBlock(blockToWait, 2*time.Minute))

latestBlockOnDroppedNode, err := node.JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false)
require.NoError(t, err)
require.GreaterOrEqual(t, latestBlockOnDroppedNode.Number, blockToWait)
})
}

Expand Down
5 changes: 5 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
}
}

// IsSyncing indicates if node is syncing with peer
func (s *syncer) IsSyncing() bool {
return s.GetSyncProgression() != nil
}

func updateMetrics(fullBlock *types.FullBlock) {
metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions)))
metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts)))
Expand Down
2 changes: 2 additions & 0 deletions syncer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Syncer interface {
HasSyncPeer() bool
// Sync starts routine to sync blocks
Sync(func(*types.FullBlock) bool) error
// IsSyncing indicates if syncer is syncing with the best peer
IsSyncing() bool
}

type Progression interface {
Expand Down
Loading