Skip to content

Commit

Permalink
Cherry-pick 0xPolygonHermez#3659,0xPolygonHermez#3662: Remove sync wi…
Browse files Browse the repository at this point in the history
…th virtual state (synchronizer). Add L1 block confirmations (0xPolygonHermez#3666)

* Remove sync with virtual state (synchronizer). Add L1 block confirmat… (0xPolygonHermez#3659)

* Remove sync with virtual state (synchronizer). Add L1 block confirmations to consider sequence final

* fix get monitored tx receipt

* update doc

* Skip wait L1 block confirmations after restart (0xPolygonHermez#3662)

* skip wait L1 block confirmations after restart

* skip checking last batch sequenced in SC after restart

* set default value of SequenceL1BlockConfirmations to 32

* set default value of SequenceL1BlockConfirmations to 2 for debug/test

* fix config_test

* fix doc
  • Loading branch information
agnusmor authored and Stefan-Ethernal committed Jun 26, 2024
1 parent 6ca582d commit 948ea01
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 31 deletions.
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func Test_Defaults(t *testing.T) {
path: "SequenceSender.MaxBatchesForL1",
expectedValue: uint64(300),
},
{
path: "SequenceSender.SequenceL1BlockConfirmations",
expectedValue: uint64(32),
},
{
path: "Etherman.URL",
expectedValue: "http://localhost:8545",
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
L1BlockTimestampMargin = "30s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 32
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}
GasOffset = 80000
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
L1BlockTimestampMargin = "30s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 32
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}

Expand Down
2 changes: 2 additions & 0 deletions sequencesender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ type Config struct {
GasOffset uint64 `mapstructure:"GasOffset"`
// MaxBatchesForL1 is the maximum amount of batches to be sequenced in a single L1 tx
MaxBatchesForL1 uint64 `mapstructure:"MaxBatchesForL1"`
// SequenceL1BlockConfirmations is number of blocks to consider a sequence sent to L1 as final
SequenceL1BlockConfirmations uint64 `mapstructure:"SequenceL1BlockConfirmations"`
}
105 changes: 80 additions & 25 deletions sequencesender/sequencesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
)

Expand All @@ -19,6 +20,7 @@ const (
monitoredIDFormat = "sequence-from-%v-to-%v"
retriesSanityCheck = 8
waitRetrySanityCheck = 15 * time.Second
waitRetryGetL1Block = 2 * time.Second
)

var (
Expand All @@ -34,12 +36,14 @@ var (

// SequenceSender represents a sequence sender
type SequenceSender struct {
cfg Config
state stateInterface
ethTxManager ethTxManager
etherman etherman
eventLog *event.EventLog
da dataAbilitier
cfg Config
da dataAbilitier
state stateInterface
ethTxManager ethTxManager
etherman etherman
eventLog *event.EventLog
lastSequenceInitialBatch uint64
lastSequenceEndBatch uint64
}

// New inits sequence sender
Expand All @@ -63,7 +67,7 @@ func (s *SequenceSender) Start(ctx context.Context) {

// marginTimeElapsed checks if the time between currentTime and l2BlockTimestamp is greater than timeMargin.
// If it's greater returns true, otherwise it returns false and the waitTime needed to achieve this timeMargin
func (s *SequenceSender) marginTimeElapsed(ctx context.Context, l2BlockTimestamp uint64, currentTime uint64, timeMargin int64) (bool, int64) {
func (s *SequenceSender) marginTimeElapsed(l2BlockTimestamp uint64, currentTime uint64, timeMargin int64) (bool, int64) {
// Check the time difference between L2 block and currentTime
var timeDiff int64
if l2BlockTimestamp >= currentTime {
Expand Down Expand Up @@ -91,7 +95,58 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
retry := false
// process monitored sequences before starting a next cycle
s.ethTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
if result.Status == ethtxmanager.MonitoredTxStatusFailed {
if result.Status == ethtxmanager.MonitoredTxStatusConfirmed {
if len(result.Txs) > 0 {
var txL1BlockNumber uint64
var txHash common.Hash
receiptFound := false
for _, tx := range result.Txs {
if tx.Receipt != nil {
txL1BlockNumber = tx.Receipt.BlockNumber.Uint64()
txHash = tx.Tx.Hash()
receiptFound = true
break
}
}

if !receiptFound {
s.halt(ctx, fmt.Errorf("monitored tx %s for sequence [%d-%d] is confirmed but doesn't have a receipt", result.ID, s.lastSequenceInitialBatch, s.lastSequenceEndBatch))
}

// wait L1 confirmation blocks
log.Infof("waiting %d L1 block confirmations for sequence [%d-%d], L1 block: %d, tx: %s",
s.cfg.SequenceL1BlockConfirmations, s.lastSequenceInitialBatch, s.lastSequenceEndBatch, txL1BlockNumber, txHash)
for {
lastL1BlockHeader, err := s.etherman.GetLatestBlockHeader(ctx)
if err != nil {
log.Errorf("failed to get last L1 block number, err: %v", err)
} else {
lastL1BlockNumber := lastL1BlockHeader.Number.Uint64()

if lastL1BlockNumber >= txL1BlockNumber+s.cfg.SequenceL1BlockConfirmations {
log.Infof("continuing, last L1 block: %d", lastL1BlockNumber)
break
}
}
time.Sleep(waitRetryGetL1Block)
}

lastSCBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
log.Warnf("failed to get from the SC last sequenced batch number, err: %v", err)
return
}

// If it's the first time we call that function after the restart of the sequence-sender (lastSequenceBatch is 0) and we are having the
// confirmation of a pending L1 tx sent before the sequence-sender was restarted, we don't know which batch was the last sequenced.
// Therefore we cannot compare the last sequenced batch in the SC with the last sequenced from sequence-sender. We skip this check
if s.lastSequenceEndBatch != 0 && (lastSCBatchNum != s.lastSequenceEndBatch) {
s.halt(ctx, fmt.Errorf("last sequenced batch from SC %d doesn't match last sequenced batch sent %d", lastSCBatchNum, s.lastSequenceEndBatch))
}
} else {
s.halt(ctx, fmt.Errorf("monitored tx %s for sequence [%d-%d] doesn't have transactions to be checked", result.ID, s.lastSequenceInitialBatch, s.lastSequenceEndBatch))
}
} else { // Monitored tx is failed
retry = true
mTxResultLogger := ethtxmanager.CreateMonitoredTxResultLogger(ethTxManagerOwner, result)
mTxResultLogger.Error("failed to send sequence, TODO: review this fatal and define what to do in this case")
Expand All @@ -102,13 +157,12 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
return
}

// Check if synchronizer is up to date
synced, err := s.isSynced(ctx, retriesSanityCheck, waitRetrySanityCheck)
sanityCheckOk, err := s.sanityCheck(ctx, retriesSanityCheck, waitRetrySanityCheck)
if err != nil {
s.halt(ctx, err)
}
if !synced {
log.Info("wait virtual state to be synced...")
if !sanityCheckOk {
log.Info("sanity check failed, retrying...")
time.Sleep(5 * time.Second) // nolint:gomnd
return
}
Expand All @@ -126,7 +180,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
return
}

lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
lastVirtualBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
log.Errorf("failed to get last virtual batch num, err: %v", err)
return
Expand All @@ -153,7 +207,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
return
}

elapsed, waitTime := s.marginTimeElapsed(ctx, lastL2BlockTimestamp, lastL1BlockHeader.Time, timeMargin)
elapsed, waitTime := s.marginTimeElapsed(lastL2BlockTimestamp, lastL1BlockHeader.Time, timeMargin)

if !elapsed {
log.Infof("waiting at least %d seconds to send sequences, time difference between last L1 block %d (ts: %d) and last L2 block %d (ts: %d) in the sequence is lower than %d seconds",
Expand All @@ -170,7 +224,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
for {
currentTime := uint64(time.Now().Unix())

elapsed, waitTime := s.marginTimeElapsed(ctx, lastL2BlockTimestamp, currentTime, timeMargin)
elapsed, waitTime := s.marginTimeElapsed(lastL2BlockTimestamp, currentTime, timeMargin)

// Wait if the time difference is less than timeMargin (L1BlockTimestampMargin)
if !elapsed {
Expand Down Expand Up @@ -205,13 +259,16 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
mTxLogger.Errorf("error to add sequences tx to eth tx manager: ", err)
return
}

s.lastSequenceInitialBatch = sequences[0].BatchNumber
s.lastSequenceEndBatch = lastSequence.BatchNumber
}

// getSequencesToSend generates an array of sequences to be send to L1.
// If the array is empty, it doesn't necessarily mean that there are no sequences to be sent,
// it could be that it's not worth it to do so yet.
func (s *SequenceSender) getSequencesToSend(ctx context.Context) ([]types.Sequence, error) {
lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
lastVirtualBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
return nil, fmt.Errorf("failed to get last virtual batch num, err: %v", err)
}
Expand Down Expand Up @@ -326,7 +383,7 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) ([]types.Sequen
return nil, nil
}

func (s *SequenceSender) isSynced(ctx context.Context, retries int, waitRetry time.Duration) (bool, error) {
func (s *SequenceSender) sanityCheck(ctx context.Context, retries int, waitRetry time.Duration) (bool, error) {
lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Warnf("failed to get last virtual batch number, err: %v", err)
Expand All @@ -345,10 +402,8 @@ func (s *SequenceSender) isSynced(ctx context.Context, retries int, waitRetry ti
return false, nil
}

if lastVirtualBatchNum < lastSCBatchNum {
log.Infof("waiting for the state to be synced, last virtual batch: %d, last SC sequenced batch: %d", lastVirtualBatchNum, lastSCBatchNum)
return false, nil
} else if lastVirtualBatchNum > lastSCBatchNum { // Sanity check: virtual batch number cannot be greater than last batch sequenced in the SC
// Sanity check: virtual batch number cannot be greater than last batch sequenced in the SC
if lastVirtualBatchNum > lastSCBatchNum {
// we will retry some times to check that really the last sequenced batch in the SC is lower that the las virtual batch
log.Warnf("last virtual batch %d is greater than last SC sequenced batch %d, retrying...", lastVirtualBatchNum, lastSCBatchNum)
for i := 0; i < retries; i++ {
Expand All @@ -368,13 +423,13 @@ func (s *SequenceSender) isSynced(ctx context.Context, retries int, waitRetry ti
log.Infof("last virtual batch %d is equal to last SC sequenced batch %d, continuing...", lastVirtualBatchNum, lastSCBatchNum)
}

// At this point lastVirtualBatchNum = lastEthBatchNum. Check trusted batches
if lastTrustedBatchClosed.BatchNumber >= lastVirtualBatchNum {
return true, nil
} else { // Sanity check: virtual batch number cannot be greater than last trusted batch closed
// Sanity check: virtual batch number cannot be greater than last trusted batch closed
if lastTrustedBatchClosed.BatchNumber < lastVirtualBatchNum {
log.Errorf("last virtual batch %d is greater than last trusted batch closed %d", lastVirtualBatchNum, lastTrustedBatchClosed.BatchNumber)
return false, ErrSyncVirtualGreaterTrusted
}

return true, nil
}

// halt halts the SequenceSender
Expand Down
12 changes: 6 additions & 6 deletions sequencesender/sequencesender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ func TestIsSynced(t *testing.T) {

testCases := []IsSyncedTestCase{
{
name: "is synced",
name: "sanity check ok",
lastVirtualBatchNum: 10,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10},
expectedResult: true,
err: nil,
},
{
name: "not synced",
name: "sanity check ok",
lastVirtualBatchNum: 9,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10},
expectedResult: false,
expectedResult: true,
err: nil,
},
{
Expand All @@ -67,15 +67,15 @@ func TestIsSynced(t *testing.T) {
err: ErrSyncVirtualGreaterSequenced,
},
{
name: "is synced, sc sequenced retries",
name: "sanity check ok: sc sequenced retries",
lastVirtualBatchNum: 11,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10, 10, 11},
expectedResult: true,
err: nil,
},
{
name: "is synced, sc sequenced retries (last)",
name: "sanity check ok: sc sequenced retries (last)",
lastVirtualBatchNum: 11,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10, 10, 10, 11},
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestIsSynced(t *testing.T) {
}
}

synced, err := ssender.isSynced(context.Background(), retries, waitRetry)
synced, err := ssender.sanityCheck(context.Background(), retries, waitRetry)

assert.EqualValues(t, tc.expectedResult, synced)
assert.EqualValues(t, tc.err, err)
Expand Down
1 change: 1 addition & 0 deletions test/config/debug.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ WaitPeriodSendSequence = "15s"
LastBatchVirtualizationTimeMaxWaitPeriod = "10s"
L1BlockTimestampMargin = "5s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 2
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "./test/sequencer.keystore", Password = "testonly"}

Expand Down
1 change: 1 addition & 0 deletions test/config/test.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ WaitPeriodSendSequence = "15s"
LastBatchVirtualizationTimeMaxWaitPeriod = "10s"
L1BlockTimestampMargin = "5s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 2
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}
[SequenceSender.StreamClient]
Expand Down

0 comments on commit 948ea01

Please sign in to comment.