From 55afb28177df08714416023300d7824822a52d22 Mon Sep 17 00:00:00 2001 From: Joshua Goldstein <92491720+joshua-goldstein@users.noreply.github.com> Date: Thu, 3 Nov 2022 17:56:08 -0500 Subject: [PATCH] fix(manifest): fix manifest corruption due to race condition in concurrent compactions (#1756) (#1820) This PR is a cherry pick of #1756. Addresses [this issue](https://discuss.dgraph.io/t/badgerdb-manifest-corruption-issue-solution/15915) on Discuss and[ this issue](https://github.com/dgraph-io/badger/issues/1819) on Badger. Co-authored-by: Kenan Kessler --- manifest.go | 10 +++++----- manifest_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/manifest.go b/manifest.go index bb27fb247..7face8b47 100644 --- a/manifest.go +++ b/manifest.go @@ -208,15 +208,14 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() + defer mf.appendLock.Unlock() if err := applyChangeSet(&mf.manifest, &changes); err != nil { - mf.appendLock.Unlock() return err } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { if err := mf.rewrite(); err != nil { - mf.appendLock.Unlock() return err } } else { @@ -225,15 +224,16 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable)) buf = append(lenCrcBuf[:], buf...) if _, err := mf.fp.Write(buf); err != nil { - mf.appendLock.Unlock() return err } } - mf.appendLock.Unlock() - return mf.fp.Sync() + return syncFunc(mf.fp) } +// this function is saved here to allow injection of fake filesystem latency at test time. +var syncFunc = func(f *os.File) error { return f.Sync() } + // Has to be 4 bytes. The value can never change, ever, anyway. var magicText = [4]byte{'B', 'd', 'g', 'r'} diff --git a/manifest_test.go b/manifest_test.go index 8a3293e7a..d57e79d94 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -24,7 +24,9 @@ import ( "os" "path/filepath" "sort" + "sync" "testing" + "time" otrace "go.opencensus.io/trace" @@ -245,3 +247,41 @@ func TestManifestRewrite(t *testing.T) { uint64(deletionsThreshold * 3): {Level: 0}, }, m.Tables) } + +func TestConcurrentManifestCompaction(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + // overwrite the sync function to make this race condition easily reproducible + syncFunc = func(f *os.File) error { + // effectively making the Sync() take around 1s makes this reproduce every time + time.Sleep(1 * time.Second) + return f.Sync() + } + + mf, _, err := helpOpenOrCreateManifestFile(dir, false, 0) + require.NoError(t, err) + + cs := &pb.ManifestChangeSet{} + for i := uint64(0); i < 1000; i++ { + cs.Changes = append(cs.Changes, + newCreateChange(i, 0, 0, 0), + newDeleteChange(i), + ) + } + + // simulate 2 concurrent compaction threads + n := 2 + wg := sync.WaitGroup{} + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + require.NoError(t, mf.addChanges(cs.Changes)) + }() + } + wg.Wait() + + require.NoError(t, mf.close()) +}