Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow BBolt to flush to disk at set interval instead of on every TX commit #4

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 58 additions & 12 deletions bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ package bbolt
import (
"bytes"
"context"
"os"
"path"

"fmt"
"github.com/nuts-foundation/go-stoabs"
"github.com/nuts-foundation/go-stoabs/util"
"github.com/sirupsen/logrus"
"go.etcd.io/bbolt"
"os"
"path"
"sync/atomic"
"time"
)

var _ stoabs.ReadTx = (*bboltTx)(nil)
Expand All @@ -43,10 +45,8 @@ func CreateBBoltStore(filePath string, opts ...stoabs.Option) (stoabs.KVStore, e
}

bboltOpts := *bbolt.DefaultOptions
if cfg.NoSync {
if cfg.SyncInterval != 0 {
bboltOpts.NoSync = true
bboltOpts.NoFreelistSync = true
bboltOpts.NoGrowSync = true
}
return createBBoltStore(filePath, &bboltOpts, cfg)
}
Expand All @@ -66,20 +66,44 @@ func createBBoltStore(filePath string, options *bbolt.Options, cfg stoabs.Config
} else {
log = logrus.StandardLogger()
}
return &bboltStore{
db: db,
log: log,
}, nil
result := &bboltStore{
db: db,
log: log,
mustSync: &atomic.Value{},
}
result.mustSync.Store(false)
result.ctx, result.cancelCtx = context.WithCancel(context.Background())
if cfg.SyncInterval > 0 {
result.startSync(cfg.SyncInterval)
}
return result, nil
}

type bboltStore struct {
db *bbolt.DB
log *logrus.Logger
db *bbolt.DB
log *logrus.Logger
mustSync *atomic.Value
ctx context.Context
cancelCtx context.CancelFunc
}

func (b bboltStore) Close(ctx context.Context) error {
// Close BBolt and wait for it to finish
closeError := make(chan error)
go func() {
// Explicitly flush to assert all changes have been
if b.mustSync.Load().(bool) {
err := b.db.Sync()
if err != nil {
closeError <- fmt.Errorf("could not flush to disk: %w", err)
return
}
}

// Signal internal processes to end
b.cancelCtx()

// Then close db
closeError <- b.db.Close()
}()
select {
Expand Down Expand Up @@ -148,6 +172,8 @@ func (b bboltStore) doTX(fn func(tx *bbolt.Tx) error, writable bool, optsSlice [
}
return appError
}
// Signal the write to be flushed to disk
b.mustSync.Store(true)
// Observe result, commit/rollback
if appError == nil {
b.log.Trace("Committing BBolt transaction")
Expand All @@ -170,6 +196,26 @@ func (b bboltStore) doTX(fn func(tx *bbolt.Tx) error, writable bool, optsSlice [
return nil
}

func (b bboltStore) startSync(interval time.Duration) {
ticker := time.NewTicker(interval)
stop := b.ctx.Done()
go func(mustSync *atomic.Value, db *bbolt.DB) {
for {
select {
case _ = <-ticker.C:
if b.mustSync.Swap(false).(bool) {
err := b.db.Sync()
if err != nil {
b.log.WithError(err).Error("Failed to sync to disk")
}
}
case _ = <-stop:
return
}
}
}(b.mustSync, b.db)
}

type bboltTx struct {
tx *bbolt.Tx
}
Expand Down
67 changes: 67 additions & 0 deletions bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ package bbolt
import (
"context"
"errors"
"fmt"
"path"
"testing"
"time"

"github.com/nuts-foundation/go-stoabs"
"github.com/nuts-foundation/go-stoabs/util"
Expand Down Expand Up @@ -319,6 +321,71 @@ func TestBboltShelf_Range(t *testing.T) {
})
}

func TestBboltShelf_FlushInterval(t *testing.T) {
t.Run("flush on close when dirty", func(t *testing.T) {
dbPath := path.Join(util.TestDirectory(t), "bbolt.db")
store, _ := CreateBBoltStore(dbPath, stoabs.WithSyncInterval(time.Second))

// Write 100 keys
const expectedNumEntries = 100
data := make([]byte, 1024)
for i := 0; i < expectedNumEntries; i++ {
err := store.WriteShelf(shelf, func(writer stoabs.Writer) error {
return writer.Put(stoabs.BytesKey(fmt.Sprintf("%d", i)), data)
})
if !assert.NoError(t, err) {
return
}
}

// Close should flush
_ = store.Close(context.Background())

// Reopen, check data was flushed
store, _ = CreateBBoltStore(dbPath, stoabs.WithSyncInterval(time.Second))
err := store.ReadShelf(shelf, func(reader stoabs.Reader) error {
assert.Equal(t, uint(expectedNumEntries), reader.Stats().NumEntries)
return nil
})
assert.NoError(t, err)

_ = store.Close(context.Background())

})
}

func BenchmarkBBoltShelf_Flush(b *testing.B) {
const expectedNumEntries = 10
b.Run("auto flush", func(b *testing.B) {
dbPath := path.Join(util.TestDirectory(b), "bbolt.db")
store, _ := CreateBBoltStore(dbPath)

b.ResetTimer()
for x := 0; x < b.N; x++ {
for i := 0; i < expectedNumEntries; i++ {
_ = store.WriteShelf(shelf, func(writer stoabs.Writer) error {
return writer.Put(stoabs.BytesKey(fmt.Sprintf("%d", i)), value)
})
}
}
_ = store.Close(context.Background())
})
b.Run("flush at interval", func(b *testing.B) {
dbPath := path.Join(util.TestDirectory(b), "bbolt.db")
store, _ := CreateBBoltStore(dbPath, stoabs.WithSyncInterval(time.Second))

b.ResetTimer()
for x := 0; x < b.N; x++ {
for i := 0; i < expectedNumEntries; i++ {
_ = store.WriteShelf(shelf, func(writer stoabs.Writer) error {
return writer.Put(stoabs.BytesKey(fmt.Sprintf("%d", i)), value)
})
}
}
_ = store.Close(context.Background())
})
}

func TestBBolt_Close(t *testing.T) {
t.Run("close closed store", func(t *testing.T) {
store, _ := createStore(t)
Expand Down
17 changes: 14 additions & 3 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package stoabs
import (
"context"
"errors"
"math"
"time"

"github.com/sirupsen/logrus"
)
Expand All @@ -44,13 +46,22 @@ type KVStore interface {
type Option func(config *Config)

type Config struct {
Log *logrus.Logger
NoSync bool
Log *logrus.Logger
// SyncInterval specifies whether changes to the database should be flushed to disk. If not set,
// changes will be flushed to disk immediately. If set, the store will attempt to flush changes to disk at a
// maximum of the specified interval.
SyncInterval time.Duration
}

func WithNoSync() Option {
return func(config *Config) {
config.NoSync = true
config.SyncInterval = math.MaxInt64
}
}

func WithSyncInterval(interval time.Duration) Option {
return func(config *Config) {
config.SyncInterval = interval
}
}

Expand Down
4 changes: 2 additions & 2 deletions util/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var invalidPathCharRegex = regexp.MustCompile("([^a-zA-Z0-9])")

// TestDirectory returns a temporary directory for this test only. Calling TestDirectory multiple times for the same
// instance of t returns a new directory every time.
func TestDirectory(t *testing.T) string {
func TestDirectory(t testing.TB) string {
if dir, err := ioutil.TempDir("", normalizeTestName(t)); err != nil {
t.Fatal(err)
return ""
Expand All @@ -44,6 +44,6 @@ func TestDirectory(t *testing.T) string {
}
}

func normalizeTestName(t *testing.T) string {
func normalizeTestName(t testing.TB) string {
return invalidPathCharRegex.ReplaceAllString(t.Name(), "_")
}