Skip to content

Commit

Permalink
Update DS Format (#3608)
Browse files Browse the repository at this point in the history
* protobuf datastream
  • Loading branch information
ToniRamirezM authored Apr 30, 2024
1 parent 8d5cf96 commit 3ce6ff8
Show file tree
Hide file tree
Showing 16 changed files with 1,541 additions and 820 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ generate-code-from-proto: ## Generates code from proto files
cd proto/src/proto/hashdb/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../merkletree/hashdb --go-grpc_out=../../../../../merkletree/hashdb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative hashdb.proto
cd proto/src/proto/executor/v1 && protoc --proto_path=. --go_out=../../../../../state/runtime/executor --go-grpc_out=../../../../../state/runtime/executor --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative executor.proto
cd proto/src/proto/aggregator/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../aggregator/prover --go-grpc_out=../../../../../aggregator/prover --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative aggregator.proto
cd proto/src/proto/datastream/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../state/datastream --go-grpc_out=../../../../../state/datastream --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative datastream.proto

## Help display.
## Pulls comments from beside commands and prints a nicely formatted
Expand Down
64 changes: 64 additions & 0 deletions proto/src/proto/datastream/v1/datastream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
syntax = "proto3";

package datastream.v1;

option go_package = "github.com/0xPolygonHermez/zkevm-node/state/datastream";

message Batch {
uint64 number = 1;
bytes local_exit_root = 2;
bytes state_root = 3;
uint64 fork_id = 4;
uint64 chain_id = 5;
}

message L2Block {
uint64 number = 1;
uint64 batch_number = 2;
uint64 timestamp = 3;
uint32 delta_timestamp = 4;
uint64 min_timestamp = 5;
bytes l1_blockhash = 6;
uint32 l1_infotree_index = 7;
bytes hash = 8;
bytes state_root = 9;
bytes global_exit_root = 10;
bytes coinbase = 11;
}

message Transaction {
uint64 l2block_number = 1;
bool is_valid = 2;
bytes encoded = 3;
uint32 effective_gas_price_percentage = 4;
bytes im_state_root = 5;
}

message UpdateGER {
uint64 batch_number = 1;
uint64 timestamp = 2;
bytes global_exit_root = 3;
bytes coinbase = 4;
uint64 fork_id = 5;
uint64 chain_id = 6;
bytes state_root = 7;
}

message BookMark {
BookmarkType type = 1;
uint64 value = 2;
}

enum BookmarkType {
BOOKMARK_TYPE_UNSPECIFIED = 0;
BOOKMARK_TYPE_BATCH = 1;
BOOKMARK_TYPE_L2_BLOCK = 2;
}

enum EntryType {
ENTRY_TYPE_UNSPECIFIED = 0;
ENTRY_TYPE_BATCH = 1;
ENTRY_TYPE_L2_BLOCK = 2;
ENTRY_TYPE_TRANSACTION = 3;
ENTRY_TYPE_UPDATE_GER = 4;
}
6 changes: 6 additions & 0 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Batch struct {
finalRemainingResources state.BatchResources // remaining batch resources when a L2 block is processed
finalHighReservedZKCounters state.ZKCounters
closingReason state.ClosingReason
finalLocalExitRoot common.Hash
}

func (b *Batch) isEmpty() bool {
Expand Down Expand Up @@ -99,6 +100,7 @@ func (f *finalizer) setWIPBatch(ctx context.Context, wipStateBatch *state.Batch)
finalRemainingResources: remainingResources,
imHighReservedZKCounters: wipStateBatch.HighReservedZKCounters,
finalHighReservedZKCounters: wipStateBatch.HighReservedZKCounters,
finalLocalExitRoot: wipStateBatch.LocalExitRoot,
}

return wipBatch, nil
Expand Down Expand Up @@ -312,6 +314,7 @@ func (f *finalizer) openNewWIPBatch(batchNumber uint64, stateRoot common.Hash) *
imRemainingResources: maxRemainingResources,
finalRemainingResources: maxRemainingResources,
closingReason: state.EmptyClosingReason,
finalLocalExitRoot: state.ZeroHash,
}
}

Expand Down Expand Up @@ -400,6 +403,9 @@ func (f *finalizer) closeSIPBatch(ctx context.Context, dbTx pgx.Tx) error {
}()
}

// Sent batch to DS
f.DSSendBatch(f.wipBatch.batchNumber, f.wipBatch.finalStateRoot, f.wipBatch.finalLocalExitRoot)

log.Infof("sip batch %d closed in statedb, closing reason: %s", f.sipBatch.batchNumber, f.sipBatch.closingReason)

f.sipBatch = nil
Expand Down
27 changes: 22 additions & 5 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@ package sequencer
import (
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ethereum/go-ethereum/common"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32, minTimestamp uint64) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
if f.streamServer != nil {
l2Block := state.DSL2Block{
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
Timestamp: blockResponse.Timestamp,
Min_timestamp: minTimestamp,
L1InfoTreeIndex: l1InfoTreeIndex,
L1BlockHash: blockResponse.BlockHashL1,
GlobalExitRoot: blockResponse.GlobalExitRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
ForkID: forkID,
BlockHash: blockResponse.BlockHash,
StateRoot: blockResponse.BlockHash, //From etrog, the blockhash is the block root
}
Expand Down Expand Up @@ -57,9 +60,23 @@ func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
// Send batch bookmark to the streamer
f.dataToStream <- state.DSBookMark{
Type: state.BookMarkTypeBatch,
f.dataToStream <- datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: batchNumber,
}
}
}

func (f *finalizer) DSSendBatch(batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

if f.streamServer != nil {
// Send batch to the streamer
f.dataToStream <- datastream.Batch{
Number: batchNumber,
ForkId: forkID,
StateRoot: stateRoot.Bytes(),
LocalExitRoot: localExitRoot.Bytes(),
}
}
}
1 change: 1 addition & 0 deletions sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ func TestFinalizer_finalizeSIPBatch(t *testing.T) {

// arrange
stateMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
stateMock.On("GetForkIDByBatchNumber", mock.Anything).Return(uint64(state.FORKID_BLUEBERRY))
stateMock.On("CloseWIPBatch", ctx, receipt, mock.Anything).Return(tc.managerErr).Once()

if tc.managerErr == nil {
Expand Down
2 changes: 1 addition & 1 deletion sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
}

// Send L2 block to data streamer
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0, forcedL2BlockResponse.Timestamp)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
5 changes: 3 additions & 2 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
return fmt.Errorf(overflowLog)
}

// Update finalStateRoot of the batch to the newStateRoot for the L2 block
// Update finalStateRoot/finalLocalExitRoot of the batch to the newStateRoot/newLocalExitRoot for the L2 block
l2Block.batch.finalStateRoot = l2Block.batchResponse.NewStateRoot
l2Block.batch.finalLocalExitRoot = l2Block.batchResponse.NewLocalExitRoot

f.updateFlushIDs(batchResponse.FlushID, batchResponse.StoredFlushID)

Expand Down Expand Up @@ -491,7 +492,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum)

// Send L2 block to data streamer
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex(), l2Block.timestamp)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
Expand Down
Loading

0 comments on commit 3ce6ff8

Please sign in to comment.