Skip to content

Commit

Permalink
Flush state db (#1831)
Browse files Browse the repository at this point in the history
* flush state db

* flush state db

* update prover image

* improve log message
  • Loading branch information
ToniRamirezM authored Mar 16, 2023
1 parent 61f4a23 commit 470216d
Show file tree
Hide file tree
Showing 12 changed files with 839 additions and 245 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ services:
zkevm-prover:
container_name: zkevm-prover
restart: unless-stopped
image: hermeznetwork/zkevm-prover:45df44e
image: hermeznetwork/zkevm-prover:40338c3
depends_on:
zkevm-state-db:
condition: service_healthy
Expand Down
866 changes: 641 additions & 225 deletions merkletree/pb/statedb.pb.go

Large diffs are not rendered by default.

80 changes: 76 additions & 4 deletions merkletree/pb/statedb_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions merkletree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/merkletree/pb"
"github.com/ethereum/go-ethereum/common"
"google.golang.org/protobuf/types/known/emptypb"
)

// StateTree provides methods to access and modify state in merkletree
Expand Down Expand Up @@ -312,3 +313,9 @@ func (tree *StateTree) setProgram(ctx context.Context, key []uint64, data []byte
})
return err
}

// Flush flushes all changes to the persistent storage.
func (tree *StateTree) Flush(ctx context.Context) error {
_, err := tree.grpcClient.Flush(ctx, &emptypb.Empty{})
return err
}
75 changes: 63 additions & 12 deletions proto/src/proto/statedb/v1/statedb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ message Version {
* Define all methods implementes by the gRPC
* Get: get the value for a specific key
* Set: set the value for a specific key
* SetProgram: set the byte data for a specific hash
* GetProgram: get the byte data for a specific hash
* SetProgram: set the byte data for a specific key
* GetProgram: get the byte data for a specific key
* Flush: wait for all the pendings writes to the DB are done
*/
service StateDBService {
rpc Set(SetRequest) returns (SetResponse) {}
rpc Get(GetRequest) returns (GetResponse) {}
rpc SetProgram(SetProgramRequest) returns (SetProgramResponse) {}
rpc GetProgram(GetProgramRequest) returns (GetProgramResponse) {}
rpc Flush (google.protobuf.Empty) returns (google.protobuf.Empty) {}
rpc LoadDB(LoadDBRequest) returns (google.protobuf.Empty) {}
rpc LoadProgramDB(LoadProgramDBRequest) returns (google.protobuf.Empty) {}
rpc Flush (google.protobuf.Empty) returns (FlushResponse) {}
}

///////////////////
Expand All @@ -37,34 +39,38 @@ service StateDBService {
* @param {old_root} - merkle-tree root
* @param {key} - key to set
* @param {value} - scalar value to set (HEX string format)
* @param {persistent} - indicates if it should be stored in the database (true) or only in the memory cache (false)
* @param {persistent} - indicates if it should be stored in the SQL database (true) or only in the memory cache (false)
* @param {details} - indicates if it should return all response parameters (true) or just the new root (false)
* @param {get_db_read_log} - indicates if it should return the DB reads generated during the execution of the request
*/
message SetRequest {
Fea old_root = 1;
Fea key = 2;
string value = 3;
bool persistent = 4;
bool details = 5;
bool get_db_read_log = 6;
}

/**
* @dev GetRequest
* @param {root} - merkle-tree root
* @param {key} - key to look for
* @param {details} - indicates if it should return all response parameters (true) or just the new root (false)
* @param {get_db_read_log} - indicates if it should return the DB reads generated during the execution of the request
*/
message GetRequest {
Fea root = 1;
Fea key = 2;
bool details = 3;
bool get_db_read_log = 4;
}

/**
* @dev SetProgramRequest
* @param {key} - hash to set
* @param {key} - key to set
* @param {data} - Program data to store
* @param {persistent} - indicates if it should be stored in the database (true) or only in the memory cache (false)
* @param {persistent} - indicates if it should be stored in the SQL database (true) or only in the memory cache (false)
*/
message SetProgramRequest {
Fea key = 1;
Expand All @@ -74,12 +80,32 @@ message SetProgramRequest {

/**
* @dev GetProgramRequest
* @param {key} - hash to get program data
* @param {key} - key to get program data
*/
message GetProgramRequest {
Fea key = 1;
}

/**
* @dev LoadDBRequest
* @param {input_db} - list of db records (MT) to load in the database
* @param {persistent} - indicates if it should be stored in the SQL database (true) or only in the memory cache (false)
*/
message LoadDBRequest {
map<string, FeList> input_db = 1;
bool persistent = 2;
}

/**
* @dev LoadProgramDBRequest
* @param {input_program_db} - list of db records (program) to load in the database
* @param {persistent} - indicates if it should be stored in the SQL database (true) or only in the memory cache (false)
*/
message LoadProgramDBRequest {
map<string, bytes> input_program_db = 1;
bool persistent = 2;
}

/////////////////////
// Responses messages
/////////////////////
Expand All @@ -96,6 +122,8 @@ message GetProgramRequest {
* @param {old_value} - old value (HEX string format)
* @param {new_value} - new value (HEX string format)
* @param {mode}
* @param {proof_hash_counter}
* @param {db_read_log} - list of db records read during the execution of the request
* @param {result} - result code
*/
message SetResponse {
Expand All @@ -109,7 +137,9 @@ message SetResponse {
string old_value = 8;
string new_value = 9;
string mode = 10;
ResultCode result = 11;
uint64 proof_hash_counter = 11;
map<string, FeList> db_read_log = 12;
ResultCode result = 13;
}

/**
Expand All @@ -121,6 +151,8 @@ message SetResponse {
* @param {ins_value} - value found (HEX string format)
* @param {is_old0} - is new insert or delete
* @param {value} - value retrieved (HEX string format)
* @param {proof_hash_counter}
* @param {db_read_log} - list of db records read during the execution of the request
* @param {result} - result code
*/
message GetResponse {
Expand All @@ -131,7 +163,9 @@ message GetResponse {
string ins_value = 5;
bool is_old0 = 6;
string value = 7;
ResultCode result = 8;
uint64 proof_hash_counter = 8;
map<string, FeList> db_read_log = 9;
ResultCode result = 10;
}

/**
Expand All @@ -152,6 +186,14 @@ message GetProgramResponse {
ResultCode result = 2;
}

/**
* @dev FlushResponse
* @param {result} - result code
*/
message FlushResponse {
ResultCode result = 1;
}

/**
* @dev Array of 4 FE
* @param {fe0} - Field Element value for pos 0
Expand All @@ -166,9 +208,17 @@ message Fea {
uint64 fe3 = 4;
}

/**
* @dev FE (Field Element) List
* @param {fe} - list of Fe
*/
message FeList {
repeated uint64 fe = 1;
}

/**
* @dev Siblings List
* @param {sibling} - sibling
* @param {sibling} - list of siblings
*/
message SiblingList {
repeated uint64 sibling = 1;
Expand All @@ -182,9 +232,10 @@ message ResultCode {
enum Code {
CODE_UNSPECIFIED = 0;
CODE_SUCCESS = 1;
CODE_KEY_NOT_FOUND = 2;
CODE_DB_ERROR = 3;
CODE_DB_KEY_NOT_FOUND = 2; // Requested key was not found in database
CODE_DB_ERROR = 3; // Error connecting to database, or processing request
CODE_INTERNAL_ERROR = 4;
CODE_SMT_INVALID_DATA_SIZE = 14; // Invalid size for the data of MT node
}
Code code = 1;
}
12 changes: 12 additions & 0 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
for {
txToStore := <-d.txsStore.Ch
d.checkIfReorg()

// Flush the state db
err := d.state.FlushMerkleTree(d.ctx)
if err != nil {
log.Fatalf("StoreProcessedTxAndDeleteFromPool. Error flushing state db: %v", err)
}

log.Debugf("Storing tx %v", txToStore.txResponse.TxHash)
dbTx, err := d.BeginStateTransaction(d.ctx)
if err != nil {
Expand Down Expand Up @@ -539,3 +546,8 @@ func (d *dbManager) GetLatestVirtualBatchTimestamp(ctx context.Context, dbTx pgx
func (d *dbManager) CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
return d.state.CountReorgs(ctx, dbTx)
}

// FlushMerkleTree persists updates in the Merkle tree
func (d *dbManager) FlushMerkleTree(ctx context.Context) error {
return d.state.FlushMerkleTree(ctx)
}
4 changes: 2 additions & 2 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
hash = tx.HashStr
}
log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hash, f.processRequest.GlobalExitRoot.String())
result, err := f.executor.ProcessBatch(ctx, f.processRequest, false)
result, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
if err != nil {
log.Errorf("failed to process transaction, isClaim: %v, err: %s", tx.IsClaim, err)
return err
Expand Down Expand Up @@ -703,7 +703,7 @@ func (f *finalizer) reprocessFullBatch(ctx context.Context, batchNum uint64, exp
log.Infof("reprocessFullBatch: Tx position %d. TxHash: %s", i, tx.Hash())
}

result, err := f.executor.ProcessBatch(ctx, processRequest, true)
result, err := f.executor.ProcessBatch(ctx, processRequest, false)
if err != nil {
log.Errorf("failed to process batch, err: %s", err)
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type stateInterface interface {
GetLatestVirtualBatchTimestamp(ctx context.Context, dbTx pgx.Tx) (time.Time, error)
CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error)
GetLatestGer(ctx context.Context, gerFinalityNumberOfBlocks uint64) (state.GlobalExitRoot, time.Time, error)
FlushMerkleTree(ctx context.Context) error
}

type workerInterface interface {
Expand Down Expand Up @@ -113,6 +114,7 @@ type dbManagerInterface interface {
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool) error
GetLatestVirtualBatchTimestamp(ctx context.Context, dbTx pgx.Tx) (time.Time, error)
CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error)
FlushMerkleTree(ctx context.Context) error
}

type dbManagerStateInterface interface {
Expand Down Expand Up @@ -142,6 +144,7 @@ type dbManagerStateInterface interface {
GetLatestVirtualBatchTimestamp(ctx context.Context, dbTx pgx.Tx) (time.Time, error)
CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error)
GetLatestGer(ctx context.Context, gerFinalityNumberOfBlocks uint64) (state.GlobalExitRoot, time.Time, error)
FlushMerkleTree(ctx context.Context) error
}

type ethTxManager interface {
Expand Down
Loading

0 comments on commit 470216d

Please sign in to comment.