diff --git a/bbolt/bbolt.go b/bbolt/bbolt.go index a4e3137..1a83217 100644 --- a/bbolt/bbolt.go +++ b/bbolt/bbolt.go @@ -21,13 +21,15 @@ package bbolt import ( "bytes" "context" - "os" - "path" - + "fmt" "github.com/nuts-foundation/go-stoabs" "github.com/nuts-foundation/go-stoabs/util" "github.com/sirupsen/logrus" "go.etcd.io/bbolt" + "os" + "path" + "sync/atomic" + "time" ) var _ stoabs.ReadTx = (*bboltTx)(nil) @@ -43,10 +45,8 @@ func CreateBBoltStore(filePath string, opts ...stoabs.Option) (stoabs.KVStore, e } bboltOpts := *bbolt.DefaultOptions - if cfg.NoSync { + if cfg.SyncInterval != 0 { bboltOpts.NoSync = true - bboltOpts.NoFreelistSync = true - bboltOpts.NoGrowSync = true } return createBBoltStore(filePath, &bboltOpts, cfg) } @@ -66,20 +66,44 @@ func createBBoltStore(filePath string, options *bbolt.Options, cfg stoabs.Config } else { log = logrus.StandardLogger() } - return &bboltStore{ - db: db, - log: log, - }, nil + result := &bboltStore{ + db: db, + log: log, + mustSync: &atomic.Value{}, + } + result.mustSync.Store(false) + result.ctx, result.cancelCtx = context.WithCancel(context.Background()) + if cfg.SyncInterval > 0 { + result.startSync(cfg.SyncInterval) + } + return result, nil } type bboltStore struct { - db *bbolt.DB - log *logrus.Logger + db *bbolt.DB + log *logrus.Logger + mustSync *atomic.Value + ctx context.Context + cancelCtx context.CancelFunc } func (b bboltStore) Close(ctx context.Context) error { + // Close BBolt and wait for it to finish closeError := make(chan error) go func() { + // Explicitly flush to assert all changes have been + if b.mustSync.Load().(bool) { + err := b.db.Sync() + if err != nil { + closeError <- fmt.Errorf("could not flush to disk: %w", err) + return + } + } + + // Signal internal processes to end + b.cancelCtx() + + // Then close db closeError <- b.db.Close() }() select { @@ -148,6 +172,8 @@ func (b bboltStore) doTX(fn func(tx *bbolt.Tx) error, writable bool, optsSlice [ } return appError } + // Signal the write to be flushed to disk + b.mustSync.Store(true) // Observe result, commit/rollback if appError == nil { b.log.Trace("Committing BBolt transaction") @@ -170,6 +196,26 @@ func (b bboltStore) doTX(fn func(tx *bbolt.Tx) error, writable bool, optsSlice [ return nil } +func (b bboltStore) startSync(interval time.Duration) { + ticker := time.NewTicker(interval) + stop := b.ctx.Done() + go func(mustSync *atomic.Value, db *bbolt.DB) { + for { + select { + case _ = <-ticker.C: + if b.mustSync.Swap(false).(bool) { + err := b.db.Sync() + if err != nil { + b.log.WithError(err).Error("Failed to sync to disk") + } + } + case _ = <-stop: + return + } + } + }(b.mustSync, b.db) +} + type bboltTx struct { tx *bbolt.Tx } diff --git a/bbolt/bbolt_test.go b/bbolt/bbolt_test.go index e818cf4..77c6027 100644 --- a/bbolt/bbolt_test.go +++ b/bbolt/bbolt_test.go @@ -21,8 +21,10 @@ package bbolt import ( "context" "errors" + "fmt" "path" "testing" + "time" "github.com/nuts-foundation/go-stoabs" "github.com/nuts-foundation/go-stoabs/util" @@ -319,6 +321,71 @@ func TestBboltShelf_Range(t *testing.T) { }) } +func TestBboltShelf_FlushInterval(t *testing.T) { + t.Run("flush on close when dirty", func(t *testing.T) { + dbPath := path.Join(util.TestDirectory(t), "bbolt.db") + store, _ := CreateBBoltStore(dbPath, stoabs.WithSyncInterval(time.Second)) + + // Write 100 keys + const expectedNumEntries = 100 + data := make([]byte, 1024) + for i := 0; i < expectedNumEntries; i++ { + err := store.WriteShelf(shelf, func(writer stoabs.Writer) error { + return writer.Put(stoabs.BytesKey(fmt.Sprintf("%d", i)), data) + }) + if !assert.NoError(t, err) { + return + } + } + + // Close should flush + _ = store.Close(context.Background()) + + // Reopen, check data was flushed + store, _ = CreateBBoltStore(dbPath, stoabs.WithSyncInterval(time.Second)) + err := store.ReadShelf(shelf, func(reader stoabs.Reader) error { + assert.Equal(t, uint(expectedNumEntries), reader.Stats().NumEntries) + return nil + }) + assert.NoError(t, err) + + _ = store.Close(context.Background()) + + }) +} + +func BenchmarkBBoltShelf_Flush(b *testing.B) { + const expectedNumEntries = 10 + b.Run("auto flush", func(b *testing.B) { + dbPath := path.Join(util.TestDirectory(b), "bbolt.db") + store, _ := CreateBBoltStore(dbPath) + + b.ResetTimer() + for x := 0; x < b.N; x++ { + for i := 0; i < expectedNumEntries; i++ { + _ = store.WriteShelf(shelf, func(writer stoabs.Writer) error { + return writer.Put(stoabs.BytesKey(fmt.Sprintf("%d", i)), value) + }) + } + } + _ = store.Close(context.Background()) + }) + b.Run("flush at interval", func(b *testing.B) { + dbPath := path.Join(util.TestDirectory(b), "bbolt.db") + store, _ := CreateBBoltStore(dbPath, stoabs.WithSyncInterval(time.Second)) + + b.ResetTimer() + for x := 0; x < b.N; x++ { + for i := 0; i < expectedNumEntries; i++ { + _ = store.WriteShelf(shelf, func(writer stoabs.Writer) error { + return writer.Put(stoabs.BytesKey(fmt.Sprintf("%d", i)), value) + }) + } + } + _ = store.Close(context.Background()) + }) +} + func TestBBolt_Close(t *testing.T) { t.Run("close closed store", func(t *testing.T) { store, _ := createStore(t) diff --git a/store.go b/store.go index cbccfd4..b6d0712 100644 --- a/store.go +++ b/store.go @@ -21,6 +21,8 @@ package stoabs import ( "context" "errors" + "math" + "time" "github.com/sirupsen/logrus" ) @@ -44,13 +46,22 @@ type KVStore interface { type Option func(config *Config) type Config struct { - Log *logrus.Logger - NoSync bool + Log *logrus.Logger + // SyncInterval specifies whether changes to the database should be flushed to disk. If not set, + // changes will be flushed to disk immediately. If set, the store will attempt to flush changes to disk at a + // maximum of the specified interval. + SyncInterval time.Duration } func WithNoSync() Option { return func(config *Config) { - config.NoSync = true + config.SyncInterval = math.MaxInt64 + } +} + +func WithSyncInterval(interval time.Duration) Option { + return func(config *Config) { + config.SyncInterval = interval } } diff --git a/util/test.go b/util/test.go index 0814826..fc2734b 100644 --- a/util/test.go +++ b/util/test.go @@ -30,7 +30,7 @@ var invalidPathCharRegex = regexp.MustCompile("([^a-zA-Z0-9])") // TestDirectory returns a temporary directory for this test only. Calling TestDirectory multiple times for the same // instance of t returns a new directory every time. -func TestDirectory(t *testing.T) string { +func TestDirectory(t testing.TB) string { if dir, err := ioutil.TempDir("", normalizeTestName(t)); err != nil { t.Fatal(err) return "" @@ -44,6 +44,6 @@ func TestDirectory(t *testing.T) string { } } -func normalizeTestName(t *testing.T) string { +func normalizeTestName(t testing.TB) string { return invalidPathCharRegex.ReplaceAllString(t.Name(), "_") }