Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute wip batch during stream delta calculation #2861

Merged
merged 2 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type stateInterface interface {
GetStoredFlushID(ctx context.Context) (uint64, string, error)
GetForkIDByBatchNumber(batchNumber uint64) uint64
GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error)
}
Expand Down
18 changes: 9 additions & 9 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Sequencer) Start(ctx context.Context) {
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) {
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state)
err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true)
if err != nil {
log.Fatalf("failed to generate data streamer file, err: %v", err)
}
Expand Down
10 changes: 7 additions & 3 deletions state/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ func (g DSUpdateGER) Decode(data []byte) DSUpdateGER {
// DSState gathers the methods required to interact with the data stream state.
type DSState interface {
GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*DSL2Block, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error)
GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSL2Block, error)
GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error)
}

// GenerateDataStreamerFile generates or resumes a data stream file
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState) error {
func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool) error {
header := streamServer.GetHeader()

var currentBatchNumber uint64 = 0
Expand Down Expand Up @@ -314,7 +314,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
log.Debugf("Current entry number: %d", entry)
log.Debugf("Current batch number: %d", currentBatchNumber)
// Get Next Batch
batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, nil)
batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, readWIPBatch, nil)
if err != nil {
if err == ErrStateNotSynchronized {
break
Expand Down Expand Up @@ -352,6 +352,10 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St
for _, batch := range fullBatches {
if len(batch.L2Blocks) == 0 {
// Empty batch
// Is WIP Batch?
if batch.StateRoot == (common.Hash{}) {
continue
}
// Check if there is a GER update
if batch.GlobalExitRoot != currentGER && batch.GlobalExitRoot != (common.Hash{}) {
updateGer := DSUpdateGER{
Expand Down
10 changes: 7 additions & 3 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2842,11 +2842,15 @@ func scanDSL2Transaction(row pgx.Row) (*DSL2Transaction, error) {
}

// GetDSBatches returns the DS batches
func (p *PostgresStorage) GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSBatch, error) {
const getBatchByNumberSQL = `
func (p *PostgresStorage) GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error) {
var getBatchByNumberSQL = `
SELECT b.batch_num, b.global_exit_root, b.local_exit_root, b.acc_input_hash, b.state_root, b.timestamp, b.coinbase, b.raw_txs_data, b.forced_batch_num, f.fork_id
FROM state.batch b, state.fork_id f
WHERE b.state_root is not null AND b.batch_num >= $1 AND b.batch_num <= $2 AND batch_num between f.from_batch_num AND f.to_batch_num`
WHERE b.batch_num >= $1 AND b.batch_num <= $2 AND batch_num between f.from_batch_num AND f.to_batch_num`

if !readWIPBatch {
getBatchByNumberSQL += " AND b.state_root is not null"
}

e := p.getExecQuerier(dbTx)
rows, err := e.Query(ctx, getBatchByNumberSQL, firstBatchNumber, lastBatchNumber)
Expand Down
2 changes: 1 addition & 1 deletion tools/datastreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func generate(cliCtx *cli.Context) error {
stateDB := state.NewPostgresStorage(state.Config{}, stateSqlDB)
log.Debug("Connected to the database")

err = state.GenerateDataStreamerFile(cliCtx.Context, streamServer, stateDB)
err = state.GenerateDataStreamerFile(cliCtx.Context, streamServer, stateDB, false)
if err != nil {
fmt.Printf("Error: %v\n", err)
os.Exit(1)
Expand Down