Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DS Format #3608

Merged
merged 36 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f16430c
protobuf datastream
ToniRamirezM Apr 24, 2024
30a6ccb
merge base branch
ToniRamirezM Apr 24, 2024
9192dcf
wip
ToniRamirezM Apr 24, 2024
433c025
wip
ToniRamirezM Apr 24, 2024
d6863a8
wip
ToniRamirezM Apr 24, 2024
2b2b696
wip
ToniRamirezM Apr 24, 2024
4c71182
wip
ToniRamirezM Apr 25, 2024
c31adb2
wip
ToniRamirezM Apr 25, 2024
41d0981
wip
ToniRamirezM Apr 25, 2024
62d33d5
wip
ToniRamirezM Apr 25, 2024
9a2a6da
wip
ToniRamirezM Apr 25, 2024
1dc9e7e
min_timestamp
ToniRamirezM Apr 25, 2024
f6ed696
fix sql
ToniRamirezM Apr 25, 2024
55b9201
fix sql
ToniRamirezM Apr 25, 2024
b634090
fix sql
ToniRamirezM Apr 25, 2024
8f97d3a
update sequencer
ToniRamirezM Apr 25, 2024
7b623ba
merge base branch
ToniRamirezM Apr 25, 2024
da1bfe9
dump batch tool
ToniRamirezM Apr 26, 2024
f1757ef
dump batch tool
ToniRamirezM Apr 26, 2024
0295096
refactor
ToniRamirezM Apr 26, 2024
120f723
dump batch tool
ToniRamirezM Apr 26, 2024
2636052
dump batch tool
ToniRamirezM Apr 26, 2024
cd30a59
dump batch tool
ToniRamirezM Apr 26, 2024
bcfdc5b
dump batch tool
ToniRamirezM Apr 26, 2024
df55b71
fix query
ToniRamirezM Apr 26, 2024
1e588aa
fixes
ToniRamirezM Apr 26, 2024
56dd0a7
Merge branch 'release/v0.6.7' into feature/protobufDataStream
ToniRamirezM Apr 26, 2024
b1cbf1d
fix test
ToniRamirezM Apr 26, 2024
003aa8f
fix test
ToniRamirezM Apr 26, 2024
b98f009
change fatal to error
ToniRamirezM Apr 26, 2024
d02eb86
handle error
ToniRamirezM Apr 26, 2024
74d87a0
fix ds starting check
ToniRamirezM Apr 28, 2024
06ed4d5
restore tool config
ToniRamirezM Apr 28, 2024
0d61a96
fix ds type
ToniRamirezM Apr 29, 2024
272fe7f
merge base release
ToniRamirezM Apr 30, 2024
db02498
fix send ds batch
ToniRamirezM Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ generate-code-from-proto: ## Generates code from proto files
cd proto/src/proto/hashdb/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../merkletree/hashdb --go-grpc_out=../../../../../merkletree/hashdb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative hashdb.proto
cd proto/src/proto/executor/v1 && protoc --proto_path=. --go_out=../../../../../state/runtime/executor --go-grpc_out=../../../../../state/runtime/executor --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative executor.proto
cd proto/src/proto/aggregator/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../aggregator/prover --go-grpc_out=../../../../../aggregator/prover --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative aggregator.proto
cd proto/src/proto/datastream/v1 && protoc --proto_path=. --proto_path=../../../../include --go_out=../../../../../state/datastream --go-grpc_out=../../../../../state/datastream --go-grpc_opt=paths=source_relative --go_opt=paths=source_relative datastream.proto

## Help display.
## Pulls comments from beside commands and prints a nicely formatted
Expand Down
64 changes: 64 additions & 0 deletions proto/src/proto/datastream/v1/datastream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
syntax = "proto3";

package datastream.v1;

option go_package = "github.com/0xPolygonHermez/zkevm-node/state/datastream";

message Batch {
uint64 number = 1;
bytes local_exit_root = 2;
bytes state_root = 3;
uint64 fork_id = 4;
uint64 chain_id = 5;
}

message L2Block {
uint64 number = 1;
uint64 batch_number = 2;
uint64 timestamp = 3;
uint32 delta_timestamp = 4;
uint64 min_timestamp = 5;
bytes l1_blockhash = 6;
uint32 l1_infotree_index = 7;
bytes hash = 8;
bytes state_root = 9;
bytes global_exit_root = 10;
bytes coinbase = 11;
}

message Transaction {
uint64 l2block_number = 1;
bool is_valid = 2;
bytes encoded = 3;
uint32 effective_gas_price_percentage = 4;
bytes im_state_root = 5;
}

message UpdateGER {
uint64 batch_number = 1;
uint64 timestamp = 2;
bytes global_exit_root = 3;
bytes coinbase = 4;
uint64 fork_id = 5;
uint64 chain_id = 6;
bytes state_root = 7;
}

message BookMark {
BookmarkType type = 1;
uint64 value = 2;
}

enum BookmarkType {
BOOKMARK_TYPE_UNSPECIFIED = 0;
BOOKMARK_TYPE_BATCH = 1;
BOOKMARK_TYPE_L2_BLOCK = 2;
}

enum EntryType {
ENTRY_TYPE_UNSPECIFIED = 0;
ENTRY_TYPE_BATCH = 1;
ENTRY_TYPE_L2_BLOCK = 2;
ENTRY_TYPE_TRANSACTION = 3;
ENTRY_TYPE_UPDATE_GER = 4;
}
6 changes: 6 additions & 0 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Batch struct {
finalRemainingResources state.BatchResources // remaining batch resources when a L2 block is processed
finalHighReservedZKCounters state.ZKCounters
closingReason state.ClosingReason
finalLocalExitRoot common.Hash
}

func (b *Batch) isEmpty() bool {
Expand Down Expand Up @@ -99,6 +100,7 @@ func (f *finalizer) setWIPBatch(ctx context.Context, wipStateBatch *state.Batch)
finalRemainingResources: remainingResources,
imHighReservedZKCounters: wipStateBatch.HighReservedZKCounters,
finalHighReservedZKCounters: wipStateBatch.HighReservedZKCounters,
finalLocalExitRoot: wipStateBatch.LocalExitRoot,
}

return wipBatch, nil
Expand Down Expand Up @@ -312,6 +314,7 @@ func (f *finalizer) openNewWIPBatch(batchNumber uint64, stateRoot common.Hash) *
imRemainingResources: maxRemainingResources,
finalRemainingResources: maxRemainingResources,
closingReason: state.EmptyClosingReason,
finalLocalExitRoot: state.ZeroHash,
}
}

Expand Down Expand Up @@ -400,6 +403,9 @@ func (f *finalizer) closeSIPBatch(ctx context.Context, dbTx pgx.Tx) error {
}()
}

// Sent batch to DS
f.DSSendBatch(f.wipBatch.batchNumber, f.wipBatch.finalStateRoot, f.wipBatch.finalLocalExitRoot)

log.Infof("sip batch %d closed in statedb, closing reason: %s", f.sipBatch.batchNumber, f.sipBatch.closingReason)

f.sipBatch = nil
Expand Down
27 changes: 22 additions & 5 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@ package sequencer
import (
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/datastream"
"github.com/ethereum/go-ethereum/common"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32, minTimestamp uint64) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
if f.streamServer != nil {
l2Block := state.DSL2Block{
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
Timestamp: blockResponse.Timestamp,
Min_timestamp: minTimestamp,
L1InfoTreeIndex: l1InfoTreeIndex,
L1BlockHash: blockResponse.BlockHashL1,
GlobalExitRoot: blockResponse.GlobalExitRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
ForkID: forkID,
BlockHash: blockResponse.BlockHash,
StateRoot: blockResponse.BlockHash, //From etrog, the blockhash is the block root
}
Expand Down Expand Up @@ -57,9 +60,23 @@ func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
// Send batch bookmark to the streamer
f.dataToStream <- state.DSBookMark{
Type: state.BookMarkTypeBatch,
f.dataToStream <- datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: batchNumber,
}
}
}

func (f *finalizer) DSSendBatch(batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

if f.streamServer != nil {
// Send batch to the streamer
f.dataToStream <- datastream.Batch{
Number: batchNumber,
ForkId: forkID,
StateRoot: stateRoot.Bytes(),
LocalExitRoot: localExitRoot.Bytes(),
}
}
}
1 change: 1 addition & 0 deletions sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ func TestFinalizer_finalizeSIPBatch(t *testing.T) {

// arrange
stateMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
stateMock.On("GetForkIDByBatchNumber", mock.Anything).Return(uint64(state.FORKID_BLUEBERRY))
stateMock.On("CloseWIPBatch", ctx, receipt, mock.Anything).Return(tc.managerErr).Once()

if tc.managerErr == nil {
Expand Down
2 changes: 1 addition & 1 deletion sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
}

// Send L2 block to data streamer
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0, forcedL2BlockResponse.Timestamp)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
5 changes: 3 additions & 2 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
return fmt.Errorf(overflowLog)
}

// Update finalStateRoot of the batch to the newStateRoot for the L2 block
// Update finalStateRoot/finalLocalExitRoot of the batch to the newStateRoot/newLocalExitRoot for the L2 block
l2Block.batch.finalStateRoot = l2Block.batchResponse.NewStateRoot
l2Block.batch.finalLocalExitRoot = l2Block.batchResponse.NewLocalExitRoot

f.updateFlushIDs(batchResponse.FlushID, batchResponse.StoredFlushID)

Expand Down Expand Up @@ -491,7 +492,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum)

// Send L2 block to data streamer
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex(), l2Block.timestamp)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
Expand Down
Loading
Loading