From f5b63211d7f3e2f5f8b698893313b2a54e4df7de Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 26 Nov 2019 01:34:23 +0530 Subject: [PATCH] Introduce in-memory mode in badger (#1113) This PR introduces in-memory mode in badger. The in-memory mode can be enabled by setting options.InMemory=true. When badger is running in in-memory mode no files are created and everything is stored in memory. On DB close, all stored data is lost. NOTE - An existing DB cannot be opened in in-memory mode. Fixes - https://github.com/dgraph-io/badger/issues/1001 --- backup_test.go | 80 +++++++++-------- batch_test.go | 15 +++- contrib/cover.sh | 1 + db.go | 161 +++++++++++++++++++++------------ db2_test.go | 16 +++- db_test.go | 205 ++++++++++++++++++++++++++++++------------ errors.go | 2 + key_registry.go | 26 ++++-- key_registry_test.go | 22 +++++ levels.go | 19 +++- managed_db_test.go | 58 +++++++----- manifest.go | 20 ++++- options.go | 10 +++ stream_writer.go | 44 +++++---- stream_writer_test.go | 60 +++++++++---- test.sh | 2 + txn.go | 2 +- txn_test.go | 174 +++++++++++++++++++++-------------- value.go | 28 ++++-- 19 files changed, 647 insertions(+), 298 deletions(-) diff --git a/backup_test.go b/backup_test.go index b9b79006a..16ef8ed5c 100644 --- a/backup_test.go +++ b/backup_test.go @@ -276,48 +276,56 @@ func populateEntries(db *DB, entries []*pb.KV) error { } func TestBackup(t *testing.T) { - var bb bytes.Buffer - - tmpdir, err := ioutil.TempDir("", "badger-test") - if err != nil { - t.Fatal(err) - } - defer removeDir(tmpdir) - - db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup0"))) - if err != nil { - t.Fatal(err) - } + test := func(t *testing.T, db *DB) { + var bb bytes.Buffer + N := 1000 + entries := createEntries(N) + require.NoError(t, populateEntries(db, entries)) - N := 1000 - entries := createEntries(N) - require.NoError(t, populateEntries(db1, entries)) - - _, err = db1.Backup(&bb, 0) - require.NoError(t, err) + _, err := db.Backup(&bb, 0) + require.NoError(t, err) - err = db1.View(func(txn *Txn) error { - opts := DefaultIteratorOptions - it := txn.NewIterator(opts) - defer it.Close() - var count int - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - idx, err := strconv.Atoi(string(item.Key())[3:]) - if err != nil { - return err + err = db.View(func(txn *Txn) error { + opts := DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + var count int + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + idx, err := strconv.Atoi(string(item.Key())[3:]) + if err != nil { + return err + } + if idx > N || !bytes.Equal(entries[idx].Key, item.Key()) { + return fmt.Errorf("%s: %s", string(item.Key()), ErrKeyNotFound) + } + count++ } - if idx > N || !bytes.Equal(entries[idx].Key, item.Key()) { - return fmt.Errorf("%s: %s", string(item.Key()), ErrKeyNotFound) + if N != count { + return fmt.Errorf("wrong number of items: %d expected, %d actual", N, count) } - count++ - } - if N != count { - return fmt.Errorf("wrong number of items: %d expected, %d actual", N, count) + return nil + }) + require.NoError(t, err) + } + t.Run("disk mode", func(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "badger-test") + if err != nil { + t.Fatal(err) } - return nil + defer removeDir(tmpdir) + opt := DefaultOptions(filepath.Join(tmpdir, "backup0")) + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := DefaultOptions("") + opt.InMemory = true + runBadgerTest(t, &opt, func(t *testing.T, db *DB) { + test(t, db) + }) }) - require.NoError(t, err) } func TestBackupRestore3(t *testing.T) { diff --git a/batch_test.go b/batch_test.go index 6b062eb43..484fd1471 100644 --- a/batch_test.go +++ b/batch_test.go @@ -32,7 +32,7 @@ func TestWriteBatch(t *testing.T) { return []byte(fmt.Sprintf("%128d", i)) } - runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test := func(t *testing.T, db *DB) { wb := db.NewWriteBatch() defer wb.Cancel() @@ -65,5 +65,18 @@ func TestWriteBatch(t *testing.T) { return nil }) require.NoError(t, err) + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := getTestOptions("") + opt.InMemory = true + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) } diff --git a/contrib/cover.sh b/contrib/cover.sh index d94b162b6..684bf98dd 100755 --- a/contrib/cover.sh +++ b/contrib/cover.sh @@ -17,6 +17,7 @@ for PKG in $(go list ./...|grep -v -E 'vendor'); do tail -n +2 $TMP >> $OUT done +echo "Running test with vlog_mmap false" # Another round of tests after turning off mmap go test -v -vlog_mmap=false github.com/dgraph-io/badger diff --git a/db.go b/db.go index b911c70ab..dd2f0fb54 100644 --- a/db.go +++ b/db.go @@ -188,6 +188,9 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { // Open returns a new DB object. func Open(opt Options) (db *DB, err error) { + if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") { + return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set") + } opt.maxBatchSize = (15 * opt.MaxTableSize) / 100 opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize) @@ -203,7 +206,16 @@ func Open(opt Options) (db *DB, err error) { return nil, errors.Errorf("Valuethreshold greater than max batch size of %d. Either "+ "reduce opt.ValueThreshold or increase opt.MaxTableSize.", opt.maxBatchSize) } - // Compact L0 on close if either it is set or if KeepL0InMemory is set. + if !(opt.ValueLogFileSize <= 2<<30 && opt.ValueLogFileSize >= 1<<20) { + return nil, ErrValueLogSize + } + if !(opt.ValueLogLoadingMode == options.FileIO || + opt.ValueLogLoadingMode == options.MemoryMap) { + return nil, ErrInvalidLoadingMode + } + + // Compact L0 on close if either it is set or if KeepL0InMemory is set. When + // keepL0InMemory is set we need to compact L0 on close otherwise we might lose data. opt.CompactL0OnClose = opt.CompactL0OnClose || opt.KeepL0InMemory if opt.ReadOnly { @@ -212,60 +224,46 @@ func Open(opt Options) (db *DB, err error) { // Do not perform compaction in read only mode. opt.CompactL0OnClose = false } + var dirLockGuard, valueDirLockGuard *directoryLockGuard - for _, path := range []string{opt.Dir, opt.ValueDir} { - dirExists, err := exists(path) + // Create directories and acquire lock on it only if badger is not running in InMemory mode. + // We don't have any directories/files in InMemory mode so we don't need to acquire + // any locks on them. + if !opt.InMemory { + if err := createDirs(opt); err != nil { + return nil, err + } + dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly) if err != nil { - return nil, y.Wrapf(err, "Invalid Dir: %q", path) + return nil, err } - if !dirExists { - if opt.ReadOnly { - return nil, errors.Errorf("Cannot find directory %q for read-only open", path) - } - // Try to create the directory - err = os.Mkdir(path, 0700) - if err != nil { - return nil, y.Wrapf(err, "Error Creating Dir: %q", path) + defer func() { + if dirLockGuard != nil { + _ = dirLockGuard.release() } + }() + absDir, err := filepath.Abs(opt.Dir) + if err != nil { + return nil, err } - } - absDir, err := filepath.Abs(opt.Dir) - if err != nil { - return nil, err - } - absValueDir, err := filepath.Abs(opt.ValueDir) - if err != nil { - return nil, err - } - var dirLockGuard, valueDirLockGuard *directoryLockGuard - dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly) - if err != nil { - return nil, err - } - defer func() { - if dirLockGuard != nil { - _ = dirLockGuard.release() - } - }() - if absValueDir != absDir { - valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly) + absValueDir, err := filepath.Abs(opt.ValueDir) if err != nil { return nil, err } - defer func() { - if valueDirLockGuard != nil { - _ = valueDirLockGuard.release() + if absValueDir != absDir { + valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly) + if err != nil { + return nil, err } - }() - } - if !(opt.ValueLogFileSize <= 2<<30 && opt.ValueLogFileSize >= 1<<20) { - return nil, ErrValueLogSize - } - if !(opt.ValueLogLoadingMode == options.FileIO || - opt.ValueLogLoadingMode == options.MemoryMap) { - return nil, ErrInvalidLoadingMode + defer func() { + if valueDirLockGuard != nil { + _ = valueDirLockGuard.release() + } + }() + } } - manifestFile, manifest, err := openOrCreateManifestFile(opt.Dir, opt.ReadOnly) + + manifestFile, manifest, err := openOrCreateManifestFile(opt) if err != nil { return nil, err } @@ -305,19 +303,21 @@ func Open(opt Options) (db *DB, err error) { blockCache: cache, } + if db.opt.InMemory { + db.opt.SyncWrites = false + db.opt.ValueThreshold = maxValueThreshold + } krOpt := KeyRegistryOptions{ ReadOnly: opt.ReadOnly, Dir: opt.Dir, EncryptionKey: opt.EncryptionKey, EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration, + InMemory: opt.InMemory, } - kr, err := OpenKeyRegistry(krOpt) - if err != nil { + if db.registry, err = OpenKeyRegistry(krOpt); err != nil { return nil, err } - db.registry = kr - // Calculate initial size. db.calculateSize() db.closers.updateSize = y.NewCloser(1) go db.updateSize(db.closers.updateSize) @@ -370,8 +370,10 @@ func Open(opt Options) (db *DB, err error) { db.closers.writes = y.NewCloser(1) go db.doWrites(db.closers.writes) - db.closers.valueGC = y.NewCloser(1) - go db.vlog.waitOnGC(db.closers.valueGC) + if !db.opt.InMemory { + db.closers.valueGC = y.NewCloser(1) + go db.vlog.waitOnGC(db.closers.valueGC) + } db.closers.pub = y.NewCloser(1) go db.pub.listenForUpdates(db.closers.pub) @@ -402,8 +404,10 @@ func (db *DB) close() (err error) { atomic.StoreInt32(&db.blockWrites, 1) - // Stop value GC first. - db.closers.valueGC.SignalAndWait() + if !db.opt.InMemory { + // Stop value GC first. + db.closers.valueGC.SignalAndWait() + } // Stop writes next. db.closers.writes.SignalAndWait() @@ -476,6 +480,9 @@ func (db *DB) close() (err error) { db.blockCache.Close() db.elog.Finish() + if db.opt.InMemory { + return + } if db.dirLockGuard != nil { if guardErr := db.dirLockGuard.release(); err == nil { @@ -497,10 +504,10 @@ func (db *DB) close() (err error) { // Fsync directories to ensure that lock file, and any other removed files whose directory // we haven't specifically fsynced, are guaranteed to have their directory entry removal // persisted to disk. - if syncErr := syncDir(db.opt.Dir); err == nil { + if syncErr := db.syncDir(db.opt.Dir); err == nil { err = errors.Wrap(syncErr, "DB.Close") } - if syncErr := syncDir(db.opt.ValueDir); err == nil { + if syncErr := db.syncDir(db.opt.ValueDir); err == nil { err = errors.Wrap(syncErr, "DB.Close") } @@ -626,7 +633,10 @@ func (db *DB) shouldWriteValueToLSM(e Entry) bool { } func (db *DB) writeToLSM(b *request) error { - if len(b.Ptrs) != len(b.Entries) { + // We should check the length of b.Prts and b.Entries only when badger is not + // running in InMemory mode. In InMemory mode, we don't write anything to the + // value log and that's why the length of b.Ptrs will always be zero. + if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) { return errors.Errorf("Ptrs and Entries don't match: %+v", b) } @@ -668,7 +678,6 @@ func (db *DB) writeRequests(reqs []*request) error { } } db.elog.Printf("writeRequests called. Writing to value log") - err := db.vlog.write(reqs) if err != nil { done(err) @@ -949,7 +958,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { // Don't block just to sync the directory entry. dirSyncCh := make(chan error, 1) - go func() { dirSyncCh <- syncDir(db.opt.Dir) }() + go func() { dirSyncCh <- db.syncDir(db.opt.Dir) }() if _, err = fd.Write(tableData); err != nil { db.elog.Errorf("ERROR while writing to level 0: %v", err) @@ -1020,6 +1029,9 @@ func exists(path string) (bool, error) { // This function does a filewalk, calculates the size of vlog and sst files and stores it in // y.LSMSize and y.VlogSize. func (db *DB) calculateSize() { + if db.opt.InMemory { + return + } newInt := func(val int64) *expvar.Int { v := new(expvar.Int) v.Add(val) @@ -1057,6 +1069,9 @@ func (db *DB) calculateSize() { func (db *DB) updateSize(lc *y.Closer) { defer lc.Done() + if db.opt.InMemory { + return + } metricsTicker := time.NewTicker(time.Minute) defer metricsTicker.Stop() @@ -1099,6 +1114,9 @@ func (db *DB) updateSize(lc *y.Closer) { // Note: Every time GC is run, it would produce a spike of activity on the LSM // tree. func (db *DB) RunValueLogGC(discardRatio float64) error { + if db.opt.InMemory { + return ErrGCInMemoryMode + } if discardRatio >= 1.0 || discardRatio <= 0.0 { return ErrInvalidRequest } @@ -1588,3 +1606,30 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList), prefixes ...[] func (db *DB) shouldEncrypt() bool { return len(db.opt.EncryptionKey) > 0 } + +func (db *DB) syncDir(dir string) error { + if db.opt.InMemory { + return nil + } + return syncDir(dir) +} + +func createDirs(opt Options) error { + for _, path := range []string{opt.Dir, opt.ValueDir} { + dirExists, err := exists(path) + if err != nil { + return y.Wrapf(err, "Invalid Dir: %q", path) + } + if !dirExists { + if opt.ReadOnly { + return errors.Errorf("Cannot find directory %q for read-only open", path) + } + // Try to create the directory + err = os.Mkdir(path, 0700) + if err != nil { + return y.Wrapf(err, "Error Creating Dir: %q", path) + } + } + } + return nil +} diff --git a/db2_test.go b/db2_test.go index 96280f445..4d7ee69a9 100644 --- a/db2_test.go +++ b/db2_test.go @@ -395,7 +395,7 @@ func TestBigValues(t *testing.T) { opts := DefaultOptions(""). WithValueThreshold(1 << 20). WithValueLogMaxEntries(100) - runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test := func(t *testing.T, db *DB) { keyCount := 1000 data := bytes.Repeat([]byte("a"), (1 << 20)) // Valuesize 1 MB. @@ -431,6 +431,20 @@ func TestBigValues(t *testing.T) { for i := 0; i < keyCount; i++ { require.NoError(t, getByKey(key(i))) } + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opts.InMemory = true + opts.Dir = "" + opts.ValueDir = "" + db, err := Open(opts) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) } diff --git a/db_test.go b/db_test.go index dcd5f9747..f285aa097 100644 --- a/db_test.go +++ b/db_test.go @@ -125,6 +125,11 @@ func runBadgerTest(t *testing.T, opts *Options, test func(t *testing.T, db *DB)) opts.Dir = dir opts.ValueDir = dir } + + if opts.InMemory { + opts.Dir = "" + opts.ValueDir = "" + } db, err := Open(*opts) require.NoError(t, err) defer func() { @@ -229,7 +234,7 @@ func TestConcurrentWrite(t *testing.T) { } func TestGet(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test := func(t *testing.T, db *DB) { txnSet(t, db, []byte("key1"), []byte("val1"), 0x08) txn := db.NewTransaction(false) @@ -271,6 +276,18 @@ func TestGet(t *testing.T) { require.NoError(t, err) require.EqualValues(t, longVal, getItemValue(t, item)) txn.Discard() + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opts := DefaultOptions("").WithInmemory(true) + db, err := Open(opts) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) } @@ -459,7 +476,7 @@ func TestGetMore(t *testing.T) { // Put a lot of data to move some data to disk. // WARNING: This test might take a while but it should pass! func TestExistsMore(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test := func(t *testing.T, db *DB) { // n := 500000 n := 10000 m := 45 @@ -519,12 +536,23 @@ func TestExistsMore(t *testing.T) { })) } fmt.Println("Done and closing") + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := DefaultOptions("").WithInmemory(true) + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) } func TestIterate2Basic(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - + test := func(t *testing.T, db *DB) { bkey := func(i int) []byte { return []byte(fmt.Sprintf("%09d", i)) } @@ -582,7 +610,20 @@ func TestIterate2Basic(t *testing.T) { } } it.Close() + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := DefaultOptions("").WithInmemory(true) + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) + } func TestLoad(t *testing.T) { @@ -1178,34 +1219,44 @@ var benchmarkData = []struct { } func TestLargeKeys(t *testing.T) { - dir, err := ioutil.TempDir("", "badger-test") - require.NoError(t, err) - defer removeDir(dir) - - db, err := Open(DefaultOptions(dir).WithValueLogFileSize(1024 * 1024 * 1024)) - require.NoError(t, err) - for i := 0; i < 1000; i++ { - tx := db.NewTransaction(true) - for _, kv := range benchmarkData { - k := make([]byte, len(kv.key)) - copy(k, kv.key) - - v := make([]byte, len(kv.value)) - copy(v, kv.value) - if err := tx.SetEntry(NewEntry(k, v)); err != nil { - // check is success should be true - if kv.success { - t.Fatalf("failed with: %s", err) + test := func(t *testing.T, opt Options) { + db, err := Open(opt) + require.NoError(t, err) + for i := 0; i < 1000; i++ { + tx := db.NewTransaction(true) + for _, kv := range benchmarkData { + k := make([]byte, len(kv.key)) + copy(k, kv.key) + + v := make([]byte, len(kv.value)) + copy(v, kv.value) + if err := tx.SetEntry(NewEntry(k, v)); err != nil { + // check is success should be true + if kv.success { + t.Fatalf("failed with: %s", err) + } + } else if !kv.success { + t.Fatal("insertion should fail") } - } else if !kv.success { - t.Fatal("insertion should fail") + } + if err := tx.Commit(); err != nil { + t.Fatalf("#%d: batchSet err: %v", i, err) } } - if err := tx.Commit(); err != nil { - t.Fatalf("#%d: batchSet err: %v", i, err) - } + require.NoError(t, db.Close()) } - require.NoError(t, db.Close()) + t.Run("disk mode", func(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + opt := DefaultOptions(dir).WithValueLogFileSize(1024 * 1024 * 1024) + test(t, opt) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := DefaultOptions("").WithValueLogFileSize(1024 * 1024 * 1024) + opt.InMemory = true + test(t, opt) + }) } func TestCreateDirs(t *testing.T) { @@ -1215,7 +1266,7 @@ func TestCreateDirs(t *testing.T) { db, err := Open(DefaultOptions(filepath.Join(dir, "badger"))) require.NoError(t, err) - db.Close() + require.NoError(t, db.Close()) _, err = os.Stat(dir) require.NoError(t, err) } @@ -1572,38 +1623,47 @@ func TestMinReadTs(t *testing.T) { } func TestGoroutineLeak(t *testing.T) { - time.Sleep(1 * time.Second) - before := runtime.NumGoroutine() - t.Logf("Num go: %d", before) - for i := 0; i < 12; i++ { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { - updated := false - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - err := db.Subscribe(ctx, func(kvs *pb.KVList) { - require.Equal(t, []byte("value"), kvs.Kv[0].GetValue()) - updated = true - wg.Done() - }, []byte("key")) - if err != nil { - require.Equal(t, err.Error(), context.Canceled.Error()) - } - }() - // Wait for the go routine to be scheduled. - time.Sleep(time.Second) - err := db.Update(func(txn *Txn) error { - return txn.SetEntry(NewEntry([]byte("key"), []byte("value"))) + test := func(t *testing.T, opt *Options) { + time.Sleep(1 * time.Second) + before := runtime.NumGoroutine() + t.Logf("Num go: %d", before) + for i := 0; i < 12; i++ { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + updated := false + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + err := db.Subscribe(ctx, func(kvs *pb.KVList) { + require.Equal(t, []byte("value"), kvs.Kv[0].GetValue()) + updated = true + wg.Done() + }, []byte("key")) + if err != nil { + require.Equal(t, err.Error(), context.Canceled.Error()) + } + }() + // Wait for the go routine to be scheduled. + time.Sleep(time.Second) + err := db.Update(func(txn *Txn) error { + return txn.SetEntry(NewEntry([]byte("key"), []byte("value"))) + }) + require.NoError(t, err) + wg.Wait() + cancel() + require.Equal(t, true, updated) }) - require.NoError(t, err) - wg.Wait() - cancel() - require.Equal(t, true, updated) - }) + } + time.Sleep(2 * time.Second) + require.Equal(t, before, runtime.NumGoroutine()) } - time.Sleep(2 * time.Second) - require.Equal(t, before, runtime.NumGoroutine()) + t.Run("disk mode", func(t *testing.T) { + test(t, nil) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := DefaultOptions("").WithInmemory(true) + test(t, &opt) + }) } func ExampleOpen() { @@ -1972,3 +2032,30 @@ func removeDir(dir string) func() { } } } + +func TestWriteInemory(t *testing.T) { + opt := DefaultOptions("").WithInmemory(true) + db, err := Open(opt) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + for i := 0; i < 100; i++ { + txnSet(t, db, []byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("val%d", i)), 0x00) + } + err = db.View(func(txn *Txn) error { + for j := 0; j < 100; j++ { + item, err := txn.Get([]byte(fmt.Sprintf("key%d", j))) + require.NoError(t, err) + expected := []byte(fmt.Sprintf("val%d", j)) + item.Value(func(val []byte) error { + require.Equal(t, expected, val, + "Invalid value for key %q. expected: %q, actual: %q", + item.Key(), expected, val) + return nil + }) + } + return nil + }) + require.NoError(t, err) +} diff --git a/errors.go b/errors.go index 5d1905b70..bb9891e78 100644 --- a/errors.go +++ b/errors.go @@ -122,4 +122,6 @@ var ( ErrInvalidEncryptionKey = errors.New("Encryption key's length should be" + "either 16, 24, or 32 bytes") + + ErrGCInMemoryMode = errors.New("Cannot run value log GC when DB is opened in InMemory mode") ) diff --git a/key_registry.go b/key_registry.go index d1ecbae45..db32acd1a 100644 --- a/key_registry.go +++ b/key_registry.go @@ -57,6 +57,7 @@ type KeyRegistryOptions struct { ReadOnly bool EncryptionKey []byte EncryptionKeyRotationDuration time.Duration + InMemory bool } // newKeyRegistry returns KeyRegistry. @@ -80,6 +81,10 @@ func OpenKeyRegistry(opt KeyRegistryOptions) (*KeyRegistry, error) { break } } + // If db is opened in InMemory mode, we don't need to write key registry to the disk. + if opt.InMemory { + return newKeyRegistry(opt), nil + } path := filepath.Join(opt.Dir, KeyRegistryFileName) var flags uint32 if opt.ReadOnly { @@ -354,14 +359,17 @@ func (kr *KeyRegistry) latestDataKey() (*pb.DataKey, error) { CreatedAt: time.Now().Unix(), Iv: iv, } - // Store the datekey. - buf := &bytes.Buffer{} - if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil { - return nil, err - } - // Persist the datakey to the disk - if _, err = kr.fp.Write(buf.Bytes()); err != nil { - return nil, err + // Don't store the datakey on file if badger is running in InMemory mode. + if !kr.opt.InMemory { + // Store the datekey. + buf := &bytes.Buffer{} + if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil { + return nil, err + } + // Persist the datakey to the disk + if _, err = kr.fp.Write(buf.Bytes()); err != nil { + return nil, err + } } // storeDatakey encrypts the datakey So, placing un-encrypted key in the memory. dk.Data = k @@ -372,7 +380,7 @@ func (kr *KeyRegistry) latestDataKey() (*pb.DataKey, error) { // Close closes the key registry. func (kr *KeyRegistry) Close() error { - if !kr.opt.ReadOnly { + if !(kr.opt.ReadOnly || kr.opt.InMemory) { return kr.fp.Close() } return nil diff --git a/key_registry_test.go b/key_registry_test.go index 06c41bf28..656f26ab3 100644 --- a/key_registry_test.go +++ b/key_registry_test.go @@ -132,3 +132,25 @@ func TestEncryptionAndDecryption(t *testing.T) { require.Equal(t, dk.Data, dk1.Data) require.NoError(t, kr.Close()) } + +func TestKeyRegistryInMemory(t *testing.T) { + encryptionKey := make([]byte, 32) + _, err := rand.Read(encryptionKey) + require.NoError(t, err) + + opt := getRegistryTestOptions("", encryptionKey) + opt.InMemory = true + + kr, err := OpenKeyRegistry(opt) + require.NoError(t, err) + _, err = kr.latestDataKey() + require.NoError(t, err) + // We're resetting the last created timestamp. So, it creates + // new datakey. + kr.lastCreated = 0 + _, err = kr.latestDataKey() + // We generated two key. So, checking the length. + require.Equal(t, 2, len(kr.dataKeys)) + require.NoError(t, err) + require.NoError(t, kr.Close()) +} diff --git a/levels.go b/levels.go index 3fb35a2e8..30b753276 100644 --- a/levels.go +++ b/levels.go @@ -98,6 +98,9 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { s.cstatus.levels[i] = new(levelCompactStatus) } + if db.opt.InMemory { + return s, nil + } // Compare manifest against directory, check for existent/non-existent files, and remove. if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil { return nil, err @@ -454,6 +457,10 @@ func (s *levelsController) compactBuildTables( // value log file should be GCed. discardStats := make(map[uint32]int64) updateStats := func(vs y.ValueStruct) { + // We don't need to store/update discard stats when badger is running in Disk-less mode. + if s.kv.opt.InMemory { + return + } if vs.Meta&bitValuePointer > 0 { var vp valuePointer vp.Decode(vs.Value) @@ -603,7 +610,15 @@ func (s *levelsController) compactBuildTables( fileID := s.reserveFileID() go func(builder *table.Builder) { defer builder.Close() - tbl, err := build(fileID) + var ( + tbl *table.Table + err error + ) + if s.kv.opt.InMemory { + tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts) + } else { + tbl, err = build(fileID) + } resultCh <- newTableResult{tbl, err} }(builder) } @@ -623,7 +638,7 @@ func (s *levelsController) compactBuildTables( // Ensure created files' directory entries are visible. We don't mind the extra latency // from not doing this ASAP after all file creation has finished because this is a // background operation. - firstErr = syncDir(s.kv.opt.Dir) + firstErr = s.kv.syncDir(s.kv.opt.Dir) } if firstErr != nil { diff --git a/managed_db_test.go b/managed_db_test.go index f0c47b39c..5a2517462 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -137,34 +137,46 @@ func TestDropAll(t *testing.T) { } func TestDropAllTwice(t *testing.T) { - dir, err := ioutil.TempDir("", "badger-test") - require.NoError(t, err) - defer removeDir(dir) - opts := getTestOptions(dir) - opts.ValueLogFileSize = 5 << 20 - db, err := Open(opts) - require.NoError(t, err) - defer func() { - require.NoError(t, db.Close()) - }() + test := func(t *testing.T, opts Options) { + db, err := Open(opts) - N := uint64(10000) - populate := func(db *DB) { - writer := db.NewWriteBatch() - for i := uint64(0); i < N; i++ { - require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true))) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + N := uint64(10000) + populate := func(db *DB) { + writer := db.NewWriteBatch() + for i := uint64(0); i < N; i++ { + require.NoError(t, writer.Set([]byte(key("key", int(i))), val(false))) + } + require.NoError(t, writer.Flush()) } - require.NoError(t, writer.Flush()) - } - populate(db) - require.Equal(t, int(N), numKeys(db)) + populate(db) + require.Equal(t, int(N), numKeys(db)) - require.NoError(t, db.DropAll()) - require.Equal(t, 0, numKeys(db)) + require.NoError(t, db.DropAll()) + require.Equal(t, 0, numKeys(db)) - // Call DropAll again. - require.NoError(t, db.DropAll()) + // Call DropAll again. + require.NoError(t, db.DropAll()) + } + t.Run("disk mode", func(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + opts := getTestOptions(dir) + opts.ValueLogFileSize = 5 << 20 + test(t, opts) + }) + t.Run("InMemory mode", func(t *testing.T) { + opts := getTestOptions("") + opts.InMemory = true + test(t, opts) + + }) } func TestDropAllWithPendingTxn(t *testing.T) { diff --git a/manifest.go b/manifest.go index 3d7faa5e9..117976ecb 100644 --- a/manifest.go +++ b/manifest.go @@ -87,6 +87,9 @@ type manifestFile struct { // Used to track the current state of the manifest, used when rewriting. manifest Manifest + + // Used to indicate if badger was opened in InMemory mode. + inMemory bool } const ( @@ -114,11 +117,14 @@ func (m *Manifest) clone() Manifest { return ret } -// openOrCreateManifestFile opens a Badger manifest file if it exists, or creates on if -// one doesn’t. -func openOrCreateManifestFile(dir string, readOnly bool) ( +// openOrCreateManifestFile opens a Badger manifest file if it exists, or creates one if +// doesn’t exists. +func openOrCreateManifestFile(opt Options) ( ret *manifestFile, result Manifest, err error) { - return helpOpenOrCreateManifestFile(dir, readOnly, manifestDeletionsRewriteThreshold) + if opt.InMemory { + return &manifestFile{inMemory: true}, Manifest{}, nil + } + return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, manifestDeletionsRewriteThreshold) } func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) ( @@ -180,6 +186,9 @@ func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold } func (mf *manifestFile) close() error { + if mf.inMemory { + return nil + } return mf.fp.Close() } @@ -188,6 +197,9 @@ func (mf *manifestFile) close() error { // this depends on the filesystem -- some might append garbage data if a system crash happens at // the wrong time.) func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { + if mf.inMemory { + return nil + } changes := pb.ManifestChangeSet{Changes: changesParam} buf, err := proto.Marshal(&changes) if err != nil { diff --git a/options.go b/options.go index 0dbd00d07..5de0d4fd0 100644 --- a/options.go +++ b/options.go @@ -50,6 +50,7 @@ type Options struct { Logger Logger Compression options.CompressionType EventLogging bool + InMemory bool // Fine tuning options. @@ -544,6 +545,15 @@ func (opt Options) WithMaxCacheSize(size int64) Options { return opt } +// WithInMemory returns a new Options value with Inmemory mode set to the given value. +// +// When badger is running in InMemory mode, everything is stored in memory. No value/sst files are +// created. In case of a crash all data will be lost. +func (opt Options) WithInmemory(b bool) Options { + opt.InMemory = b + return opt +} + // WithZSTDCompressionLevel returns a new Options value with ZSTDCompressionLevel set // to the given value. // diff --git a/stream_writer.go b/stream_writer.go index 08e178346..15affcfba 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -245,11 +245,11 @@ func (sw *StreamWriter) Flush() error { // Now sync the directories, so all the files are registered. if sw.db.opt.ValueDir != sw.db.opt.Dir { - if err := syncDir(sw.db.opt.ValueDir); err != nil { + if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil { return err } } - if err := syncDir(sw.db.opt.Dir); err != nil { + if err := sw.db.syncDir(sw.db.opt.Dir); err != nil { return err } return sw.db.lc.validate() @@ -297,12 +297,14 @@ func (w *sortedWriter) handleRequests() { process := func(req *request) { for i, e := range req.Entries { - vptr := req.Ptrs[i] - if !vptr.IsZero() { - y.AssertTrue(w.head.Less(vptr)) - w.head = vptr + // If badger is running in InMemory mode, len(req.Ptrs) == 0. + if i < len(req.Ptrs) { + vptr := req.Ptrs[i] + if !vptr.IsZero() { + y.AssertTrue(w.head.Less(vptr)) + w.head = vptr + } } - var vs y.ValueStruct if e.skipVlog { vs = y.ValueStruct{ @@ -312,6 +314,7 @@ func (w *sortedWriter) handleRequests() { ExpiresAt: e.ExpiresAt, } } else { + vptr := req.Ptrs[i] vs = y.ValueStruct{ Value: vptr.Encode(), Meta: e.meta | bitValuePointer, @@ -401,19 +404,26 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { return nil } fileID := w.db.lc.reserveFileID() - fd, err := y.CreateSyncedFile(table.NewFilename(fileID, w.db.opt.Dir), true) - if err != nil { - return err - } - if _, err := fd.Write(data); err != nil { - return err - } opts := buildTableOptions(w.db.opt) opts.DataKey = builder.DataKey() opts.Cache = w.db.blockCache - tbl, err := table.OpenTable(fd, opts) - if err != nil { - return err + var tbl *table.Table + if w.db.opt.InMemory { + var err error + if tbl, err = table.OpenInMemoryTable(data, fileID, &opts); err != nil { + return err + } + } else { + fd, err := y.CreateSyncedFile(table.NewFilename(fileID, w.db.opt.Dir), true) + if err != nil { + return err + } + if _, err := fd.Write(data); err != nil { + return err + } + if tbl, err = table.OpenTable(fd, opts); err != nil { + return err + } } lc := w.db.lc diff --git a/stream_writer_test.go b/stream_writer_test.go index e10acc74d..27d5330e1 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -51,11 +51,7 @@ func getSortedKVList(valueSize, listSize int) *pb.KVList { // check if we can read values after writing using stream writer func TestStreamWriter1(t *testing.T) { - normalModeOpts := getTestOptions("") - managedModeOpts := getTestOptions("") - managedModeOpts.managedTxns = true - - for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} { + test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer noOfKeys := 1000 @@ -88,15 +84,25 @@ func TestStreamWriter1(t *testing.T) { require.NoError(t, err, "error while retrieving key") }) } + t.Run("Normal mode", func(t *testing.T) { + normalModeOpts := getTestOptions("") + test(t, &normalModeOpts) + }) + t.Run("Managed mode", func(t *testing.T) { + managedModeOpts := getTestOptions("") + managedModeOpts.managedTxns = true + test(t, &managedModeOpts) + }) + t.Run("InMemory mode", func(t *testing.T) { + diskLessModeOpts := getTestOptions("") + diskLessModeOpts.InMemory = true + test(t, &diskLessModeOpts) + }) } // write more keys to db after writing keys using stream writer func TestStreamWriter2(t *testing.T) { - normalModeOpts := getTestOptions("") - managedModeOpts := getTestOptions("") - managedModeOpts.managedTxns = true - - for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} { + test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer noOfKeys := 1000 @@ -141,14 +147,24 @@ func TestStreamWriter2(t *testing.T) { require.Nil(t, err, "error should be nil while iterating") }) } + t.Run("Normal mode", func(t *testing.T) { + normalModeOpts := getTestOptions("") + test(t, &normalModeOpts) + }) + t.Run("Managed mode", func(t *testing.T) { + managedModeOpts := getTestOptions("") + managedModeOpts.managedTxns = true + test(t, &managedModeOpts) + }) + t.Run("InMemory mode", func(t *testing.T) { + diskLessModeOpts := getTestOptions("") + diskLessModeOpts.InMemory = true + test(t, &diskLessModeOpts) + }) } func TestStreamWriter3(t *testing.T) { - normalModeOpts := getTestOptions("") - managedModeOpts := getTestOptions("") - managedModeOpts.managedTxns = true - - for _, opts := range []*Options{&normalModeOpts, &managedModeOpts} { + test := func(t *testing.T, opts *Options) { runBadgerTest(t, opts, func(t *testing.T, db *DB) { // write entries using stream writer noOfKeys := 1000 @@ -220,6 +236,20 @@ func TestStreamWriter3(t *testing.T) { require.Nil(t, err, "error should be nil while iterating") }) } + t.Run("Normal mode", func(t *testing.T) { + normalModeOpts := getTestOptions("") + test(t, &normalModeOpts) + }) + t.Run("Managed mode", func(t *testing.T) { + managedModeOpts := getTestOptions("") + managedModeOpts.managedTxns = true + test(t, &managedModeOpts) + }) + t.Run("InMemory mode", func(t *testing.T) { + diskLessModeOpts := getTestOptions("") + diskLessModeOpts.InMemory = true + test(t, &diskLessModeOpts) + }) } // After inserting all data from streams, StreamWriter reinitializes Oracle and updates its nextTs diff --git a/test.sh b/test.sh index 5b14bfd8f..90d21889c 100755 --- a/test.sh +++ b/test.sh @@ -2,6 +2,8 @@ set -e +go version + # Ensure that we can compile the binary. pushd badger go build -v . diff --git a/txn.go b/txn.go index 7af80daef..4e9df8fd7 100644 --- a/txn.go +++ b/txn.go @@ -294,7 +294,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator { func (txn *Txn) checkSize(e *Entry) error { count := txn.count + 1 - // Extra bytes for version in key. + // Extra bytes for the version in key. size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10 if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize { return ErrTxnTooBig diff --git a/txn_test.go b/txn_test.go index c0b6d7841..8450da910 100644 --- a/txn_test.go +++ b/txn_test.go @@ -33,7 +33,6 @@ import ( func TestTxnSimple(t *testing.T) { runBadgerTest(t, nil, func(t *testing.T, db *DB) { - txn := db.NewTransaction(true) for i := 0; i < 10; i++ { @@ -56,7 +55,7 @@ func TestTxnSimple(t *testing.T) { } func TestTxnReadAfterWrite(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test := func(t *testing.T, db *DB) { var wg sync.WaitGroup N := 100 wg.Add(N) @@ -80,6 +79,19 @@ func TestTxnReadAfterWrite(t *testing.T) { }(i) } wg.Wait() + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := getTestOptions("") + opt.InMemory = true + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) } @@ -615,7 +627,7 @@ func TestTxnIterationEdgeCase3(t *testing.T) { } func TestIteratorAllVersionsWithDeleted(t *testing.T) { - runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test := func(t *testing.T, db *DB) { // Write two keys err := db.Update(func(txn *Txn) error { require.NoError(t, txn.SetEntry(NewEntry([]byte("answer1"), []byte("42")))) @@ -661,6 +673,19 @@ func TestIteratorAllVersionsWithDeleted(t *testing.T) { return nil }) require.NoError(t, err) + } + t.Run("disk mode", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + test(t, db) + }) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt := getTestOptions("") + opt.InMemory = true + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) }) } @@ -713,86 +738,101 @@ func TestManagedDB(t *testing.T) { opt := getTestOptions(dir) opt.managedTxns = true - db, err := Open(opt) - require.NoError(t, err) - defer db.Close() - - key := func(i int) []byte { - return []byte(fmt.Sprintf("key-%02d", i)) - } - - val := func(i int) []byte { - return []byte(fmt.Sprintf("val-%d", i)) - } - require.Panics(t, func() { - db.Update(func(tx *Txn) error { return nil }) - }) + test := func(t *testing.T, db *DB) { + key := func(i int) []byte { + return []byte(fmt.Sprintf("key-%02d", i)) + } - err = db.View(func(tx *Txn) error { return nil }) - require.NoError(t, err) + val := func(i int) []byte { + return []byte(fmt.Sprintf("val-%d", i)) + } - // Write data at t=3. - txn := db.NewTransactionAt(3, true) - for i := 0; i <= 3; i++ { - require.NoError(t, txn.SetEntry(NewEntry(key(i), val(i)))) - } - require.Panics(t, func() { txn.Commit() }) - require.NoError(t, txn.CommitAt(3, nil)) - - // Read data at t=2. - txn = db.NewTransactionAt(2, false) - for i := 0; i <= 3; i++ { - _, err := txn.Get(key(i)) - require.Equal(t, ErrKeyNotFound, err) - } - txn.Discard() + require.Panics(t, func() { + db.Update(func(tx *Txn) error { return nil }) + }) - // Read data at t=3. - txn = db.NewTransactionAt(3, false) - for i := 0; i <= 3; i++ { - item, err := txn.Get(key(i)) - require.NoError(t, err) - require.Equal(t, uint64(3), item.Version()) - v, err := item.ValueCopy(nil) + err = db.View(func(tx *Txn) error { return nil }) require.NoError(t, err) - require.Equal(t, val(i), v) - } - txn.Discard() - - // Write data at t=7. - txn = db.NewTransactionAt(6, true) - for i := 0; i <= 7; i++ { - _, err := txn.Get(key(i)) - if err == nil { - continue // Don't overwrite existing keys. + + // Write data at t=3. + txn := db.NewTransactionAt(3, true) + for i := 0; i <= 3; i++ { + require.NoError(t, txn.SetEntry(NewEntry(key(i), val(i)))) } - require.NoError(t, txn.SetEntry(NewEntry(key(i), val(i)))) - } - require.NoError(t, txn.CommitAt(7, nil)) + require.Panics(t, func() { txn.Commit() }) + require.NoError(t, txn.CommitAt(3, nil)) - // Read data at t=9. - txn = db.NewTransactionAt(9, false) - for i := 0; i <= 9; i++ { - item, err := txn.Get(key(i)) - if i <= 7 { - require.NoError(t, err) - } else { + // Read data at t=2. + txn = db.NewTransactionAt(2, false) + for i := 0; i <= 3; i++ { + _, err := txn.Get(key(i)) require.Equal(t, ErrKeyNotFound, err) } + txn.Discard() - if i <= 3 { + // Read data at t=3. + txn = db.NewTransactionAt(3, false) + for i := 0; i <= 3; i++ { + item, err := txn.Get(key(i)) + require.NoError(t, err) require.Equal(t, uint64(3), item.Version()) - } else if i <= 7 { - require.Equal(t, uint64(7), item.Version()) - } - if i <= 7 { v, err := item.ValueCopy(nil) require.NoError(t, err) require.Equal(t, val(i), v) } + txn.Discard() + + // Write data at t=7. + txn = db.NewTransactionAt(6, true) + for i := 0; i <= 7; i++ { + _, err := txn.Get(key(i)) + if err == nil { + continue // Don't overwrite existing keys. + } + require.NoError(t, txn.SetEntry(NewEntry(key(i), val(i)))) + } + require.NoError(t, txn.CommitAt(7, nil)) + + // Read data at t=9. + txn = db.NewTransactionAt(9, false) + for i := 0; i <= 9; i++ { + item, err := txn.Get(key(i)) + if i <= 7 { + require.NoError(t, err) + } else { + require.Equal(t, ErrKeyNotFound, err) + } + + if i <= 3 { + require.Equal(t, uint64(3), item.Version()) + } else if i <= 7 { + require.Equal(t, uint64(7), item.Version()) + } + if i <= 7 { + v, err := item.ValueCopy(nil) + require.NoError(t, err) + require.Equal(t, val(i), v) + } + } + txn.Discard() } - txn.Discard() + t.Run("disk mode", func(t *testing.T) { + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) + }) + t.Run("InMemory mode", func(t *testing.T) { + opt.InMemory = true + opt.Dir = "" + opt.ValueDir = "" + db, err := Open(opt) + require.NoError(t, err) + test(t, db) + require.NoError(t, db.Close()) + }) + } func TestArmV7Issue311Fix(t *testing.T) { diff --git a/value.go b/value.go index c0e28758e..f71970e1c 100644 --- a/value.go +++ b/value.go @@ -732,6 +732,10 @@ func (vlog *valueLog) deleteLogFile(lf *logFile) error { } func (vlog *valueLog) dropAll() (int, error) { + // If db is opened in InMemory mode, we don't need to do anything since there are no vlog files. + if vlog.db.opt.InMemory { + return 0, nil + } // We don't want to block dropAll on any pending transactions. So, don't worry about iterator // count. var count int @@ -993,12 +997,16 @@ func (vlog *valueLog) replayLog(lf *logFile, offset uint32, replayFn logEntry) e } func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { - opt := db.opt - vlog.opt = opt - vlog.dirPath = opt.ValueDir + vlog.opt = db.opt vlog.db = db + // We don't need to open any vlog files or collect stats for GC if DB is opened + // in InMemory mode. InMemory mode doesn't create any files/directories on disk. + if vlog.opt.InMemory { + return nil + } + vlog.dirPath = vlog.opt.ValueDir vlog.elog = y.NoEventLog - if opt.EventLogging { + if vlog.opt.EventLogging { vlog.elog = trace.NewEventLog("Badger", "Valuelog") } vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time. @@ -1108,7 +1116,7 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { vlog.db.vhead = valuePointer{Fid: vlog.maxFid, Offset: uint32(lastOffset)} // Map the file if needed. When we create a file, it is automatically mapped. - if err = last.mmap(2 * opt.ValueLogFileSize); err != nil { + if err = last.mmap(2 * vlog.opt.ValueLogFileSize); err != nil { return errFile(err, last.path, "Map log file") } if err := vlog.populateDiscardStats(); err != nil { @@ -1139,6 +1147,9 @@ func (lf *logFile) init() error { } func (vlog *valueLog) Close() error { + if vlog.db.opt.InMemory { + return nil + } // close flushDiscardStats. vlog.lfDiscardStats.closer.SignalAndWait() @@ -1279,6 +1290,9 @@ func (vlog *valueLog) woffset() uint32 { // write is thread-unsafe by design and should not be called concurrently. func (vlog *valueLog) write(reqs []*request) error { + if vlog.db.opt.InMemory { + return nil + } vlog.filesLock.RLock() maxFid := atomic.LoadUint32(&vlog.maxFid) curlf := vlog.filesMap[maxFid] @@ -1708,6 +1722,10 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error { } func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) { + if vlog.opt.InMemory { + return + } + select { case vlog.lfDiscardStats.flushChan <- stats: default: