Skip to content

Commit

Permalink
fix(manifest): fix manifest corruption due to race condition in concu…
Browse files Browse the repository at this point in the history
…rrent compactions (#1756) (#1820)

This PR is a cherry pick of #1756. Addresses [this
issue](https://discuss.dgraph.io/t/badgerdb-manifest-corruption-issue-solution/15915)
on Discuss and[ this
issue](#1819) on Badger.

Co-authored-by: Kenan Kessler <[email protected]>
  • Loading branch information
2 people authored and mangalaman93 committed Feb 14, 2023
1 parent 4b93b45 commit 55afb28
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
10 changes: 5 additions & 5 deletions manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,14 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {

// Maybe we could use O_APPEND instead (on certain file systems)
mf.appendLock.Lock()
defer mf.appendLock.Unlock()
if err := applyChangeSet(&mf.manifest, &changes); err != nil {
mf.appendLock.Unlock()
return err
}
// Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
if err := mf.rewrite(); err != nil {
mf.appendLock.Unlock()
return err
}
} else {
Expand All @@ -225,15 +224,16 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
buf = append(lenCrcBuf[:], buf...)
if _, err := mf.fp.Write(buf); err != nil {
mf.appendLock.Unlock()
return err
}
}

mf.appendLock.Unlock()
return mf.fp.Sync()
return syncFunc(mf.fp)
}

// this function is saved here to allow injection of fake filesystem latency at test time.
var syncFunc = func(f *os.File) error { return f.Sync() }

// Has to be 4 bytes. The value can never change, ever, anyway.
var magicText = [4]byte{'B', 'd', 'g', 'r'}

Expand Down
40 changes: 40 additions & 0 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"

otrace "go.opencensus.io/trace"

Expand Down Expand Up @@ -245,3 +247,41 @@ func TestManifestRewrite(t *testing.T) {
uint64(deletionsThreshold * 3): {Level: 0},
}, m.Tables)
}

func TestConcurrentManifestCompaction(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

// overwrite the sync function to make this race condition easily reproducible
syncFunc = func(f *os.File) error {
// effectively making the Sync() take around 1s makes this reproduce every time
time.Sleep(1 * time.Second)
return f.Sync()
}

mf, _, err := helpOpenOrCreateManifestFile(dir, false, 0)
require.NoError(t, err)

cs := &pb.ManifestChangeSet{}
for i := uint64(0); i < 1000; i++ {
cs.Changes = append(cs.Changes,
newCreateChange(i, 0, 0, 0),
newDeleteChange(i),
)
}

// simulate 2 concurrent compaction threads
n := 2
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
require.NoError(t, mf.addChanges(cs.Changes))
}()
}
wg.Wait()

require.NoError(t, mf.close())
}

0 comments on commit 55afb28

Please sign in to comment.