Skip to content

Commit

Permalink
Validator schema upgrade (#318)
Browse files Browse the repository at this point in the history
Validator schema upgrade

update tests

Decouple validatorDBHash from sql notions

fix docs
  • Loading branch information
charithabandi authored Sep 27, 2023
1 parent a73a851 commit 101c0c8
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 13 deletions.
6 changes: 1 addition & 5 deletions internal/app/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
}
closer.addCloser(db.Close)

err = registerSQL(d.ctx, ac, db, "validator_db", d.log)
if err != nil {
failBuild(err, "failed to register validator db")
}

joinExpiry := d.cfg.GenesisConfig.ConsensusParams.Validator.JoinExpiry
v, err := vmgr.NewValidatorMgr(d.ctx, db,
vmgr.WithLogger(*d.log.Named("validatorStore")),
Expand All @@ -265,6 +260,7 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
failBuild(err, "failed to build validator store")
}

ac.Register(d.ctx, "validator_db", v.Committable())
return v
}

Expand Down
98 changes: 95 additions & 3 deletions pkg/validators/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package validators

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"sort"

"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/sessions"
sqlSessions "github.com/kwilteam/kwil-db/pkg/sessions/sql-session"
"github.com/kwilteam/kwil-db/pkg/sql"
)

type joinReq struct {
Expand Down Expand Up @@ -62,6 +68,9 @@ type ValidatorMgr struct {
// opts
joinExpiry int64
log log.Logger

// session params
cm *validatorDbCommittable
}

// NOTE: The SQLite validator/approval store is local and transparent to the
Expand All @@ -81,6 +90,79 @@ type ValidatorStore interface {
AddValidator(ctx context.Context, joiner []byte, power int64) error
}

type ValidatorDbSession interface {
Datastore

ApplyChangeset(reader io.Reader) error
CreateSession() (sql.Session, error)
Savepoint() (sql.Savepoint, error)
CheckpointWal() error
EnableForeignKey() error
DisableForeignKey() error
}

type validatorDbCommittable struct {
sessions.Committable

validatorDbHash func() []byte
}

// ID overrides the base Committable. We do this so that we can have the
// persistence of the state be part of the 2pc process, but have the ID reflect
// the actual state free from SQL specifics.
func (c *validatorDbCommittable) ID(ctx context.Context) ([]byte, error) {
return c.validatorDbHash(), nil
}

func (vm *ValidatorMgr) Committable() sessions.Committable {
return vm.cm
}

// ValidatorDB state includes:
// - current validators
// - active join requests
// - approvers for each join request
func (vm *ValidatorMgr) validatorDbHash() []byte {
hasher := sha256.New()

// current validators val1:val2:...
var currentValidators []string
for val := range vm.current {
currentValidators = append(currentValidators, val)
}
sort.Strings(currentValidators)
for _, val := range currentValidators {
hasher.Write([]byte(val + ":"))
}

// active join requests & approvals
// joinerPubkey:power:expiresAt:approver1:approver2:...
var joiners []string
for val := range vm.candidates {
joiners = append(joiners, val)
}
sort.Strings(joiners)

for _, joiner := range joiners {
jr := vm.candidates[joiner]
jrStr := fmt.Sprintf("%s:%d:%d", joiner, jr.power, jr.expiresAt)
hasher.Write([]byte(jrStr))

var approvers []string
for val, approved := range jr.validators {
if approved {
approvers = append(approvers, val)
}
}
sort.Strings(approvers)
for _, approver := range approvers {
hasher.Write([]byte(":" + approver))
}
hasher.Write([]byte(";"))
}
return hasher.Sum(nil)
}

func (vm *ValidatorMgr) isCurrent(val []byte) bool {
_, have := vm.current[string(val)]
return have
Expand All @@ -90,7 +172,7 @@ func (vm *ValidatorMgr) candidate(val []byte) *joinReq {
return vm.candidates[string(val)]
}

func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
func NewValidatorMgr(ctx context.Context, datastore ValidatorDbSession, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
vm := &ValidatorMgr{
current: make(map[string]struct{}),
candidates: make(map[string]*joinReq),
Expand All @@ -101,6 +183,13 @@ func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...Validator
opt(vm)
}

vm.cm = &validatorDbCommittable{
Committable: sqlSessions.NewSqlCommitable(datastore,
sqlSessions.WithLogger(*vm.log.Named("validator-committable")),
),
validatorDbHash: vm.validatorDbHash,
}

var err error
vm.db, err = newValidatorStore(ctx, datastore, vm.log)
if err != nil {
Expand Down Expand Up @@ -282,8 +371,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator {
// Updates for approved (joining) validators.
for candidate, join := range vm.candidates {
if join.votes() < join.requiredVotes() {

if vm.lastBlockHeight >= join.expiresAt {
if isJoinExpired(join, vm.lastBlockHeight) {
// Join request expired
delete(vm.candidates, candidate)
if err := vm.db.DeleteJoinRequest(ctx, join.pubkey); err != nil {
Expand Down Expand Up @@ -331,3 +419,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator {
func (mgr *ValidatorMgr) UpdateBlockHeight(height int64) {
mgr.lastBlockHeight = height
}

func isJoinExpired(join *joinReq, blockHeight int64) bool {
return join.expiresAt != -1 && blockHeight >= join.expiresAt
}
70 changes: 67 additions & 3 deletions pkg/validators/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
CREATE TABLE IF NOT EXISTS validators (
pubkey BLOB PRIMARY KEY,
power INTEGER
) WITHOUT ROWID, STRICT;
) WITHOUT ROWID, STRICT;
CREATE TABLE IF NOT EXISTS join_reqs (
candidate BLOB PRIMARY KEY,
Expand Down Expand Up @@ -79,8 +79,69 @@ const (

sqlAddToJoinBoard = `INSERT INTO joins_board (candidate, validator, approval)
VALUES ($candidate, $validator, $approval)`

sqlInitVersionTable = `CREATE TABLE IF NOT EXISTS schema_version (
version INT NOT NULL
);` // Do we still need WITHOUT ROWID and STRICT? It's just a single row table

sqlSetVersion = "INSERT INTO schema_version (version) VALUES ($version);"

//sqlUpdateVersion = "UPDATE schema_version SET version = $version;"

sqlGetVersion = "SELECT version FROM schema_version;"

valStoreVersion = 1
)

// Migration actions.
const (
sqlAddJoinExpiry string = `ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER DEFAULT -1;`
)

// Schema version table.
func (vs *validatorStore) initSchemaVersion(ctx context.Context) error {
if err := vs.db.Execute(ctx, sqlInitVersionTable, nil); err != nil {
return fmt.Errorf("failed to initialize schema version table: %w", err)
}

err := vs.db.Execute(ctx, sqlSetVersion, map[string]any{
"$version": valStoreVersion,
})
if err != nil {
return fmt.Errorf("failed to set schema version: %w", err)
}
return nil
}

// func (vs *validatorStore) updateCurrentVersion(ctx context.Context, version int) error {
// err := vs.db.Execute(ctx, sqlUpdateVersion, map[string]any{
// "$version": version,
// })
// if err != nil {
// return fmt.Errorf("failed to update schema version: %w", err)
// }
// return nil
// }

func (vs *validatorStore) currentVersion(ctx context.Context) (int, error) {
results, err := vs.db.Query(ctx, sqlGetVersion, nil)
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
vi, ok := results[0]["version"]
if !ok {
return 0, errors.New("no version in schema_version record")
}
version, ok := vi.(int64)
if !ok {
return 0, fmt.Errorf("invalid version value (%T)", vi)
}
return int(version), nil
}

// -- CREATE TABLE IF NOT EXISTS validator_approvals (
// -- validator_id INTEGER REFERENCES validators (id) ON DELETE CASCADE,
// -- approval_id INTEGER REFERENCES approvals (id) ON DELETE CASCADE,
Expand All @@ -101,12 +162,15 @@ func (vs *validatorStore) initTables(ctx context.Context) error {
inits := getTableInits()

for _, init := range inits {
err := vs.db.Execute(ctx, init, nil)
if err != nil {
if err := vs.db.Execute(ctx, init, nil); err != nil {
return fmt.Errorf("failed to initialize tables: %w", err)
}
}

if err := vs.initSchemaVersion(ctx); err != nil {
return err
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/validators/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func newValidatorStore(ctx context.Context, datastore Datastore, log log.Logger)
log: log,
}

err := ar.initTables(ctx)
err := ar.initOrUpgradeDatabase(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize tables: %w", err)
return nil, fmt.Errorf("failed to initialize database at version %d due to error: %w", valStoreVersion, err)
}

// err = ar.prepareStatements()
Expand Down
Binary file added pkg/validators/test_data/version0.sqlite
Binary file not shown.
Binary file added pkg/validators/test_data/version1.sqlite
Binary file not shown.
Binary file added pkg/validators/test_data/version2.sqlite
Binary file not shown.
113 changes: 113 additions & 0 deletions pkg/validators/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package validators

import (
"context"
"fmt"

"go.uber.org/zap"
)

type upgradeAction int

const (
upgradeActionNone upgradeAction = iota
upgradeActionRunMigrations
)

func upgradeActionString(action upgradeAction) string {
switch action {
case upgradeActionNone:
return "none"
case upgradeActionRunMigrations:
return "run migrations"
default:
return "unknown"
}
}

// checkVersion checks the current version of the validator store and decides
// whether to run any db migrations.
func (vs *validatorStore) checkVersion(ctx context.Context) (int, upgradeAction, error) {
// Check if schema version exists
version, versionErr := vs.currentVersion(ctx)
// On error, infer that schema_version table doesn't exist (just assuming this
// since we'd need to query sqlite_master table to be certain that was the error)
if versionErr != nil {
// Check if validators db exists (again only infers and this isn't robust because it could fail for other reasons)
_, valErr := vs.currentValidators(ctx)
if valErr != nil {
// Fresh db, do regular initialization at valStoreVersion
return valStoreVersion, upgradeActionNone, nil
}
if valErr == nil {
// Legacy db without version tracking - version 0
return 0, upgradeActionRunMigrations, nil
}
}

if version == valStoreVersion {
// DB on the latest version
return version, upgradeActionNone, nil
}
if version < valStoreVersion {
// DB on previous version, Run DB migrations
return version, upgradeActionRunMigrations, nil
}

// Invalid DB version, return error
return version, upgradeActionNone, fmt.Errorf("validator store version %d is higher than the supported version %d", version, valStoreVersion)
}

// databaseUpgrade runs the database upgrade based on the current version.
func (vs *validatorStore) initOrUpgradeDatabase(ctx context.Context) error {
version, action, err := vs.checkVersion(ctx)
if err != nil {
return err
}

vs.log.Info("databaseUpgrade", zap.Int("version", version), zap.String("action", upgradeActionString(action)))

switch action {
case upgradeActionNone:
return vs.initTables(ctx)
case upgradeActionRunMigrations:
return vs.runMigrations(ctx, version)
default:
vs.log.Error("unknown upgrade action", zap.Int("action", int(action)))
return fmt.Errorf("unknown upgrade action: %d", action)
}
}

// runMigrations runs incremental db upgrades from current version to the latest version.
func (vs *validatorStore) runMigrations(ctx context.Context, version int) error {
switch version {
case 0:
if err := vs.upgradeValidatorsDB_0_1(ctx); err != nil {
return err
}
fallthrough
case valStoreVersion:
vs.log.Info("databaseUpgrade: completed successfully")
return nil
default:
vs.log.Error("unknown version", zap.Int("version", version))
return fmt.Errorf("unknown version: %d", version)
}
}

// upgradeValidatorsDB_0_1 upgrades the validators db from version 0 to 1.
// Version 0: join_reqs table: [candidate, power]
// Version 1: join_reqs table: [candidate, power, expiryAt]
// "ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER;"
func (vs *validatorStore) upgradeValidatorsDB_0_1(ctx context.Context) error {
vs.log.Info("Upgrading validators db from version 0 to 1")
// Add schema version table
if err := vs.initSchemaVersion(ctx); err != nil {
return err
}

if err := vs.db.Execute(ctx, sqlAddJoinExpiry, nil); err != nil {
return fmt.Errorf("failed to add expiresAt column to join_reqs table: %w", err)
}
return nil
}
Loading

0 comments on commit 101c0c8

Please sign in to comment.