Skip to content

Commit

Permalink
bug fixes for atomic committer, added garbage collection for KV store…
Browse files Browse the repository at this point in the history
…, workable dependency building
  • Loading branch information
brennanjl committed Aug 21, 2023
1 parent 74c5b7f commit 2121cf7
Show file tree
Hide file tree
Showing 19 changed files with 326 additions and 118 deletions.
2 changes: 1 addition & 1 deletion internal/app/kwild/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func Test_Config(t *testing.T) {
os.Setenv("KWILD_PRIVATE_KEY", "f1aa5a7966c3863ccde3047f6a1e266cdc0c76b399e256b8fede92b1c69e4f4e")
os.Setenv("KWILD_PRIVATE_KEY", "f2d82d73ba03a7e843443f2b3179a01398144baa4a23d40d1e8a3a8e4fb217d0484d59f4de46b2174ebce66ac3afa7989b444244323c19a74b683f54cf33227c")
os.Setenv("KWILD_PORT", "8081")
os.Setenv("KWILD_DEPOSITS_POOL_ADDRESS", "0xabc")
os.Setenv("KWILD_EXTENSION_ENDPOINTS", "localhost:8080,localhost:8081, localhost:8082")
Expand Down
32 changes: 20 additions & 12 deletions internal/app/kwild/config/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/kwilteam/kwil-db/pkg/crypto"
Expand All @@ -24,14 +25,10 @@ type KwildConfig struct {
SqliteFilePath string
Log log.Config
ExtensionEndpoints []string
ArweaveConfig ArweaveConfig
BcRpcUrl string
WithoutGasCosts bool
WithoutNonces bool
}

type ArweaveConfig struct {
BundlrURL string
RootDir string
}

var (
Expand All @@ -43,10 +40,10 @@ var (
LogOutputPaths,
HttpListenAddress,
ExtensionEndpoints,
ArweaveBundlrURL,
CometBftRPCUrl,
WithoutGasCosts,
WithoutNonces,
RootDir,
}
)

Expand Down Expand Up @@ -149,12 +146,6 @@ var (
},
}

ArweaveBundlrURL = config.CfgVar{
EnvName: "ARWEAVE_BUNDLR_URL",
Field: "ArweaveConfig.BundlrURL",
Default: "",
}

WithoutGasCosts = config.CfgVar{
EnvName: "WITHOUT_GAS_COSTS",
Field: "WithoutGasCosts",
Expand All @@ -166,4 +157,21 @@ var (
Field: "WithoutNonces",
Default: false,
}

RootDir = config.CfgVar{
EnvName: "ROOT_DIR",
Field: "RootDir",
Setter: func(val any) (any, error) {
if val == nil {
return filepath.Clean("~/.kwil"), nil
}

str, err := conv.String(val)
if err != nil {
return nil, err
}

return filepath.Clean(str), nil
},
}
)
88 changes: 68 additions & 20 deletions internal/app/kwild/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package server

import (
"context"
"errors"
"fmt"
"net"
"path/filepath"
"sync"
"time"

// kwil-db
Expand Down Expand Up @@ -59,31 +62,37 @@ func BuildKwildServer(ctx context.Context) (svr *Server, err error) {
opener: newSqliteOpener(cfg.SqliteFilePath),
}

return buildServer(deps), nil
closers := &closeFuncs{
closers: make([]func() error, 0),
}

return buildServer(deps, closers), nil
}

func buildServer(d *coreDependencies) *Server {
func buildServer(d *coreDependencies, closers *closeFuncs) *Server {
// atomic committer
ac := buildAtomicCommitter(d)
closers.addCloser(ac.Close)

// engine
e := buildEngine(d, ac)
closers.addCloser(e.Close)

// account store
accs := buildAccountRepository(d, ac)
accs := buildAccountRepository(d, closers, ac)

// datasets module
datasetsModule := buildDatasetsModule(d, e, accs)

// validator updater and store
vstore := buildValidatorManager(d, ac)
vstore := buildValidatorManager(d, closers, ac)

// validator module
validatorModule := buildValidatorModule(d, accs, vstore)

abciApp := buildAbci(d, datasetsModule, validatorModule, ac)
abciApp := buildAbci(d, closers, datasetsModule, validatorModule, ac)

cometBftNode := buildCometNode(d, abciApp)
cometBftNode := buildCometNode(d, closers, abciApp)

cometBftClient := buildCometBftClient(cometBftNode)

Expand All @@ -98,6 +107,7 @@ func buildServer(d *coreDependencies) *Server {
gateway: buildGatewayServer(d),
cometBftNode: cometBftNode,
log: *d.log.Named("kwild-server"),
closers: closers,
cfg: d.cfg,
}
}
Expand All @@ -110,14 +120,53 @@ type coreDependencies struct {
opener sql.Opener
}

func buildAbci(d *coreDependencies, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
// closeFuncs holds a list of closers
// it is used to close all resources on shutdown
type closeFuncs struct {
closers []func() error
}

func (c *closeFuncs) addCloser(f func() error) {
c.closers = append(c.closers, f)
}

// closeAll concurrently closes all closers
func (c *closeFuncs) closeAll() error {
errs := make([]error, 0)
errCh := make(chan error, len(c.closers))
wg := sync.WaitGroup{}

for _, f := range c.closers {
wg.Add(1)
go func(f func() error) {
err := f()
if err != nil {
errCh <- err
}
wg.Done()
}(f)
}

wg.Wait()
close(errCh)

for err := range errCh {
errs = append(errs, err)
}

return errors.Join(errs...)
}

func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
atomicCommitter *sessions.AtomicCommitter) *abci.AbciApp {
badgerKv, err := badger.NewBadgerDB("tmp/kwil/kv", &badger.Options{
badgerKv, err := badger.NewBadgerDB(d.ctx, filepath.Join(d.cfg.RootDir, "abci/info"), &badger.Options{
GuaranteeFSync: true,
Logger: *d.log.Named("abci-kv-store"),
})
if err != nil {
failBuild(err, "failed to open badger")
}
closer.addCloser(badgerKv.Close)

atomicKv, err := atomic.NewAtomicKV(badgerKv)
if err != nil {
Expand Down Expand Up @@ -180,11 +229,12 @@ func buildEngine(d *coreDependencies, a *sessions.AtomicCommitter) *engine.Engin
return e
}

func buildAccountRepository(d *coreDependencies, ac *sessions.AtomicCommitter) *balances.AccountStore {
func buildAccountRepository(d *coreDependencies, closer *closeFuncs, ac *sessions.AtomicCommitter) *balances.AccountStore {
db, err := d.opener.Open("accounts_db", *d.log.Named("account-store"))
if err != nil {
failBuild(err, "failed to open accounts db")
}
closer.addCloser(db.Close)

err = registerSQL(d.ctx, ac, db, "accounts_db", d.log)
if err != nil {
Expand All @@ -203,11 +253,12 @@ func buildAccountRepository(d *coreDependencies, ac *sessions.AtomicCommitter) *
return b
}

func buildValidatorManager(d *coreDependencies, ac *sessions.AtomicCommitter) *vmgr.ValidatorMgr {
func buildValidatorManager(d *coreDependencies, closer *closeFuncs, ac *sessions.AtomicCommitter) *vmgr.ValidatorMgr {
db, err := d.opener.Open("validator_db", *d.log.Named("validator-store"))
if err != nil {
failBuild(err, "failed to open validator db")
}
closer.addCloser(db.Close)

err = registerSQL(d.ctx, ac, db, "validator_db", d.log)
if err != nil {
Expand Down Expand Up @@ -274,24 +325,26 @@ func buildCometBftClient(cometBftNode *cometbft.CometBftNode) *cmtlocal.Local {
return cmtlocal.New(cometBftNode.Node)
}

func buildCometNode(d *coreDependencies, abciApp abciTypes.Application) *cometbft.CometBftNode {
func buildCometNode(d *coreDependencies, closer *closeFuncs, abciApp abciTypes.Application) *cometbft.CometBftNode {
// TODO: a lot of the filepaths, as well as cometbft logging level, are hardcoded. This should be cleaned up with a config

// for now, I'm just using a KV store for my atomic commit. This probably is not ideal; a file may be better
// I'm simply using this because we know it fsyncs the data to disk
db, err := badger.NewBadgerDB("tmp/abci/signing", &badger.Options{
db, err := badger.NewBadgerDB(d.ctx, filepath.Join(d.cfg.RootDir, "signing"), &badger.Options{
GuaranteeFSync: true,
Logger: *d.log.Named("private-validator-signature-store"),
})
if err != nil {
failBuild(err, "failed to build comet node")
}
closer.addCloser(db.Close)

readWriter := &atomicReadWriter{
kv: db,
key: []byte("az"), // any key here will work
}

node, err := cometbft.NewCometBftNode(abciApp, d.cfg.PrivateKey.Bytes(), readWriter, "tmp/abci", "debug")
node, err := cometbft.NewCometBftNode(abciApp, d.cfg.PrivateKey.Bytes(), readWriter, filepath.Join(d.cfg.RootDir, "abci"), "debug")
if err != nil {
failBuild(err, "failed to build comet node")
}
Expand All @@ -300,18 +353,13 @@ func buildCometNode(d *coreDependencies, abciApp abciTypes.Application) *cometbf
}

func buildAtomicCommitter(d *coreDependencies) *sessions.AtomicCommitter {
twoPCWal, err := wal.OpenWal("tmp/kwil/wal")
twoPCWal, err := wal.OpenWal(filepath.Join(d.cfg.RootDir, "application/wal"))
if err != nil {
failBuild(err, "failed to open 2pc wal")
}

// we are actually registering all committables ad-hoc, so we can pass nil here
c, err := sessions.NewAtomicCommitter(d.ctx, nil, twoPCWal, sessions.WithLogger(*d.log.Named("atomic-committer")))
if err != nil {
failBuild(err, "failed to build atomic committer")
}

return c
return sessions.NewAtomicCommitter(d.ctx, nil, twoPCWal, sessions.WithLogger(*d.log.Named("atomic-committer")))
}

func failBuild(err error, msg string) {
Expand Down
18 changes: 11 additions & 7 deletions internal/app/kwild/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,20 @@ import (

"github.com/kwilteam/kwil-db/internal/app/kwild/config"
"github.com/kwilteam/kwil-db/pkg/abci"
"github.com/kwilteam/kwil-db/pkg/abci/cometbft"
"github.com/kwilteam/kwil-db/pkg/grpc/gateway"
grpc "github.com/kwilteam/kwil-db/pkg/grpc/server"
"github.com/kwilteam/kwil-db/pkg/log"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// comet bft node has a gross and confusing interface, so we use this to make it more clear
type startStopper interface {
Start() error
Stop() error
}

// Server controls the gRPC server and http gateway.
type Server struct {
grpcServer *grpc.Server
gateway *gateway.GatewayServer
cometBftNode startStopper
cometBftNode *cometbft.CometBftNode
closers *closeFuncs
log log.Logger

cfg *config.KwildConfig
Expand Down Expand Up @@ -94,6 +90,14 @@ func (s *Server) Start(ctx context.Context) error {
s.log.Info("comet node started")

err := group.Wait()

defer func() {
err := s.closers.closeAll()
if err != nil {
s.log.Error("failed to close resource:", zap.Error(err))
}
}()

if err != nil {
if errors.Is(err, context.Canceled) {
s.log.Info("server context is canceled")
Expand Down
5 changes: 5 additions & 0 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
func (a *AbciApp) Info(p0 abciTypes.RequestInfo) abciTypes.ResponseInfo {
ctx := context.Background()

err := a.committer.ClearWal(ctx)
if err != nil {
panic(newFatalError("Info", &p0, fmt.Sprintf("failed to clear WAL: %v", err)))
}

// Load the current validator set from our store.
vals, err := a.validators.CurrentSet(ctx)
if err != nil { // TODO error return
Expand Down
3 changes: 0 additions & 3 deletions pkg/abci/cometbft/privval/key.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/abci/cometbft/privval/privvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewValidatorSigner(ed25519Key []byte, storer AtomicReadWriter) (*ValidatorS
}

return &ValidatorSigner{
privateKey: cometEd25519.PrivKey(KEY),
privateKey: cometEd25519.PrivKey(ed25519Key),
lastSignedState: lss,
}, nil
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (v *ValidatorSigner) signAndPersist(height int64, round int32, step int8, s
return signature, nil
}

// last signState tracks the most recent signature
// LastSignState tracks the most recent signature
// made by this validator. It is atomically committed to disk
// before it is used for anything else, and can be reloaded in case
// of a crash
Expand Down
19 changes: 0 additions & 19 deletions pkg/abci/cometbft/privval/privvalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package privval_test

import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"testing"
"time"

cometEd25519 "github.com/cometbft/cometbft/crypto/ed25519"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
"github.com/cometbft/cometbft/types"
"github.com/kwilteam/kwil-db/pkg/abci/cometbft/privval"
Expand All @@ -19,22 +16,6 @@ import (
const defaultChainID = "test-chain"
const defaultPrivateKey = "7c67e60fce0c403ff40193a3128e5f3d8c2139aed36d76d7b5f1e70ec19c43f00aa611bf555596912bc6f9a9f169f8785918e7bab9924001895798ff13f05842"

func Test_C(t *testing.T) {
pk := cometEd25519.PrivKey(defaultPrivateKey)
fmt.Println(base64.StdEncoding.EncodeToString(pk.Bytes()))
fmt.Println(base64.StdEncoding.EncodeToString(pk.PubKey().Bytes()))
panic("")
}

func Test_D(t *testing.T) {
pk := cometEd25519.GenPrivKey()
fmt.Println(hex.DecodeString("7c67e60fce0c403ff40193a3128e5f3d8c2139aed36d76d7b5f1e70ec19c43f00aa611bf555596912bc6f9a9f169f8785918e7bab9924001895798ff13f05842"))
fmt.Println(hex.EncodeToString(pk.Bytes()))
fmt.Println(base64.StdEncoding.EncodeToString(pk.PubKey().Bytes()))
fmt.Println(base64.StdEncoding.EncodeToString(pk.PubKey().Address()))
panic("")
}

func Test_PrivValidatorVote(t *testing.T) {
type testCase struct {
// name is the name of the test case.
Expand Down
Loading

0 comments on commit 2121cf7

Please sign in to comment.