Skip to content

Commit

Permalink
Closing Signals Manager first implementation (#1550)
Browse files Browse the repository at this point in the history
* Closing Signals Manager first implementation

* Closing Signals Manager first implementation

* Closing Signals Manager first implementation

* refactor
  • Loading branch information
ToniRamirezM authored Jan 17, 2023
1 parent 174557b commit a99fb62
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 85 deletions.
2 changes: 1 addition & 1 deletion db/migrations/state/0003.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ ALTER TABLE state.batch
DROP COLUMN IF EXISTS forced_batch_num;

ALTER TABLE state.forced_batch
ADD COLUMN batch_num BIGINT;
ADD COLUMN batch_num BIGINT;
85 changes: 44 additions & 41 deletions sequencer/closingsignalsmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,64 @@ import (
"time"

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/ethereum/go-ethereum/common"
)

// TBD. Considerations:
// - Should wait for a block to be finalized: https://www.alchemy.com/overviews/ethereum-commitment-levels https://ethereum.github.io/beacon-APIs/#/Beacon/getStateFinalityCheckpoints

type closingSignalsManager struct {
finalizer *finalizer
timestamp time.Time
ctx context.Context
dbManager dbManagerInterface
closingSignalCh ClosingSignalCh
cfg FinalizerCfg
}

func newClosingSignalsManager(finalizer *finalizer) *closingSignalsManager {
return &closingSignalsManager{finalizer: finalizer}
func newClosingSignalsManager(ctx context.Context, dbManager dbManagerInterface, closingSignalCh ClosingSignalCh, cfg FinalizerCfg) *closingSignalsManager {
return &closingSignalsManager{ctx: ctx, dbManager: dbManager, closingSignalCh: closingSignalCh, cfg: cfg}
}

func (c *closingSignalsManager) Start() {

for {

// Check L2 Reorg
// ==============
// Whats the condition to detect a L2 Reorg?

// Check GER Update
// Get latest GER from stateDB
// If latest GER != previousGER -> send new Ger using channel

// Check Forced Batches

// Read new forces batches from stateDB
// Send them using channel
// Mark them as sended

// Check Sending to L1 Timeout
// How do we know when we have sent to L1 to reset the counter and don't do a timeout?

}
go c.checkForcedBatches()
go c.checkGERUpdate()
}

func (c *closingSignalsManager) checkGERUpdate() {
}
var lastGERSent common.Hash

func (c *closingSignalsManager) checkForcedBatches(ctx context.Context) {
backupTimestamp := time.Now()
forcedBatches, err := c.finalizer.dbManager.GetForcedBatchesSince(ctx, c.timestamp, nil)
if err != nil {
log.Errorf("error checking forced batches: %v", err)
return
for {
time.Sleep(c.cfg.ClosingSignalsManagerWaitForL1OperationsInSec.Duration * time.Second)

lastL2BlockHeader, err := c.dbManager.GetLastL2BlockHeader(c.ctx, nil)
if err != nil {
log.Errorf("error getting last L2 block: %v", err)
continue
}

ger, _, err := c.dbManager.GetLatestGer(c.ctx, lastL2BlockHeader.Number.Uint64(), c.cfg.GERFinalityNumberOfBlocks)
if err != nil {
log.Errorf("error checking GER update: %v", err)
continue
}

if ger.GlobalExitRoot != lastGERSent {
c.closingSignalCh.GERCh <- ger.GlobalExitRoot
lastGERSent = ger.GlobalExitRoot
}
}
}

for _, forcedBatch := range forcedBatches {
c.finalizer.closingSignalCh.ForcedBatchCh <- *forcedBatch
}
func (c *closingSignalsManager) checkForcedBatches() {
for {
time.Sleep(c.cfg.ClosingSignalsManagerWaitForL1OperationsInSec.Duration * time.Second)

c.timestamp = backupTimestamp
}
latestSentForcedBatchNumber, err := c.dbManager.GetLastTrustedForcedBatchNumber(c.ctx, nil)

func (c *closingSignalsManager) checkSendToL1Timeout() {
forcedBatches, err := c.dbManager.GetForcedBatchesSince(c.ctx, latestSentForcedBatchNumber, nil)
if err != nil {
log.Errorf("error checking forced batches: %v", err)
continue
}

for _, forcedBatch := range forcedBatches {
c.closingSignalCh.ForcedBatchCh <- *forcedBatch
}
}
}
21 changes: 18 additions & 3 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,26 @@ type Config struct {

// FinalizerCfg contains the finalizer's configuration properties
type FinalizerCfg struct {
GERDeadlineTimeoutInSec types.Duration `mapstructure:"GERDeadlineTimeoutInSec"`
// GERDeadlineTimeoutInSec is the time the finalizer waits after receiving closing signal to update Global Exit Root
GERDeadlineTimeoutInSec types.Duration `mapstructure:"GERDeadlineTimeoutInSec"`

// SendingToL1DeadlineTimeoutInSec is the time the finalizer waits after receiving closing signal to process Forced Batches
ForcedBatchDeadlineTimeoutInSec types.Duration `mapstructure:"ForcedBatchDeadlineTimeoutInSec"`

// SendingToL1DeadlineTimeoutInSec is the time the finalizer waits after receiving closing signal to sends a batch to L1
SendingToL1DeadlineTimeoutInSec types.Duration `mapstructure:"SendingToL1DeadlineTimeoutInSec"`
SleepDurationInMs types.Duration `mapstructure:"SleepDurationInMs"`
ResourcePercentageToCloseBatch uint32 `mapstructure:"ResourcePercentageToCloseBatch"`

// SleepDurationInMs is the time the finalizer sleeps between each iteration, if there are no transactions to be processed
SleepDurationInMs types.Duration `mapstructure:"SleepDurationInMs"`

// ResourcePercentageToCloseBatch is the percentage window of the resource left out for the batch to be closed
ResourcePercentageToCloseBatch uint32 `mapstructure:"ResourcePercentageToCloseBatch"`

// GERFinalityNumberOfBlocks is number of blocks to consider GER final
GERFinalityNumberOfBlocks uint64 `mapstructure:"GERFinalityNumberOfBlocks"`

// ClosingSignalsManagerWaitForL1OperationsInSec is used by the closing signals manager to wait for its operation
ClosingSignalsManagerWaitForL1OperationsInSec types.Duration `mapstructure:"ClosingSignalsManagerWaitForL1OperationsInSec"`
}

// MaxSequenceSize is a wrapper type that parses token amount to big int
Expand Down
26 changes: 20 additions & 6 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/pool/pgpoolstorage"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -200,7 +201,7 @@ func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
}

// Check if the Tx is still valid in the state to detect reorgs
latestL2Block, err := d.state.GetLastL2Block(d.ctx, dbTx)
latestL2BlockHeader, err := d.state.GetLastL2BlockHeader(d.ctx, dbTx)
if err != nil {
err = dbTx.Rollback(d.ctx)
if err != nil {
Expand All @@ -209,8 +210,8 @@ func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
d.txsStore.Wg.Done()
continue
}
if latestL2Block.Root() != txToStore.previousL2BlockStateRoot {
log.Info("L2 reorg detected. Old state root: %v New state root: %v", latestL2Block.Root(), txToStore.previousL2BlockStateRoot)
if latestL2BlockHeader.Root != txToStore.previousL2BlockStateRoot {
log.Info("L2 reorg detected. Old state root: %v New state root: %v", latestL2BlockHeader.Root, txToStore.previousL2BlockStateRoot)
d.l2ReorgCh <- L2ReorgEvent{}
d.txsStore.Wg.Done()
continue
Expand Down Expand Up @@ -379,7 +380,11 @@ func (d *dbManager) GetLastNBatches(ctx context.Context, numBatches uint) ([]*st
}

// GetLatestGer gets the latest global exit root
func (d *dbManager) GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error) {
func (d *dbManager) GetLatestGer(ctx context.Context, blockNumber uint64, gerFinalityNumberOfBlocks uint64) (state.GlobalExitRoot, time.Time, error) {
maxBlockNumber := uint64(0)
if gerFinalityNumberOfBlocks <= blockNumber {
maxBlockNumber = blockNumber - gerFinalityNumberOfBlocks
}
ger, receivedAt, err := d.state.GetLatestGlobalExitRoot(ctx, maxBlockNumber, nil)
if err != nil && errors.Is(err, state.ErrNotFound) {
return state.GlobalExitRoot{}, time.Time{}, nil
Expand Down Expand Up @@ -524,6 +529,15 @@ func (d *dbManager) ProcessForcedBatch(forcedBatchNum uint64, request state.Proc
}

// GetForcedBatchesSince gets L1 forced batches since timestamp
func (d *dbManager) GetForcedBatchesSince(ctx context.Context, since time.Time, dbTx pgx.Tx) ([]*state.ForcedBatch, error) {
return d.state.GetForcedBatchesSince(ctx, since, dbTx)
func (d *dbManager) GetForcedBatchesSince(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error) {
return d.state.GetForcedBatchesSince(ctx, forcedBatchNumber, dbTx)
}

// GetLastL2BlockHeader gets the last l2 block number
func (d *dbManager) GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error) {
return d.state.GetLastL2BlockHeader(ctx, dbTx)
}

func (d *dbManager) GetLastTrustedForcedBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
return d.state.GetLastTrustedForcedBatchNumber(ctx, dbTx)
}
13 changes: 8 additions & 5 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type stateInterface interface {
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
UpdateBatchL2Data(ctx context.Context, batchNumber uint64, batchL2Data []byte, dbTx pgx.Tx) error
ProcessSequencerBatch(ctx context.Context, batchNumber uint64, batchL2Data []byte, caller state.CallerLabel, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)
GetForcedBatchesSince(ctx context.Context, since time.Time, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetForcedBatchesSince(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetLastTrustedForcedBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error)
}

type workerInterface interface {
Expand Down Expand Up @@ -98,9 +99,11 @@ type dbManagerInterface interface {
GetLastClosedBatch(ctx context.Context) (*state.Batch, error)
IsBatchClosed(ctx context.Context, batchNum uint64) (bool, error)
MarkReorgedTxsAsPending(ctx context.Context)
GetLatestGer(ctx context.Context, waitBlocksToConsiderGERFinal uint64) (state.GlobalExitRoot, time.Time, error)
GetLatestGer(ctx context.Context, blockNumber uint64, gerFinalityNumberOfBlocks uint64) (state.GlobalExitRoot, time.Time, error)
ProcessForcedBatch(forcedBatchNum uint64, request state.ProcessRequest) (*state.ProcessBatchResponse, error)
GetForcedBatchesSince(ctx context.Context, since time.Time, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetForcedBatchesSince(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
GetLastTrustedForcedBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error)
}

type dbManagerStateInterface interface {
Expand All @@ -114,13 +117,13 @@ type dbManagerStateInterface interface {
GetLastClosedBatch(ctx context.Context, dbTx pgx.Tx) (*state.Batch, error)
GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error)
GetLastBatch(ctx context.Context, dbTx pgx.Tx) (*state.Batch, error)
GetLastL2Block(ctx context.Context, dbTx pgx.Tx) (*types.Block, error)
GetLatestGlobalExitRoot(ctx context.Context, maxBlockNumber uint64, dbTx pgx.Tx) (state.GlobalExitRoot, time.Time, error)
GetLastL2BlockHeader(ctx context.Context, dbTx pgx.Tx) (*types.Header, error)
ExecuteBatch(ctx context.Context, batchNumber uint64, batchL2Data []byte, dbTx pgx.Tx) (*pb.ProcessBatchResponse, error)
GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error)
UpdateBatchL2Data(ctx context.Context, batchNumber uint64, batchL2Data []byte, dbTx pgx.Tx) error
GetForcedBatch(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) (*state.ForcedBatch, error)
ProcessSequencerBatch(ctx context.Context, batchNumber uint64, batchL2Data []byte, caller state.CallerLabel, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)
GetForcedBatchesSince(ctx context.Context, since time.Time, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetForcedBatchesSince(ctx context.Context, forcedBatchNumber uint64, dbTx pgx.Tx) ([]*state.ForcedBatch, error)
GetLastTrustedForcedBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error)
}
78 changes: 62 additions & 16 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a99fb62

Please sign in to comment.