Skip to content

Commit

Permalink
merged with snapshots, made final changes
Browse files Browse the repository at this point in the history
  • Loading branch information
brennanjl committed Aug 21, 2023
1 parent 36890c7 commit b082a11
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 86 deletions.
Empty file added cmd/kwild/__debug_bin3948100767
Empty file.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/kwilteam/kwil-db

go 1.20
go 1.21

require (
github.com/alexliesenfeld/health v0.6.0
Expand Down Expand Up @@ -235,7 +235,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20230724220655-d98519c11495
golang.org/x/exp v0.0.0-20230724220655-d98519c11495 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
Expand Down
76 changes: 66 additions & 10 deletions go.sum

Large diffs are not rendered by default.

31 changes: 17 additions & 14 deletions internal/app/kwild/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ import (
"github.com/kwilteam/kwil-db/pkg/kv/badger"
"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/modules/datasets"
"github.com/kwilteam/kwil-db/pkg/modules/snapshots"
"github.com/kwilteam/kwil-db/pkg/modules/validators"
"github.com/kwilteam/kwil-db/pkg/sessions"
"github.com/kwilteam/kwil-db/pkg/sessions/wal"
snapshotPkg "github.com/kwilteam/kwil-db/pkg/snapshots"
"github.com/kwilteam/kwil-db/pkg/snapshots"
"github.com/kwilteam/kwil-db/pkg/sql"
vmgr "github.com/kwilteam/kwil-db/pkg/validators"

Expand Down Expand Up @@ -72,7 +71,7 @@ func BuildKwildServer(ctx context.Context) (svr *Server, err error) {

func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// atomic committer
ac := buildAtomicCommitter(d)
ac := buildAtomicCommitter(d, closers)
closers.addCloser(ac.Close)

// engine
Expand All @@ -91,11 +90,11 @@ func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// validator module
validatorModule := buildValidatorModule(d, accs, vstore)

snapshotModule := buildSnapshotModule(d)
snapshotModule := buildSnapshotter(d)

bootstrapperModule := buildBootstrapModule(d)
bootstrapperModule := buildBootstrapper(d)

abciApp := buildAbci(d, closers, datasetsModule, validatorModule, ac)
abciApp := buildAbci(d, closers, datasetsModule, validatorModule, ac, snapshotModule, bootstrapperModule)

cometBftNode := buildCometNode(d, closers, abciApp)

Expand Down Expand Up @@ -149,7 +148,7 @@ func (c *closeFuncs) closeAll() error {
}

func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
atomicCommitter *sessions.AtomicCommitter) *abci.AbciApp {
atomicCommitter *sessions.AtomicCommitter, snapshotter *snapshots.SnapshotStore, bootstrapper *snapshots.Bootstrapper) *abci.AbciApp {
badgerKv, err := badger.NewBadgerDB(d.ctx, filepath.Join(d.cfg.RootDir, "abci/info"), &badger.Options{
GuaranteeFSync: true,
Logger: *d.log.Named("abci-kv-store"),
Expand Down Expand Up @@ -206,8 +205,8 @@ func buildEngine(d *coreDependencies, a *sessions.AtomicCommitter) *engine.Engin
}

sqlCommitRegister := &sqlCommittableRegister{
commiter: a,
log: *d.log.Named("sqlite-committable"),
committer: a,
log: *d.log.Named("sqlite-committable"),
}

e, err := engine.Open(d.ctx, d.opener,
Expand Down Expand Up @@ -274,7 +273,7 @@ func buildValidatorModule(d *coreDependencies, accs datasets.AccountStore,
validators.WithLogger(*d.log.Named("validator-module")))
}

func buildSnapshotModule(d *coreDependencies) *snapshots.SnapshotStore {
func buildSnapshotter(d *coreDependencies) *snapshots.SnapshotStore {
if !d.cfg.SnapshotConfig.Enabled {
return nil
}
Expand All @@ -287,8 +286,8 @@ func buildSnapshotModule(d *coreDependencies) *snapshots.SnapshotStore {
)
}

func buildBootstrapModule(d *coreDependencies) *snapshotPkg.Bootstrapper {
bootstrapper, err := snapshotPkg.NewBootstrapper(d.cfg.SqliteFilePath)
func buildBootstrapper(d *coreDependencies) *snapshots.Bootstrapper {
bootstrapper, err := snapshots.NewBootstrapper(d.cfg.SqliteFilePath)
if err != nil {
failBuild(err, "Bootstrap module initialization failure")
}
Expand Down Expand Up @@ -366,14 +365,18 @@ func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.A
return node
}

func buildAtomicCommitter(d *coreDependencies) *sessions.AtomicCommitter {
func buildAtomicCommitter(d *coreDependencies, closers *closeFuncs) *sessions.AtomicCommitter {
twoPCWal, err := wal.OpenWal(filepath.Join(d.cfg.RootDir, "application/wal"))
if err != nil {
failBuild(err, "failed to open 2pc wal")
}

// we are actually registering all committables ad-hoc, so we can pass nil here
return sessions.NewAtomicCommitter(d.ctx, nil, twoPCWal, sessions.WithLogger(*d.log.Named("atomic-committer")))
s := sessions.NewAtomicCommitter(d.ctx, twoPCWal, sessions.WithLogger(*d.log.Named("atomic-committer")))
// we need atomic committer to close before 2pc wal
closers.addCloser(s.Close)
closers.addCloser(twoPCWal.Close)
return s
}

func failBuild(err error, msg string) {
Expand Down
9 changes: 5 additions & 4 deletions internal/app/kwild/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,20 @@ func (a *atomicReadWriter) Write(val []byte) error {
}

// sqlCommittableRegister allows dynamic registration of SQL committables
// it implements engine.CommitRegister
type sqlCommittableRegister struct {
commiter *sessions.AtomicCommitter
log log.Logger
committer *sessions.AtomicCommitter
log log.Logger
}

var _ engine.CommitRegister = (*sqlCommittableRegister)(nil)

func (s *sqlCommittableRegister) Register(ctx context.Context, name string, db sql.Database) error {
return registerSQL(ctx, s.commiter, db, name, s.log)
return registerSQL(ctx, s.committer, db, name, s.log)
}

func (s *sqlCommittableRegister) Unregister(ctx context.Context, name string) error {
return s.commiter.Unregister(ctx, name)
return s.committer.Unregister(ctx, name)
}

// registerSQL is a helper function to register a SQL committable to the atomic committer.
Expand Down
4 changes: 2 additions & 2 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com
metadataStore: &metadataStore{
kv: kv,
},
bootstrapper: bootstrapper,
snapshotter: snapshotter,
bootstrapper: nil,
snapshotter: nil,

valAddrToKey: make(map[string][]byte),
valUpdates: make([]*validators.Validator, 0),
Expand Down
1 change: 0 additions & 1 deletion pkg/abci/cometbft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type CometBftNode struct {
}

// NewCometBftNode creates a new CometBFT node.
// I don't love this constructor function signature; I can definitely make it better
func NewCometBftNode(app abciTypes.Application, privateKey []byte, atomicStore privval.AtomicReadWriter, directory string, logLevel string) (*CometBftNode, error) {
conf := cometConfig.DefaultConfig().SetRoot(directory)
logger := cometLog.NewTMLogger(cometLog.NewSyncWriter(os.Stdout))
Expand Down
12 changes: 0 additions & 12 deletions pkg/modules/snapshots/interfaces.go

This file was deleted.

9 changes: 2 additions & 7 deletions pkg/sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,9 @@ func (id CommittableId) Bytes() []byte {
}

// NewAtomicCommitter creates a new atomic session.
func NewAtomicCommitter(ctx context.Context, committables map[string]Committable, wal Wal, opts ...CommiterOpt) *AtomicCommitter {
committablesMap := make(map[CommittableId]Committable)
for id, committable := range committables {
committablesMap[CommittableId(id)] = committable
}

func NewAtomicCommitter(ctx context.Context, wal Wal, opts ...CommiterOpt) *AtomicCommitter {
a := &AtomicCommitter{
committables: committablesMap,
committables: make(map[CommittableId]Committable),
wal: &sessionWal{wal},
log: log.NewNoOp(),
timeout: 5 * time.Second,
Expand Down
70 changes: 42 additions & 28 deletions pkg/sessions/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,15 @@ func Test_Session(t *testing.T) {
var id []byte
outErr := func() error {
ctx := context.Background()
committer := sessions.NewAtomicCommitter(ctx, tt.fields.committables, tt.fields.wal)
committer := sessions.NewAtomicCommitter(ctx, tt.fields.wal)

for id, committable := range tt.fields.committables {
err := committer.Register(ctx, id, committable)
if err != nil {
return err
}
}

defer committer.Close()
err := committer.ClearWal(ctx)
if err != nil {
Expand Down Expand Up @@ -248,16 +256,16 @@ func Test_ExistingWal(t *testing.T) {
wal sessions.Wal
}

type commitableData struct {
id string
commitable sessions.Committable
resultData map[string]any
type committableData struct {
id string
committable sessions.Committable
resultData map[string]any
}

type testCase struct {
name string
fields fields
commitableData []commitableData
commitableData []committableData
err error
}

Expand All @@ -271,16 +279,16 @@ func Test_ExistingWal(t *testing.T) {
walRecordCs2,
}...),
},
commitableData: []commitableData{
commitableData: []committableData{
{
id: "c1",
commitable: mockCommittable1(),
resultData: map[string]any{},
id: "c1",
committable: mockCommittable1(),
resultData: map[string]any{},
},
{
id: "c2",
commitable: mockCommittable2(),
resultData: map[string]any{},
id: "c2",
committable: mockCommittable2(),
resultData: map[string]any{},
},
},
},
Expand All @@ -294,17 +302,17 @@ func Test_ExistingWal(t *testing.T) {
walRecordCommit,
}...),
},
commitableData: []commitableData{
commitableData: []committableData{
{
id: "c1",
commitable: mockCommittable1(),
id: "c1",
committable: mockCommittable1(),
resultData: map[string]any{
"key1": "c1_changeset_1",
},
},
{
id: "c2",
commitable: mockCommittable2(),
id: "c2",
committable: mockCommittable2(),
resultData: map[string]any{
"key1": "c2_changeset_1",
},
Expand All @@ -320,16 +328,16 @@ func Test_ExistingWal(t *testing.T) {
walRecordCommit,
}...),
},
commitableData: []commitableData{
commitableData: []committableData{
{
id: "c1",
commitable: mockCommittable1(),
resultData: map[string]any{},
id: "c1",
committable: mockCommittable1(),
resultData: map[string]any{},
},
{
id: "c2",
commitable: mockCommittable2(),
resultData: map[string]any{},
id: "c2",
committable: mockCommittable2(),
resultData: map[string]any{},
},
},
},
Expand All @@ -339,12 +347,18 @@ func Test_ExistingWal(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
commitableMap := map[string]sessions.Committable{}
for _, data := range tt.commitableData {
commitableMap[data.id] = data.commitable
commitableMap[data.id] = data.committable
}

ctx := context.Background()

committer := sessions.NewAtomicCommitter(ctx, commitableMap, tt.fields.wal)
committer := sessions.NewAtomicCommitter(ctx, tt.fields.wal)
for id, committable := range commitableMap {
err := committer.Register(ctx, id, committable)
if err != nil {
t.Fatal(err)
}
}
err := committer.ClearWal(ctx)
assertError(t, err, tt.err)
if tt.err != nil {
Expand All @@ -359,7 +373,7 @@ func Test_ExistingWal(t *testing.T) {
}

for _, data := range tt.commitableData {
for key, value := range data.commitable.(*mockCommittable).appliedData {
for key, value := range data.committable.(*mockCommittable).appliedData {
if value != data.resultData[key] {
t.Fatalf("expected value %v, got %v", data.resultData[key], value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package snapshots

import (
"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/snapshots"
)

type SnapshotStoreOpts func(*SnapshotStore)
Expand All @@ -18,11 +17,11 @@ type SnapshotStore struct {

numSnapshots uint64 // current number of snapshots
log log.Logger
snapshotter Snapshotter // Snapshotter instance to create a snapshot, instantiated at the beginning of snapshot process
snapshotter *Snapshotter // Snapshotter instance to create a snapshot, instantiated at the beginning of snapshot process
}

func NewSnapshotStore(databaseDir string, snapshotDir string, height uint64, maxSnapshots uint64, opts ...SnapshotStoreOpts) *SnapshotStore {
snapshotter := snapshots.NewSnapshotter(snapshotDir, databaseDir, 16*1024*1024)
snapshotter := NewSnapshotter(snapshotDir, databaseDir, 16*1024*1024)
ss := &SnapshotStore{
enabled: true,
numSnapshots: 0,
Expand Down Expand Up @@ -64,7 +63,7 @@ func (s *SnapshotStore) CreateSnapshot(height uint64) error {
}

if s.snapshotter == nil {
s.snapshotter = snapshots.NewSnapshotter(s.snapshotDir, s.databaseDir, s.chunkSize)
s.snapshotter = NewSnapshotter(s.snapshotDir, s.databaseDir, s.chunkSize)
}

// Initialize snapshot session
Expand Down Expand Up @@ -94,7 +93,7 @@ func (s *SnapshotStore) CreateSnapshot(height uint64) error {
}

// Lists the snapshot metadata of all the existing snapshots
func (s *SnapshotStore) ListSnapshots() ([]snapshots.Snapshot, error) {
func (s *SnapshotStore) ListSnapshots() ([]Snapshot, error) {
return s.snapshotter.ListSnapshots()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"path/filepath"
"testing"

"github.com/kwilteam/kwil-db/pkg/modules/snapshots"
"github.com/kwilteam/kwil-db/pkg/snapshots"
"github.com/stretchr/testify/assert"
)

Expand Down

0 comments on commit b082a11

Please sign in to comment.