Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Oct 30, 2023
1 parent 96a5d55 commit e0df051
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 6 deletions.
6 changes: 6 additions & 0 deletions consensus/polybft/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ func (tp *syncerMock) Sync(func(*types.FullBlock) bool) error {
return args.Error(0)
}

func (tp *syncerMock) IsSyncingWithPeer() bool {
args := tp.Called()

return args.Bool(0)
}

func init() {
// setup custom hash header func
setupHeaderHashFunc()
Expand Down
14 changes: 13 additions & 1 deletion consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,12 +599,24 @@ func (p *Polybft) startConsensusProtocol() {
if ev.Source == "syncer" && ev.NewChain[0].Number >= p.blockchain.CurrentHeader().Number {
p.logger.Info("sync block notification received", "block height", ev.NewChain[0].Number,
"current height", p.blockchain.CurrentHeader().Number)
syncerBlockCh <- struct{}{}

select {
case syncerBlockCh <- struct{}{}:
default:
}
}
}
}
}()

// wait until he stops syncing
p.logger.Info("waiting to stop syncing so that we can try to join consensus if node is a validator")

for p.syncer.IsSyncingWithPeer() {
}

p.logger.Info("node synced up on start. Trying to join consensus if validator")

var (
sequenceCh <-chan struct{}
stopSequence func()
Expand Down
34 changes: 29 additions & 5 deletions e2e-polybft/e2e/consensus_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"bytes"
"fmt"
"math/big"
"path"
Expand Down Expand Up @@ -53,22 +54,40 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) {
})

t.Run("sync protocol, drop single validator node", func(t *testing.T) {
validatorSrv := cluster.Servers[0]
validatorAcc, err := sidechain.GetAccountFromDir(validatorSrv.DataDir())
require.NoError(t, err)

// query the current block number, as it is a starting point for the test
currentBlockNum, err := cluster.Servers[0].JSONRPC().Eth().BlockNumber()
currentBlockNum, err := validatorSrv.JSONRPC().Eth().BlockNumber()
require.NoError(t, err)

// wait for 2 epochs to elapse, before we stop the node
require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute))

// stop one node
node := cluster.Servers[0]
node.Stop()
validatorSrv.Stop()

// check what is the current block on the running nodes
currentBlockNum, err = cluster.Servers[1].JSONRPC().Eth().BlockNumber()
require.NoError(t, err)

// wait for 2 epochs to elapse, so that rest of the network progresses
require.NoError(t, cluster.WaitForBlock(currentBlockNum+2*epochSize, 2*time.Minute))

// start the node again
node.Start()
validatorSrv.Start()

// wait 2 more epochs to elapse and make sure that stopped node managed to catch up
require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute))

// wait until the validator mines one block to check if he is back in consensus
require.NoError(t, cluster.WaitUntil(3*time.Minute, 2*time.Second, func() bool {
latestBlock, err := cluster.Servers[0].JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false)
require.NoError(t, err)

return bytes.Equal(validatorAcc.Address().Bytes(), latestBlock.Miner.Bytes())
}))
})

t.Run("sync protocol, drop single non-validator node", func(t *testing.T) {
Expand All @@ -87,7 +106,12 @@ func TestE2E_Consensus_Basic_WithNonValidators(t *testing.T) {
node.Start()

// wait 2 more epochs to elapse and make sure that stopped node managed to catch up
require.NoError(t, cluster.WaitForBlock(currentBlockNum+4*epochSize, 2*time.Minute))
blockToWait := currentBlockNum + 4*epochSize
require.NoError(t, cluster.WaitForBlock(blockToWait, 2*time.Minute))

latestBlockOnDroppedNode, err := node.JSONRPC().Eth().GetBlockByNumber(ethgo.Latest, false)
require.NoError(t, err)
require.GreaterOrEqual(t, latestBlockOnDroppedNode.Number, blockToWait)
})
}

Expand Down
16 changes: 16 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncer
import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/0xPolygon/polygon-edge/helper/progress"
Expand Down Expand Up @@ -38,6 +39,8 @@ type syncer struct {

// Channel to notify Sync that a new status arrived
newStatusCh chan struct{}

isSyncing atomic.Bool
}

func NewSyncer(
Expand Down Expand Up @@ -219,6 +222,9 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
return 0, false, err
}

s.setIsSyncing(true)
defer s.setIsSyncing(false)

// Create a blockchain subscription for the sync progression and start tracking
subscription := s.blockchain.SubscribeEvents()
s.syncProgression.StartProgression(localLatest+1, subscription)
Expand Down Expand Up @@ -272,6 +278,16 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
}
}

// setIsSyncing updates the isSyncing field
func (s *syncer) setIsSyncing(isSyncing bool) {
s.isSyncing.Store(isSyncing)
}

// IsSyncingWithPeer indicates if node is syncing with peer
func (s *syncer) IsSyncingWithPeer() bool {
return s.isSyncing.Load()
}

func updateMetrics(fullBlock *types.FullBlock) {
metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions)))
metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts)))
Expand Down
2 changes: 2 additions & 0 deletions syncer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Syncer interface {
HasSyncPeer() bool
// Sync starts routine to sync blocks
Sync(func(*types.FullBlock) bool) error

IsSyncingWithPeer() bool
}

type Progression interface {
Expand Down

0 comments on commit e0df051

Please sign in to comment.