diff --git a/consensus/polybft/mocks_test.go b/consensus/polybft/mocks_test.go index c2e8c6424b..3ca22f86ea 100644 --- a/consensus/polybft/mocks_test.go +++ b/consensus/polybft/mocks_test.go @@ -395,6 +395,12 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error { return args.Error(0) } +func (tp *syncerMock) IsSyncingWithPeer() bool { + args := tp.Called() + + return args.Bool(0) +} + func init() { // setup custom hash header func setupHeaderHashFunc() diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 9ebf2fae75..633a0ab0dc 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -599,12 +599,22 @@ func (p *Polybft) startConsensusProtocol() { if ev.Source == "syncer" && ev.NewChain[0].Number >= p.blockchain.CurrentHeader().Number { p.logger.Info("sync block notification received", "block height", ev.NewChain[0].Number, "current height", p.blockchain.CurrentHeader().Number) - syncerBlockCh <- struct{}{} + + select { + case syncerBlockCh <- struct{}{}: + default: + } } } } }() + // wait until he stops syncing + p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator") + for p.syncer.IsSyncingWithPeer() { + } + p.logger.Info("node synced up on start. Trying to join consensus if validator") + var ( sequenceCh <-chan struct{} stopSequence func() diff --git a/e2e-polybft/e2e/consensus_test.go b/e2e-polybft/e2e/consensus_test.go index b65d5d0928..a924a0236e 100644 --- a/e2e-polybft/e2e/consensus_test.go +++ b/e2e-polybft/e2e/consensus_test.go @@ -1,6 +1,7 @@ package e2e import ( + "bytes" "fmt" "math/big" "path" @@ -53,22 +54,40 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) { }) t.Run("sync protocol, drop single validator node", func(t *testing.T) { + validatorSrv := cluster.Servers[0] + validatorAcc, err := sidechain.GetAccountFromDir(validatorSrv.DataDir()) + require.NoError(t, err) + // query the current block number, as it is a starting point for the test - currentBlockNum, err := cluster.Servers[0].JSONRPC().Eth().BlockNumber() + currentBlockNum, err := validatorSrv.JSONRPC().Eth().BlockNumber() require.NoError(t, err) + // wait for 2 epochs to elapse, before we stop the node + require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute)) + // stop one node - node := cluster.Servers[0] - node.Stop() + validatorSrv.Stop() + + // check what is the current block on the running nodes + currentBlockNum, err = cluster.Servers[1].JSONRPC().Eth().BlockNumber() + require.NoError(t, err) // wait for 2 epochs to elapse, so that rest of the network progresses require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute)) // start the node again - node.Start() + validatorSrv.Start() // wait 2 more epochs to elapse and make sure that stopped node managed to catch up require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute)) + + // wait until the validator mines one block to check if he is back in consensus + cluster.WaitUntil(3*time.Minute, 2*time.Second, func() bool { + latestBlock, err := cluster.Servers[0].JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false) + require.NoError(t, err) + + return bytes.Equal(validatorAcc.Address().Bytes(), latestBlock.Miner.Bytes()) + }) }) t.Run("sync protocol, drop single non-validator node", func(t *testing.T) { @@ -87,7 +106,12 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) { node.Start() // wait 2 more epochs to elapse and make sure that stopped node managed to catch up - require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute)) + blockToWait := currentBlockNum + 4*epochSize + require.NoError(t, cluster.WaitForBlock(blockToWait, 2*time.Minute)) + + latestBlockOnDroppedNode, err := node.JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false) + require.NoError(t, err) + require.GreaterOrEqual(t, latestBlockOnDroppedNode.Number, blockToWait) }) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 5c20b52624..c00773471f 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -3,6 +3,7 @@ package syncer import ( "errors" "fmt" + "sync/atomic" "time" "github.com/0xPolygon/polygon-edge/helper/progress" @@ -38,6 +39,8 @@ type syncer struct { // Channel to notify Sync that a new status arrived newStatusCh chan struct{} + + isSyncing atomic.Bool } func NewSyncer( @@ -219,6 +222,9 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, return 0, false, err } + s.setIsSyncing(true) + defer s.setIsSyncing(false) + // Create a blockchain subscription for the sync progression and start tracking subscription := s.blockchain.SubscribeEvents() s.syncProgression.StartProgression(localLatest+1, subscription) @@ -272,6 +278,16 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, } } +// setIsSyncing updates the isSyncing field +func (s *syncer) setIsSyncing(isSyncing bool) { + s.isSyncing.Store(isSyncing) +} + +// IsSyncingWithPeer indicates if node is syncing with peer +func (s *syncer) IsSyncingWithPeer() bool { + return s.isSyncing.Load() +} + func updateMetrics(fullBlock *types.FullBlock) { metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions))) metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts))) diff --git a/syncer/types.go b/syncer/types.go index 9e3f81f36e..cba42c4b94 100644 --- a/syncer/types.go +++ b/syncer/types.go @@ -70,6 +70,8 @@ type Syncer interface { HasSyncPeer() bool // Sync starts routine to sync blocks Sync(func(*types.FullBlock) bool) error + + IsSyncingWithPeer() bool } type Progression interface {