Skip to content

Commit

Permalink
Added support for Snapshotstore and Bootstrapper (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi authored and brennanjl committed Feb 26, 2024
1 parent 019f7cf commit 8d6dbf9
Show file tree
Hide file tree
Showing 20 changed files with 1,588 additions and 13 deletions.
32 changes: 32 additions & 0 deletions internal/app/kwild/config/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,20 @@ type KwildConfig struct {
BCPrivateKey cmtCrypto.PrivKey
WithoutGasCosts bool
WithoutNonces bool
SnapshotConfig SnapshotConfig
}

type ArweaveConfig struct {
BundlrURL string
}

type SnapshotConfig struct {
Enabled bool
RecurringHeight uint64
MaxSnapshots uint64
SnapshotDir string
}

var (
RegisteredVariables = []config.CfgVar{
PrivateKey,
Expand Down Expand Up @@ -169,4 +177,28 @@ var (
Field: "WithoutNonces",
Default: false,
}

SnapshotEnabled = config.CfgVar{
EnvName: "SNAPSHOT_ENABLED",
Field: "SnapshotConfig.Enabled",
Default: false,
}

SnapshotRecurringHeight = config.CfgVar{
EnvName: "SNAPSHOT_RECURRING_HEIGHT",
Field: "SnapshotConfig.RecurringHeight",
Default: uint64(10000), // 12-14 hrs at 1 block per 5 seconds speed
}

MaxSnapshots = config.CfgVar{
EnvName: "MAX_SNAPSHOTS",
Field: "SnapshotConfig.MaxSnapshots",
Default: 2,
}

SnapshotDir = config.CfgVar{
EnvName: "SNAPSHOT_DIR",
Field: "SnapshotConfig.SnapshotDir",
Default: "/tmp/kwil/snapshots",
}
)
33 changes: 31 additions & 2 deletions internal/app/kwild/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
grpc "github.com/kwilteam/kwil-db/pkg/grpc/server"
"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/modules/datasets"
"github.com/kwilteam/kwil-db/pkg/modules/snapshots"
"github.com/kwilteam/kwil-db/pkg/modules/validators"
snapshotPkg "github.com/kwilteam/kwil-db/pkg/snapshots"
"github.com/kwilteam/kwil-db/pkg/sql"
vmgr "github.com/kwilteam/kwil-db/pkg/validators"

Expand Down Expand Up @@ -100,7 +102,11 @@ func buildServer(d *coreDependencies) *Server {
// validator module
validatorModule := buildValidatorModule(d, accs, vstore)

abciApp := buildAbci(d, datasetsModule, validatorModule, nil)
snapshotModule := buildSnapshotModule(d)

bootstrapperModule := buildBootstrapModule(d)

abciApp := buildAbci(d, datasetsModule, validatorModule, nil, snapshotModule, bootstrapperModule)

cometBftNode, err := newCometNode(abciApp, d.cfg)
if err != nil {
Expand Down Expand Up @@ -131,11 +137,13 @@ type coreDependencies struct {
}

func buildAbci(d *coreDependencies, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
atomicCommitter abci.AtomicCommitter) *abci.AbciApp {
atomicCommitter abci.AtomicCommitter, snapshotter abci.SnapshotModule, bootstrapper abci.DBBootstrapModule) *abci.AbciApp {
return abci.NewAbciApp(
datasetsModule,
validatorModule,
atomicCommitter,
snapshotter,
bootstrapper,
abci.WithLogger(*d.log.Named("abci")),
)
}
Expand Down Expand Up @@ -216,6 +224,27 @@ func buildValidatorModule(d *coreDependencies, accs datasets.AccountStore,
validators.WithLogger(*d.log.Named("validator-module")))
}

func buildSnapshotModule(d *coreDependencies) *snapshots.SnapshotStore {
if !d.cfg.SnapshotConfig.Enabled {
return nil
}

return snapshots.NewSnapshotStore(d.cfg.SqliteFilePath,
d.cfg.SnapshotConfig.SnapshotDir,
d.cfg.SnapshotConfig.RecurringHeight,
d.cfg.SnapshotConfig.MaxSnapshots,
snapshots.WithLogger(*d.log.Named("snapshotStore")),
)
}

func buildBootstrapModule(d *coreDependencies) *snapshotPkg.Bootstrapper {
bootstrapper, err := snapshotPkg.NewBootstrapper(d.cfg.SqliteFilePath)
if err != nil {
failBuild(err, "Bootstrap module initialization failure")
}
return bootstrapper
}

func buildGrpcServer(d *coreDependencies, txsvc txpb.TxServiceServer) *grpc.Server {
lis, err := net.Listen("tcp", d.cfg.GrpcListenAddress)
if err != nil {
Expand Down
80 changes: 72 additions & 8 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@ type appState struct { // TODO
prevAppHash []byte
}

func NewAbciApp(database DatasetsModule, validators ValidatorModule, committer AtomicCommitter, opts ...AbciOpt) *AbciApp {
func NewAbciApp(database DatasetsModule, validators ValidatorModule, committer AtomicCommitter,
snapshotter SnapshotModule,
bootstrapper DBBootstrapModule,
opts ...AbciOpt) *AbciApp {
app := &AbciApp{
database: database,
validators: validators,
committer: committer,
database: database,
validators: validators,
committer: committer,
snapshotter: snapshotter,
bootstrapper: bootstrapper,

log: log.NewNoOp(),

Expand Down Expand Up @@ -93,6 +98,12 @@ type AbciApp struct {
// committer is the atomic committer that handles atomic commits across multiple stores
committer AtomicCommitter

// snapshotter is the snapshotter module that handles snapshotting
snapshotter SnapshotModule

// bootstrapper is the bootstrapper module that handles bootstrapping the database
bootstrapper DBBootstrapModule

log log.Logger

// commitWaiter is a waitgroup that waits for the commit to finish
Expand All @@ -102,10 +113,24 @@ type AbciApp struct {
commitWaiter sync.WaitGroup

state appState

// Expected AppState after bootstrapping the node with a given snapshot,
// state gets updated with the bootupState after bootstrapping
bootupState appState
}

func (a *AbciApp) ApplySnapshotChunk(p0 abciTypes.RequestApplySnapshotChunk) abciTypes.ResponseApplySnapshotChunk {
panic("TODO")
refetchChunks, status, err := a.bootstrapper.ApplySnapshotChunk(p0.Chunk, p0.Index)
if err != nil {
return abciTypes.ResponseApplySnapshotChunk{Result: abciStatus(status), RefetchChunks: refetchChunks}
}

if a.bootstrapper.IsDBRestored() {
a.state.prevAppHash = a.bootupState.prevAppHash
a.state.prevBlockHeight = a.bootupState.prevBlockHeight
a.log.Info("Bootstrapped database successfully")
}
return abciTypes.ResponseApplySnapshotChunk{Result: abciTypes.ResponseApplySnapshotChunk_ACCEPT, RefetchChunks: nil}
}

func (a *AbciApp) Info(p0 abciTypes.RequestInfo) abciTypes.ResponseInfo {
Expand Down Expand Up @@ -395,22 +420,61 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
a.state.prevBlockHeight++
a.state.prevAppHash = appHash

height := uint64(a.state.prevBlockHeight)
if a.snapshotter != nil && a.snapshotter.IsSnapshotDue(height) {
// TODO: Lock all DBs
err = a.snapshotter.CreateSnapshot(height)
if err != nil {
a.log.Error("snapshot creation failed", zap.Error(err))
}
// Unlock all the DBs
}

return abciTypes.ResponseCommit{
Data: appHash, // will be in ResponseFinalizeBlock in v0.38
}
}

func (a *AbciApp) ListSnapshots(p0 abciTypes.RequestListSnapshots) abciTypes.ResponseListSnapshots {
panic("TODO")
if a.snapshotter == nil {
return abciTypes.ResponseListSnapshots{Snapshots: nil}
}

snapshots, err := a.snapshotter.ListSnapshots()
if err != nil {
return abciTypes.ResponseListSnapshots{Snapshots: nil}
}

var res []*abciTypes.Snapshot
for _, snapshot := range snapshots {
abcisnapshot, err := convertToABCISnapshot(&snapshot)
if err != nil {
return abciTypes.ResponseListSnapshots{Snapshots: nil}
}
res = append(res, abcisnapshot)
}
return abciTypes.ResponseListSnapshots{Snapshots: res}
}

func (a *AbciApp) LoadSnapshotChunk(p0 abciTypes.RequestLoadSnapshotChunk) abciTypes.ResponseLoadSnapshotChunk {
panic("TODO")
if a.snapshotter == nil {
return abciTypes.ResponseLoadSnapshotChunk{Chunk: nil}
}

chunk := a.snapshotter.LoadSnapshotChunk(p0.Height, p0.Format, p0.Chunk)
return abciTypes.ResponseLoadSnapshotChunk{Chunk: chunk}
}

func (a *AbciApp) OfferSnapshot(p0 abciTypes.RequestOfferSnapshot) abciTypes.ResponseOfferSnapshot {
panic("TODO")
snapshot := convertABCISnapshots(p0.Snapshot)
if (a.bootstrapper.OfferSnapshot(snapshot)) != nil {
return abciTypes.ResponseOfferSnapshot{Result: abciTypes.ResponseOfferSnapshot_REJECT}
}
a.bootupState.prevAppHash = p0.Snapshot.Hash
a.bootupState.prevBlockHeight = int64(snapshot.Height)
return abciTypes.ResponseOfferSnapshot{Result: abciTypes.ResponseOfferSnapshot_ACCEPT}
}

func (a *AbciApp) PrepareProposal(p0 abciTypes.RequestPrepareProposal) abciTypes.ResponsePrepareProposal {
panic("TODO")
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/abci/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/kwilteam/kwil-db/pkg/engine/types"
"github.com/kwilteam/kwil-db/pkg/snapshots"
"github.com/kwilteam/kwil-db/pkg/transactions"
"github.com/kwilteam/kwil-db/pkg/validators"
)
Expand Down Expand Up @@ -56,3 +57,30 @@ type AtomicCommitter interface {
Begin(ctx context.Context) error
Commit(ctx context.Context, applyCallback func(error)) (commitID []byte, err error)
}

// SnapshotModule is an interface for a struct that implements snapshotting
type SnapshotModule interface {
// Checks if databases are to be snapshotted at a particular height
IsSnapshotDue(height uint64) bool

// Starts the snapshotting process, Locking databases need to be handled outside this fn
CreateSnapshot(height uint64) error

// Lists all the available snapshots in the snapshotstore and returns the snapshot metadata
ListSnapshots() ([]snapshots.Snapshot, error)

// Returns the snapshot chunk of index chunkId at a given height
LoadSnapshotChunk(height uint64, format uint32, chunkID uint32) []byte
}

// DBBootstrapModule is an interface for a struct that implements bootstrapping
type DBBootstrapModule interface {
// Offers a snapshot (metadata) to the bootstrapper and decides whether to accept the snapshot or not
OfferSnapshot(snapshot *snapshots.Snapshot) error

// Offers a snapshot Chunk to the bootstrapper, once all the chunks corresponding to the snapshot are received, the databases are restored from the chunks
ApplySnapshotChunk(chunk []byte, index uint32) ([]uint32, snapshots.Status, error)

// Signifies the end of the db restoration
IsDBRestored() bool
}
48 changes: 48 additions & 0 deletions pkg/abci/utils.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package abci

import (
"encoding/json"
"fmt"
"os"
"path/filepath"

abciTypes "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/crypto/ed25519"
cmtos "github.com/cometbft/cometbft/libs/os"
"github.com/cometbft/cometbft/privval"
"github.com/kwilteam/kwil-db/pkg/snapshots"
)

// resetAll removes address book files plus all data, and resets the privValdiator data.
Expand Down Expand Up @@ -119,3 +122,48 @@ type cometAddresser struct{}
func (ca cometAddresser) Address(pubkey []byte) string {
return cometAddrFromPubKey(pubkey)
}

func convertABCISnapshots(req *abciTypes.Snapshot) *snapshots.Snapshot {
var metadata snapshots.SnapshotMetadata
err := json.Unmarshal(req.Metadata, &metadata)
if err != nil {
return nil
}

snapshot := &snapshots.Snapshot{
Height: req.Height,
Format: req.Format,
ChunkCount: req.Chunks,
Hash: req.Hash,
Metadata: metadata,
}
return snapshot
}

func convertToABCISnapshot(snapshot *snapshots.Snapshot) (*abciTypes.Snapshot, error) {
metadata, err := json.Marshal(snapshot.Metadata)
if err != nil {
return nil, err
}

return &abciTypes.Snapshot{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.ChunkCount,
Hash: snapshot.Hash,
Metadata: metadata,
}, nil
}

func abciStatus(status snapshots.Status) abciTypes.ResponseApplySnapshotChunk_Result {
switch status {
case snapshots.ACCEPT:
return abciTypes.ResponseApplySnapshotChunk_ACCEPT
case snapshots.REJECT:
return abciTypes.ResponseApplySnapshotChunk_REJECT_SNAPSHOT
case snapshots.RETRY:
return abciTypes.ResponseApplySnapshotChunk_RETRY
default:
return abciTypes.ResponseApplySnapshotChunk_UNKNOWN
}
}
12 changes: 12 additions & 0 deletions pkg/modules/snapshots/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package snapshots

import "github.com/kwilteam/kwil-db/pkg/snapshots"

type Snapshotter interface {
StartSnapshotSession(height uint64) error
EndSnapshotSession() error
CreateSnapshot() error
LoadSnapshotChunk(height uint64, format uint32, chunkID uint32) ([]byte, error)
DeleteOldestSnapshot() error
ListSnapshots() ([]snapshots.Snapshot, error)
}
Loading

0 comments on commit 8d6dbf9

Please sign in to comment.