Skip to content

Commit

Permalink
fixed bug where databases were not stopping properly
Browse files Browse the repository at this point in the history
  • Loading branch information
brennanjl committed Aug 21, 2023
1 parent 7f1d986 commit 1e8fc03
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 29 deletions.
16 changes: 16 additions & 0 deletions internal/app/kwild/cmd/server/root.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package server

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/kwilteam/kwil-db/internal/app/kwild/config"

// shorthand for chain client service
Expand All @@ -20,12 +25,23 @@ var startCmd = &cobra.Command{
Long: "Starts node with Kwild and CometBFT services",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(ctx)

go func() {
<-signalChan
cancel()
}()

svr, err := server.BuildKwildServer(ctx)
if err != nil {
return err
}

return svr.Start(ctx)

},
}

Expand Down
47 changes: 27 additions & 20 deletions internal/app/kwild/server/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net"
"path/filepath"
"sync"
"time"

// kwil-db
Expand Down Expand Up @@ -132,29 +131,37 @@ func (c *closeFuncs) addCloser(f func() error) {

// closeAll concurrently closes all closers
func (c *closeFuncs) closeAll() error {
errs := make([]error, 0)
errCh := make(chan error, len(c.closers))
wg := sync.WaitGroup{}

var errs []error
for _, f := range c.closers {
wg.Add(1)
go func(f func() error) {
err := f()
if err != nil {
errCh <- err
}
wg.Done()
}(f)
}

wg.Wait()
close(errCh)

for err := range errCh {
errs = append(errs, err)
if err := f(); err != nil {
errs = append(errs, err)
}
}

return errors.Join(errs...)
// errs := make([]error, 0)
// errCh := make(chan error, len(c.closers))
// wg := sync.WaitGroup{}

// for _, f := range c.closers {
// wg.Add(1)
// go func(f func() error) {
// err := f()
// if err != nil {
// errCh <- err
// }
// wg.Done()
// }(f)
// }

// wg.Wait()
// close(errCh)

// for err := range errCh {
// errs = append(errs, err)
// }

// return errors.Join(errs...)
}

func buildAbci(d *coreDependencies, closer *closeFuncs, datasetsModule abci.DatasetsModule, validatorModule abci.ValidatorModule,
Expand Down
13 changes: 6 additions & 7 deletions internal/app/kwild/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func (s *Server) Start(ctx context.Context) error {
}
}
}()
defer func() {
err := s.closers.closeAll()
if err != nil {
s.log.Error("failed to close resource:", zap.Error(err))
}
}()

s.log.Info("starting server...")

Expand Down Expand Up @@ -91,13 +97,6 @@ func (s *Server) Start(ctx context.Context) error {

err := group.Wait()

defer func() {
err := s.closers.closeAll()
if err != nil {
s.log.Error("failed to close resource:", zap.Error(err))
}
}()

if err != nil {
if errors.Is(err, context.Canceled) {
s.log.Info("server context is canceled")
Expand Down
20 changes: 18 additions & 2 deletions pkg/kv/badger/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
// It takes a path, like path/to/db, where the database will be stored.
func NewBadgerDB(ctx context.Context, path string, options *Options) (*BadgerDB, error) {
b := &BadgerDB{
logger: log.NewNoOp(),
gcInterval: 5 * time.Minute,
logger: log.NewNoOp(),
gcInterval: 5 * time.Minute,
gcDiscardRatio: 0.5,
}

badgerOpts := badger.DefaultOptions(path)
Expand Down Expand Up @@ -51,6 +52,9 @@ type BadgerDB struct {
// gcInterval is the interval at which the database is garbage collected.
gcInterval time.Duration

// gcDiscardRatio is the ratio of discarded keys to total keys.
gcDiscardRatio float64

// logger is the logger for the database.
logger log.Logger
}
Expand Down Expand Up @@ -182,11 +186,18 @@ type Options struct {

// Logger is the logger to use for the database.
Logger log.Logger

// GarbageCollectionDiscardRatio is the ratio at which the garbage collector discards
// old versions of a value. It must be between 0 and 1.
// Lower values will have a higher performance hit, but will use less disk space.
GarbageCollectionDiscardRatio float64
}

// applyToBadgerOpts applies the options to the badger options.
func (o *Options) applyToBadgerOpts(opts *badger.Options) {
opts.SyncWrites = o.GuaranteeFSync
opts.CompactL0OnClose = true
opts.NumCompactors = 2
}

func (o *Options) applyToDB(db *BadgerDB) {
Expand All @@ -197,6 +208,11 @@ func (o *Options) applyToDB(db *BadgerDB) {
if o.GarbageCollectionInterval != 0 {
db.gcInterval = o.GarbageCollectionInterval
}

if o.GarbageCollectionDiscardRatio != 0 {
db.gcDiscardRatio = o.GarbageCollectionDiscardRatio
}

}

// badgerLogger implements the badger.Logger interface.
Expand Down

0 comments on commit 1e8fc03

Please sign in to comment.