From 101c0c8a60d54de7fc1e197361e8112716c3c463 Mon Sep 17 00:00:00 2001 From: Charitha Bandi <45089429+charithabandi@users.noreply.github.com> Date: Wed, 27 Sep 2023 17:56:12 -0500 Subject: [PATCH] Validator schema upgrade (#318) Validator schema upgrade update tests Decouple validatorDBHash from sql notions fix docs --- internal/app/kwild/server/build.go | 6 +- pkg/validators/mgr.go | 98 +++++++++++- pkg/validators/sql.go | 70 ++++++++- pkg/validators/store.go | 4 +- pkg/validators/test_data/version0.sqlite | Bin 0 -> 16384 bytes pkg/validators/test_data/version1.sqlite | Bin 0 -> 20480 bytes pkg/validators/test_data/version2.sqlite | Bin 0 -> 20480 bytes pkg/validators/upgrade.go | 113 ++++++++++++++ pkg/validators/upgrade_test.go | 185 +++++++++++++++++++++++ 9 files changed, 463 insertions(+), 13 deletions(-) create mode 100644 pkg/validators/test_data/version0.sqlite create mode 100644 pkg/validators/test_data/version1.sqlite create mode 100644 pkg/validators/test_data/version2.sqlite create mode 100644 pkg/validators/upgrade.go create mode 100644 pkg/validators/upgrade_test.go diff --git a/internal/app/kwild/server/build.go b/internal/app/kwild/server/build.go index 67bbf4f55..bd880685a 100644 --- a/internal/app/kwild/server/build.go +++ b/internal/app/kwild/server/build.go @@ -251,11 +251,6 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions } closer.addCloser(db.Close) - err = registerSQL(d.ctx, ac, db, "validator_db", d.log) - if err != nil { - failBuild(err, "failed to register validator db") - } - joinExpiry := d.cfg.GenesisConfig.ConsensusParams.Validator.JoinExpiry v, err := vmgr.NewValidatorMgr(d.ctx, db, vmgr.WithLogger(*d.log.Named("validatorStore")), @@ -265,6 +260,7 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions failBuild(err, "failed to build validator store") } + ac.Register(d.ctx, "validator_db", v.Committable()) return v } diff --git a/pkg/validators/mgr.go b/pkg/validators/mgr.go index 3f242dbdc..67c78b654 100644 --- a/pkg/validators/mgr.go +++ b/pkg/validators/mgr.go @@ -2,10 +2,16 @@ package validators import ( "context" + "crypto/sha256" "errors" "fmt" + "io" + "sort" "github.com/kwilteam/kwil-db/pkg/log" + "github.com/kwilteam/kwil-db/pkg/sessions" + sqlSessions "github.com/kwilteam/kwil-db/pkg/sessions/sql-session" + "github.com/kwilteam/kwil-db/pkg/sql" ) type joinReq struct { @@ -62,6 +68,9 @@ type ValidatorMgr struct { // opts joinExpiry int64 log log.Logger + + // session params + cm *validatorDbCommittable } // NOTE: The SQLite validator/approval store is local and transparent to the @@ -81,6 +90,79 @@ type ValidatorStore interface { AddValidator(ctx context.Context, joiner []byte, power int64) error } +type ValidatorDbSession interface { + Datastore + + ApplyChangeset(reader io.Reader) error + CreateSession() (sql.Session, error) + Savepoint() (sql.Savepoint, error) + CheckpointWal() error + EnableForeignKey() error + DisableForeignKey() error +} + +type validatorDbCommittable struct { + sessions.Committable + + validatorDbHash func() []byte +} + +// ID overrides the base Committable. We do this so that we can have the +// persistence of the state be part of the 2pc process, but have the ID reflect +// the actual state free from SQL specifics. +func (c *validatorDbCommittable) ID(ctx context.Context) ([]byte, error) { + return c.validatorDbHash(), nil +} + +func (vm *ValidatorMgr) Committable() sessions.Committable { + return vm.cm +} + +// ValidatorDB state includes: +// - current validators +// - active join requests +// - approvers for each join request +func (vm *ValidatorMgr) validatorDbHash() []byte { + hasher := sha256.New() + + // current validators val1:val2:... + var currentValidators []string + for val := range vm.current { + currentValidators = append(currentValidators, val) + } + sort.Strings(currentValidators) + for _, val := range currentValidators { + hasher.Write([]byte(val + ":")) + } + + // active join requests & approvals + // joinerPubkey:power:expiresAt:approver1:approver2:... + var joiners []string + for val := range vm.candidates { + joiners = append(joiners, val) + } + sort.Strings(joiners) + + for _, joiner := range joiners { + jr := vm.candidates[joiner] + jrStr := fmt.Sprintf("%s:%d:%d", joiner, jr.power, jr.expiresAt) + hasher.Write([]byte(jrStr)) + + var approvers []string + for val, approved := range jr.validators { + if approved { + approvers = append(approvers, val) + } + } + sort.Strings(approvers) + for _, approver := range approvers { + hasher.Write([]byte(":" + approver)) + } + hasher.Write([]byte(";")) + } + return hasher.Sum(nil) +} + func (vm *ValidatorMgr) isCurrent(val []byte) bool { _, have := vm.current[string(val)] return have @@ -90,7 +172,7 @@ func (vm *ValidatorMgr) candidate(val []byte) *joinReq { return vm.candidates[string(val)] } -func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) { +func NewValidatorMgr(ctx context.Context, datastore ValidatorDbSession, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) { vm := &ValidatorMgr{ current: make(map[string]struct{}), candidates: make(map[string]*joinReq), @@ -101,6 +183,13 @@ func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...Validator opt(vm) } + vm.cm = &validatorDbCommittable{ + Committable: sqlSessions.NewSqlCommitable(datastore, + sqlSessions.WithLogger(*vm.log.Named("validator-committable")), + ), + validatorDbHash: vm.validatorDbHash, + } + var err error vm.db, err = newValidatorStore(ctx, datastore, vm.log) if err != nil { @@ -282,8 +371,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator { // Updates for approved (joining) validators. for candidate, join := range vm.candidates { if join.votes() < join.requiredVotes() { - - if vm.lastBlockHeight >= join.expiresAt { + if isJoinExpired(join, vm.lastBlockHeight) { // Join request expired delete(vm.candidates, candidate) if err := vm.db.DeleteJoinRequest(ctx, join.pubkey); err != nil { @@ -331,3 +419,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator { func (mgr *ValidatorMgr) UpdateBlockHeight(height int64) { mgr.lastBlockHeight = height } + +func isJoinExpired(join *joinReq, blockHeight int64) bool { + return join.expiresAt != -1 && blockHeight >= join.expiresAt +} diff --git a/pkg/validators/sql.go b/pkg/validators/sql.go index 8ee7cbabc..3fb170c17 100644 --- a/pkg/validators/sql.go +++ b/pkg/validators/sql.go @@ -24,7 +24,7 @@ const ( CREATE TABLE IF NOT EXISTS validators ( pubkey BLOB PRIMARY KEY, power INTEGER - ) WITHOUT ROWID, STRICT; + ) WITHOUT ROWID, STRICT; CREATE TABLE IF NOT EXISTS join_reqs ( candidate BLOB PRIMARY KEY, @@ -79,8 +79,69 @@ const ( sqlAddToJoinBoard = `INSERT INTO joins_board (candidate, validator, approval) VALUES ($candidate, $validator, $approval)` + + sqlInitVersionTable = `CREATE TABLE IF NOT EXISTS schema_version ( + version INT NOT NULL + );` // Do we still need WITHOUT ROWID and STRICT? It's just a single row table + + sqlSetVersion = "INSERT INTO schema_version (version) VALUES ($version);" + + //sqlUpdateVersion = "UPDATE schema_version SET version = $version;" + + sqlGetVersion = "SELECT version FROM schema_version;" + + valStoreVersion = 1 +) + +// Migration actions. +const ( + sqlAddJoinExpiry string = `ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER DEFAULT -1;` ) +// Schema version table. +func (vs *validatorStore) initSchemaVersion(ctx context.Context) error { + if err := vs.db.Execute(ctx, sqlInitVersionTable, nil); err != nil { + return fmt.Errorf("failed to initialize schema version table: %w", err) + } + + err := vs.db.Execute(ctx, sqlSetVersion, map[string]any{ + "$version": valStoreVersion, + }) + if err != nil { + return fmt.Errorf("failed to set schema version: %w", err) + } + return nil +} + +// func (vs *validatorStore) updateCurrentVersion(ctx context.Context, version int) error { +// err := vs.db.Execute(ctx, sqlUpdateVersion, map[string]any{ +// "$version": version, +// }) +// if err != nil { +// return fmt.Errorf("failed to update schema version: %w", err) +// } +// return nil +// } + +func (vs *validatorStore) currentVersion(ctx context.Context) (int, error) { + results, err := vs.db.Query(ctx, sqlGetVersion, nil) + if err != nil { + return 0, err + } + if len(results) == 0 { + return 0, nil + } + vi, ok := results[0]["version"] + if !ok { + return 0, errors.New("no version in schema_version record") + } + version, ok := vi.(int64) + if !ok { + return 0, fmt.Errorf("invalid version value (%T)", vi) + } + return int(version), nil +} + // -- CREATE TABLE IF NOT EXISTS validator_approvals ( // -- validator_id INTEGER REFERENCES validators (id) ON DELETE CASCADE, // -- approval_id INTEGER REFERENCES approvals (id) ON DELETE CASCADE, @@ -101,12 +162,15 @@ func (vs *validatorStore) initTables(ctx context.Context) error { inits := getTableInits() for _, init := range inits { - err := vs.db.Execute(ctx, init, nil) - if err != nil { + if err := vs.db.Execute(ctx, init, nil); err != nil { return fmt.Errorf("failed to initialize tables: %w", err) } } + if err := vs.initSchemaVersion(ctx); err != nil { + return err + } + return nil } diff --git a/pkg/validators/store.go b/pkg/validators/store.go index 1c0486ea3..c429c9501 100644 --- a/pkg/validators/store.go +++ b/pkg/validators/store.go @@ -34,9 +34,9 @@ func newValidatorStore(ctx context.Context, datastore Datastore, log log.Logger) log: log, } - err := ar.initTables(ctx) + err := ar.initOrUpgradeDatabase(ctx) if err != nil { - return nil, fmt.Errorf("failed to initialize tables: %w", err) + return nil, fmt.Errorf("failed to initialize database at version %d due to error: %w", valStoreVersion, err) } // err = ar.prepareStatements() diff --git a/pkg/validators/test_data/version0.sqlite b/pkg/validators/test_data/version0.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..e8dcec182a1b53381fccae7e21d91c3b8e73d74c GIT binary patch literal 16384 zcmeI%Pe>F|90%~3T|?K<2|?E``4fZOSPk)%X1VT8H8r_4J7tEnjqXUBy0fj5+d^e# z-HL+j&?)H<1kov^@*oi%)k7dbLUigB1<_vwdo$+lhT0Z%S@=G7n78l!_`TnJb}m0Q zbV!bxI`tZf0nMaFZUg6VaIHif$2r(lz^>)tWRvXg{PM;h?hdZLhc96P&XOmbFkJeT z-7p{k0SG_<0uX=z1Rwwb2teRp1THl@3*9v}{N)x?>+aW28PV8axZBVYk*sNfUlDz( zNUE<*7Ab3kYKvX29xWD$Ml{y`waLLYQpDY&BDVX*kR2IL=%)v%HXY=lU^@jwS!DVA zzL4J+5WPfobrds9ipI!1snd|wZzVJmgJf4lDcxiTrI%M6l-^+{#m{v2>8aI@VJWnD zJYleClG;^qkEnRr+CfD+;8Qwjzu1{or#DlDr`YA84oTe`Jgkxu?2rOp3aN_ZS4Y-4 z3f)yz{77e7&wgHu$euV?$o=wI>6TT7RaM+b>WOetihw2vHr{Uo$uW} zXXlbu%!dls%n`11ZnSQ6eRIj%O~&FO=^39=X@J@q`s~Y<$P;R1&G_aDQUzyz5=Lr}E?GrT)Kx6DEZQVJJ_*P&Nc0009U<00Izz00bZa0SG|g ee+vW(WVt9;cb@BCcRNY0_WiGtoi>vqMJ?ykG*)Q3_> zgw|70PX!Y6QV(VH81~R>4`zQzDCo~i7?l-?gcaRw?jL6BrH94$a4$PM=eP6wozGCPWs8LBZrmums~@)n96 zuXs~_P3xdR00Izz00bZa0SG_<0uU%VfuP=CGMnj1ohtSXO3}c8G$i_Ck`fKc;rw?c zHV!B9}tPI_&;tBrWLcFw~&ZJd`RBK?YVB1-BqL1yA|5Dj;9_{X1|$^ZiC#qhn6=(lG;vA8HuVQ@`$APN5rry1#_#FMk66bidxn5;HJfm z(*{#@H9hG_8fJ;IJtnWZ?0E9ArW`k~Et`Kcoign^N3$;|?Lvb91Rwwb2tWV=5P$## zAOHafK%fW(YIMADzIAzZrQ-0ZM{DB~^%J~$#AVREdNy-=cggDg(yEi&gGz*6-LCsAq0`e0#l?cE?la|0arUV(+rCA_7CH z5P$##AOHafKmY;|fB*y_0D*rgVAu0}X`zF9p-zEB;ZU()RxYLbJcmD9oP>lh)Tn{jmw@}x5P$##AOHafKmY;|fI!g+1Pw;B#X?W9n$$leM+1ZMu+$fm)o4fw=f5kl zdwHA46VcWt@FahZ)Rmb`>z^V{m&kYUUgC0##ML7RWhP7f0BbbY)X=97Ces{KLg8p% zzapu@oUS1+LC!>S^?(!(hJuoQ(rXjkZN$sB^IqO%=Y1p**{8}UqNFYpWFc-Baqt4K z7i_or>^28)C8VK&gcXg1!bBUCNlY3_EU2halB|l1%UWU^>ER8DGBbLn$VmTjdHi>2 z^d+Q7L{;?YG=)_k>-IXkY~Ehd$@k{eY0Xw)(Vd9jDIRk7h{WsmI~`Wy6TMEmIJKQI znyaelc68)OK>pNK_S3MrE~cR0?ZyFt=KHED}=Xs7*@`Zd%*~ zZ8TR`(^Jl*VU{S{WAduYjwc^$%5n4BviUdDDbv4mH20FyFEj{100Izz00bZa0SG_< z0uX=z1d33gh80Znt;?$`6^BnfUYnS#pA@tsZX^5p`Rwi8C94lgt9DX}^FPZ?Q`|JS z$jueuCQ&j3AOHafKmY;|fB*y_009UFLVGxAULI?Fizq%(EDRheet3?VO@e3`I_-{VVtx@`g f1_1~_00Izz00bZa0SG_<0uX?}|0lrG49)xkm`0Pd literal 0 HcmV?d00001 diff --git a/pkg/validators/upgrade.go b/pkg/validators/upgrade.go new file mode 100644 index 000000000..a21a683c8 --- /dev/null +++ b/pkg/validators/upgrade.go @@ -0,0 +1,113 @@ +package validators + +import ( + "context" + "fmt" + + "go.uber.org/zap" +) + +type upgradeAction int + +const ( + upgradeActionNone upgradeAction = iota + upgradeActionRunMigrations +) + +func upgradeActionString(action upgradeAction) string { + switch action { + case upgradeActionNone: + return "none" + case upgradeActionRunMigrations: + return "run migrations" + default: + return "unknown" + } +} + +// checkVersion checks the current version of the validator store and decides +// whether to run any db migrations. +func (vs *validatorStore) checkVersion(ctx context.Context) (int, upgradeAction, error) { + // Check if schema version exists + version, versionErr := vs.currentVersion(ctx) + // On error, infer that schema_version table doesn't exist (just assuming this + // since we'd need to query sqlite_master table to be certain that was the error) + if versionErr != nil { + // Check if validators db exists (again only infers and this isn't robust because it could fail for other reasons) + _, valErr := vs.currentValidators(ctx) + if valErr != nil { + // Fresh db, do regular initialization at valStoreVersion + return valStoreVersion, upgradeActionNone, nil + } + if valErr == nil { + // Legacy db without version tracking - version 0 + return 0, upgradeActionRunMigrations, nil + } + } + + if version == valStoreVersion { + // DB on the latest version + return version, upgradeActionNone, nil + } + if version < valStoreVersion { + // DB on previous version, Run DB migrations + return version, upgradeActionRunMigrations, nil + } + + // Invalid DB version, return error + return version, upgradeActionNone, fmt.Errorf("validator store version %d is higher than the supported version %d", version, valStoreVersion) +} + +// databaseUpgrade runs the database upgrade based on the current version. +func (vs *validatorStore) initOrUpgradeDatabase(ctx context.Context) error { + version, action, err := vs.checkVersion(ctx) + if err != nil { + return err + } + + vs.log.Info("databaseUpgrade", zap.Int("version", version), zap.String("action", upgradeActionString(action))) + + switch action { + case upgradeActionNone: + return vs.initTables(ctx) + case upgradeActionRunMigrations: + return vs.runMigrations(ctx, version) + default: + vs.log.Error("unknown upgrade action", zap.Int("action", int(action))) + return fmt.Errorf("unknown upgrade action: %d", action) + } +} + +// runMigrations runs incremental db upgrades from current version to the latest version. +func (vs *validatorStore) runMigrations(ctx context.Context, version int) error { + switch version { + case 0: + if err := vs.upgradeValidatorsDB_0_1(ctx); err != nil { + return err + } + fallthrough + case valStoreVersion: + vs.log.Info("databaseUpgrade: completed successfully") + return nil + default: + vs.log.Error("unknown version", zap.Int("version", version)) + return fmt.Errorf("unknown version: %d", version) + } +} + +// upgradeValidatorsDB_0_1 upgrades the validators db from version 0 to 1. +// Version 0: join_reqs table: [candidate, power] +// Version 1: join_reqs table: [candidate, power, expiryAt] +// "ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER;" +func (vs *validatorStore) upgradeValidatorsDB_0_1(ctx context.Context) error { + vs.log.Info("Upgrading validators db from version 0 to 1") + // Add schema version table + if err := vs.initSchemaVersion(ctx); err != nil { + return err + } + + if err := vs.db.Execute(ctx, sqlAddJoinExpiry, nil); err != nil { + return fmt.Errorf("failed to add expiresAt column to join_reqs table: %w", err) + } + return nil +} diff --git a/pkg/validators/upgrade_test.go b/pkg/validators/upgrade_test.go new file mode 100644 index 000000000..bc84b904d --- /dev/null +++ b/pkg/validators/upgrade_test.go @@ -0,0 +1,185 @@ +package validators + +import ( + "context" + "os" + "testing" + + "github.com/kwilteam/kwil-db/pkg/log" + sqlTesting "github.com/kwilteam/kwil-db/pkg/sql/testing" +) + +func setup(srcFile string) { + // Copies the db file to tmp + os.MkdirAll("tmp", os.ModePerm) + bts, err := os.ReadFile(srcFile) + if err != nil { + panic(err) + } + + err = os.WriteFile("./tmp/validator_db.sqlite", bts, os.ModePerm) + if err != nil { + panic(err) + } +} + +func TestValidatorStoreUpgradeLegacyToV1(t *testing.T) { + setup("./test_data/version0.sqlite") + + // Open Version 0 DB. It contains: 1 validator and 3 join requests + ds, td, err := sqlTesting.OpenTestDB("validator_db") + if err != nil { + t.Fatal(err) + } + defer td() + ctx := context.Background() + logger := log.NewStdOut(log.DebugLevel) + + // validator store + vs := &validatorStore{ + db: ds, + log: logger, + } + + // Verify validator count is 1 + results, err := vs.db.Query(ctx, "SELECT COUNT(*) FROM validators", nil) + if err != nil { + t.Fatal(err) + } + if len(results) != 1 { + t.Fatalf("Expected 1 result, got %d", len(results)) + } + + // Expected values: version 0, action upgradeActionLegacy + version, action, err := vs.checkVersion(ctx) + if err != nil { + t.Fatal(err) + } + if version != 0 { + t.Fatalf("Expected version 0, got %d", version) + } + if action != upgradeActionRunMigrations { + t.Fatalf("Expected action %s, got %s", + upgradeActionString(upgradeActionRunMigrations), + upgradeActionString(action)) + } + + // Expect failure as expiresAt column doesn't exist in legacy code + _, err = vs.ActiveVotes(ctx) + if err == nil { + t.Fatal(err) + } + + // Upgrade DB to version 1 + err = vs.initOrUpgradeDatabase(ctx) + if err != nil { + t.Fatal(err) + } + + // Check Version Table to ensure version is 1 + version, err = vs.currentVersion(ctx) + if err != nil { + t.Fatal(err) + } + if version != valStoreVersion { + t.Fatalf("Expected version %d, got %d", valStoreVersion, version) + } +} + +func TestValidatorStoreUpgradeV1(t *testing.T) { + setup("./test_data/version1.sqlite") + + // Open Version 0 DB. It contains 1 validator and 3 join requests + ds, td, err := sqlTesting.OpenTestDB("validator_db") + if err != nil { + t.Fatal(err) + } + defer td() + ctx := context.Background() + logger := log.NewStdOut(log.DebugLevel) + + // validator store + vs := &validatorStore{ + db: ds, + log: logger, + } + + // Verify validator count is 1 + results, err := vs.db.Query(ctx, "SELECT COUNT(*) FROM validators", nil) + if err != nil { + t.Fatal(err) + } + if len(results) != 1 { + t.Fatalf("Expected 1 result, got %d", len(results)) + } + + // Expected values: version 1, action upgradeActionNone + versionPre, action, err := vs.checkVersion(ctx) + if err != nil { + t.Fatal(err) + } + if versionPre != 1 { + t.Fatalf("Expected version 0, got %d", versionPre) + } + if action != upgradeActionNone { + t.Fatalf("Expected action %s, got %s", + upgradeActionString(upgradeActionNone), + upgradeActionString(action)) + } + + // Three entries in join_reqs table with expiresAt column + votes, err := vs.ActiveVotes(ctx) + if err != nil { + t.Fatal(err) + } + if len(votes) != 3 { + t.Fatalf("Starting votes not empty (%d)", len(votes)) + } + + // Upgrade + err = vs.initOrUpgradeDatabase(ctx) + if err != nil { + t.Fatal(err) + } + + // Check Version Table + versionPost, err := vs.currentVersion(ctx) + if err != nil { + t.Fatal(err) + } + // Version should be 1, no upgrade + if versionPost != versionPre { + t.Fatalf("Expected version %d, got %d", versionPre, versionPost) + } +} + +func TestValidatorStoreUpgradeV2(t *testing.T) { + setup("./test_data/version2.sqlite") + + // Open Version 2 DB. It contains 1 validator and 3 join requests + ds, td, err := sqlTesting.OpenTestDB("validator_db") + if err != nil { + t.Fatal(err) + } + defer td() + ctx := context.Background() + logger := log.NewStdOut(log.DebugLevel) + + // validator store + vs := &validatorStore{ + db: ds, + log: logger, + } + + // invalid version + _, _, err = vs.checkVersion(ctx) + if err == nil { + t.Fatal(err) + } + + // Upgrade should fail as version is invalid + err = vs.initOrUpgradeDatabase(ctx) + if err == nil { + t.Fatal(err) + } +}