Skip to content

Commit

Permalink
This commit refactors the database engine to include a number of new …
Browse files Browse the repository at this point in the history
…features and improvements.

Among these are:
- Proper handling of transactionality for database deployments and drops
- Simpler generation of app hashes
- Simpler code for determining action logic, making future improvements easier
- Replacing the 2pc protocol with a much simpler system based on idempotency
- Better concurrency support for databases
- Better execution lifetime handling for long running queries (to prevent writer starvation)
- An overall simplification of the entire Kwil system, reducing on net ~5000 lines of code.
- And other improvements
  • Loading branch information
brennanjl committed Nov 16, 2023
1 parent 8f925eb commit 37808e3
Show file tree
Hide file tree
Showing 184 changed files with 8,641 additions and 13,564 deletions.
127 changes: 75 additions & 52 deletions cmd/kwild/server/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
"github.com/kwilteam/kwil-db/internal/abci/cometbft"
"github.com/kwilteam/kwil-db/internal/abci/snapshots"
"github.com/kwilteam/kwil-db/internal/accounts"
"github.com/kwilteam/kwil-db/internal/ident"
"github.com/kwilteam/kwil-db/internal/kv/atomic"
"github.com/kwilteam/kwil-db/internal/engine/execution"
"github.com/kwilteam/kwil-db/internal/kv/badger"
"github.com/kwilteam/kwil-db/internal/modules/datasets"
"github.com/kwilteam/kwil-db/internal/modules/validators"
Expand All @@ -30,16 +29,16 @@ import (
healthcheck "github.com/kwilteam/kwil-db/internal/services/health"
simple_checker "github.com/kwilteam/kwil-db/internal/services/health/simple-checker"
"github.com/kwilteam/kwil-db/internal/sessions"
sqlSessions "github.com/kwilteam/kwil-db/internal/sessions/sql-session"
"github.com/kwilteam/kwil-db/internal/sessions/wal"
"github.com/kwilteam/kwil-db/internal/sessions/committable"
"github.com/kwilteam/kwil-db/internal/sql/adapter"
"github.com/kwilteam/kwil-db/internal/sql/registry"
"github.com/kwilteam/kwil-db/internal/sql/sqlite"
vmgr "github.com/kwilteam/kwil-db/internal/validators"

"github.com/kwilteam/kwil-db/core/log"
admpb "github.com/kwilteam/kwil-db/core/rpc/protobuf/admin/v0"
txpb "github.com/kwilteam/kwil-db/core/rpc/protobuf/tx/v1"
"github.com/kwilteam/kwil-db/core/rpc/transport"
"github.com/kwilteam/kwil-db/internal/engine"
"github.com/kwilteam/kwil-db/internal/sql"

abciTypes "github.com/cometbft/cometbft/abci/types"
cmtEd "github.com/cometbft/cometbft/crypto/ed25519"
Expand All @@ -52,12 +51,10 @@ import (

func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// atomic committer
ac := buildAtomicCommitter(d, closers)
closers.addCloser(ac.Close)
ac := buildCommitter(d, closers)

// engine
e := buildEngine(d, ac)
closers.addCloser(e.Close)
e := buildEngine(d, closers, ac)

// account store
accs := buildAccountRepository(d, closers, ac)
Expand Down Expand Up @@ -102,14 +99,21 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
}
}

// db / committable names, to prevent breaking changes
const (
accountsDBName = "accounts"
validatorDBName = "validators"
engineName = "engine"
)

// coreDependencies holds dependencies that are widely used
type coreDependencies struct {
ctx context.Context
cfg *config.KwildConfig
genesisCfg *config.GenesisConfig
privKey cmtEd.PrivKey
log log.Logger
opener sql.Opener
opener func(ctx context.Context, dbName string, persistentReaders, maximumReaders int, create bool) (*sqlite.Pool, error)
keypair *tls.Certificate
}

Expand All @@ -134,7 +138,7 @@ func (c *closeFuncs) closeAll() error {
}

func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.AccountsModule, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
atomicCommitter *sessions.AtomicCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
committer *sessions.MultiCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
badgerPath := filepath.Join(d.cfg.RootDir, abciDirName, config.ABCIInfoSubDirName)
badgerKv, err := badger.NewBadgerDB(d.ctx, badgerPath, &badger.Options{
GuaranteeFSync: true,
Expand All @@ -146,11 +150,6 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.Acco
}
closer.addCloser(badgerKv.Close)

atomicKv, err := atomic.NewAtomicKV(badgerKv)
if err != nil {
failBuild(err, "failed to open atomic kv")
}

var sh abci.SnapshotModule
if snapshotter != nil {
sh = snapshotter
Expand All @@ -165,8 +164,8 @@ func buildAbci(d *coreDependencies, closer *closeFuncs, accountsModule abci.Acco
accountsModule,
datasetsModule,
validatorModule,
atomicKv,
atomicCommitter,
badgerKv,
committer,
sh,
bootstrapper,
abci.WithLogger(*d.log.Named("abci")),
Expand Down Expand Up @@ -198,7 +197,7 @@ func buildDatasetsModule(d *coreDependencies, eng datasets.Engine, accs datasets
)
}

func buildEngine(d *coreDependencies, a *sessions.AtomicCommitter) *engine.Engine {
func buildEngine(d *coreDependencies, closer *closeFuncs, a *sessions.MultiCommitter) *execution.GlobalContext {
extensions, err := getExtensions(d.ctx, d.cfg.AppCfg.ExtensionEndpoints)
if err != nil {
failBuild(err, "failed to get extensions")
Expand All @@ -208,35 +207,45 @@ func buildEngine(d *coreDependencies, a *sessions.AtomicCommitter) *engine.Engin
d.log.Debug("registered extension", zap.String("name", ext.Name()))
}

sqlCommitRegister := &sqlCommittableRegister{
committer: a,
log: *d.log.Named("sqlite-committable"),
reg, err := registry.NewRegistry(d.ctx, func(ctx context.Context, dbid string, create bool) (registry.Pool, error) {
return sqlite.NewPool(ctx, dbid, 1, 2, true)
}, d.cfg.AppCfg.SqliteFilePath, registry.WithReaderWaitTimeout(time.Millisecond*100), registry.WithLogger(
*d.log.Named("registry"),
))
if err != nil {
failBuild(err, "failed to build registry")
}

e, err := engine.Open(d.ctx, d.opener,
sqlCommitRegister,
ident.Address,
engine.WithLogger(*d.log.Named("engine")),
engine.WithExtensions(adaptExtensions(extensions)),
)
eng, err := execution.NewGlobalContext(d.ctx, reg, adaptExtensions(extensions))
if err != nil {
failBuild(err, "failed to build engine")
}
err = a.Register(engineName, reg)
if err != nil {
failBuild(err, "failed to open engine")
failBuild(err, "failed to register engine")
}

// Register masterDB committable
a.Register(d.ctx, "master_db", e.Committable())
return e
closer.addCloser(reg.Close)

return eng
}

func buildAccountRepository(d *coreDependencies, closer *closeFuncs, ac *sessions.AtomicCommitter) *accounts.AccountStore {
db, err := d.opener.Open("accounts_db", *d.log.Named("account-store"))
func buildAccountRepository(d *coreDependencies, closer *closeFuncs, ac *sessions.MultiCommitter) *accounts.AccountStore {

db, err := d.opener(d.ctx, filepath.Join(d.cfg.RootDir, applicationDirName, accountsDBName), 1, 2, true)
if err != nil {
failBuild(err, "failed to open accounts db")
}
closer.addCloser(db.Close)

adapted := adapter.PoolAdapater{Pool: db}

com := committable.New(adapted)

genCfg := d.genesisCfg
b, err := accounts.NewAccountStore(d.ctx, db,
b, err := accounts.NewAccountStore(d.ctx,
&adapted,
com,
accounts.WithLogger(*d.log.Named("accountStore")),
accounts.WithNonces(!genCfg.ConsensusParams.WithoutNonces),
accounts.WithGasCosts(!genCfg.ConsensusParams.WithoutGasCosts),
Expand All @@ -246,14 +255,16 @@ func buildAccountRepository(d *coreDependencies, closer *closeFuncs, ac *session
}
closer.addCloser(b.Close)

committable := sqlSessions.NewSqlCommittable(db, sqlSessions.WithLogger(*d.log.Named("accounts-committable")))
ac.Register(d.ctx, "accounts_db", b.WrapCommittable(committable))
err = ac.Register(accountsDBName, com)
if err != nil {
failBuild(err, "failed to register account store")
}

return b
}

func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions.AtomicCommitter) *vmgr.ValidatorMgr {
db, err := d.opener.Open("validator_db", *d.log.Named("validator-store"))
func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions.MultiCommitter) *vmgr.ValidatorMgr {
db, err := d.opener(d.ctx, filepath.Join(d.cfg.RootDir, applicationDirName, validatorDBName), 1, 2, true)
if err != nil {
failBuild(err, "failed to open validator db")
}
Expand All @@ -265,7 +276,13 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
feeMultiplier = 0
}

v, err := vmgr.NewValidatorMgr(d.ctx, db,
adapted := adapter.PoolAdapater{Pool: db}

com := committable.New(adapted)

v, err := vmgr.NewValidatorMgr(d.ctx,
&adapted,
com,
vmgr.WithLogger(*d.log.Named("validatorStore")),
vmgr.WithJoinExpiry(joinExpiry),
vmgr.WithFeeMultiplier(int64(feeMultiplier)),
Expand All @@ -274,8 +291,11 @@ func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions
failBuild(err, "failed to build validator store")
}

committable := sqlSessions.NewSqlCommittable(db, sqlSessions.WithLogger(*d.log.Named("validator-committable")))
ac.Register(d.ctx, "validator_db", v.WrapCommittable(committable))
err = ac.Register(validatorDBName, com)
if err != nil {
failBuild(err, "failed to register validator store")
}

return v
}

Expand Down Expand Up @@ -478,19 +498,22 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A
return node
}

func buildAtomicCommitter(d *coreDependencies, closers *closeFuncs) *sessions.AtomicCommitter {
twoPCWal, err := wal.OpenWal(filepath.Join(d.cfg.RootDir, applicationDirName, "wal"))
func buildCommitter(d *coreDependencies, closers *closeFuncs) *sessions.MultiCommitter {
kv, err := badger.NewBadgerDB(d.ctx, filepath.Join(d.cfg.RootDir, applicationDirName, "committer"),
&badger.Options{
GuaranteeFSync: true,
GarbageCollectionInterval: 5 * time.Minute,
Logger: *d.log.Named("atomic-committer-kv-store"),
GarbageCollectionDiscardRatio: 0.5,
},
)
if err != nil {
failBuild(err, "failed to open 2pc wal")
failBuild(err, "failed to open atomic committer kv")
}
closers.addCloser(twoPCWal.Close)

// we are actually registering all committables ad-hoc, so we can pass nil here
s := sessions.NewAtomicCommitter(d.ctx, twoPCWal, sessions.WithLogger(*d.log.Named("atomic-committer")))
// we need atomic committer to close before 2pc wal
closers.addCloser(s.Close)
closers.addCloser(kv.Close)

return s
return sessions.NewCommitter(kv, make(map[string]sessions.Committable), sessions.WithLogger(*d.log.Named("atomic-committer")))
}

func failBuild(err error, msg string) {
Expand Down
3 changes: 2 additions & 1 deletion cmd/kwild/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/kwilteam/kwil-db/internal/abci/cometbft" // internalize
gateway "github.com/kwilteam/kwil-db/internal/services/grpc_gateway"
grpc "github.com/kwilteam/kwil-db/internal/services/grpc_server"
"github.com/kwilteam/kwil-db/internal/sql/sqlite"

// internalize
"go.uber.org/zap"
Expand Down Expand Up @@ -101,7 +102,7 @@ func New(ctx context.Context, cfg *config.KwildConfig, genesisCfg *config.Genesi
genesisCfg: genesisCfg,
privKey: ed25519.PrivKey(nodeKey.Bytes()),
log: logger,
opener: newSqliteOpener(dbDir),
opener: sqlite.NewPool,
keypair: keyPair,
}

Expand Down
78 changes: 25 additions & 53 deletions cmd/kwild/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@ import (
"math"
"strings"

"github.com/kwilteam/kwil-db/core/log"
types "github.com/kwilteam/kwil-db/core/types/admin"
extActions "github.com/kwilteam/kwil-db/extensions/actions"
"github.com/kwilteam/kwil-db/internal/abci"
"github.com/kwilteam/kwil-db/internal/abci/cometbft/privval"
"github.com/kwilteam/kwil-db/internal/engine"
"github.com/kwilteam/kwil-db/internal/engine/execution"
"github.com/kwilteam/kwil-db/internal/extensions"
"github.com/kwilteam/kwil-db/internal/kv"
"github.com/kwilteam/kwil-db/internal/sessions"
sqlSessions "github.com/kwilteam/kwil-db/internal/sessions/sql-session"
"github.com/kwilteam/kwil-db/internal/sql"
"github.com/kwilteam/kwil-db/internal/sql/client"

abciTypes "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/p2p"
Expand Down Expand Up @@ -58,40 +53,43 @@ func getExtensions(ctx context.Context, urls []string) (map[string]extActions.Ex
return exts, nil
}

func adaptExtensions(exts map[string]extActions.Extension) map[string]engine.ExtensionInitializer {
adapted := make(map[string]engine.ExtensionInitializer, len(exts))
func adaptExtensions(exts map[string]extActions.Extension) map[string]execution.NamespaceInitializer {
adapted := make(map[string]execution.NamespaceInitializer, len(exts))

for name, ext := range exts {
initializer := &extensions.ExtensionInitializer{
Extension: ext,
}
adapted[name] = extensionInitializeFunc(initializer.CreateInstance)
}
adapted[name] = func(ctx context.Context, metadata map[string]string) (execution.Namespace, error) {
// external extensions expect string as "string", however the engine now passes literals as "'string'"
trimmedMap := make(map[string]string, len(metadata))
for k, v := range metadata {
trimmedMap[k] = strings.Trim(v, "'")
}

return adapted
}
ext, err := initializer.CreateInstance(ctx, trimmedMap)
if err != nil {
return nil, err
}

type extensionInitializeFunc func(ctx context.Context, metadata map[string]string) (*extensions.Instance, error)
return &extensionAdapter{
ext: ext,
}, nil
}
}

func (e extensionInitializeFunc) CreateInstance(ctx context.Context, metadata map[string]string) (engine.ExtensionInstance, error) {
return e(ctx, metadata)
return adapted
}

type sqliteOpener struct {
sqliteFilePath string
// extensionAdapater allows an extension to be used as an engine namespace.
type extensionAdapter struct {
ext *extensions.Instance
}

func newSqliteOpener(sqliteFilePath string) *sqliteOpener {
return &sqliteOpener{
sqliteFilePath: sqliteFilePath,
}
}
func (e *extensionAdapter) Call(scoper execution.Scoper, method string, inputs []any) ([]any, error) {
ctx := scoper.NewScope().Ctx()

func (s *sqliteOpener) Open(fileName string, logger log.Logger) (sql.Database, error) {
return client.NewSqliteStore(fileName,
client.WithLogger(logger),
client.WithPath(s.sqliteFilePath),
)
return e.ext.Execute(ctx, method, inputs...)
}

// wrappedCometBFTClient satisfies the generic txsvc.BlockchainBroadcaster and
Expand Down Expand Up @@ -251,29 +249,3 @@ func (a *atomicReadWriter) Read() ([]byte, error) {
func (a *atomicReadWriter) Write(val []byte) error {
return a.kv.Set(a.key, val)
}

// sqlCommittableRegister allows dynamic registration of SQL committables
// it implements engine.CommitRegister
type sqlCommittableRegister struct {
committer *sessions.AtomicCommitter
log log.Logger
}

var _ engine.CommitRegister = (*sqlCommittableRegister)(nil)

func (s *sqlCommittableRegister) Register(ctx context.Context, name string, db sql.Database) error {
return registerSQL(ctx, s.committer, db, name, s.log)
}

func (s *sqlCommittableRegister) Unregister(ctx context.Context, name string) error {
return s.committer.Unregister(ctx, name)
}

// registerSQL is a helper function to register a SQL committable to the atomic committer.
func registerSQL(ctx context.Context, ac *sessions.AtomicCommitter, db sql.Database, name string, logger log.Logger) error {
return ac.Register(ctx, name,
sqlSessions.NewSqlCommittable(db,
sqlSessions.WithLogger(*logger.Named(name + "-committable")),
),
)
}
Loading

0 comments on commit 37808e3

Please sign in to comment.