Skip to content

Commit

Permalink
Feature/#1515 migration (#1531)
Browse files Browse the repository at this point in the history
* rpc and broadcast without migrations

* mock

* linter

* fix unit test

* migration flag for synchronizer and checkMigrations

* run/stop aggregator command
  • Loading branch information
ARR552 authored Jan 16, 2023
1 parent aea6dfd commit a8f6318
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 17 deletions.
8 changes: 7 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ var (
Required: false,
Value: cli.NewStringSlice(jsonrpc.APIEth, jsonrpc.APINet, jsonrpc.APIZKEVM, jsonrpc.APITxPool, jsonrpc.APIWeb3),
}
migrationsFlag = cli.BoolFlag{
Name: config.FlagMigrations,
Aliases: []string{"mig"},
Usage: "Blocks the migrations in stateDB to not run them",
Required: false,
}
)

func main() {
Expand All @@ -83,7 +89,7 @@ func main() {
Aliases: []string{},
Usage: "Run the zkevm-node",
Action: start,
Flags: append(flags, &genesisFlag),
Flags: append(flags, &genesisFlag, &migrationsFlag),
},
{
Name: "approve",
Expand Down
24 changes: 21 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,18 @@ func start(cliCtx *cli.Context) error {
if c.Metrics.Enabled {
metrics.Init()
}
runStateMigrations(c.StateDB)
components := cliCtx.StringSlice(config.FlagComponents)

// Only runs migration if the component is the synchronizer and if the flag is deactivated
if !cliCtx.Bool(config.FlagMigrations) {
for _, comp := range components {
if comp == SYNCHRONIZER {
runStateMigrations(c.StateDB)
}
}
}
checkStateMigrations(c.StateDB)

stateSqlDB, err := db.NewSQLDB(c.StateDB)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -81,8 +92,8 @@ func start(cliCtx *cli.Context) error {

ethTxManager := ethtxmanager.New(c.EthTxManager, etherman, st)

for _, item := range cliCtx.StringSlice(config.FlagComponents) {
switch item {
for _, component := range components {
switch component {
case AGGREGATOR:
log.Info("Running aggregator")
go runAggregator(ctx, c.Aggregator, etherman, ethTxManager, st)
Expand Down Expand Up @@ -127,6 +138,13 @@ func runStateMigrations(c db.Config) {
runMigrations(c, db.StateMigrationName)
}

func checkStateMigrations(c db.Config) {
err := db.CheckMigrations(c, db.StateMigrationName)
if err != nil {
log.Fatal(err)
}
}

func runPoolMigrations(c db.Config) {
runMigrations(c, db.PoolMigrationName)
}
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
FlagComponents = "components"
// FlagHTTPAPI is the flag for http.api.
FlagHTTPAPI = "http.api"
// FlagMigrations is the flag for migrations.
FlagMigrations = "migrations"
)

// Config represents the configuration of the entire Hermez Node
Expand Down
37 changes: 37 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
StateMigrationName = "zkevm-state-db"
// PoolMigrationName is the name of the migration used by packr to pack the migration file
PoolMigrationName = "zkevm-pool-db"
maxPlanMigration = 1000
)

var packrMigrations = map[string]*packr.Box{
Expand Down Expand Up @@ -47,6 +48,11 @@ func RunMigrationsUp(cfg Config, name string) error {
return runMigrations(cfg, name, migrate.Up)
}

// CheckMigrations runs migrate-up for the given config.
func CheckMigrations(cfg Config, name string) error {
return checkMigrations(cfg, name, migrate.Up)
}

// RunMigrationsDown runs migrate-down for the given config.
func RunMigrationsDown(cfg Config, name string) error {
return runMigrations(cfg, name, migrate.Down)
Expand Down Expand Up @@ -76,3 +82,34 @@ func runMigrations(cfg Config, packrName string, direction migrate.MigrationDire
log.Info("successfully ran ", nMigrations, " migrations")
return nil
}

func checkMigrations(cfg Config, packrName string, direction migrate.MigrationDirection) error {
c, err := pgx.ParseConfig(fmt.Sprintf("postgres://%s:%s@%s:%s/%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Name))
if err != nil {
return err
}
db := stdlib.OpenDB(*c)

box, ok := packrMigrations[packrName]
if !ok {
return fmt.Errorf("packr box not found with name: %v", packrName)
}

var migrations = &migrate.PackrMigrationSource{Box: box}
planMigrations, _, err := migrate.PlanMigration(db, "postgres", migrations, direction, maxPlanMigration)
if err != nil {
log.Error("error planning migrations. Error: ", err)
return err
}
nmigrations := len(planMigrations)
if nmigrations != 0 {
log.Errorf("error the component needs to run %d migrations before starting", nmigrations)
records, err := migrate.GetMigrationRecords(db, "postgres")
if err != nil {
log.Error("error getting migration records. Error: ", err)
return err
}
return fmt.Errorf("error the component needs to run %d migrations before starting. DB only contains %d migrations", nmigrations, len(records))
}
return nil
}
1 change: 1 addition & 0 deletions jsonrpc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type gasPriceEstimator interface {

// stateInterface gathers the methods required to interact with the state.
type stateInterface interface {
PrepareWebSocket()
BeginStateTransaction(ctx context.Context) (pgx.Tx, error)
DebugTransaction(ctx context.Context, transactionHash common.Hash, tracer string, dbTx pgx.Tx) (*runtime.ExecutionResult, error)
EstimateGas(transaction *types.Transaction, senderAddress common.Address, l2BlockNumber *uint64, dbTx pgx.Tx) (uint64, error)
Expand Down
7 changes: 6 additions & 1 deletion jsonrpc/mock_state_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewServer(
storage storageInterface,
apis map[string]bool,
) *Server {
s.PrepareWebSocket()
handler := newJSONRpcHandler()

if _, ok := apis[APIEth]; ok {
Expand Down
2 changes: 2 additions & 0 deletions jsonrpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocks, *ethclien
var newL2BlockEventHandler state.NewL2BlockEventHandler = func(e state.NewL2BlockEvent) {}
st.On("RegisterNewL2BlockEventHandler", mock.IsType(newL2BlockEventHandler)).Once()

st.On("PrepareWebSocket").Once()

server := NewServer(cfg, pool, st, gasPriceEstimator, storage, apis)

go func() {
Expand Down
22 changes: 12 additions & 10 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,29 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient pb.ExecutorSe
metrics.Register()
})

lastL2Block, err := storage.GetLastL2Block(context.Background(), nil)
if errors.Is(err, ErrStateNotSynchronized) {
lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)})
} else if err != nil {
log.Fatalf("failed to load the last l2 block: %v", err)
}

s := &State{
cfg: cfg,
PostgresStorage: storage,
executorClient: executorClient,
tree: stateTree,
lastL2BlockSeen: *lastL2Block,
newL2BlockEvents: make(chan NewL2BlockEvent),
newL2BlockEventHandlers: []NewL2BlockEventHandler{},
}

return s
}

// PrepareWebSocket allows the RPC to prepare ws
func (s *State) PrepareWebSocket() {
lastL2Block, err := s.PostgresStorage.GetLastL2Block(context.Background(), nil)
if errors.Is(err, ErrStateNotSynchronized) {
lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)})
} else if err != nil {
log.Fatalf("failed to load the last l2 block: %v", err)
}
s.lastL2BlockSeen = *lastL2Block
go s.monitorNewL2Blocks()
go s.handleEvents()

return s
}

// BeginStateTransaction starts a state transaction
Expand Down
15 changes: 13 additions & 2 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ benchmark-sequencer: stop
$(RUNPOOLDB)
sleep 5
$(RUNZKPROVER)
$(RUNJSONRPC)
$(RUNSYNC)
sleep 2
$(RUNJSONRPC)
docker ps -a
docker logs $(DOCKERCOMPOSEZKPROVER)
@ cd benchmarks/sequencer ; \
Expand All @@ -156,6 +157,7 @@ stop-db: ## Stops the node database
.PHONY: run-node
run-node: ## Runs the node
$(RUNSYNC)
sleep 2
$(RUNSEQUENCER)
$(RUNAGGREGATOR)
$(RUNJSONRPC)
Expand Down Expand Up @@ -249,6 +251,14 @@ run-broadcast: ## Runs the broadcast service
stop-broadcast: ## Stops the broadcast service
$(STOPBROADCAST)

.PHONY: run-aggregator
run-aggregator: ## Runs the aggregator service
$(RUNAGGREGATOR)

.PHONY: stop-aggregator
stop-aggregator: ## Stops the aggregator service
$(STOPAGGREGATOR)

.PHONY: run-grafana
run-grafana: ## Runs the grafana service
$(RUNGRAFANA)
Expand Down Expand Up @@ -283,10 +293,11 @@ run: ## Runs a full node
sleep 1
$(RUNZKPROVER)
sleep 3
$(RUNSYNC)
sleep 2
$(RUNSEQUENCER)
$(RUNAGGREGATOR)
$(RUNJSONRPC)
$(RUNSYNC)

.PHONY: stop
stop: ## Stops all services
Expand Down

0 comments on commit a8f6318

Please sign in to comment.