Skip to content

Commit

Permalink
Decouple validatorDBHash from sql notions
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Sep 27, 2023
1 parent 5c5c786 commit c16edb4
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 141 deletions.
6 changes: 1 addition & 5 deletions internal/app/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,6 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
}
closer.addCloser(db.Close)

err = registerSQL(d.ctx, ac, db, "validator_db", d.log)
if err != nil {
failBuild(err, "failed to register validator db")
}

joinExpiry := d.cfg.GenesisConfig.ConsensusParams.Validator.JoinExpiry
v, err := vmgr.NewValidatorMgr(d.ctx, db,
vmgr.WithLogger(*d.log.Named("validatorStore")),
Expand All @@ -265,6 +260,7 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
failBuild(err, "failed to build validator store")
}

ac.Register(d.ctx, "validator_db", v.Committable())
return v
}

Expand Down
93 changes: 92 additions & 1 deletion pkg/validators/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package validators

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
"sort"

"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/sessions"
sqlSessions "github.com/kwilteam/kwil-db/pkg/sessions/sql-session"
"github.com/kwilteam/kwil-db/pkg/sql"
)

type joinReq struct {
Expand Down Expand Up @@ -62,6 +68,9 @@ type ValidatorMgr struct {
// opts
joinExpiry int64
log log.Logger

// session params
cm *validatorDbCommittable
}

// NOTE: The SQLite validator/approval store is local and transparent to the
Expand All @@ -81,6 +90,81 @@ type ValidatorStore interface {
AddValidator(ctx context.Context, joiner []byte, power int64) error
}

type ValidatorDbSession interface {
Datastore

ApplyChangeset(reader io.Reader) error
CreateSession() (sql.Session, error)
Savepoint() (sql.Savepoint, error)
CheckpointWal() error
EnableForeignKey() error
DisableForeignKey() error
}

type validatorDbCommittable struct {
sessions.Committable

validatorDbHash func() []byte
}

// ID overrides the base Committable. We do this so that we can have the
// persistence of the state be part of the 2pc process, but have the ID reflect
// the actual state free from SQL specifics.
func (c *validatorDbCommittable) ID(ctx context.Context) ([]byte, error) {
return c.validatorDbHash(), nil
}

func (vm *ValidatorMgr) Committable() sessions.Committable {
return vm.cm
}

/*
ValidatorDB state includes:
- current validators
- active join requests
- approvals
*/
func (vm *ValidatorMgr) validatorDbHash() []byte {
hasher := sha256.New()

// current validators val1:val2:...
var currentValidators []string
for val := range vm.current {
currentValidators = append(currentValidators, val)
}
sort.Strings(currentValidators)
for _, val := range currentValidators {
hasher.Write([]byte(val + ":"))
}

// active join requests & approvals
// joinerPubkey:power:expiresAt:approver1:approver2:...
var joiners []string
for val := range vm.candidates {
joiners = append(joiners, val)
}
sort.Strings(joiners)

for _, joiner := range joiners {
jr := vm.candidates[joiner]
jrStr := fmt.Sprintf("%s:%d:%d", joiner, jr.power, jr.expiresAt)
hasher.Write([]byte(jrStr))

var approvers []string
for val, approved := range jr.validators {
if approved {
approvers = append(approvers, val)
}
}
sort.Strings(approvers)
for _, approver := range approvers {
hasher.Write([]byte(":" + approver))
}
hasher.Write([]byte(";"))
}
return hasher.Sum(nil)
}

func (vm *ValidatorMgr) isCurrent(val []byte) bool {
_, have := vm.current[string(val)]
return have
Expand All @@ -90,7 +174,7 @@ func (vm *ValidatorMgr) candidate(val []byte) *joinReq {
return vm.candidates[string(val)]
}

func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
func NewValidatorMgr(ctx context.Context, datastore ValidatorDbSession, opts ...ValidatorMgrOpt) (*ValidatorMgr, error) {
vm := &ValidatorMgr{
current: make(map[string]struct{}),
candidates: make(map[string]*joinReq),
Expand All @@ -101,6 +185,13 @@ func NewValidatorMgr(ctx context.Context, datastore Datastore, opts ...Validator
opt(vm)
}

vm.cm = &validatorDbCommittable{
Committable: sqlSessions.NewSqlCommitable(datastore,
sqlSessions.WithLogger(*vm.log.Named("validator-committable")),
),
validatorDbHash: vm.validatorDbHash,
}

var err error
vm.db, err = newValidatorStore(ctx, datastore, vm.log)
if err != nil {
Expand Down
50 changes: 33 additions & 17 deletions pkg/validators/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
CREATE TABLE IF NOT EXISTS validators (
pubkey BLOB PRIMARY KEY,
power INTEGER
) WITHOUT ROWID, STRICT;
) WITHOUT ROWID, STRICT;
CREATE TABLE IF NOT EXISTS join_reqs (
candidate BLOB PRIMARY KEY,
Expand All @@ -37,11 +37,7 @@ const (
validator BLOB REFERENCES validators (pubkey) ON DELETE CASCADE,
approval INTEGER,
PRIMARY KEY (candidate, validator)
) WITHOUT ROWID, STRICT;
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL
);`
) WITHOUT ROWID, STRICT;`

// joins_board give us the board of validators (approvers) for a given join
// request which is needed to resume vote handling. The validators for a
Expand Down Expand Up @@ -84,23 +80,34 @@ const (
sqlAddToJoinBoard = `INSERT INTO joins_board (candidate, validator, approval)
VALUES ($candidate, $validator, $approval)`

sqlInitMetaTable = `
CREATE TABLE IF NOT EXISTS schema_version (
sqlInitVersionTable = `CREATE TABLE IF NOT EXISTS schema_version (
version INT NOT NULL
);`
);` // Do we still need WITHOUT ROWID and STRICT? It's just a single row table

sqlSetVersion = "INSERT INTO schema_version (version) VALUES ($version);"

sqlSetVersion = "INSERT INTO schema_version (version) VALUES ($version) ON CONFLICT DO UPDATE SET version = ($version);"
//sqlUpdateVersion = "UPDATE schema_version SET version = $version;"

sqlGetVersion = "SELECT version FROM schema_version;"

valStoreVersion = 1
)

// Migration actions.
const (
sqlUpgradeV0to1 string = `ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER DEFAULT -1;`
)

// Schema version table.
func (vs *validatorStore) initSchemaVersion(ctx context.Context) error {
if err := vs.db.Execute(ctx, sqlInitMetaTable, nil); err != nil {
func (vs *validatorStore) initSchemaVersion(ctx context.Context, version int) error {
if err := vs.db.Execute(ctx, sqlInitVersionTable, nil); err != nil {
return fmt.Errorf("failed to initialize schema version table: %w", err)
}

err := vs.setCurrentVersion(ctx, version)
if err != nil {
return err
}
return nil
}

Expand All @@ -114,6 +121,16 @@ func (vs *validatorStore) setCurrentVersion(ctx context.Context, version int) er
return nil
}

// func (vs *validatorStore) updateCurrentVersion(ctx context.Context, version int) error {
// err := vs.db.Execute(ctx, sqlUpdateVersion, map[string]any{
// "$version": version,
// })
// if err != nil {
// return fmt.Errorf("failed to update schema version: %w", err)
// }
// return nil
// }

func (vs *validatorStore) currentVersion(ctx context.Context) (int, error) {
results, err := vs.db.Query(ctx, sqlGetVersion, nil)
if err != nil {
Expand Down Expand Up @@ -149,20 +166,19 @@ func getTableInits() []string {
return inits[:len(inits)-1]
}

func (vs *validatorStore) initTables(ctx context.Context) error {
func (vs *validatorStore) initTables(ctx context.Context, version int) error {
inits := getTableInits()

for _, init := range inits {
err := vs.db.Execute(ctx, init, nil)
if err != nil {
if err := vs.db.Execute(ctx, init, nil); err != nil {
return fmt.Errorf("failed to initialize tables: %w", err)
}
}

err := vs.setCurrentVersion(ctx, valStoreVersion)
if err != nil {
if err := vs.initSchemaVersion(ctx, version); err != nil {
return err
}

return nil
}

Expand Down
17 changes: 4 additions & 13 deletions pkg/validators/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func (vs *validatorStore) initOrUpgradeDatabase(ctx context.Context) error {
return err
}

vs.log.Info("databaseUpgrade", zap.String("version", fmt.Sprintf("%d", version)), zap.String("action", upgradeActionString(action)), zap.Error(err))
vs.log.Info("databaseUpgrade", zap.Int("version", version), zap.String("action", upgradeActionString(action)), zap.Error(err))

switch action {
case upgradeActionNone:
return vs.initTables(ctx)
return vs.initTables(ctx, valStoreVersion)
case upgradeActionLegacy:
fallthrough
case upgradeActionRunMigrations:
Expand Down Expand Up @@ -118,21 +118,12 @@ Version 1: join_reqs table: [candidate, power, expiryAt]
func (vs *validatorStore) upgradeValidatorsDB_0_1(ctx context.Context) error {
vs.log.Info("Upgrading validators db from version 0 to 1")
// Add schema version table
if err := vs.initSchemaVersion(ctx); err != nil {
if err := vs.initSchemaVersion(ctx, 1); err != nil {
return err
}

// Set schema version to 1
if err := vs.setCurrentVersion(ctx, 1); err != nil {
return err
}

if err := vs.db.Execute(ctx, "ALTER TABLE join_reqs ADD COLUMN expiresAt INTEGER;", nil); err != nil {
if err := vs.db.Execute(ctx, sqlUpgradeV0to1, nil); err != nil {
return fmt.Errorf("failed to add expiresAt column to join_reqs table: %w", err)
}

if err := vs.db.Execute(ctx, "UPDATE join_reqs SET expiresAt = -1;", nil); err != nil {
return fmt.Errorf("failed to set indefinite join expiry for existing join requests: %w", err)
}
return nil
}
Loading

0 comments on commit c16edb4

Please sign in to comment.