diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 0810f59cac..087fbc7464 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -83,7 +83,7 @@ type stateInterface interface { GetStoredFlushID(ctx context.Context) (uint64, string, error) GetForkIDByBatchNumber(batchNumber uint64) uint64 GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*state.DSL2Block, error) - GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSBatch, error) + GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error) GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSL2Block, error) GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*state.DSL2Transaction, error) } diff --git a/sequencer/mock_state.go b/sequencer/mock_state.go index e8db2db471..7d8717866e 100644 --- a/sequencer/mock_state.go +++ b/sequencer/mock_state.go @@ -210,25 +210,25 @@ func (_m *StateMock) GetBatchByNumber(ctx context.Context, batchNumber uint64, d return r0, r1 } -// GetDSBatches provides a mock function with given fields: ctx, firstBatchNumber, lastBatchNumber, dbTx -func (_m *StateMock) GetDSBatches(ctx context.Context, firstBatchNumber uint64, lastBatchNumber uint64, dbTx pgx.Tx) ([]*state.DSBatch, error) { - ret := _m.Called(ctx, firstBatchNumber, lastBatchNumber, dbTx) +// GetDSBatches provides a mock function with given fields: ctx, firstBatchNumber, lastBatchNumber, readWIPBatch, dbTx +func (_m *StateMock) GetDSBatches(ctx context.Context, firstBatchNumber uint64, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*state.DSBatch, error) { + ret := _m.Called(ctx, firstBatchNumber, lastBatchNumber, readWIPBatch, dbTx) var r0 []*state.DSBatch var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) ([]*state.DSBatch, error)); ok { - return rf(ctx, firstBatchNumber, lastBatchNumber, dbTx) + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, bool, pgx.Tx) ([]*state.DSBatch, error)); ok { + return rf(ctx, firstBatchNumber, lastBatchNumber, readWIPBatch, dbTx) } - if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, pgx.Tx) []*state.DSBatch); ok { - r0 = rf(ctx, firstBatchNumber, lastBatchNumber, dbTx) + if rf, ok := ret.Get(0).(func(context.Context, uint64, uint64, bool, pgx.Tx) []*state.DSBatch); ok { + r0 = rf(ctx, firstBatchNumber, lastBatchNumber, readWIPBatch, dbTx) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]*state.DSBatch) } } - if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, pgx.Tx) error); ok { - r1 = rf(ctx, firstBatchNumber, lastBatchNumber, dbTx) + if rf, ok := ret.Get(1).(func(context.Context, uint64, uint64, bool, pgx.Tx) error); ok { + r1 = rf(ctx, firstBatchNumber, lastBatchNumber, readWIPBatch, dbTx) } else { r1 = ret.Error(1) } diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index a60a7bf8b4..f572d9818b 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -140,7 +140,7 @@ func (s *Sequencer) Start(ctx context.Context) { } func (s *Sequencer) updateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer) { - err := state.GenerateDataStreamerFile(ctx, streamServer, s.state) + err := state.GenerateDataStreamerFile(ctx, streamServer, s.state, true) if err != nil { log.Fatalf("failed to generate data streamer file, err: %v", err) } diff --git a/state/datastream.go b/state/datastream.go index cf0f2d84db..e71251d2ee 100644 --- a/state/datastream.go +++ b/state/datastream.go @@ -199,13 +199,13 @@ func (g DSUpdateGER) Decode(data []byte) DSUpdateGER { // DSState gathers the methods required to interact with the data stream state. type DSState interface { GetDSGenesisBlock(ctx context.Context, dbTx pgx.Tx) (*DSL2Block, error) - GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSBatch, error) + GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error) GetDSL2Blocks(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSL2Block, error) GetDSL2Transactions(ctx context.Context, firstL2Block, lastL2Block uint64, dbTx pgx.Tx) ([]*DSL2Transaction, error) } // GenerateDataStreamerFile generates or resumes a data stream file -func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState) error { +func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.StreamServer, stateDB DSState, readWIPBatch bool) error { header := streamServer.GetHeader() var currentBatchNumber uint64 = 0 @@ -314,7 +314,7 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St log.Debugf("Current entry number: %d", entry) log.Debugf("Current batch number: %d", currentBatchNumber) // Get Next Batch - batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, nil) + batches, err := stateDB.GetDSBatches(ctx, currentBatchNumber, currentBatchNumber+limit, readWIPBatch, nil) if err != nil { if err == ErrStateNotSynchronized { break @@ -352,6 +352,10 @@ func GenerateDataStreamerFile(ctx context.Context, streamServer *datastreamer.St for _, batch := range fullBatches { if len(batch.L2Blocks) == 0 { // Empty batch + // Is WIP Batch? + if batch.StateRoot == (common.Hash{}) { + continue + } // Check if there is a GER update if batch.GlobalExitRoot != currentGER && batch.GlobalExitRoot != (common.Hash{}) { updateGer := DSUpdateGER{ diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 5547bccf3d..177d00ecdb 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -2842,11 +2842,15 @@ func scanDSL2Transaction(row pgx.Row) (*DSL2Transaction, error) { } // GetDSBatches returns the DS batches -func (p *PostgresStorage) GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, dbTx pgx.Tx) ([]*DSBatch, error) { - const getBatchByNumberSQL = ` +func (p *PostgresStorage) GetDSBatches(ctx context.Context, firstBatchNumber, lastBatchNumber uint64, readWIPBatch bool, dbTx pgx.Tx) ([]*DSBatch, error) { + var getBatchByNumberSQL = ` SELECT b.batch_num, b.global_exit_root, b.local_exit_root, b.acc_input_hash, b.state_root, b.timestamp, b.coinbase, b.raw_txs_data, b.forced_batch_num, f.fork_id FROM state.batch b, state.fork_id f - WHERE b.state_root is not null AND b.batch_num >= $1 AND b.batch_num <= $2 AND batch_num between f.from_batch_num AND f.to_batch_num` + WHERE b.batch_num >= $1 AND b.batch_num <= $2 AND batch_num between f.from_batch_num AND f.to_batch_num` + + if !readWIPBatch { + getBatchByNumberSQL += " AND b.state_root is not null" + } e := p.getExecQuerier(dbTx) rows, err := e.Query(ctx, getBatchByNumberSQL, firstBatchNumber, lastBatchNumber) diff --git a/tools/datastreamer/main.go b/tools/datastreamer/main.go index c0ba0e1923..80e692c9d2 100644 --- a/tools/datastreamer/main.go +++ b/tools/datastreamer/main.go @@ -193,7 +193,7 @@ func generate(cliCtx *cli.Context) error { stateDB := state.NewPostgresStorage(state.Config{}, stateSqlDB) log.Debug("Connected to the database") - err = state.GenerateDataStreamerFile(cliCtx.Context, streamServer, stateDB) + err = state.GenerateDataStreamerFile(cliCtx.Context, streamServer, stateDB, false) if err != nil { fmt.Printf("Error: %v\n", err) os.Exit(1)