Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validators DB Upgrades #318

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions internal/app/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
}
closer.addCloser(db.Close)

err = registerSQL(d.ctx, ac, db, "validator_db", d.log)
if err != nil {
failBuild(err, "failed to register validator db")
}

joinExpiry := d.cfg.GenesisConfig.ConsensusParams.Validator.JoinExpiry
v, err := vmgr.NewValidatorMgr(d.ctx, db,
vmgr.WithLogger(*d.log.Named("validatorStore")),
Expand All @@ -265,6 +260,7 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
failBuild(err, "failed to build validator store")
}

ac.Register(d.ctx, "validator_db", v.Committable())
return v
}

Expand Down
98 changes: 95 additions & 3 deletions pkg/validators/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package validators

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"sort"

"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/sessions"
sqlSessions "github.com/kwilteam/kwil-db/pkg/sessions/sql-session"
"github.com/kwilteam/kwil-db/pkg/sql"
)

type joinReq struct {
Expand Down Expand Up @@ -62,6 +68,9 @@ type ValidatorMgr struct {
// opts
joinExpiry int64
log log.Logger

// session params
cm *validatorDbCommittable
}

// NOTE: The SQLite validator/approval store is local and transparent to the
Expand All @@ -81,6 +90,79 @@ type ValidatorStore interface {
AddValidator(ctx context.Context, joiner []byte, power int64) error
}

type ValidatorDbSession interface {
Datastore

ApplyChangeset(reader io.Reader) error
CreateSession() (sql.Session, error)
Savepoint() (sql.Savepoint, error)
CheckpointWal() error
EnableForeignKey() error
DisableForeignKey() error
}

type validatorDbCommittable struct {
sessions.Committable

validatorDbHash func() []byte
}

// ID overrides the base Committable. We do this so that we can have the
// persistence of the state be part of the 2pc process, but have the ID reflect
// the actual state free from SQL specifics.
func (c *validatorDbCommittable) ID(ctx context.Context) ([]byte, error) {
return c.validatorDbHash(), nil
}

func (vm *ValidatorMgr) Committable() sessions.Committable {
return vm.cm
}

// ValidatorDB state includes:
// - current validators
// - active join requests
// - approvers for each join request
func (vm *ValidatorMgr) validatorDbHash() []byte {
hasher := sha256.New()

// current validators val1:val2:...
var currentValidators []string
for val := range vm.current {
currentValidators = append(currentValidators, val)
}
sort.Strings(currentValidators)
for _, val := range currentValidators {
hasher.Write([]byte(val + ":"))
}

// active join requests & approvals
// joinerPubkey:power:expiresAt:approver1:approver2:...
var joiners []string
for val := range vm.candidates {
joiners = append(joiners, val)
}
sort.Strings(joiners)

for _, joiner := range joiners {
jr := vm.candidates[joiner]
jrStr := fmt.Sprintf("%s:%d:%d", joiner, jr.power, jr.expiresAt)
hasher.Write([]byte(jrStr))

var approvers []string
for val, approved := range jr.validators {
if approved {
approvers = append(approvers, val)
}
}
sort.Strings(approvers)
for _, approver := range approvers {
hasher.Write([]byte(":" + approver))
Comment on lines +148 to +159
Copy link
Member

@jchappelow jchappelow Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No major issue with the string based approach for digesting all the data, but I'll look at this tonight (can be after this merges) and I feel like we might be safer with an all binary approach like with the genesisConfig appHash generation. The fmt package tends to be oriented toward display and often has subtle changes. encoding/binary tends to be more well defined and stable.

}
hasher.Write([]byte(";"))
}
return hasher.Sum(nil)
}

func (vm *ValidatorMgr) isCurrent(val []byte) bool {
_, have := vm.current[string(val)]
return have
Expand All @@ -90,7 +172,7 @@ func (vm *ValidatorMgr) candidate(val []byte) *joinReq {
return vm.candidates[string(val)]
}

func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
func NewValidatorMgr(ctx context.Context, datastore ValidatorDbSession, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
vm := &ValidatorMgr{
current: make(map[string]struct{}),
candidates: make(map[string]*joinReq),
Expand All @@ -101,6 +183,13 @@ func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...Validator
opt(vm)
}

vm.cm = &validatorDbCommittable{
Committable: sqlSessions.NewSqlCommitable(datastore,
sqlSessions.WithLogger(*vm.log.Named("validator-committable")),
),
validatorDbHash: vm.validatorDbHash,
}

var err error
vm.db, err = newValidatorStore(ctx, datastore, vm.log)
if err != nil {
Expand Down Expand Up @@ -282,8 +371,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator {
// Updates for approved (joining) validators.
for candidate, join := range vm.candidates {
if join.votes() < join.requiredVotes() {

if vm.lastBlockHeight >= join.expiresAt {
if isJoinExpired(join, vm.lastBlockHeight) {
// Join request expired
delete(vm.candidates, candidate)
if err := vm.db.DeleteJoinRequest(ctx, join.pubkey); err != nil {
Expand Down Expand Up @@ -331,3 +419,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator {
func (mgr *ValidatorMgr) UpdateBlockHeight(height int64) {
mgr.lastBlockHeight = height
}

func isJoinExpired(join *joinReq, blockHeight int64) bool {
return join.expiresAt != -1 && blockHeight >= join.expiresAt
}
70 changes: 67 additions & 3 deletions pkg/validators/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
CREATE TABLE IF NOT EXISTS validators (
pubkey BLOB PRIMARY KEY,
power INTEGER
) WITHOUT ROWID, STRICT;
) WITHOUT ROWID, STRICT;

CREATE TABLE IF NOT EXISTS join_reqs (
candidate BLOB PRIMARY KEY,
Expand Down Expand Up @@ -79,8 +79,69 @@ const (

sqlAddToJoinBoard = `INSERT INTO joins_board (candidate, validator, approval)
VALUES ($candidate, $validator, $approval)`

sqlInitVersionTable = `CREATE TABLE IF NOT EXISTS schema_version (
version INT NOT NULL
);` // Do we still need WITHOUT ROWID and STRICT? It's just a single row table
jchappelow marked this conversation as resolved.
Show resolved Hide resolved

sqlSetVersion = "INSERT INTO schema_version (version) VALUES ($version);"

//sqlUpdateVersion = "UPDATE schema_version SET version = $version;"

sqlGetVersion = "SELECT version FROM schema_version;"

valStoreVersion = 1
)

// Migration actions.
const (
sqlAddJoinExpiry string = `ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER DEFAULT -1;`
)

// Schema version table.
func (vs *validatorStore) initSchemaVersion(ctx context.Context) error {
if err := vs.db.Execute(ctx, sqlInitVersionTable, nil); err != nil {
return fmt.Errorf("failed to initialize schema version table: %w", err)
}

err := vs.db.Execute(ctx, sqlSetVersion, map[string]any{
"$version": valStoreVersion,
})
if err != nil {
return fmt.Errorf("failed to set schema version: %w", err)
}
return nil
}

// func (vs *validatorStore) updateCurrentVersion(ctx context.Context, version int) error {
// err := vs.db.Execute(ctx, sqlUpdateVersion, map[string]any{
// "$version": version,
// })
// if err != nil {
// return fmt.Errorf("failed to update schema version: %w", err)
// }
// return nil
// }

func (vs *validatorStore) currentVersion(ctx context.Context) (int, error) {
results, err := vs.db.Query(ctx, sqlGetVersion, nil)
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
vi, ok := results[0]["version"]
if !ok {
return 0, errors.New("no version in schema_version record")
}
version, ok := vi.(int64)
if !ok {
return 0, fmt.Errorf("invalid version value (%T)", vi)
}
return int(version), nil
}

// -- CREATE TABLE IF NOT EXISTS validator_approvals (
// -- validator_id INTEGER REFERENCES validators (id) ON DELETE CASCADE,
// -- approval_id INTEGER REFERENCES approvals (id) ON DELETE CASCADE,
Expand All @@ -101,12 +162,15 @@ func (vs *validatorStore) initTables(ctx context.Context) error {
inits := getTableInits()

for _, init := range inits {
err := vs.db.Execute(ctx, init, nil)
if err != nil {
if err := vs.db.Execute(ctx, init, nil); err != nil {
return fmt.Errorf("failed to initialize tables: %w", err)
}
}

if err := vs.initSchemaVersion(ctx); err != nil {
return err
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/validators/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func newValidatorStore(ctx context.Context, datastore Datastore, log log.Logger)
log: log,
}

err := ar.initTables(ctx)
err := ar.initOrUpgradeDatabase(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize tables: %w", err)
return nil, fmt.Errorf("failed to initialize database at version %d due to error: %w", valStoreVersion, err)
}

// err = ar.prepareStatements()
Expand Down
Binary file added pkg/validators/test_data/version0.sqlite
Binary file not shown.
Binary file added pkg/validators/test_data/version1.sqlite
Binary file not shown.
Binary file added pkg/validators/test_data/version2.sqlite
Binary file not shown.
113 changes: 113 additions & 0 deletions pkg/validators/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package validators

import (
"context"
"fmt"

"go.uber.org/zap"
)

type upgradeAction int

const (
upgradeActionNone upgradeAction = iota
upgradeActionRunMigrations
)

func upgradeActionString(action upgradeAction) string {
switch action {
case upgradeActionNone:
return "none"
case upgradeActionRunMigrations:
return "run migrations"
default:
return "unknown"
}
}

// checkVersion checks the current version of the validator store and decides
// whether to run any db migrations.
func (vs *validatorStore) checkVersion(ctx context.Context) (int, upgradeAction, error) {
// Check if schema version exists
version, versionErr := vs.currentVersion(ctx)
// On error, infer that schema_version table doesn't exist (just assuming this
// since we'd need to query sqlite_master table to be certain that was the error)
if versionErr != nil {
// Check if validators db exists (again only infers and this isn't robust because it could fail for other reasons)
_, valErr := vs.currentValidators(ctx)
if valErr != nil {
// Fresh db, do regular initialization at valStoreVersion
return valStoreVersion, upgradeActionNone, nil
}
if valErr == nil {
// Legacy db without version tracking - version 0
return 0, upgradeActionRunMigrations, nil
}
}

if version == valStoreVersion {
// DB on the latest version
return version, upgradeActionNone, nil
}
if version < valStoreVersion {
// DB on previous version, Run DB migrations
return version, upgradeActionRunMigrations, nil
}

// Invalid DB version, return error
return version, upgradeActionNone, fmt.Errorf("validator store version %d is higher than the supported version %d", version, valStoreVersion)
}

// databaseUpgrade runs the database upgrade based on the current version.
func (vs *validatorStore) initOrUpgradeDatabase(ctx context.Context) error {
version, action, err := vs.checkVersion(ctx)
if err != nil {
return err
}

vs.log.Info("databaseUpgrade", zap.Int("version", version), zap.String("action", upgradeActionString(action)))

switch action {
case upgradeActionNone:
return vs.initTables(ctx)
case upgradeActionRunMigrations:
return vs.runMigrations(ctx, version)
default:
vs.log.Error("unknown upgrade action", zap.Int("action", int(action)))
return fmt.Errorf("unknown upgrade action: %d", action)
}
}

// runMigrations runs incremental db upgrades from current version to the latest version.
func (vs *validatorStore) runMigrations(ctx context.Context, version int) error {
switch version {
case 0:
if err := vs.upgradeValidatorsDB_0_1(ctx); err != nil {
return err
}
fallthrough
case valStoreVersion:
vs.log.Info("databaseUpgrade: completed successfully")
return nil
default:
vs.log.Error("unknown version", zap.Int("version", version))
return fmt.Errorf("unknown version: %d", version)
}
}

// upgradeValidatorsDB_0_1 upgrades the validators db from version 0 to 1.
// Version 0: join_reqs table: [candidate, power]
// Version 1: join_reqs table: [candidate, power, expiryAt]
// "ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER;"
func (vs *validatorStore) upgradeValidatorsDB_0_1(ctx context.Context) error {
vs.log.Info("Upgrading validators db from version 0 to 1")
// Add schema version table
if err := vs.initSchemaVersion(ctx); err != nil {
return err
}

if err := vs.db.Execute(ctx, sqlAddJoinExpiry, nil); err != nil {
return fmt.Errorf("failed to add expiresAt column to join_reqs table: %w", err)
}
return nil
}
Loading