Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: optimize MVCCGarbageCollect for large numbers of versions #51184

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 0 additions & 65 deletions pkg/storage/bench_pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,6 @@ func BenchmarkMVCCScan_Pebble(b *testing.B) {
}
}

func BenchmarkExportToSst(b *testing.B) {
numKeys := []int{64, 512, 1024, 8192, 65536}
numRevisions := []int{1, 10, 100}
exportAllRevisions := []bool{false, true}
engineMakers := []struct {
name string
create engineMaker
}{
{"rocksdb", setupMVCCRocksDB},
{"pebble", setupMVCCPebble},
}

for _, engineImpl := range engineMakers {
b.Run(engineImpl.name, func(b *testing.B) {
for _, numKey := range numKeys {
b.Run(fmt.Sprintf("numKeys=%d", numKey), func(b *testing.B) {
for _, numRevision := range numRevisions {
b.Run(fmt.Sprintf("numRevisions=%d", numRevision), func(b *testing.B) {
for _, exportAllRevisionsVal := range exportAllRevisions {
b.Run(fmt.Sprintf("exportAllRevisions=%t", exportAllRevisionsVal), func(b *testing.B) {
runExportToSst(b, engineImpl.create, numKey, numRevision, exportAllRevisionsVal)
})
}
})
}
})
}
})
}
}

func BenchmarkMVCCReverseScan_Pebble(b *testing.B) {
if testing.Short() {
b.Skip("TODO: fix benchmark")
Expand Down Expand Up @@ -363,40 +332,6 @@ func BenchmarkClearIterRange_Pebble(b *testing.B) {
})
}

func BenchmarkMVCCGarbageCollect_Pebble(b *testing.B) {
if testing.Short() {
b.Skip("short flag")
}

// NB: To debug #16068, test only 128-128-15000-6.
ctx := context.Background()
for _, keySize := range []int{128} {
b.Run(fmt.Sprintf("keySize=%d", keySize), func(b *testing.B) {
for _, valSize := range []int{128} {
b.Run(fmt.Sprintf("valSize=%d", valSize), func(b *testing.B) {
for _, numKeys := range []int{1, 1024} {
b.Run(fmt.Sprintf("numKeys=%d", numKeys), func(b *testing.B) {
for _, numVersions := range []int{2, 1024} {
b.Run(fmt.Sprintf("numVersions=%d", numVersions), func(b *testing.B) {
runMVCCGarbageCollect(ctx, b, setupMVCCInMemPebble, benchGarbageCollectOptions{
benchDataOptions: benchDataOptions{
numKeys: numKeys,
numVersions: numVersions,
valueBytes: valSize,
},
keyBytes: keySize,
deleteVersions: numVersions - 1,
})
})
}
})
}
})
}
})
}
}

func BenchmarkBatchApplyBatchRepr_Pebble(b *testing.B) {
if testing.Short() {
b.Skip("short flag")
Expand Down
34 changes: 0 additions & 34 deletions pkg/storage/bench_rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,40 +366,6 @@ func BenchmarkClearIterRange_RocksDB(b *testing.B) {
})
}

func BenchmarkMVCCGarbageCollect_RocksDB(b *testing.B) {
if testing.Short() {
b.Skip("short flag")
}

// NB: To debug #16068, test only 128-128-15000-6.
ctx := context.Background()
for _, keySize := range []int{128} {
b.Run(fmt.Sprintf("keySize=%d", keySize), func(b *testing.B) {
for _, valSize := range []int{128} {
b.Run(fmt.Sprintf("valSize=%d", valSize), func(b *testing.B) {
for _, numKeys := range []int{1, 1024} {
b.Run(fmt.Sprintf("numKeys=%d", numKeys), func(b *testing.B) {
for _, numVersions := range []int{2, 1024} {
b.Run(fmt.Sprintf("numVersions=%d", numVersions), func(b *testing.B) {
runMVCCGarbageCollect(ctx, b, setupMVCCInMemRocksDB, benchGarbageCollectOptions{
benchDataOptions: benchDataOptions{
numKeys: numKeys,
numVersions: numVersions,
valueBytes: valSize,
},
keyBytes: keySize,
deleteVersions: numVersions - 1,
})
})
}
})
}
})
}
})
}
}

func BenchmarkBatchApplyBatchRepr_RocksDB(b *testing.B) {
if testing.Short() {
b.Skip("short flag")
Expand Down
97 changes: 97 additions & 0 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,103 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// Note: most benchmarks in this package have an engine-specific Benchmark
// function (see bench_rocksdb_test.go and bench_pebble_test.go). The newer
// Benchmarks with a unified implementation are here at the top of this file
// with the business logic for the implementation of the other tests following.

func BenchmarkMVCCGarbageCollect(b *testing.B) {
if testing.Short() {
b.Skip("short flag")
}

// NB: To debug #16068, test only 128-128-15000-6.
keySizes := []int{128}
valSizes := []int{128}
numKeys := []int{1, 1024}
versionConfigs := []struct {
total int
toDelete []int
}{
{2, []int{1}},
{1024, []int{1, 16, 32, 512, 1015, 1023}},
}
engineMakers := []struct {
name string
create engineMaker
}{
{"rocksdb", setupMVCCInMemRocksDB},
{"pebble", setupMVCCInMemPebble},
}

ctx := context.Background()
for _, engineImpl := range engineMakers {
b.Run(engineImpl.name, func(b *testing.B) {
for _, keySize := range keySizes {
b.Run(fmt.Sprintf("keySize=%d", keySize), func(b *testing.B) {
for _, valSize := range valSizes {
b.Run(fmt.Sprintf("valSize=%d", valSize), func(b *testing.B) {
for _, numKeys := range numKeys {
b.Run(fmt.Sprintf("numKeys=%d", numKeys), func(b *testing.B) {
for _, versions := range versionConfigs {
b.Run(fmt.Sprintf("numVersions=%d", versions.total), func(b *testing.B) {
for _, toDelete := range versions.toDelete {
b.Run(fmt.Sprintf("deleteVersions=%d", toDelete), func(b *testing.B) {
runMVCCGarbageCollect(ctx, b, engineImpl.create,
benchGarbageCollectOptions{
benchDataOptions: benchDataOptions{
numKeys: numKeys,
numVersions: versions.total,
valueBytes: valSize,
},
keyBytes: keySize,
deleteVersions: toDelete,
})
})
}
})
}
})
}
})
}
})
}
})
}
}

func BenchmarkExportToSst(b *testing.B) {
numKeys := []int{64, 512, 1024, 8192, 65536}
numRevisions := []int{1, 10, 100}
exportAllRevisions := []bool{false, true}
engineMakers := []struct {
name string
create engineMaker
}{
{"rocksdb", setupMVCCRocksDB},
{"pebble", setupMVCCPebble},
}

for _, engineImpl := range engineMakers {
b.Run(engineImpl.name, func(b *testing.B) {
for _, numKey := range numKeys {
b.Run(fmt.Sprintf("numKeys=%d", numKey), func(b *testing.B) {
for _, numRevision := range numRevisions {
b.Run(fmt.Sprintf("numRevisions=%d", numRevision), func(b *testing.B) {
for _, exportAllRevisionsVal := range exportAllRevisions {
b.Run(fmt.Sprintf("exportAllRevisions=%t", exportAllRevisionsVal), func(b *testing.B) {
runExportToSst(b, engineImpl.create, numKey, numRevision, exportAllRevisionsVal)
})
}
})
}
})
}
})
}
}

const overhead = 48 // Per key/value overhead (empirically determined)

type engineMaker func(testing.TB, string) Engine
Expand Down
126 changes: 94 additions & 32 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math"
"os"
"path/filepath"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -3151,24 +3152,43 @@ func MVCCResolveWriteIntentRangeUsingIter(
// keys slice. The iterator is seeked in turn to each listed
// key, clearing all values with timestamps <= to expiration. The
// timestamp parameter is used to compute the intent age on GC.
//
// Note that this method will be sorting the keys.
func MVCCGarbageCollect(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
keys []roachpb.GCRequest_GCKey,
timestamp hlc.Timestamp,
) error {
// We're allowed to use a prefix iterator because we always Seek() the
// iterator when handling a new user key.
iter := rw.NewIterator(IterOptions{Prefix: true})
defer iter.Close()

var count int64
defer func(begin time.Time) {
log.Eventf(ctx, "done with GC evaluation for %d keys at %.2f keys/sec. Deleted %d entries",
len(keys), float64(len(keys))*1e9/float64(timeutil.Since(begin)), count)
}(timeutil.Now())

// If there are no keys then there is no work.
if len(keys) == 0 {
return nil
}

// Sort the slice to both determine the bounds and ensure that we're seeking
// in increasing order.
sort.Slice(keys, func(i, j int) bool {
iKey := MVCCKey{Key: keys[i].Key, Timestamp: keys[i].Timestamp}
jKey := MVCCKey{Key: keys[j].Key, Timestamp: keys[j].Timestamp}
return iKey.Less(jKey)
})

// Bound the iterator appropriately for the set of keys we'll be garbage
// collecting.
iter := rw.NewIterator(IterOptions{
LowerBound: keys[0].Key,
UpperBound: keys[len(keys)-1].Key.Next(),
})
defer iter.Close()

// Iterate through specified GC keys.
meta := &enginepb.MVCCMetadata{}
for _, gcKey := range keys {
Expand Down Expand Up @@ -3222,19 +3242,63 @@ func MVCCGarbageCollect(
iter.Next()
}

// TODO(tschottdorf): Can't we just Seek() to a key with timestamp
// gcKey.Timestamp to avoid potentially cycling through a large prefix
// of versions we can't GC? The batching mechanism in the GC queue sends
// requests susceptible to that happening when there are lots of versions.
// A minor complication there will be that we need to know the first non-
// deletable value's timestamp (for prevNanos).

// Now, iterate through all values, GC'ing ones which have expired.
// For GCBytesAge, this requires keeping track of the previous key's
// timestamp (prevNanos). See ComputeStatsGo for a more easily digested
// and better commented version of this logic.

prevNanos := timestamp.WallTime
{
// If there are a large number of versions which are not garbage,
// iterating through all of them is very inefficient. However, if there
// are few, SeekLT is inefficient. Try to step the iterator a few times
// to find the predecessor of gcKey before resorting to seeking.
//
// In a synthetic benchmark where there is one version of garbage and one
// not, this optimization showed a 50% improvement. More importantly,
// this optimization mitigated the overhead of the Seek approach when
// almost all of the versions are garbage.
var foundPrevNanos bool
{
const nextsBeforeSeek = 4
for i := 0; i < nextsBeforeSeek; i++ {
if ok, err := iter.Valid(); err != nil {
return err
} else if !ok {
break
}
unsafeIterKey := iter.UnsafeKey()
if !unsafeIterKey.Key.Equal(encKey.Key) {
break
}
if unsafeIterKey.Timestamp.LessEq(gcKey.Timestamp) {
foundPrevNanos = true
break
}
prevNanos = unsafeIterKey.Timestamp.WallTime
iter.Next()
}
}

// Stepping with the iterator did not get us to our target garbage key or
// its predecessor. Seek to the predecessor to find the right value for
// prevNanos and position the iterator on the gcKey.
if !foundPrevNanos {
gcKeyMVCC := MVCCKey{Key: gcKey.Key, Timestamp: gcKey.Timestamp}
iter.SeekLT(gcKeyMVCC)
if ok, err := iter.Valid(); err != nil {
return err
} else if ok {
// Use the previous version's timestamp if it's for this key.
if iter.UnsafeKey().Key.Equal(gcKey.Key) {
prevNanos = iter.UnsafeKey().Timestamp.WallTime
}
// Seek to the first version for deletion.
iter.Next()
}
}
}

// Iterate through the garbage versions, accumulating their stats and
// issuing clear operations.
for ; ; iter.Next() {
if ok, err := iter.Valid(); err != nil {
return err
Expand All @@ -3248,26 +3312,24 @@ func MVCCGarbageCollect(
if !unsafeIterKey.IsValue() {
break
}
if unsafeIterKey.Timestamp.LessEq(gcKey.Timestamp) {
if ms != nil {
// FIXME: use prevNanos instead of unsafeIterKey.Timestamp, except
// when it's a deletion.
valSize := int64(len(iter.UnsafeValue()))

// A non-deletion becomes non-live when its newer neighbor shows up.
// A deletion tombstone becomes non-live right when it is created.
fromNS := prevNanos
if valSize == 0 {
fromNS = unsafeIterKey.Timestamp.WallTime
}

ms.Add(updateStatsOnGC(gcKey.Key, MVCCVersionTimestampSize,
valSize, nil, fromNS))
}
count++
if err := rw.Clear(unsafeIterKey); err != nil {
return err
if ms != nil {
// FIXME: use prevNanos instead of unsafeIterKey.Timestamp, except
// when it's a deletion.
valSize := int64(len(iter.UnsafeValue()))

// A non-deletion becomes non-live when its newer neighbor shows up.
// A deletion tombstone becomes non-live right when it is created.
fromNS := prevNanos
if valSize == 0 {
fromNS = unsafeIterKey.Timestamp.WallTime
}

ms.Add(updateStatsOnGC(gcKey.Key, MVCCVersionTimestampSize,
valSize, nil, fromNS))
}
count++
if err := rw.Clear(unsafeIterKey); err != nil {
return err
}
prevNanos = unsafeIterKey.Timestamp.WallTime
}
Expand Down