diff --git a/cmd/main.go b/cmd/main.go index 1926b918ff..f1203fe705 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -59,6 +59,12 @@ var ( Required: false, Value: cli.NewStringSlice(jsonrpc.APIEth, jsonrpc.APINet, jsonrpc.APIZKEVM, jsonrpc.APITxPool, jsonrpc.APIWeb3), } + migrationsFlag = cli.BoolFlag{ + Name: config.FlagMigrations, + Aliases: []string{"mig"}, + Usage: "Blocks the migrations in stateDB to not run them", + Required: false, + } ) func main() { @@ -83,7 +89,7 @@ func main() { Aliases: []string{}, Usage: "Run the zkevm-node", Action: start, - Flags: append(flags, &genesisFlag), + Flags: append(flags, &genesisFlag, &migrationsFlag), }, { Name: "approve", diff --git a/cmd/run.go b/cmd/run.go index f1add83737..9850e8124d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -51,7 +51,18 @@ func start(cliCtx *cli.Context) error { if c.Metrics.Enabled { metrics.Init() } - runStateMigrations(c.StateDB) + components := cliCtx.StringSlice(config.FlagComponents) + + // Only runs migration if the component is the synchronizer and if the flag is deactivated + if !cliCtx.Bool(config.FlagMigrations) { + for _, comp := range components { + if comp == SYNCHRONIZER { + runStateMigrations(c.StateDB) + } + } + } + checkStateMigrations(c.StateDB) + stateSqlDB, err := db.NewSQLDB(c.StateDB) if err != nil { log.Fatal(err) @@ -81,8 +92,8 @@ func start(cliCtx *cli.Context) error { ethTxManager := ethtxmanager.New(c.EthTxManager, etherman, st) - for _, item := range cliCtx.StringSlice(config.FlagComponents) { - switch item { + for _, component := range components { + switch component { case AGGREGATOR: log.Info("Running aggregator") go runAggregator(ctx, c.Aggregator, etherman, ethTxManager, st) @@ -127,6 +138,13 @@ func runStateMigrations(c db.Config) { runMigrations(c, db.StateMigrationName) } +func checkStateMigrations(c db.Config) { + err := db.CheckMigrations(c, db.StateMigrationName) + if err != nil { + log.Fatal(err) + } +} + func runPoolMigrations(c db.Config) { runMigrations(c, db.PoolMigrationName) } diff --git a/config/config.go b/config/config.go index ad256c1d6c..1e042aac1b 100644 --- a/config/config.go +++ b/config/config.go @@ -39,6 +39,8 @@ const ( FlagComponents = "components" // FlagHTTPAPI is the flag for http.api. FlagHTTPAPI = "http.api" + // FlagMigrations is the flag for migrations. + FlagMigrations = "migrations" ) // Config represents the configuration of the entire Hermez Node diff --git a/db/db.go b/db/db.go index 32dc7917b2..011da89fda 100644 --- a/db/db.go +++ b/db/db.go @@ -17,6 +17,7 @@ const ( StateMigrationName = "zkevm-state-db" // PoolMigrationName is the name of the migration used by packr to pack the migration file PoolMigrationName = "zkevm-pool-db" + maxPlanMigration = 1000 ) var packrMigrations = map[string]*packr.Box{ @@ -47,6 +48,11 @@ func RunMigrationsUp(cfg Config, name string) error { return runMigrations(cfg, name, migrate.Up) } +// CheckMigrations runs migrate-up for the given config. +func CheckMigrations(cfg Config, name string) error { + return checkMigrations(cfg, name, migrate.Up) +} + // RunMigrationsDown runs migrate-down for the given config. func RunMigrationsDown(cfg Config, name string) error { return runMigrations(cfg, name, migrate.Down) @@ -76,3 +82,34 @@ func runMigrations(cfg Config, packrName string, direction migrate.MigrationDire log.Info("successfully ran ", nMigrations, " migrations") return nil } + +func checkMigrations(cfg Config, packrName string, direction migrate.MigrationDirection) error { + c, err := pgx.ParseConfig(fmt.Sprintf("postgres://%s:%s@%s:%s/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Name)) + if err != nil { + return err + } + db := stdlib.OpenDB(*c) + + box, ok := packrMigrations[packrName] + if !ok { + return fmt.Errorf("packr box not found with name: %v", packrName) + } + + var migrations = &migrate.PackrMigrationSource{Box: box} + planMigrations, _, err := migrate.PlanMigration(db, "postgres", migrations, direction, maxPlanMigration) + if err != nil { + log.Error("error planning migrations. Error: ", err) + return err + } + nmigrations := len(planMigrations) + if nmigrations != 0 { + log.Errorf("error the component needs to run %d migrations before starting", nmigrations) + records, err := migrate.GetMigrationRecords(db, "postgres") + if err != nil { + log.Error("error getting migration records. Error: ", err) + return err + } + return fmt.Errorf("error the component needs to run %d migrations before starting. DB only contains %d migrations", nmigrations, len(records)) + } + return nil +} diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index 7dc4e3d64c..80c10934e6 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -32,6 +32,7 @@ type gasPriceEstimator interface { // stateInterface gathers the methods required to interact with the state. type stateInterface interface { + PrepareWebSocket() BeginStateTransaction(ctx context.Context) (pgx.Tx, error) DebugTransaction(ctx context.Context, transactionHash common.Hash, tracer string, dbTx pgx.Tx) (*runtime.ExecutionResult, error) EstimateGas(transaction *types.Transaction, senderAddress common.Address, l2BlockNumber *uint64, dbTx pgx.Tx) (uint64, error) diff --git a/jsonrpc/mock_state_test.go b/jsonrpc/mock_state_test.go index 94a9a00bbf..d9263bdd42 100644 --- a/jsonrpc/mock_state_test.go +++ b/jsonrpc/mock_state_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.14.0. DO NOT EDIT. package jsonrpc @@ -604,6 +604,11 @@ func (_m *stateMock) IsL2BlockVirtualized(ctx context.Context, blockNumber int, return r0, r1 } +// PrepareWebSocket provides a mock function with given fields: +func (_m *stateMock) PrepareWebSocket() { + _m.Called() +} + // ProcessUnsignedTransaction provides a mock function with given fields: ctx, tx, senderAddress, l2BlockNumber, noZKEVMCounters, dbTx func (_m *stateMock) ProcessUnsignedTransaction(ctx context.Context, tx *types.Transaction, senderAddress common.Address, l2BlockNumber *uint64, noZKEVMCounters bool, dbTx pgx.Tx) *runtime.ExecutionResult { ret := _m.Called(ctx, tx, senderAddress, l2BlockNumber, noZKEVMCounters, dbTx) diff --git a/jsonrpc/server.go b/jsonrpc/server.go index e599e1574d..126946b34d 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -52,6 +52,7 @@ func NewServer( storage storageInterface, apis map[string]bool, ) *Server { + s.PrepareWebSocket() handler := newJSONRpcHandler() if _, ok := apis[APIEth]; ok { diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 0517d99901..ee07664d58 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -49,6 +49,8 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocks, *ethclien var newL2BlockEventHandler state.NewL2BlockEventHandler = func(e state.NewL2BlockEvent) {} st.On("RegisterNewL2BlockEventHandler", mock.IsType(newL2BlockEventHandler)).Once() + st.On("PrepareWebSocket").Once() + server := NewServer(cfg, pool, st, gasPriceEstimator, storage, apis) go func() { diff --git a/state/state.go b/state/state.go index b52f861894..0edf56d8d0 100644 --- a/state/state.go +++ b/state/state.go @@ -80,27 +80,29 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient pb.ExecutorSe metrics.Register() }) - lastL2Block, err := storage.GetLastL2Block(context.Background(), nil) - if errors.Is(err, ErrStateNotSynchronized) { - lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)}) - } else if err != nil { - log.Fatalf("failed to load the last l2 block: %v", err) - } - s := &State{ cfg: cfg, PostgresStorage: storage, executorClient: executorClient, tree: stateTree, - lastL2BlockSeen: *lastL2Block, newL2BlockEvents: make(chan NewL2BlockEvent), newL2BlockEventHandlers: []NewL2BlockEventHandler{}, } + return s +} + +// PrepareWebSocket allows the RPC to prepare ws +func (s *State) PrepareWebSocket() { + lastL2Block, err := s.PostgresStorage.GetLastL2Block(context.Background(), nil) + if errors.Is(err, ErrStateNotSynchronized) { + lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)}) + } else if err != nil { + log.Fatalf("failed to load the last l2 block: %v", err) + } + s.lastL2BlockSeen = *lastL2Block go s.monitorNewL2Blocks() go s.handleEvents() - - return s } // BeginStateTransaction starts a state transaction diff --git a/test/Makefile b/test/Makefile index a39f51350c..2c4977b491 100644 --- a/test/Makefile +++ b/test/Makefile @@ -134,8 +134,9 @@ benchmark-sequencer: stop $(RUNPOOLDB) sleep 5 $(RUNZKPROVER) - $(RUNJSONRPC) $(RUNSYNC) + sleep 2 + $(RUNJSONRPC) docker ps -a docker logs $(DOCKERCOMPOSEZKPROVER) @ cd benchmarks/sequencer ; \ @@ -156,6 +157,7 @@ stop-db: ## Stops the node database .PHONY: run-node run-node: ## Runs the node $(RUNSYNC) + sleep 2 $(RUNSEQUENCER) $(RUNAGGREGATOR) $(RUNJSONRPC) @@ -249,6 +251,14 @@ run-broadcast: ## Runs the broadcast service stop-broadcast: ## Stops the broadcast service $(STOPBROADCAST) +.PHONY: run-aggregator +run-aggregator: ## Runs the aggregator service + $(RUNAGGREGATOR) + +.PHONY: stop-aggregator +stop-aggregator: ## Stops the aggregator service + $(STOPAGGREGATOR) + .PHONY: run-grafana run-grafana: ## Runs the grafana service $(RUNGRAFANA) @@ -283,10 +293,11 @@ run: ## Runs a full node sleep 1 $(RUNZKPROVER) sleep 3 + $(RUNSYNC) + sleep 2 $(RUNSEQUENCER) $(RUNAGGREGATOR) $(RUNJSONRPC) - $(RUNSYNC) .PHONY: stop stop: ## Stops all services