Skip to content

Commit

Permalink
Validator schema upgrade
Browse files Browse the repository at this point in the history
Validator schema upgrade

update tests

Decouple validatorDBHash from sql notions

fix docs
  • Loading branch information
charithabandi committed Sep 27, 2023
1 parent a73a851 commit 67039da
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 14 deletions.
6 changes: 1 addition & 5 deletions internal/app/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
}
closer.addCloser(db.Close)

err = registerSQL(d.ctx, ac, db, "validator_db", d.log)
if err != nil {
failBuild(err, "failed to register validator db")
}

joinExpiry := d.cfg.GenesisConfig.ConsensusParams.Validator.JoinExpiry
v, err := vmgr.NewValidatorMgr(d.ctx, db,
vmgr.WithLogger(*d.log.Named("validatorStore")),
Expand All @@ -265,6 +260,7 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
failBuild(err, "failed to build validator store")
}

ac.Register(d.ctx, "validator_db", v.Committable())
return v
}

Expand Down
98 changes: 95 additions & 3 deletions pkg/validators/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package validators

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"sort"

"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/sessions"
sqlSessions "github.com/kwilteam/kwil-db/pkg/sessions/sql-session"
"github.com/kwilteam/kwil-db/pkg/sql"
)

type joinReq struct {
Expand Down Expand Up @@ -62,6 +68,9 @@ type ValidatorMgr struct {
// opts
joinExpiry int64
log log.Logger

// session params
cm *validatorDbCommittable
}

// NOTE: The SQLite validator/approval store is local and transparent to the
Expand All @@ -81,6 +90,79 @@ type ValidatorStore interface {
AddValidator(ctx context.Context, joiner []byte, power int64) error
}

type ValidatorDbSession interface {
Datastore

ApplyChangeset(reader io.Reader) error
CreateSession() (sql.Session, error)
Savepoint() (sql.Savepoint, error)
CheckpointWal() error
EnableForeignKey() error
DisableForeignKey() error
}

type validatorDbCommittable struct {
sessions.Committable

validatorDbHash func() []byte
}

// ID overrides the base Committable. We do this so that we can have the
// persistence of the state be part of the 2pc process, but have the ID reflect
// the actual state free from SQL specifics.
func (c *validatorDbCommittable) ID(ctx context.Context) ([]byte, error) {
return c.validatorDbHash(), nil
}

func (vm *ValidatorMgr) Committable() sessions.Committable {
return vm.cm
}

// ValidatorDB state includes:
// - current validators
// - active join requests
// - approvers for each join request
func (vm *ValidatorMgr) validatorDbHash() []byte {
hasher := sha256.New()

// current validators val1:val2:...
var currentValidators []string
for val := range vm.current {
currentValidators = append(currentValidators, val)
}
sort.Strings(currentValidators)
for _, val := range currentValidators {
hasher.Write([]byte(val + ":"))
}

// active join requests & approvals
// joinerPubkey:power:expiresAt:approver1:approver2:...
var joiners []string
for val := range vm.candidates {
joiners = append(joiners, val)
}
sort.Strings(joiners)

for _, joiner := range joiners {
jr := vm.candidates[joiner]
jrStr := fmt.Sprintf("%s:%d:%d", joiner, jr.power, jr.expiresAt)
hasher.Write([]byte(jrStr))

var approvers []string
for val, approved := range jr.validators {
if approved {
approvers = append(approvers, val)
}
}
sort.Strings(approvers)
for _, approver := range approvers {
hasher.Write([]byte(":" + approver))
}
hasher.Write([]byte(";"))
}
return hasher.Sum(nil)
}

func (vm *ValidatorMgr) isCurrent(val []byte) bool {
_, have := vm.current[string(val)]
return have
Expand All @@ -90,7 +172,7 @@ func (vm *ValidatorMgr) candidate(val []byte) *joinReq {
return vm.candidates[string(val)]
}

func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
func NewValidatorMgr(ctx context.Context, datastore ValidatorDbSession, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
vm := &ValidatorMgr{
current: make(map[string]struct{}),
candidates: make(map[string]*joinReq),
Expand All @@ -101,6 +183,13 @@ func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...Validator
opt(vm)
}

vm.cm = &validatorDbCommittable{
Committable: sqlSessions.NewSqlCommitable(datastore,
sqlSessions.WithLogger(*vm.log.Named("validator-committable")),
),
validatorDbHash: vm.validatorDbHash,
}

var err error
vm.db, err = newValidatorStore(ctx, datastore, vm.log)
if err != nil {
Expand Down Expand Up @@ -282,8 +371,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator {
// Updates for approved (joining) validators.
for candidate, join := range vm.candidates {
if join.votes() < join.requiredVotes() {

if vm.lastBlockHeight >= join.expiresAt {
if isJoinExpired(join, vm.lastBlockHeight) {
// Join request expired
delete(vm.candidates, candidate)
if err := vm.db.DeleteJoinRequest(ctx, join.pubkey); err != nil {
Expand Down Expand Up @@ -331,3 +419,7 @@ func (vm *ValidatorMgr) Finalize(ctx context.Context) []*Validator {
func (mgr *ValidatorMgr) UpdateBlockHeight(height int64) {
mgr.lastBlockHeight = height
}

func isJoinExpired(join *joinReq, blockHeight int64) bool {
return join.expiresAt != -1 && blockHeight >= join.expiresAt
}
72 changes: 68 additions & 4 deletions pkg/validators/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
CREATE TABLE IF NOT EXISTS validators (
pubkey BLOB PRIMARY KEY,
power INTEGER
) WITHOUT ROWID, STRICT;
) WITHOUT ROWID, STRICT;
CREATE TABLE IF NOT EXISTS join_reqs (
candidate BLOB PRIMARY KEY,
Expand Down Expand Up @@ -79,8 +79,69 @@ const (

sqlAddToJoinBoard = `INSERT INTO joins_board (candidate, validator, approval)
VALUES ($candidate, $validator, $approval)`

sqlInitVersionTable = `CREATE TABLE IF NOT EXISTS schema_version (
version INT NOT NULL
);` // Do we still need WITHOUT ROWID and STRICT? It's just a single row table

sqlSetVersion = "INSERT INTO schema_version (version) VALUES ($version);"

//sqlUpdateVersion = "UPDATE schema_version SET version = $version;"

sqlGetVersion = "SELECT version FROM schema_version;"

valStoreVersion = 1
)

// Migration actions.
const (
sqlUpgradeV0to1 string = `ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER DEFAULT -1;`
)

// Schema version table.
func (vs *validatorStore) initSchemaVersion(ctx context.Context, version int) error {
if err := vs.db.Execute(ctx, sqlInitVersionTable, nil); err != nil {
return fmt.Errorf("failed to initialize schema version table: %w", err)
}

err := vs.db.Execute(ctx, sqlSetVersion, map[string]any{
"$version": version,
})
if err != nil {
return fmt.Errorf("failed to set schema version: %w", err)
}
return nil
}

// func (vs *validatorStore) updateCurrentVersion(ctx context.Context, version int) error {
// err := vs.db.Execute(ctx, sqlUpdateVersion, map[string]any{
// "$version": version,
// })
// if err != nil {
// return fmt.Errorf("failed to update schema version: %w", err)
// }
// return nil
// }

func (vs *validatorStore) currentVersion(ctx context.Context) (int, error) {
results, err := vs.db.Query(ctx, sqlGetVersion, nil)
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
vi, ok := results[0]["version"]
if !ok {
return 0, errors.New("no version in schema_version record")
}
version, ok := vi.(int64)
if !ok {
return 0, fmt.Errorf("invalid version value (%T)", vi)
}
return int(version), nil
}

// -- CREATE TABLE IF NOT EXISTS validator_approvals (
// -- validator_id INTEGER REFERENCES validators (id) ON DELETE CASCADE,
// -- approval_id INTEGER REFERENCES approvals (id) ON DELETE CASCADE,
Expand All @@ -97,16 +158,19 @@ func getTableInits() []string {
return inits[:len(inits)-1]
}

func (vs *validatorStore) initTables(ctx context.Context) error {
func (vs *validatorStore) initTables(ctx context.Context, version int) error {
inits := getTableInits()

for _, init := range inits {
err := vs.db.Execute(ctx, init, nil)
if err != nil {
if err := vs.db.Execute(ctx, init, nil); err != nil {
return fmt.Errorf("failed to initialize tables: %w", err)
}
}

if err := vs.initSchemaVersion(ctx, version); err != nil {
return err
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/validators/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func newValidatorStore(ctx context.Context, datastore Datastore, log log.Logger)
log: log,
}

err := ar.initTables(ctx)
err := ar.initOrUpgradeDatabase(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize tables: %w", err)
return nil, fmt.Errorf("failed to initialize database at version %d due to error: %w", valStoreVersion, err)
}

// err = ar.prepareStatements()
Expand Down
Binary file added pkg/validators/test_data/version0.sqlite
Binary file not shown.
Binary file added pkg/validators/test_data/version1.sqlite
Binary file not shown.
Binary file added pkg/validators/test_data/version2.sqlite
Binary file not shown.
Loading

0 comments on commit 67039da

Please sign in to comment.