Skip to content

Commit

Permalink
fix various issues with migrations
Browse files Browse the repository at this point in the history
- Migration proposals are removed after a migration is approved
- Once migrations are approved, no more migration proposals or
  approvals are accepted
- Migration cant start during an ongoing migration
- delete pending resolutions at the start of the migration process
- sanity checks on the migration config
- make genesis file generation at the migration start height async
  to circumvent cometbft state access issues during replay.
- kwil-admin node status now show migration status of the node
- no more consensus failures upon the end of migration, instead just
  halt the network by stopping mining anymore transactions. This keeps
  the node's rpc services in a useful state even after restarts
- can resubmit the expired proposals
- genesis config is based on the old networks config with minimal
  changes such as chainID, appHash and genesisValidators and migrations
- migrations defaults are changed to 0 from -1
  • Loading branch information
charithabandi authored Sep 9, 2024
1 parent 2aee07c commit ba6e3ea
Show file tree
Hide file tree
Showing 36 changed files with 673 additions and 195 deletions.
33 changes: 20 additions & 13 deletions cmd/kwil-admin/cmds/migration/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/kwilteam/kwil-db/cmd/common/display"
"github.com/kwilteam/kwil-db/cmd/kwil-admin/cmds/common"
"github.com/kwilteam/kwil-db/common/chain"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/internal/statesync"
)

Expand Down Expand Up @@ -48,8 +49,14 @@ func genesisStateCmd() *cobra.Command {
return display.PrintErr(cmd, err)
}

if !metadata.InMigration || metadata.GenesisConfig == nil || metadata.SnapshotMetadata == nil {
return display.PrintCmd(cmd, &MigrationState{InMigration: false, StartHeight: metadata.StartHeight, EndHeight: metadata.EndHeight})
// If there is no active migration or if the migration has not started yet, return the migration state
// indicating that there is no genesis state to download.
if metadata.MigrationState.Status == types.NoActiveMigration ||
metadata.MigrationState.Status == types.MigrationNotStarted ||
metadata.GenesisConfig == nil || metadata.SnapshotMetadata == nil {
return display.PrintCmd(cmd, &MigrationState{
Info: metadata.MigrationState,
})
}

// ensure the root directory exists
Expand Down Expand Up @@ -106,9 +113,7 @@ func genesisStateCmd() *cobra.Command {

// Print the migration state
return display.PrintCmd(cmd, &MigrationState{
InMigration: metadata.InMigration,
StartHeight: metadata.StartHeight,
EndHeight: metadata.EndHeight,
Info: metadata.MigrationState,
GenesisFile: genesisFile,
Snapshot: snapshotFile,
})
Expand All @@ -122,16 +127,18 @@ func genesisStateCmd() *cobra.Command {
}

type MigrationState struct {
InMigration bool `json:"in_migration"`
StartHeight int64 `json:"start_height"`
EndHeight int64 `json:"end_height"`
GenesisFile string `json:"genesis_file"`
Snapshot string `json:"snapshot"`
Info types.MigrationState `json:"state"`
GenesisFile string `json:"genesis_file"`
Snapshot string `json:"snapshot"`
}

func (m *MigrationState) MarshalText() ([]byte, error) {
if !m.InMigration {
return []byte(fmt.Sprintf("No genesis state to download yet. Migration is set to start at block height: %d", m.StartHeight)), nil
if m.Info.Status == types.NoActiveMigration {
return []byte("No active migration found."), nil
}

if m.Info.Status == types.MigrationNotStarted {
return []byte(fmt.Sprintf("No genesis state to download yet. Migration is set to start at block height: %d", m.Info.StartHeight)), nil
}

if m.GenesisFile == "" {
Expand All @@ -147,7 +154,7 @@ func (m *MigrationState) MarshalText() ([]byte, error) {
"\tEnd Height: %d\n"+
"\tGenesis File: %s\n"+
"\tSnapshot File: %s\n",
m.StartHeight, m.EndHeight, m.GenesisFile, m.Snapshot)), nil
m.Info.StartHeight, m.Info.EndHeight, m.GenesisFile, m.Snapshot)), nil
}

func (m *MigrationState) MarshalJSON() ([]byte, error) {
Expand Down
1 change: 1 addition & 0 deletions cmd/kwil-admin/cmds/migration/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (m *MigrationsList) MarshalText() ([]byte, error) {
msg.WriteString(fmt.Sprintf("\tactivationPeriod: %d\n", migration.ActivationPeriod))
msg.WriteString(fmt.Sprintf("\tmigrationDuration: %d\n", migration.Duration))
msg.WriteString(fmt.Sprintf("\tchainID: %s\n", migration.ChainID))
msg.WriteString(fmt.Sprintf("\ttimestamp: %s\n", migration.Timestamp))
}
return msg.Bytes(), nil
}
5 changes: 4 additions & 1 deletion cmd/kwil-admin/cmds/migration/propose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package migration

import (
"errors"
"time"

"github.com/spf13/cobra"

"github.com/kwilteam/kwil-db/cmd/common/display"
"github.com/kwilteam/kwil-db/cmd/kwil-admin/cmds/common"
"github.com/kwilteam/kwil-db/internal/migrations"
"github.com/kwilteam/kwil-db/internal/voting"
)

var (
Expand Down Expand Up @@ -48,14 +50,15 @@ func proposeCmd() *cobra.Command {
ActivationPeriod: activationPeriod,
Duration: migrationDuration,
ChainID: chainID,
Timestamp: time.Now().String(),
}
proposalBts, err := proposal.MarshalBinary()
if err != nil {
return display.PrintErr(cmd, err)
}

// Submit a migration proposal
txHash, err := clt.CreateResolution(ctx, proposalBts, migrations.StartMigrationEventType)
txHash, err := clt.CreateResolution(ctx, proposalBts, voting.StartMigrationEventType)
if err != nil {
return display.PrintErr(cmd, err)
}
Expand Down
78 changes: 60 additions & 18 deletions cmd/kwil-admin/cmds/snapshot/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/kwilteam/kwil-db/cmd/common/display"
"github.com/kwilteam/kwil-db/cmd/kwil-admin/cmds/common"
"github.com/kwilteam/kwil-db/common/chain"
"github.com/kwilteam/kwil-db/internal/abci/meta"
"github.com/kwilteam/kwil-db/internal/sql/pg"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -65,12 +67,12 @@ func createCmd() *cobra.Command {
return display.PrintErr(cmd, fmt.Errorf("failed to expand snapshot directory path: %v", err))
}

logs, err := pgDump(cmd.Context(), dbName, dbUser, dbPass, dbHost, dbPort, maxRowSize, snapshotDir)
height, logs, err := pgDump(cmd.Context(), dbName, dbUser, dbPass, dbHost, dbPort, maxRowSize, snapshotDir)
if err != nil {
return display.PrintErr(cmd, fmt.Errorf("failed to create database snapshot: %v", err))
}

r := &createSnapshotRes{Logs: logs}
r := &createSnapshotRes{Logs: logs, Height: height}
return display.PrintCmd(cmd, r)
},
}
Expand All @@ -87,30 +89,36 @@ func createCmd() *cobra.Command {
}

type createSnapshotRes struct {
Logs []string `json:"logs"`
Logs []string `json:"logs"`
Height int64 `json:"height"`
}

func (c *createSnapshotRes) MarshalJSON() ([]byte, error) {
return json.Marshal(c)
}

func (c *createSnapshotRes) MarshalText() (text []byte, err error) {
return []byte(fmt.Sprintf("Snapshot created successfully\n%s", strings.Join(c.Logs, "\n"))), nil
return []byte(fmt.Sprintf("Snapshot created successfully at height: %d \n%s", c.Height, strings.Join(c.Logs, "\n"))), nil
}

// PGDump uses pg_dump to create a snapshot of the database.
// It returns messages to log and an error if any.
func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, maxRowSize int, snapshotDir string) (logs []string, err error) {
func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string, maxRowSize int, snapshotDir string) (height int64, logs []string, err error) {
// Get the chain height
height, err = chainHeight(ctx, dbName, dbUser, dbPass, dbHost, dbPort)
if err != nil {
return -1, nil, fmt.Errorf("failed to get chain height: %w", err)
}
// Check if the snapshot directory exists, if not create it
err = os.MkdirAll(snapshotDir, 0755)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot directory: %w", err)
return -1, nil, fmt.Errorf("failed to create snapshot directory: %w", err)
}

dumpFile := filepath.Join(snapshotDir, "kwildb-snapshot.sql.gz")
outputFile, err := os.Create(dumpFile)
if err != nil {
return nil, fmt.Errorf("failed to create dump file: %w", err)
return -1, nil, fmt.Errorf("failed to create dump file: %w", err)
}
// delete the dump file if an error occurs anywhere during the snapshot process
defer func() {
Expand Down Expand Up @@ -166,12 +174,12 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
var stderr bytes.Buffer
pgDumpOutput, err := pgDumpCmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to get stdout pipe: %w", err)
return -1, nil, fmt.Errorf("failed to get stdout pipe: %w", err)
}
pgDumpCmd.Stderr = &stderr

if err := pgDumpCmd.Start(); err != nil {
return nil, fmt.Errorf("failed to start pg_dump command: %w", err)
return -1, nil, fmt.Errorf("failed to start pg_dump command: %w", err)
}
defer pgDumpOutput.Close()

Expand Down Expand Up @@ -199,24 +207,24 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
inVotersBlock = false
n, err := multiWriter.Write([]byte(line + "\n"))
if err != nil {
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
return -1, nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}
totalBytes += int64(n)
continue
}

strs := strings.Split(line, "\t")
if len(strs) != 3 {
return nil, fmt.Errorf("invalid voter line: %s", line)
return -1, nil, fmt.Errorf("invalid voter line: %s", line)
}
voterID, err := hex.DecodeString(strs[1][3:]) // Remove the leading \\x
if err != nil {
return nil, fmt.Errorf("failed to decode voter ID: %w", err)
return -1, nil, fmt.Errorf("failed to decode voter ID: %w", err)
}

power, err := strconv.ParseInt(strs[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse power: %w", err)
return -1, nil, fmt.Errorf("failed to parse power: %w", err)
}

genCfg.Validators = append(genCfg.Validators, &chain.GenesisValidator{
Expand All @@ -243,32 +251,66 @@ func pgDump(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string,
// Write the sanitized line to the gzip writer
n, err := multiWriter.Write([]byte(line + "\n"))
if err != nil {
return nil, fmt.Errorf("failed to write to gzip writer: %w", err)
return -1, nil, fmt.Errorf("failed to write to gzip writer: %w", err)
}
totalBytes += int64(n)
}
}
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("failed to scan pg_dump output: %w", err)
return -1, nil, fmt.Errorf("failed to scan pg_dump output: %w", err)
}

// Close the writer when pg_dump completes to signal EOF to sed
if err := pgDumpCmd.Wait(); err != nil {
return nil, errors.New(stderr.String())
return -1, nil, errors.New(stderr.String())
}

// Append the below sql statement to the dump file to adjust the expiration times of the resolutions
// This is to ensure that the resolutions are correctly expired on the new network
n, err := multiWriter.Write([]byte("UPDATE kwild_voting.resolutions SET expiration = expiration-" + strconv.FormatInt(height, 10) + ";\n"))
if err != nil {
return -1, nil, fmt.Errorf("failed to write resolution updates to gzip writer: %w", err)
}

totalBytes += int64(n)

gzipWriter.Flush()
hash := hasher.Sum(nil)
genCfg.DataAppHash = hash

// Write the genesis config to a file
genesisFile := filepath.Join(snapshotDir, "genesis.json")
if err := genCfg.SaveAs(genesisFile); err != nil {
return nil, fmt.Errorf("failed to save genesis config: %w", err)
return -1, nil, fmt.Errorf("failed to save genesis config: %w", err)
}

return []string{fmt.Sprintf("Snapshot created at: %s, Total bytes written: %d", dumpFile, totalBytes),
return height, []string{fmt.Sprintf("Snapshot created at: %s, Total bytes written: %d", dumpFile, totalBytes),
fmt.Sprintf("Genesis config created at: %s, Genesis hash: %s", genesisFile, fmt.Sprintf("%x", hash))}, nil
}

func chainHeight(ctx context.Context, dbName, dbUser, dbPass, dbHost, dbPort string) (int64, error) {
cfg := &pg.PoolConfig{
ConnConfig: pg.ConnConfig{
Host: dbHost,
Port: dbPort,
User: dbUser,
Pass: dbPass,
DBName: dbName,
},
MaxConns: 2,
}
pool, err := pg.NewPool(ctx, cfg)
if err != nil {
return 0, fmt.Errorf("failed to create pool: %w", err)
}
defer pool.Close()

height, _, err := meta.GetChainState(ctx, pool)
if err != nil {
return 0, fmt.Errorf("failed to get chain state: %w", err)
}

return height, nil
}
11 changes: 9 additions & 2 deletions cmd/kwil-admin/nodecfg/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,16 @@ func (genCfg *NodeGenerateConfig) ApplyGenesisParams(genesisCfg *chain.GenesisCo
if genCfg.ChainID != "" {
genesisCfg.ChainID = genCfg.ChainID
}
genesisCfg.ConsensusParams.Validator.JoinExpiry = genCfg.JoinExpiry

if genCfg.JoinExpiry > 0 {
genesisCfg.ConsensusParams.Validator.JoinExpiry = genCfg.JoinExpiry
}

genesisCfg.ConsensusParams.WithoutGasCosts = genCfg.WithoutGasCosts
genesisCfg.ConsensusParams.Votes.VoteExpiry = genCfg.VoteExpiry

if genCfg.VoteExpiry > 0 {
genesisCfg.ConsensusParams.Votes.VoteExpiry = genCfg.VoteExpiry
}

numAllocs := len(genCfg.Allocs)
if !genCfg.WithoutGasCosts { // when gas is enabled, give genesis validators some funds
Expand Down
4 changes: 4 additions & 0 deletions cmd/kwild/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func RootCmd() *cobra.Command {
return fmt.Errorf("failed to initialize private key and genesis: %w", err)
}

if err := genesisConfig.SanityChecks(); err != nil {
return fmt.Errorf("genesis configuration failed sanity checks: %w", err)
}

if err := kwildCfg.ConfigureExtensions(genesisConfig); err != nil {
return err
}
Expand Down
24 changes: 21 additions & 3 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// Give abci p2p module access to removing peers
p2p.SetRemovePeerFn(cometBftNode.RemovePeer)

// Give migrator access to the consensus params getter
migrator.SetConsensusParamsGetter(cometBftNode.ConsensusParams)

cometBftClient := buildCometBftClient(cometBftNode)
wrappedCmtClient := &wrappedCometBFTClient{
cl: cometBftClient,
Expand Down Expand Up @@ -578,14 +581,28 @@ func buildSigner(d *coreDependencies) *auth.Ed25519Signer {
func buildDB(d *coreDependencies, closer *closeFuncs) *pg.DB {
// Check if the database is supposed to be restored from the snapshot
// If yes, restore the database from the snapshot
restoreDB(d)
fromSnapshot := restoreDB(d)

db, err := d.dbOpener(d.ctx, d.cfg.AppConfig.DBName, 24)
if err != nil {
failBuild(err, "kwild database open failed")
}
closer.addCloser(db.Close, "closing main DB")

if fromSnapshot {
// readjust the expiry heights of all the pending resolutions after snapshot restore for Zero-downtime migrations
// snapshot tool handles the migration expiry height readjustment for offline migrations
adjustExpiration := false
startHeight := d.genesisCfg.ConsensusParams.Migration.StartHeight
if d.cfg.AppConfig.MigrateFrom != "" && startHeight != 0 {
adjustExpiration = true
}

err = migrations.CleanupResolutionsAfterMigration(d.ctx, db, adjustExpiration, startHeight)
if err != nil {
failBuild(err, "failed to cleanup resolutions after snapshot restore")
}
}
return db
}

Expand All @@ -598,9 +615,9 @@ func buildDB(d *coreDependencies, closer *closeFuncs) *pg.DB {
// - If the genesis apphash is not specified
// - If statesync is enabled. Statesync will take care of syncing the database
// to the network state using statesync snapshots.
func restoreDB(d *coreDependencies) {
func restoreDB(d *coreDependencies) bool {
if d.cfg.ChainConfig.StateSync.Enable || len(d.genesisCfg.DataAppHash) == 0 || isDbInitialized(d) {
return
return false
}

genCfg := d.genesisCfg
Expand Down Expand Up @@ -640,6 +657,7 @@ func restoreDB(d *coreDependencies) {
if err != nil {
failBuild(err, "failed to restore DB from snapshot")
}
return true
}

// isDbInitialized checks if the database is already initialized.
Expand Down
Loading

0 comments on commit ba6e3ea

Please sign in to comment.