From 48554b490a3fe57c6ae3f221f968f0835fb758ad Mon Sep 17 00:00:00 2001 From: jocalvert Date: Thu, 29 Mar 2018 21:03:30 +0000 Subject: [PATCH] mvcc: Clone the key index for compaction and lock on each item For compaction, clone the original Btree for traversal purposes, so as to not hold the lock for the duration of compaction. This allows read/write throughput by not blocking when the index tree is large (> 1M entries). --- mvcc/index.go | 42 +++++++++++++++++----------------------- mvcc/index_bench_test.go | 28 +++++++++++++++++++++++++++ mvcc/kvstore.go | 4 ++-- 3 files changed, 48 insertions(+), 26 deletions(-) create mode 100644 mvcc/index_bench_test.go diff --git a/mvcc/index.go b/mvcc/index.go index 626de3825fcc..22dec99a11a7 100644 --- a/mvcc/index.go +++ b/mvcc/index.go @@ -185,27 +185,32 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision { func (ti *treeIndex) Compact(rev int64) map[revision]struct{} { available := make(map[revision]struct{}) - var emptyki []*keyIndex if ti.lg != nil { ti.lg.Info("compact tree index", zap.Int64("revision", rev)) } else { plog.Printf("store.index: compact %d", rev) } - // TODO: do not hold the lock for long time? - // This is probably OK. Compacting 10M keys takes O(10ms). ti.Lock() - defer ti.Unlock() - ti.tree.Ascend(compactIndex(rev, available, &emptyki)) - for _, ki := range emptyki { - item := ti.tree.Delete(ki) - if item == nil { - if ti.lg != nil { - ti.lg.Panic("failed to delete during compaction") - } else { - plog.Panic("store.index: unexpected delete failure during compaction") + clone := ti.tree.Clone() + ti.Unlock() + + clone.Ascend(func(item btree.Item) bool { + keyi := item.(*keyIndex) + ti.Lock() + keyi.compact(rev, available) + if keyi.isEmpty() { + item := ti.tree.Delete(keyi) + if item == nil { + if ti.lg != nil { + ti.lg.Panic("failed to delete during compaction") + } else { + plog.Panic("store.index: unexpected delete failure during compaction") + } } } - } + ti.Unlock() + return true + }) return available } @@ -222,17 +227,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} { return available } -func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool { - return func(i btree.Item) bool { - keyi := i.(*keyIndex) - keyi.compact(rev, available) - if keyi.isEmpty() { - *emptyki = append(*emptyki, keyi) - } - return true - } -} - func (ti *treeIndex) Equal(bi index) bool { b := bi.(*treeIndex) diff --git a/mvcc/index_bench_test.go b/mvcc/index_bench_test.go new file mode 100644 index 000000000000..e7120be57c30 --- /dev/null +++ b/mvcc/index_bench_test.go @@ -0,0 +1,28 @@ +package mvcc + +import ( + "testing" + + "go.uber.org/zap" +) + +func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) } +func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) } +func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) } +func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) } +func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) } + +func benchmarkIndexCompact(b *testing.B, size int) { + log := zap.NewNop() + kvindex := newTreeIndex(log) + + bytesN := 64 + keys := createBytesSlice(bytesN, size) + for i := 1; i < size; i++ { + kvindex.Put(keys[i], revision {main: int64(i), sub: int64(i)}) + } + b.ResetTimer() + for i := 1; i < b.N; i++ { + kvindex.Compact(int64(i)) + } +} diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index f03b6311e4f4..3d941335ea7a 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -245,9 +245,10 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { // ensure that desired compaction is persisted s.b.ForceCommit() - keep := s.kvindex.Compact(rev) ch := make(chan struct{}) var j = func(ctx context.Context) { + keep := s.kvindex.Compact(rev) + indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond)) if ctx.Err() != nil { s.compactBarrier(ctx, ch) return @@ -261,7 +262,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.fifoSched.Schedule(j) - indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond)) return ch, nil }