From 55120303267a1193931ce1a58154f0f82a2d061d Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 10 Jan 2020 11:33:51 -0500 Subject: [PATCH] storage: rework RunGC so it no longer buffers keys and values in memory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit reworks the processing of replicated state underneath the gcQueue for the purpose of determining and sending GC requests. The primary intention of this commit is to remove the need to buffer all of the versions of a key in memory. As we learned in #42531, this bufferring can be extremely unfortunate when using sequence data types which are written to frequently. Prior to this commit, the code forward iterates through the range's data and eagerly reads all versions of the a key into memory. It then binary searches those versions to find the latest timestamp for the key which can be GC'd. It then reverse iterates through all of those versions to determine the latest version of the key which would put the current batch over its limit. This last step works to paginate the process of actually deleting the data for many versions of the same key. I suppose this pagination was added to ensure that write batches due to GC requests don't get too large. Unfortunately this logic was unable to paginate the loading of versions from the storage engine. In this new commit, the entire process of computing data to GC now uses reverse iteration; for each key we examine versions from oldest to newest. The commit adds a `gcIterator` which wraps this reverse iteration with some useful lookahead. During this GC process, at most two additional versions need to examined to determine whether a given version is garbage. While this approach relies on reverse iteration which is known to be less efficient than forward iteration, it offers the opportunity to avoid allocating memory for versions of a key which are not going to end up as a part of a GC request. This reduction in memory usage shows up in benchmarks (see below). The change retains the old implementation as a testing strategy and as a basis for the benchmarks. ``` name old time/op new time/op delta Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000-8 924ns ± 8% 590ns ± 1% -36.13% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#01-8 976ns ± 2% 578ns ± 1% -40.75% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#02-8 944ns ± 0% 570ns ± 9% -39.63% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#03-8 903ns ± 0% 612ns ± 3% -32.29% (p=0.016 n=4+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#04-8 994ns ± 9% 592ns ± 9% -40.47% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000-8 669ns ± 4% 526ns ± 1% -21.34% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#01-8 624ns ± 0% 529ns ± 2% -15.16% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#02-8 636ns ± 4% 534ns ± 2% -16.04% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#03-8 612ns ± 1% 532ns ± 3% -13.08% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#04-8 638ns ± 2% 534ns ±10% -16.35% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000-8 603ns ± 6% 527ns ± 8% -12.51% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#01-8 613ns ± 5% 517ns ± 6% -15.78% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#02-8 619ns ± 6% 534ns ± 4% -13.61% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#03-8 607ns ± 7% 520ns ± 2% -14.39% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#04-8 599ns ± 4% 501ns ± 7% -16.36% (p=0.008 n=5+5) name old speed new speed delta Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000-8 23.9MB/s ± 8% 37.3MB/s ± 1% +56.23% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#01-8 22.6MB/s ± 2% 38.1MB/s ± 1% +68.81% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#02-8 23.3MB/s ± 0% 38.7MB/s ± 9% +66.06% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#03-8 24.4MB/s ± 0% 36.0MB/s ± 3% +47.73% (p=0.016 n=4+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#04-8 22.2MB/s ± 8% 37.3MB/s ± 9% +68.09% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000-8 34.4MB/s ± 4% 43.7MB/s ± 1% +27.08% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#01-8 36.9MB/s ± 0% 43.4MB/s ± 2% +17.84% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#02-8 36.2MB/s ± 4% 43.1MB/s ± 2% +19.02% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#03-8 37.6MB/s ± 1% 43.3MB/s ± 3% +15.02% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#04-8 36.0MB/s ± 2% 43.2MB/s ±10% +19.87% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000-8 36.5MB/s ± 5% 41.8MB/s ± 9% +14.39% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#01-8 35.9MB/s ± 5% 42.7MB/s ± 6% +18.83% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#02-8 35.6MB/s ± 6% 41.2MB/s ± 4% +15.66% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#03-8 36.3MB/s ± 6% 42.3MB/s ± 2% +16.69% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#04-8 36.7MB/s ± 4% 44.0MB/s ± 7% +19.69% (p=0.008 n=5+5) name old alloc/op new alloc/op delta Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000-8 325B ± 0% 76B ± 0% -76.62% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#01-8 358B ± 0% 49B ± 0% ~ (p=0.079 n=4+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#02-8 340B ± 0% 29B ± 0% -91.47% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#03-8 328B ± 0% 18B ± 0% -94.51% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#04-8 325B ± 0% 14B ± 0% -95.69% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000-8 226B ± 0% 2B ± 0% ~ (p=0.079 n=4+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#01-8 228B ± 0% 3B ± 0% -98.69% (p=0.000 n=5+4) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#02-8 228B ± 0% 2B ± 0% -99.12% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#03-8 228B ± 0% 2B ± 0% -99.12% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#04-8 226B ± 0% 0B -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000-8 388B ± 2% 0B -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#01-8 391B ± 2% 0B -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#02-8 389B ± 1% 0B -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#03-8 391B ± 2% 0B -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#04-8 390B ± 1% 0B -100.00% (p=0.008 n=5+5) name old allocs/op new allocs/op delta Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000-8 4.00 ± 0% 0.00 -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#01-8 4.00 ± 0% 0.00 -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#02-8 4.00 ± 0% 0.00 -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#03-8 4.00 ± 0% 0.00 -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[2,3],valueLen=[1,1],keysPerValue=[1,2],deleteFrac=0.000000,intentFrac=0.100000#04-8 4.00 ± 0% 0.00 -100.00% (p=0.008 n=5+5) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#01-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#02-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#03-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1,100],deleteFrac=0.100000,intentFrac=0.100000#04-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#01-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#02-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#03-8 0.00 0.00 ~ (all equal) Run/ts=[0,100],keySuffix=[8,8],valueLen=[8,16],keysPerValue=[1000,1000000],deleteFrac=0.100000,intentFrac=0.100000#04-8 0.00 0.00 ~ (all equal) ``` Release note (bug fix): The GC process was improved to paginate the key versions of a key to fix OOM crashes which can occur when there are extremely large numbers of versions for a given key. --- pkg/cli/debug.go | 2 +- pkg/storage/gc/data_distribution_test.go | 254 +++++++++++ pkg/storage/gc/gc.go | 509 ++++++++++++++++++++--- pkg/storage/gc/gc_iterator.go | 174 ++++++++ pkg/storage/gc/gc_iterator_test.go | 165 ++++++++ pkg/storage/gc/gc_old_test.go | 380 +++++++++++++++++ pkg/storage/gc/gc_random_test.go | 244 +++++++++++ pkg/storage/gc/gc_test.go | 87 ---- pkg/storage/gc/run_gc.go | 464 --------------------- pkg/storage/gc_queue.go | 8 +- pkg/storage/gc_queue_test.go | 10 +- pkg/storage/rditer/replica_data_iter.go | 12 + 12 files changed, 1699 insertions(+), 610 deletions(-) create mode 100644 pkg/storage/gc/data_distribution_test.go create mode 100644 pkg/storage/gc/gc_iterator.go create mode 100644 pkg/storage/gc/gc_iterator_test.go create mode 100644 pkg/storage/gc/gc_old_test.go create mode 100644 pkg/storage/gc/gc_random_test.go delete mode 100644 pkg/storage/gc/gc_test.go delete mode 100644 pkg/storage/gc/run_gc.go diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 126b4461b11b..0fe1b8c9ecb8 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -556,7 +556,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error { for _, desc := range descs { snap := db.NewSnapshot() defer snap.Close() - info, err := gc.RunGC( + info, err := gc.Run( context.Background(), &desc, snap, diff --git a/pkg/storage/gc/data_distribution_test.go b/pkg/storage/gc/data_distribution_test.go new file mode 100644 index 000000000000..0daf9b62261e --- /dev/null +++ b/pkg/storage/gc/data_distribution_test.go @@ -0,0 +1,254 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "fmt" + "math/rand" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// dataDistribution is an abstraction for testing that represents a stream of +// MVCCKeyValues. The stream may indicate that a value is an intent by returning +// a non-nil transaction. If an intent is returned it must have a higher +// timestamp than any other version written for the key. +type dataDistribution func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) + +// setupTest writes the data from this distribution into eng. All data should +// be a part of the range represented by desc. +func (ds dataDistribution) setupTest( + t testing.TB, eng engine.Engine, desc roachpb.RangeDescriptor, +) enginepb.MVCCStats { + ctx := context.TODO() + var maxTs hlc.Timestamp + var ms enginepb.MVCCStats + for { + kv, txn, ok := ds() + if !ok { + break + } + if txn == nil { + require.NoError(t, eng.Put(kv.Key, kv.Value)) + } else { + // TODO(ajwerner): Decide if using MVCCPut is worth it. + ts := kv.Key.Timestamp + if txn.ReadTimestamp == (hlc.Timestamp{}) { + txn.ReadTimestamp = ts + } + if txn.WriteTimestamp == (hlc.Timestamp{}) { + txn.WriteTimestamp = ts + } + err := engine.MVCCPut(ctx, eng, &ms, kv.Key.Key, ts, + roachpb.Value{RawBytes: kv.Value}, txn) + require.NoError(t, err) + } + if !kv.Key.Timestamp.Less(maxTs) { + maxTs = kv.Key.Timestamp + } + } + require.NoError(t, eng.Flush()) + snap := eng.NewSnapshot() + defer snap.Close() + ms, err := rditer.ComputeStatsForRange(&desc, snap, maxTs.WallTime) + require.NoError(t, err) + return ms +} + +// newDataDistribution constructs a dataDistribution from various underlying +// distributions. +func newDataDistribution( + tsDist func() hlc.Timestamp, + keyDist func() roachpb.Key, + valueDist func() []byte, + versionsPerKey func() int, + intentFrac float64, + totalKeys int, + rng *rand.Rand, +) dataDistribution { + // TODO(ajwerner): provide a mechanism to control the rate of expired intents + // or the intent age. Such a knob would likely require decoupling intents from + // other keys. + var ( + remaining = totalKeys + key roachpb.Key + seen = map[string]struct{}{} + timestamps []hlc.Timestamp + haveIntent bool + ) + return func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) { + if remaining == 0 { + return engine.MVCCKeyValue{}, nil, false + } + defer func() { remaining-- }() + for len(timestamps) == 0 { + versions := versionsPerKey() + if versions == 0 { + continue + } + if versions > remaining { + versions = remaining + } + timestamps = make([]hlc.Timestamp, 0, versions) + for i := 0; i < versions; i++ { + timestamps = append(timestamps, tsDist()) + } + sort.Slice(timestamps, func(i, j int) bool { + return timestamps[i].Less(timestamps[j]) + }) + for { + key = keyDist() + sk := string(key) + if _, ok := seen[sk]; ok { + continue + } + seen[sk] = struct{}{} + break + } + haveIntent = rng.Float64() < intentFrac + } + ts := timestamps[0] + timestamps = timestamps[1:] + var txn *roachpb.Transaction + if len(timestamps) == 0 && haveIntent { + txn = &roachpb.Transaction{ + Status: roachpb.PENDING, + ReadTimestamp: ts, + MaxTimestamp: ts.Next().Next(), + } + txn.ID = uuid.MakeV4() + txn.WriteTimestamp = ts + txn.Key = keyDist() + } + return engine.MVCCKeyValue{ + Key: engine.MVCCKey{Key: key, Timestamp: ts}, + Value: valueDist(), + }, txn, true + } +} + +// distSpec abstractly represents a distribution. +type distSpec interface { + dist(maxRows int, rng *rand.Rand) dataDistribution + desc() *roachpb.RangeDescriptor + String() string +} + +// uniformDistSpec is a distSpec which represents uniform distributions over its +// various dimensions. +type uniformDistSpec struct { + tsFrom, tsTo int64 // seconds + keySuffixMin, keySuffixMax int + valueLenMin, valueLenMax int + deleteFrac float64 + keysPerValueMin, keysPerValueMax int + intentFrac float64 +} + +var _ distSpec = uniformDistSpec{} + +func (ds uniformDistSpec) dist(maxRows int, rng *rand.Rand) dataDistribution { + return newDataDistribution( + uniformTimestampDistribution(ds.tsFrom*time.Second.Nanoseconds(), ds.tsTo*time.Second.Nanoseconds(), rng), + uniformTableKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng), + uniformValueDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng), + uniformValuesPerKey(ds.keysPerValueMin, ds.keysPerValueMax, rng), + ds.intentFrac, + maxRows, + rng, + ) +} + +func (ds uniformDistSpec) desc() *roachpb.RangeDescriptor { + tablePrefix := keys.MakeTablePrefix(42) + return &roachpb.RangeDescriptor{ + StartKey: tablePrefix, + EndKey: roachpb.RKey(roachpb.Key(tablePrefix).PrefixEnd()), + } +} + +func (ds uniformDistSpec) String() string { + return fmt.Sprintf( + "ts=[%d,%d],"+ + "keySuffix=[%d,%d],"+ + "valueLen=[%d,%d],"+ + "keysPerValue=[%d,%d],"+ + "deleteFrac=%f,intentFrac=%f", + ds.tsFrom, ds.tsTo, + ds.keySuffixMin, ds.keySuffixMax, + ds.valueLenMin, ds.valueLenMax, + ds.keysPerValueMin, ds.keysPerValueMax, + ds.deleteFrac, ds.intentFrac) +} + +// uniformTimestamp returns an hlc timestamp distribution with a wall time +// uniform over [from, to] and a zero logical timestamp. +func uniformTimestampDistribution(from, to int64, rng *rand.Rand) func() hlc.Timestamp { + if from >= to { + panic(fmt.Errorf("from (%d) >= to (%d)", from, to)) + } + n := int(to-from) + 1 + return func() hlc.Timestamp { + return hlc.Timestamp{WallTime: from + int64(rng.Intn(n))} + } +} + +// returns a uniform length random value distribution. +func uniformValueDistribution(min, max int, deleteFrac float64, rng *rand.Rand) func() []byte { + if min > max { + panic(fmt.Errorf("min (%d) > max (%d)", min, max)) + } + n := (max - min) + 1 + return func() []byte { + if rng.Float64() < deleteFrac { + return nil + } + value := make([]byte, min+rng.Intn(n)) + if _, err := rng.Read(value); err != nil { + panic(err) + } + return value + } +} + +func uniformValuesPerKey(valuesPerKeyMin, valuesPerKeyMax int, rng *rand.Rand) func() int { + if valuesPerKeyMin > valuesPerKeyMax { + panic(fmt.Errorf("min (%d) > max (%d)", valuesPerKeyMin, valuesPerKeyMax)) + } + n := (valuesPerKeyMax - valuesPerKeyMin) + 1 + return func() int { return valuesPerKeyMin + rng.Intn(n) } +} + +func uniformTableKeyDistribution( + prefix roachpb.Key, suffixMin, suffixMax int, rng *rand.Rand, +) func() roachpb.Key { + if suffixMin > suffixMax { + panic(fmt.Errorf("suffixMin (%d) > suffixMax (%d)", suffixMin, suffixMax)) + } + n := (suffixMax - suffixMin) + 1 + return func() roachpb.Key { + randData := make([]byte, suffixMin+rng.Intn(n)) + _, _ = rng.Read(randData) + return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], randData) + } +} diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index 906db1a3ba30..60947fc81718 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -1,4 +1,4 @@ -// Copyright 2014 The Cockroach Authors. +// Copyright 2020 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,79 +8,490 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +// Package gc contains the logic to run scan a range for garbage and issue +// GC requests to remove that garbage. +// +// The Run function is the primary entry point and is called underneath the +// gcQueue in the storage package. It can also be run for debugging. package gc import ( - "sort" + "context" + "fmt" + "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +const ( + // IntentAgeThreshold is the threshold after which an extant intent + // will be resolved. + IntentAgeThreshold = 2 * time.Hour // 2 hour + + // KeyVersionChunkBytes is the threshold size for splitting + // GCRequests into multiple batches. + KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes ) -// GarbageCollector GCs MVCC key/values using a zone-specific GC -// policy allows either the union or intersection of maximum # of -// versions and maximum age. -type GarbageCollector struct { +// CalculateThreshold calculates the GC threshold given the policy and the +// current view of time. +func CalculateThreshold(now hlc.Timestamp, policy zonepb.GCPolicy) (threshold hlc.Timestamp) { + ttlNanos := int64(policy.TTLSeconds) * time.Second.Nanoseconds() + return hlc.Timestamp{WallTime: now.WallTime - ttlNanos} +} + +// A GCer is an abstraction used by the GC queue to carry out chunked deletions. +type GCer interface { + SetGCThreshold(context.Context, Threshold) error + GC(context.Context, []roachpb.GCRequest_GCKey) error +} + +// NoopGCer implements GCer by doing nothing. +type NoopGCer struct{} + +var _ GCer = NoopGCer{} + +// SetGCThreshold implements storage.GCer. +func (NoopGCer) SetGCThreshold(context.Context, Threshold) error { return nil } + +// GC implements storage.GCer. +func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil } + +// Threshold holds the key and txn span GC thresholds, respectively. +type Threshold struct { + Key hlc.Timestamp + Txn hlc.Timestamp +} + +// Info contains statistics and insights from a GC run. +type Info struct { + // Now is the timestamp used for age computations. + Now hlc.Timestamp + // Policy is the policy used for this garbage collection cycle. + Policy zonepb.GCPolicy + // Stats about the userspace key-values considered, namely the number of + // keys with GC'able data, the number of "old" intents and the number of + // associated distinct transactions. + NumKeysAffected, IntentsConsidered, IntentTxns int + // TransactionSpanTotal is the total number of entries in the transaction span. + TransactionSpanTotal int + // Summary of transactions which were found GCable (assuming that + // potentially necessary intent resolutions did not fail). + TransactionSpanGCAborted, TransactionSpanGCCommitted int + TransactionSpanGCStaging, TransactionSpanGCPending int + // AbortSpanTotal is the total number of transactions present in the AbortSpan. + AbortSpanTotal int + // AbortSpanConsidered is the number of AbortSpan entries old enough to be + // considered for removal. An "entry" corresponds to one transaction; + // more than one key-value pair may be associated with it. + AbortSpanConsidered int + // AbortSpanGCNum is the number of AbortSpan entries fit for removal (due + // to their transactions having terminated). + AbortSpanGCNum int + // PushTxn is the total number of pushes attempted in this cycle. + PushTxn int + // ResolveTotal is the total number of attempted intent resolutions in + // this cycle. + ResolveTotal int + // Threshold is the computed expiration timestamp. Equal to `Now - Policy`. Threshold hlc.Timestamp - policy zonepb.GCPolicy + // AffectedVersionsKeyBytes is the number of (fully encoded) bytes deleted from keys in the storage engine. + // Note that this does not account for compression that the storage engine uses to store data on disk. Real + // space savings tends to be smaller due to this compression, and space may be released only at a later point + // in time. + AffectedVersionsKeyBytes int64 + // AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine. + // See AffectedVersionsKeyBytes for caveats. + AffectedVersionsValBytes int64 } -// MakeGarbageCollector allocates and returns a new GC, with expiration -// computed based on current time and policy.TTLSeconds. -func MakeGarbageCollector(now hlc.Timestamp, policy zonepb.GCPolicy) GarbageCollector { - ttlNanos := int64(policy.TTLSeconds) * 1e9 - return GarbageCollector{ - Threshold: hlc.Timestamp{WallTime: now.WallTime - ttlNanos}, - policy: policy, +// CleanupIntentsFunc synchronously resolves the supplied intents +// (which may be PENDING, in which case they are first pushed) while +// taking care of proper batching. +type CleanupIntentsFunc func(context.Context, []roachpb.Intent) error + +// CleanupTxnIntentsAsyncFunc asynchronously cleans up intents from a +// transaction record, pushing the transaction first if it is +// PENDING. Once all intents are resolved successfully, removes the +// transaction record. +type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.Intent) error + +// Run runs garbage collection for the specified descriptor on the +// provided Engine (which is not mutated). It uses the provided gcFn +// to run garbage collection once on all implicated spans, +// cleanupIntentsFn to resolve intents synchronously, and +// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and +// associated transaction record on success. +func Run( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + policy zonepb.GCPolicy, + gcer GCer, + cleanupIntentsFn CleanupIntentsFunc, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) (Info, error) { + + txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) + threshold := CalculateThreshold(now, policy) + + if err := gcer.SetGCThreshold(ctx, Threshold{ + Key: threshold, + Txn: txnExp, + }); err != nil { + return Info{}, errors.Wrap(err, "failed to set GC thresholds") + } + + info := Info{ + Policy: policy, + Now: now, + Threshold: threshold, + } + + // Maps from txn ID to txn and intent key slice. + txnMap := map[uuid.UUID]*roachpb.Transaction{} + intentSpanMap := map[uuid.UUID][]roachpb.Span{} + err := processReplicatedKeyRange(ctx, desc, snap, now, threshold, gcer, txnMap, intentSpanMap, &info) + if err != nil { + return Info{}, err + } + + // From now on, all newly added keys are range-local. + + // Process local range key entries (txn records, queue last processed times). + localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) + if err != nil { + return Info{}, err } + + if err := gcer.GC(ctx, localRangeKeys); err != nil { + return Info{}, err + } + + // Clean up the AbortSpan. + log.Event(ctx, "processing AbortSpan") + abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) + if err := gcer.GC(ctx, abortSpanKeys); err != nil { + return Info{}, err + } + + log.Eventf(ctx, "GC'ed keys; stats %+v", info) + + // Push transactions (if pending) and resolve intents. + var intents []roachpb.Intent + for txnID, txn := range txnMap { + intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) + } + info.ResolveTotal += len(intents) + log.Eventf(ctx, "cleanup of %d intents", len(intents)) + if err := cleanupIntentsFn(ctx, intents); err != nil { + return Info{}, err + } + + return info, nil +} + +// processReplicatedKeyRange identifies garbage and sends GC requests to +// remove it. +// +// The logic iterates all versions of all keys in the range from oldest to +// newest. Expired intents are written into the txnMap and intentSpanMap. +func processReplicatedKeyRange( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + threshold hlc.Timestamp, + gcer GCer, + txnMap map[uuid.UUID]*roachpb.Transaction, + intentSpanMap map[uuid.UUID][]roachpb.Span, + info *Info, +) error { + var alloc bufalloc.ByteAllocator + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) + handleIntent := func(md *engine.MVCCKeyValue) { + meta := &enginepb.MVCCMetadata{} + if err := protoutil.Unmarshal(md.Value, meta); err != nil { + log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", md.Key, err) + return + } + if meta.Txn != nil { + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if hlc.Timestamp(meta.Timestamp).Less(intentExp) { + txnID := meta.Txn.ID + if _, ok := txnMap[txnID]; !ok { + txnMap[txnID] = &roachpb.Transaction{ + TxnMeta: *meta.Txn, + } + // IntentTxns and PushTxn will be equal here, since + // pushes to transactions whose record lies in this + // range (but which are not associated to a remaining + // intent on it) happen asynchronously and are accounted + // for separately. Thus higher up in the stack, we + // expect PushTxn > IntentTxns. + info.IntentTxns++ + // All transactions in txnMap may be PENDING and + // cleanupIntentsFn will push them to finalize them. + info.PushTxn++ + } + info.IntentsConsidered++ + alloc, md.Key.Key = alloc.Copy(md.Key.Key, 0) + intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{ + Key: md.Key.Key, + }) + } + } + } + + // Iterate all versions of all keys from oldest to newest. If a version is an + // intent it will have the highest timestamp of any versions and will be + // followed by a metadata entry. The loop will determine whether a given key + // has garbage and, if so, will determine the timestamp of the latest version + // which is garbage to be added to the current batch. If the current version + // pushes the size of keys to be removed above the limit, the current key will + // be added with that version and the batch will be sent. When the newest + // version for a key has been reached, if haveGarbageForThisKey, we'll add the + // current key to the batch with the gcTimestampForThisKey. + var ( + batchGCKeys []roachpb.GCRequest_GCKey + batchGCKeysBytes int64 + haveGarbageForThisKey bool + gcTimestampForThisKey hlc.Timestamp + sentBatchForThisKey bool + ) + it := makeGCIterator(desc, snap) + defer it.close() + for ; ; it.step() { + s, ok := it.state() + if !ok { + if it.err != nil { + return it.err + } + break + } + if s.curIsNotValue() { // Step over metadata or other system keys + continue + } + if s.curIsIntent() { + handleIntent(s.next) + continue + } + isNewest := s.curIsNewest() + if isGarbage(threshold, s.cur, s.next, isNewest) { + keyBytes := int64(s.cur.Key.EncodedSize()) + batchGCKeysBytes += keyBytes + haveGarbageForThisKey = true + gcTimestampForThisKey = s.cur.Key.Timestamp + info.AffectedVersionsKeyBytes += keyBytes + info.AffectedVersionsValBytes += int64(len(s.cur.Value)) + } + if affected := isNewest && (sentBatchForThisKey || haveGarbageForThisKey); affected { + info.NumKeysAffected++ + } + shouldSendBatch := batchGCKeysBytes >= KeyVersionChunkBytes + if shouldSendBatch || isNewest && haveGarbageForThisKey { + alloc, s.cur.Key.Key = alloc.Copy(s.cur.Key.Key, 0) + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{ + Key: s.cur.Key.Key, + Timestamp: gcTimestampForThisKey, + }) + haveGarbageForThisKey = false + gcTimestampForThisKey = hlc.Timestamp{} + + // Mark that we sent a batch for this key so we know that we had garbage + // even if it turns out that there's no more garbage for this key. + // We want to count a key as affected once even if we paginate the + // deletion of its versions. + sentBatchForThisKey = shouldSendBatch && !isNewest + } + if shouldSendBatch { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + // Even though we are batching the GC process, it's + // safe to continue because we bumped the GC + // thresholds. We may leave some inconsistent history + // behind, but nobody can read it. + log.Warningf(ctx, "failed to GC a batch of keys: %v", err) + } + batchGCKeys = nil + batchGCKeysBytes = 0 + alloc = nil + } + } + if len(batchGCKeys) > 0 { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + return err + } + } + return nil } -// Filter makes decisions about garbage collection based on the -// garbage collection policy for batches of values for the same -// key. Returns the index of the first key to be GC'd and the -// timestamp including, and after which, all values should be garbage -// collected. If no values should be GC'd, returns -1 for the index -// and the zero timestamp. Keys must be in descending time -// order. Values deleted at or before the returned timestamp can be -// deleted without invalidating any reads in the time interval -// (gc.expiration, \infinity). +// isGarbage makes a determination whether a key ('cur') is garbage. If 'next' +// is non-nil, it should be the chronologically newer version of the same key +// (or the metadata KV if cur is an intent). If isNewest is false, next must be +// non-nil. isNewest implies that this is the highest timestamp committed +// version for this key. If isNewest is true and next is non-nil, it is an +// intent. Conservatively we have to assume that the intent will get aborted, +// so we will be able to GC just the values that we could remove if there +// weren't an intent. Hence this definition of isNewest. // -// The GC keeps all values (including deletes) above the expiration time, plus +// We keep all values (including deletes) above the expiration time, plus // the first value before or at the expiration time. This allows reads to be // guaranteed as described above. However if this were the only rule, then if // the most recent write was a delete, it would never be removed. Thus, when a -// deleted value is the most recent before expiration, it can be deleted. This -// would still allow for the tombstone bugs in #6227, so in the future we will -// add checks that disallow writes before the last GC expiration time. -func (gc GarbageCollector) Filter(keys []engine.MVCCKey, values [][]byte) (int, hlc.Timestamp) { - if gc.policy.TTLSeconds <= 0 { - return -1, hlc.Timestamp{} +// deleted value is the most recent before expiration, it can be deleted. +func isGarbage(threshold hlc.Timestamp, cur, next *engine.MVCCKeyValue, isNewest bool) bool { + // If the value is not at or below the threshold then it's not garbage. + if belowThreshold := cur.Key.Timestamp.LessEq(threshold); !belowThreshold { + return false } - if len(keys) == 0 { - return -1, hlc.Timestamp{} + isDelete := len(cur.Value) == 0 + if isNewest && !isDelete { + return false + } + // If this value is not a delete, then we need to make sure that the next + // value is also at or below the threshold. + // NB: This doesn't need to check whether next is nil because we know + // isNewest is false when evaluating rhs of the or below. + if !isDelete && next == nil { + panic("huh") + } + return isDelete || next.Key.Timestamp.LessEq(threshold) +} + +// processLocalKeyRange scans the local range key entries, consisting of +// transaction records, queue last processed timestamps, and range descriptors. +// +// - Transaction entries: +// - For expired transactions , schedule the intents for +// asynchronous resolution. The actual transaction spans are not +// returned for GC in this pass, but are separately GC'ed after +// successful resolution of all intents. The exception is if there +// are no intents on the txn record, in which case it's returned for +// immediate GC. +// +// - Queue last processed times: cleanup any entries which don't match +// this range's start key. This can happen on range merges. +func processLocalKeyRange( + ctx context.Context, + snap engine.Reader, + desc *roachpb.RangeDescriptor, + cutoff hlc.Timestamp, + info *Info, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) ([]roachpb.GCRequest_GCKey, error) { + var gcKeys []roachpb.GCRequest_GCKey + + handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { + // If the transaction needs to be pushed or there are intents to + // resolve, invoke the cleanup function. + if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { + return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsIntents(txn.IntentSpans, txn)) + } + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp + return nil } - // find the first expired key index using binary search - i := sort.Search(len(keys), func(i int) bool { return keys[i].Timestamp.LessEq(gc.Threshold) }) + handleOneTransaction := func(kv roachpb.KeyValue) error { + var txn roachpb.Transaction + if err := kv.Value.GetProto(&txn); err != nil { + return err + } + info.TransactionSpanTotal++ + if cutoff.LessEq(txn.LastActive()) { + return nil + } - if i == len(keys) { - return -1, hlc.Timestamp{} + // The transaction record should be considered for removal. + switch txn.Status { + case roachpb.PENDING: + info.TransactionSpanGCPending++ + case roachpb.STAGING: + info.TransactionSpanGCStaging++ + case roachpb.ABORTED: + info.TransactionSpanGCAborted++ + case roachpb.COMMITTED: + info.TransactionSpanGCCommitted++ + default: + panic(fmt.Sprintf("invalid transaction state: %s", txn)) + } + return handleTxnIntents(kv.Key, &txn) } - // Now keys[i].Timestamp is <= gc.expiration, but the key-value pair is still - // "visible" at timestamp gc.expiration (and up to the next version). - if deleted := len(values[i]) == 0; deleted { - // We don't have to keep a delete visible (since GCing it does not change - // the outcome of the read). Note however that we can't touch deletes at - // higher timestamps immediately preceding this one, since they're above - // gc.expiration and are needed for correctness; see #6227. - return i, keys[i].Timestamp - } else if i+1 < len(keys) { - // Otherwise mark the previous timestamp for deletion (since it won't ever - // be returned for reads at gc.expiration and up). - return i + 1, keys[i+1].Timestamp + handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { + if !rangeKey.Equal(desc.StartKey) { + // Garbage collect the last processed timestamp if it doesn't match start key. + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp + } + return nil } - return -1, hlc.Timestamp{} + handleOne := func(kv roachpb.KeyValue) error { + rangeKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) + if err != nil { + return err + } + if suffix.Equal(keys.LocalTransactionSuffix.AsRawKey()) { + if err := handleOneTransaction(kv); err != nil { + return err + } + } else if suffix.Equal(keys.LocalQueueLastProcessedSuffix.AsRawKey()) { + if err := handleOneQueueLastProcessed(kv, roachpb.RKey(rangeKey)); err != nil { + return err + } + } + return nil + } + + startKey := keys.MakeRangeKeyPrefix(desc.StartKey) + endKey := keys.MakeRangeKeyPrefix(desc.EndKey) + + _, err := engine.MVCCIterate(ctx, snap, startKey, endKey, hlc.Timestamp{}, engine.MVCCScanOptions{}, + func(kv roachpb.KeyValue) (bool, error) { + return false, handleOne(kv) + }) + return gcKeys, err +} + +// processAbortSpan iterates through the local AbortSpan entries +// and collects entries which indicate that a client which was running +// this transaction must have realized that it has been aborted (due to +// heartbeating having failed). The parameter minAge is typically a +// multiple of the heartbeat timeout used by the coordinator. +func processAbortSpan( + ctx context.Context, + snap engine.Reader, + rangeID roachpb.RangeID, + threshold hlc.Timestamp, + info *Info, +) []roachpb.GCRequest_GCKey { + var gcKeys []roachpb.GCRequest_GCKey + abortSpan := abortspan.New(rangeID) + if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { + info.AbortSpanTotal++ + if v.Timestamp.Less(threshold) { + info.AbortSpanGCNum++ + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) + } + return nil + }); err != nil { + // Still return whatever we managed to collect. + log.Warning(ctx, err) + } + return gcKeys } diff --git a/pkg/storage/gc/gc_iterator.go b/pkg/storage/gc/gc_iterator.go new file mode 100644 index 000000000000..a86876210b07 --- /dev/null +++ b/pkg/storage/gc/gc_iterator.go @@ -0,0 +1,174 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" +) + +// gcIterator wraps an rditer.ReplicaDataIterator which it reverse iterates for +// the purpose of discovering gc-able replicated data. +type gcIterator struct { + it *rditer.ReplicaDataIterator + done bool + err error + buf gcIteratorRingBuf +} + +func makeGCIterator(desc *roachpb.RangeDescriptor, snap engine.Reader) gcIterator { + return gcIterator{ + it: rditer.NewReplicaDataIterator(desc, snap, + true /* replicatedOnly */, true /* seekEnd */), + } +} + +type gcIteratorState struct { + cur, next, afterNext *engine.MVCCKeyValue +} + +// curIsNewest returns true if the current MVCCKeyValue in the gcIteratorState +// is the newest committed version of the key. +// +// It returns true if next is nil or if next is an intent. +func (s *gcIteratorState) curIsNewest() bool { + return s.cur.Key.IsValue() && + (s.next == nil || (s.afterNext != nil && !s.afterNext.Key.IsValue())) +} + +// curIsNotValue returns true if the current MVCCKeyValue in the gcIteratorState +// is not a value, i.e. does not have a timestamp. +func (s *gcIteratorState) curIsNotValue() bool { + return !s.cur.Key.IsValue() +} + +// curIsIntent returns true if the current MVCCKeyValue in the gcIteratorState +// is an intent. +func (s *gcIteratorState) curIsIntent() bool { + return s.next != nil && !s.next.Key.IsValue() +} + +// state returns the current state of the iterator. The state contains the +// current and the two following versions of the current key if they exist. +// +// If ok is false, further iteration is unsafe; either the end of iteration has +// been reached or an error has occurred. Callers should check it.err to +// determine whether an error has occurred in cases where ok is false. +// +// It is not safe to use values in the state after subsequent calls to +// it.step(). +func (it *gcIterator) state() (s gcIteratorState, ok bool) { + // The current key is the newest if the key which comes next is different or + // the key which comes after the current key is an intent or this is the first + // key in the range. + s.cur, ok = it.peekAt(0) + if !ok { + return gcIteratorState{}, false + } + next, ok := it.peekAt(1) + if !ok && it.err != nil { // cur is the first key in the range + return gcIteratorState{}, false + } + if !ok || !next.Key.Key.Equal(s.cur.Key.Key) { + return s, true + } + s.next = next + afterNext, ok := it.peekAt(2) + if !ok && it.err != nil { // cur is the first key in the range + return gcIteratorState{}, false + } + if !ok || !afterNext.Key.Key.Equal(s.cur.Key.Key) { + return s, true + } + s.afterNext = afterNext + return s, true +} + +func (it *gcIterator) step() { + it.buf.removeFront() +} + +func (it *gcIterator) peekAt(i int) (*engine.MVCCKeyValue, bool) { + if it.buf.len <= i { + if !it.fillTo(i + 1) { + return nil, false + } + } + return it.buf.at(i), true +} + +func (it *gcIterator) fillTo(targetLen int) (ok bool) { + for it.buf.len < targetLen { + if ok, err := it.it.Valid(); !ok { + it.err, it.done = err, err == nil + return false + } + it.buf.pushBack(it.it) + it.it.Prev() + } + return true +} + +func (it *gcIterator) close() { + it.it.Close() + it.it = nil +} + +// gcIteratorRingBufSize is 3 because the gcIterator.state method at most needs +// to look forward two keys ahead of the current key. +const gcIteratorRingBufSize = 3 + +type gcIteratorRingBuf struct { + allocs [gcIteratorRingBufSize]bufalloc.ByteAllocator + buf [gcIteratorRingBufSize]engine.MVCCKeyValue + len int + head int +} + +func (b *gcIteratorRingBuf) at(i int) *engine.MVCCKeyValue { + if i >= b.len { + panic("index out of range") + } + return &b.buf[(b.head+i)%gcIteratorRingBufSize] +} + +func (b *gcIteratorRingBuf) removeFront() { + if b.len == 0 { + panic("cannot remove from empty gcIteratorRingBuf") + } + b.buf[b.head] = engine.MVCCKeyValue{} + b.head = (b.head + 1) % gcIteratorRingBufSize + b.len-- +} + +type iterator interface { + UnsafeKey() engine.MVCCKey + UnsafeValue() []byte +} + +func (b *gcIteratorRingBuf) pushBack(it iterator) { + if b.len == gcIteratorRingBufSize { + panic("cannot add to full gcIteratorRingBuf") + } + i := (b.head + b.len) % gcIteratorRingBufSize + b.allocs[i] = b.allocs[i][:0] + k := it.UnsafeKey() + v := it.UnsafeValue() + b.allocs[i], k.Key = b.allocs[i].Copy(k.Key, len(v)) + b.allocs[i], v = b.allocs[i].Copy(v, 0) + b.buf[i] = engine.MVCCKeyValue{ + Key: k, + Value: v, + } + b.len++ +} diff --git a/pkg/storage/gc/gc_iterator_test.go b/pkg/storage/gc/gc_iterator_test.go new file mode 100644 index 000000000000..a5a7d58b8d18 --- /dev/null +++ b/pkg/storage/gc/gc_iterator_test.go @@ -0,0 +1,165 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestGCIterator exercises the GC iterator by writing data to the underlying +// engine and then validating the state of the iterator as it iterates that +// data. +func TestGCIterator(t *testing.T) { + // dataItem represents a version in the storage engine and optionally a + // corresponding transaction which will make the MVCCKeyValue an intent. + type dataItem struct { + engine.MVCCKeyValue + txn *roachpb.Transaction + } + // makeDataItem is a shorthand to construct dataItems. + makeDataItem := func(k roachpb.Key, val []byte, ts int64, txn *roachpb.Transaction) dataItem { + return dataItem{ + MVCCKeyValue: engine.MVCCKeyValue{ + Key: engine.MVCCKey{ + Key: k, + Timestamp: hlc.Timestamp{WallTime: ts * time.Nanosecond.Nanoseconds()}, + }, + Value: val, + }, + txn: txn, + } + } + // makeLiteralDistribution adapts dataItems for use with the data distribution + // infrastructure. + makeLiteralDataDistribution := func(items ...dataItem) dataDistribution { + return func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) { + if len(items) == 0 { + return engine.MVCCKeyValue{}, nil, false + } + item := items[0] + defer func() { items = items[1:] }() + return item.MVCCKeyValue, item.txn, true + } + } + // stateExpectations are expectations about the state of the iterator. + type stateExpectations struct { + cur, next, afterNext int + isNewest bool + isIntent bool + isNotValue bool + } + // notation to mark that an iterator state element as either nil or metadata. + const ( + isNil = -1 + isMD = -2 + ) + // exp is a shorthand to construct state expectations. + exp := func(cur, next, afterNext int, isNewest, isIntent, isNotValue bool) stateExpectations { + return stateExpectations{ + cur: cur, next: next, afterNext: afterNext, + isNewest: isNewest, + isIntent: isIntent, + isNotValue: isNotValue, + } + } + vals := uniformValueDistribution(3, 5, 0, rand.New(rand.NewSource(1))) + tablePrefix := keys.MakeTablePrefix(42) + desc := roachpb.RangeDescriptor{StartKey: tablePrefix, EndKey: roachpb.RKey(roachpb.Key(tablePrefix).PrefixEnd())} + keyA := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'a') + keyB := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'b') + keyC := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'c') + makeTxn := func() *roachpb.Transaction { + txn := roachpb.Transaction{} + txn.Key = keyA + txn.ID = uuid.MakeV4() + txn.Status = roachpb.PENDING + return &txn + } + + type testCase struct { + name string + data []dataItem + expectations []stateExpectations + } + // checkExpectations tests whether the state of the iterator matches the + // expectation. + checkExpectations := func( + t *testing.T, data []dataItem, ex stateExpectations, s gcIteratorState, + ) { + check := func(ex int, kv *engine.MVCCKeyValue) { + switch { + case ex >= 0: + require.EqualValues(t, &data[ex].MVCCKeyValue, kv) + case ex == isNil: + require.Nil(t, kv) + case ex == isMD: + require.False(t, kv.Key.IsValue()) + } + } + check(ex.cur, s.cur) + check(ex.next, s.next) + check(ex.afterNext, s.afterNext) + require.Equal(t, ex.isNewest, s.curIsNewest()) + require.Equal(t, ex.isIntent, s.curIsIntent()) + } + makeTest := func(tc testCase) func(t *testing.T) { + return func(t *testing.T) { + eng := engine.NewDefaultInMem() + ds := makeLiteralDataDistribution(tc.data...) + ds.setupTest(t, eng, desc) + snap := eng.NewSnapshot() + defer snap.Close() + it := makeGCIterator(&desc, snap) + expectations := tc.expectations + for i, ex := range expectations { + t.Run(fmt.Sprint(i), func(t *testing.T) { + s, ok := it.state() + require.True(t, ok) + checkExpectations(t, tc.data, ex, s) + }) + it.step() + } + } + } + di := makeDataItem // shorthand for convenient notation + for _, tc := range []testCase{ + { + name: "basic", + data: []dataItem{ + di(keyA, vals(), 2, nil), + di(keyA, vals(), 11, nil), + di(keyA, vals(), 14, nil), + di(keyB, vals(), 3, nil), + di(keyC, vals(), 7, makeTxn()), + }, + expectations: []stateExpectations{ + exp(4, isMD, isNil, false, true, false), + exp(isMD, isNil, isNil, false, false, true), + exp(3, isNil, isNil, true, false, false), + exp(0, 1, 2, false, false, false), + exp(1, 2, isNil, false, false, false), + exp(2, isNil, isNil, true, false, false), + }, + }, + } { + t.Run(tc.name, makeTest(tc)) + } +} diff --git a/pkg/storage/gc/gc_old_test.go b/pkg/storage/gc/gc_old_test.go new file mode 100644 index 000000000000..4f22542e47b9 --- /dev/null +++ b/pkg/storage/gc/gc_old_test.go @@ -0,0 +1,380 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +// runGCOld is an older implementation of Run. It is used for benchmarking and +// testing. +// +// runGCOld runs garbage collection for the specified descriptor on the +// provided Engine (which is not mutated). It uses the provided gcFn +// to run garbage collection once on all implicated spans, +// cleanupIntentsFn to resolve intents synchronously, and +// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and +// associated transaction record on success. +func runGCOld( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + policy zonepb.GCPolicy, + gcer GCer, + cleanupIntentsFn CleanupIntentsFunc, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) (Info, error) { + + iter := rditer.NewReplicaDataIterator(desc, snap, + true /* replicatedOnly */, false /* seekEnd */) + defer iter.Close() + + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) + txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) + + gc := MakeGarbageCollector(now, policy) + + if err := gcer.SetGCThreshold(ctx, Threshold{ + Key: gc.Threshold, + Txn: txnExp, + }); err != nil { + return Info{}, errors.Wrap(err, "failed to set GC thresholds") + } + + var batchGCKeys []roachpb.GCRequest_GCKey + var batchGCKeysBytes int64 + var expBaseKey roachpb.Key + var keys []engine.MVCCKey + var vals [][]byte + var keyBytes int64 + var valBytes int64 + info := Info{ + Policy: policy, + Now: now, + Threshold: gc.Threshold, + } + + // Maps from txn ID to txn and intent key slice. + txnMap := map[uuid.UUID]*roachpb.Transaction{} + intentSpanMap := map[uuid.UUID][]roachpb.Span{} + + // processKeysAndValues is invoked with each key and its set of + // values. Intents older than the intent age threshold are sent for + // resolution and values after the MVCC metadata, and possible + // intent, are sent for garbage collection. + processKeysAndValues := func() { + // If there's more than a single value for the key, possibly send for GC. + if len(keys) > 1 { + meta := &enginepb.MVCCMetadata{} + if err := protoutil.Unmarshal(vals[0], meta); err != nil { + log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keys[0], err) + } else { + // In the event that there's an active intent, send for + // intent resolution if older than the threshold. + startIdx := 1 + if meta.Txn != nil { + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if hlc.Timestamp(meta.Timestamp).Less(intentExp) { + txnID := meta.Txn.ID + if _, ok := txnMap[txnID]; !ok { + txnMap[txnID] = &roachpb.Transaction{ + TxnMeta: *meta.Txn, + } + // IntentTxns and PushTxn will be equal here, since + // pushes to transactions whose record lies in this + // range (but which are not associated to a remaining + // intent on it) happen asynchronously and are accounted + // for separately. Thus higher up in the stack, we + // expect PushTxn > IntentTxns. + info.IntentTxns++ + // All transactions in txnMap may be PENDING and + // cleanupIntentsFn will push them to finalize them. + info.PushTxn++ + } + info.IntentsConsidered++ + intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{Key: expBaseKey}) + } + // With an active intent, GC ignores MVCC metadata & intent value. + startIdx = 2 + } + // See if any values may be GC'd. + if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) { + // Batch keys after the total size of version keys exceeds + // the threshold limit. This avoids sending potentially large + // GC requests through Raft. Iterate through the keys in reverse + // order so that GC requests can be made multiple times even on + // a single key, with successively newer timestamps to prevent + // any single request from exploding during GC evaluation. + for i := len(keys) - 1; i >= startIdx+idx; i-- { + keyBytes = int64(keys[i].EncodedSize()) + valBytes = int64(len(vals[i])) + + // Add the total size of the GC'able versions of the keys and values to Info. + info.AffectedVersionsKeyBytes += keyBytes + info.AffectedVersionsValBytes += valBytes + + batchGCKeysBytes += keyBytes + // If the current key brings the batch over the target + // size, add the current timestamp to finish the current + // chunk and start a new one. + if batchGCKeysBytes >= KeyVersionChunkBytes { + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp}) + + err := gcer.GC(ctx, batchGCKeys) + + // Succeed or fail, allow releasing the memory backing batchGCKeys. + iter.ResetAllocator() + batchGCKeys = nil + batchGCKeysBytes = 0 + + if err != nil { + // Even though we are batching the GC process, it's + // safe to continue because we bumped the GC + // thresholds. We may leave some inconsistent history + // behind, but nobody can read it. + log.Warning(ctx, err) + return + } + } + } + // Add the key to the batch at the GC timestamp, unless it was already added. + if batchGCKeysBytes != 0 { + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: gcTS}) + } + info.NumKeysAffected++ + } + } + } + } + + // Iterate through the keys and values of this replica's range. + log.Event(ctx, "iterating through range") + for ; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return Info{}, err + } else if !ok { + break + } else if ctx.Err() != nil { + // Stop iterating if our context has expired. + return Info{}, err + } + iterKey := iter.Key() + if !iterKey.IsValue() || !iterKey.Key.Equal(expBaseKey) { + // Moving to the next key (& values). + processKeysAndValues() + expBaseKey = iterKey.Key + if !iterKey.IsValue() { + keys = []engine.MVCCKey{iter.Key()} + vals = [][]byte{iter.Value()} + continue + } + // An implicit metadata. + keys = []engine.MVCCKey{engine.MakeMVCCMetadataKey(iterKey.Key)} + // A nil value for the encoded MVCCMetadata. This will unmarshal to an + // empty MVCCMetadata which is sufficient for processKeysAndValues to + // determine that there is no intent. + vals = [][]byte{nil} + } + keys = append(keys, iter.Key()) + vals = append(vals, iter.Value()) + } + // Handle last collected set of keys/vals. + processKeysAndValues() + if len(batchGCKeys) > 0 { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + return Info{}, err + } + } + + // From now on, all newly added keys are range-local. + + // Process local range key entries (txn records, queue last processed times). + localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) + if err != nil { + return Info{}, err + } + + if err := gcer.GC(ctx, localRangeKeys); err != nil { + return Info{}, err + } + + // Clean up the AbortSpan. + log.Event(ctx, "processing AbortSpan") + abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) + if err := gcer.GC(ctx, abortSpanKeys); err != nil { + return Info{}, err + } + + log.Eventf(ctx, "GC'ed keys; stats %+v", info) + + // Push transactions (if pending) and resolve intents. + var intents []roachpb.Intent + for txnID, txn := range txnMap { + intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) + } + info.ResolveTotal += len(intents) + log.Eventf(ctx, "cleanup of %d intents", len(intents)) + if err := cleanupIntentsFn(ctx, intents); err != nil { + return Info{}, err + } + + return info, nil +} + +// GarbageCollector GCs MVCC key/values using a zone-specific GC +// policy allows either the union or intersection of maximum # of +// versions and maximum age. +type GarbageCollector struct { + Threshold hlc.Timestamp + policy zonepb.GCPolicy +} + +// MakeGarbageCollector allocates and returns a new GC, with expiration +// computed based on current time and policy.TTLSeconds. +func MakeGarbageCollector(now hlc.Timestamp, policy zonepb.GCPolicy) GarbageCollector { + return GarbageCollector{ + Threshold: CalculateThreshold(now, policy), + policy: policy, + } +} + +// Filter makes decisions about garbage collection based on the +// garbage collection policy for batches of values for the same +// key. Returns the index of the first key to be GC'd and the +// timestamp including, and after which, all values should be garbage +// collected. If no values should be GC'd, returns -1 for the index +// and the zero timestamp. Keys must be in descending time +// order. Values deleted at or before the returned timestamp can be +// deleted without invalidating any reads in the time interval +// (gc.expiration, \infinity). +// +// The GC keeps all values (including deletes) above the expiration time, plus +// the first value before or at the expiration time. This allows reads to be +// guaranteed as described above. However if this were the only rule, then if +// the most recent write was a delete, it would never be removed. Thus, when a +// deleted value is the most recent before expiration, it can be deleted. This +// would still allow for the tombstone bugs in #6227, so in the future we will +// add checks that disallow writes before the last GC expiration time. +func (gc GarbageCollector) Filter(keys []engine.MVCCKey, values [][]byte) (int, hlc.Timestamp) { + if gc.policy.TTLSeconds <= 0 { + return -1, hlc.Timestamp{} + } + if len(keys) == 0 { + return -1, hlc.Timestamp{} + } + + // find the first expired key index using binary search + i := sort.Search(len(keys), func(i int) bool { return keys[i].Timestamp.LessEq(gc.Threshold) }) + + if i == len(keys) { + return -1, hlc.Timestamp{} + } + + // Now keys[i].Timestamp is <= gc.expiration, but the key-value pair is still + // "visible" at timestamp gc.expiration (and up to the next version). + if deleted := len(values[i]) == 0; deleted { + // We don't have to keep a delete visible (since GCing it does not change + // the outcome of the read). Note however that we can't touch deletes at + // higher timestamps immediately preceding this one, since they're above + // gc.expiration and are needed for correctness; see #6227. + return i, keys[i].Timestamp + } else if i+1 < len(keys) { + // Otherwise mark the previous timestamp for deletion (since it won't ever + // be returned for reads at gc.expiration and up). + return i + 1, keys[i+1].Timestamp + } + + return -1, hlc.Timestamp{} +} + +func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) engine.MVCCKey { + return engine.MVCCKey{Key: key, Timestamp: ts} +} + +var ( + aKey = roachpb.Key("a") + bKey = roachpb.Key("b") + aKeys = []engine.MVCCKey{ + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 1}), + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), + } + bKeys = []engine.MVCCKey{ + mvccVersionKey(bKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), + mvccVersionKey(bKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), + } +) + +// TestGarbageCollectorFilter verifies the filter policies for +// different sorts of MVCC keys. +func TestGarbageCollectorFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 1}) + gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 2}) + n := []byte("data") + d := []byte(nil) + testData := []struct { + gc GarbageCollector + time hlc.Timestamp + keys []engine.MVCCKey + values [][]byte + expIdx int + expDelTS hlc.Timestamp + }{ + {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{d, d, d}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{d, d}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 1e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 1e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{d, d, d}, 2, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 2e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{d, n, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 3e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 4e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + {gcA, hlc.Timestamp{WallTime: 5e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + } + for i, test := range testData { + test.gc.Threshold = test.time + test.gc.Threshold.WallTime -= int64(test.gc.policy.TTLSeconds) * 1e9 + idx, delTS := test.gc.Filter(test.keys, test.values) + if idx != test.expIdx { + t.Errorf("%d: expected index %d; got %d", i, test.expIdx, idx) + } + if delTS != test.expDelTS { + t.Errorf("%d: expected deletion timestamp %s; got %s", i, test.expDelTS, delTS) + } + } +} diff --git a/pkg/storage/gc/gc_random_test.go b/pkg/storage/gc/gc_random_test.go new file mode 100644 index 000000000000..0f04da4e9fac --- /dev/null +++ b/pkg/storage/gc/gc_random_test.go @@ -0,0 +1,244 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "fmt" + "math/rand" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/stretchr/testify/require" +) + +// randomRunGCTestSpec specifies a distribution for to create random data for +// testing Run +type randomRunGCTestSpec struct { + ds distSpec + now hlc.Timestamp + ttl int32 // seconds +} + +var ( + fewVersionsTinyRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 2, keySuffixMax: 3, + valueLenMin: 1, valueLenMax: 1, + deleteFrac: 0, + keysPerValueMin: 1, keysPerValueMax: 2, + intentFrac: .1, + } + someVersionsMidSizeRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1, keysPerValueMax: 100, + intentFrac: .1, + } + lotsOfVersionsMidSizeRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1000, keysPerValueMax: 1000000, + intentFrac: .1, + } +) + +// TestRunNewVsOld exercises the behavior of Run relative to the old +// implementation. It runs both the new and old implementation and ensures +// that they produce exactly the same results on the same set of keys. +func TestRunNewVsOld(t *testing.T) { + rng := rand.New(rand.NewSource(1)) + ctx := context.Background() + const N = 100000 + + someVersionsMidSizeRowsLotsOfIntents := someVersionsMidSizeRows + someVersionsMidSizeRowsLotsOfIntents.intentFrac = 1 + for _, tc := range []randomRunGCTestSpec{ + { + ds: someVersionsMidSizeRowsLotsOfIntents, + now: hlc.Timestamp{ + WallTime: (IntentAgeThreshold + 100*time.Second).Nanoseconds(), + }, + ttl: int32(IntentAgeThreshold.Seconds()), + }, + { + ds: someVersionsMidSizeRows, + now: hlc.Timestamp{ + WallTime: 100 * time.Second.Nanoseconds(), + }, + ttl: 1, + }, + } { + t.Run(fmt.Sprintf("%v@%v,ttl=%v", tc.ds, tc.now, tc.ttl), func(t *testing.T) { + eng := engine.NewDefaultInMem() + tc.ds.dist(N, rng).setupTest(t, eng, *tc.ds.desc()) + snap := eng.NewSnapshot() + + oldGCer := makeFakeGCer() + gcInfoOld, err := runGCOld(ctx, tc.ds.desc(), snap, tc.now, + zonepb.GCPolicy{TTLSeconds: tc.ttl}, + &oldGCer, + oldGCer.resolveIntents, + oldGCer.resolveIntentsAsync) + require.NoError(t, err) + + newGCer := makeFakeGCer() + gcInfoNew, err := Run(ctx, tc.ds.desc(), snap, tc.now, + zonepb.GCPolicy{TTLSeconds: tc.ttl}, + &newGCer, + newGCer.resolveIntents, + newGCer.resolveIntentsAsync) + require.NoError(t, err) + + oldGCer.normalize() + newGCer.normalize() + require.EqualValues(t, gcInfoOld, gcInfoNew) + require.EqualValues(t, oldGCer, newGCer) + }) + } +} + +// BenchmarkRun benchmarks the old and implementations of Run with different +// data distributions. +func BenchmarkRun(b *testing.B) { + rng := rand.New(rand.NewSource(1)) + ctx := context.Background() + runGC := func(eng engine.Engine, old bool, spec randomRunGCTestSpec) (Info, error) { + runGCFunc := Run + if old { + runGCFunc = runGCOld + } + snap := eng.NewSnapshot() + return runGCFunc(ctx, spec.ds.desc(), snap, spec.now, + zonepb.GCPolicy{TTLSeconds: spec.ttl}, + NoopGCer{}, + func(ctx context.Context, intents []roachpb.Intent) error { + return nil + }, + func(ctx context.Context, txn *roachpb.Transaction, intents []roachpb.Intent) error { + return nil + }) + } + makeTest := func(old bool, spec randomRunGCTestSpec) func(b *testing.B) { + return func(b *testing.B) { + eng := engine.NewDefaultInMem() + defer eng.Close() + ms := spec.ds.dist(b.N, rng).setupTest(b, eng, *spec.ds.desc()) + b.SetBytes(int64(float64(ms.Total()) / float64(b.N))) + b.ResetTimer() + _, err := runGC(eng, old, spec) + b.StopTimer() + require.NoError(b, err) + } + } + specsWithTTLs := func( + ds distSpec, now hlc.Timestamp, ttls []int32, + ) (specs []randomRunGCTestSpec) { + for _, ttl := range ttls { + specs = append(specs, randomRunGCTestSpec{ + ds: ds, + now: now, + ttl: ttl, + }) + } + return specs + } + ts100 := hlc.Timestamp{WallTime: (100 * time.Second).Nanoseconds()} + ttls := []int32{0, 25, 50, 75, 100} + specs := specsWithTTLs(fewVersionsTinyRows, ts100, ttls) + specs = append(specs, specsWithTTLs(someVersionsMidSizeRows, ts100, ttls)...) + specs = append(specs, specsWithTTLs(lotsOfVersionsMidSizeRows, ts100, ttls)...) + for _, old := range []bool{true, false} { + b.Run(fmt.Sprintf("old=%v", old), func(b *testing.B) { + for _, spec := range specs { + b.Run(fmt.Sprint(spec.ds), makeTest(old, spec)) + } + }) + } +} + +type fakeGCer struct { + gcKeys map[string]roachpb.GCRequest_GCKey + threshold Threshold + intents []roachpb.Intent + txnIntents []txnIntents +} + +func makeFakeGCer() fakeGCer { + return fakeGCer{ + gcKeys: make(map[string]roachpb.GCRequest_GCKey), + } +} + +var _ GCer = (*fakeGCer)(nil) + +func (f *fakeGCer) SetGCThreshold(ctx context.Context, t Threshold) error { + f.threshold = t + return nil +} + +func (f *fakeGCer) GC(ctx context.Context, keys []roachpb.GCRequest_GCKey) error { + for _, k := range keys { + f.gcKeys[k.Key.String()] = k + } + return nil +} + +func (f *fakeGCer) resolveIntentsAsync( + _ context.Context, txn *roachpb.Transaction, intents []roachpb.Intent, +) error { + f.txnIntents = append(f.txnIntents, txnIntents{txn: txn, intents: intents}) + return nil +} + +func (f *fakeGCer) resolveIntents(_ context.Context, intents []roachpb.Intent) error { + f.intents = append(f.intents, intents...) + return nil +} + +func (f *fakeGCer) normalize() { + sortIntents := func(i, j int) bool { + return intentLess(&f.intents[i], &f.intents[j]) + } + sort.Slice(f.intents, sortIntents) + for i := range f.txnIntents { + sort.Slice(f.txnIntents[i].intents, sortIntents) + } + sort.Slice(f.txnIntents, func(i, j int) bool { + return f.txnIntents[i].txn.ID.String() < f.txnIntents[j].txn.ID.String() + }) +} + +func intentLess(a, b *roachpb.Intent) bool { + cmp := a.Key.Compare(b.Key) + switch { + case cmp < 0: + return true + case cmp > 0: + return false + default: + return a.Txn.ID.String() < b.Txn.ID.String() + } +} + +type txnIntents struct { + txn *roachpb.Transaction + intents []roachpb.Intent +} diff --git a/pkg/storage/gc/gc_test.go b/pkg/storage/gc/gc_test.go deleted file mode 100644 index b2c3268061e1..000000000000 --- a/pkg/storage/gc/gc_test.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package gc - -import ( - "testing" - - "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" -) - -func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) engine.MVCCKey { - return engine.MVCCKey{Key: key, Timestamp: ts} -} - -var ( - aKey = roachpb.Key("a") - bKey = roachpb.Key("b") - aKeys = []engine.MVCCKey{ - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 1}), - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), - } - bKeys = []engine.MVCCKey{ - mvccVersionKey(bKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), - mvccVersionKey(bKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), - } -) - -// TestGarbageCollectorFilter verifies the filter policies for -// different sorts of MVCC keys. -func TestGarbageCollectorFilter(t *testing.T) { - defer leaktest.AfterTest(t)() - gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 1}) - gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 2}) - n := []byte("data") - d := []byte(nil) - testData := []struct { - gc GarbageCollector - time hlc.Timestamp - keys []engine.MVCCKey - values [][]byte - expIdx int - expDelTS hlc.Timestamp - }{ - {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{d, d, d}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{d, d}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 1e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 1e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{d, d, d}, 2, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 2e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, - {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{d, n, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 3e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 4e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, - {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, - {gcA, hlc.Timestamp{WallTime: 5e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, - {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, - } - for i, test := range testData { - test.gc.Threshold = test.time - test.gc.Threshold.WallTime -= int64(test.gc.policy.TTLSeconds) * 1e9 - idx, delTS := test.gc.Filter(test.keys, test.values) - if idx != test.expIdx { - t.Errorf("%d: expected index %d; got %d", i, test.expIdx, idx) - } - if delTS != test.expDelTS { - t.Errorf("%d: expected deletion timestamp %s; got %s", i, test.expDelTS, delTS) - } - } -} diff --git a/pkg/storage/gc/run_gc.go b/pkg/storage/gc/run_gc.go deleted file mode 100644 index 8cda6ee1b0e1..000000000000 --- a/pkg/storage/gc/run_gc.go +++ /dev/null @@ -1,464 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -// Package gc contains the logic to run scan a range for garbage and issue -// GC requests to remove that garbage. -// -// The Run function is the primary entrypoint and is called underneath the -// gcQueue in the storage package. It can also be run for debugging. -package gc - -import ( - "context" - "fmt" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/abortspan" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" -) - -const ( - // IntentAgeThreshold is the threshold after which an extant intent - // will be resolved. - IntentAgeThreshold = 2 * time.Hour // 2 hour - - // KeyVersionChunkBytes is the threshold size for splitting - // GCRequests into multiple batches. - KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes -) - -// A GCer is an abstraction used by the GC queue to carry out chunked deletions. -type GCer interface { - SetGCThreshold(context.Context, GCThreshold) error - GC(context.Context, []roachpb.GCRequest_GCKey) error -} - -// NoopGCer implements GCer by doing nothing. -type NoopGCer struct{} - -var _ GCer = NoopGCer{} - -// SetGCThreshold implements storage.GCer. -func (NoopGCer) SetGCThreshold(context.Context, GCThreshold) error { return nil } - -// GC implements storage.GCer. -func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil } - -// GCThreshold holds the key and txn span GC thresholds, respectively. -type GCThreshold struct { - Key hlc.Timestamp - Txn hlc.Timestamp -} - -// GCInfo contains statistics and insights from a GC run. -type GCInfo struct { - // Now is the timestamp used for age computations. - Now hlc.Timestamp - // Policy is the policy used for this garbage collection cycle. - Policy zonepb.GCPolicy - // Stats about the userspace key-values considered, namely the number of - // keys with GC'able data, the number of "old" intents and the number of - // associated distinct transactions. - NumKeysAffected, IntentsConsidered, IntentTxns int - // TransactionSpanTotal is the total number of entries in the transaction span. - TransactionSpanTotal int - // Summary of transactions which were found GCable (assuming that - // potentially necessary intent resolutions did not fail). - TransactionSpanGCAborted, TransactionSpanGCCommitted int - TransactionSpanGCStaging, TransactionSpanGCPending int - // AbortSpanTotal is the total number of transactions present in the AbortSpan. - AbortSpanTotal int - // AbortSpanConsidered is the number of AbortSpan entries old enough to be - // considered for removal. An "entry" corresponds to one transaction; - // more than one key-value pair may be associated with it. - AbortSpanConsidered int - // AbortSpanGCNum is the number of AbortSpan entries fit for removal (due - // to their transactions having terminated). - AbortSpanGCNum int - // PushTxn is the total number of pushes attempted in this cycle. - PushTxn int - // ResolveTotal is the total number of attempted intent resolutions in - // this cycle. - ResolveTotal int - // Threshold is the computed expiration timestamp. Equal to `Now - Policy`. - Threshold hlc.Timestamp - // AffectedVersionsKeyBytes is the number of (fully encoded) bytes deleted from keys in the storage engine. - // Note that this does not account for compression that the storage engine uses to store data on disk. Real - // space savings tends to be smaller due to this compression, and space may be released only at a later point - // in time. - AffectedVersionsKeyBytes int64 - // AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine. - // See AffectedVersionsKeyBytes for caveats. - AffectedVersionsValBytes int64 -} - -// A CleanupIntentsFunc synchronously resolves the supplied intents -// (which may be PENDING, in which case they are first pushed) while -// taking care of proper batching. -type CleanupIntentsFunc func(context.Context, []roachpb.Intent) error - -// A cleanupTxnIntentsFunc asynchronously cleans up intents from a -// transaction record, pushing the transaction first if it is -// PENDING. Once all intents are resolved successfully, removes the -// transaction record. -type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.Intent) error - -// RunGC runs garbage collection for the specified descriptor on the -// provided Engine (which is not mutated). It uses the provided gcFn -// to run garbage collection once on all implicated spans, -// cleanupIntentsFn to resolve intents synchronously, and -// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and -// associated transaction record on success. -func RunGC( - ctx context.Context, - desc *roachpb.RangeDescriptor, - snap engine.Reader, - now hlc.Timestamp, - policy zonepb.GCPolicy, - gcer GCer, - cleanupIntentsFn CleanupIntentsFunc, - cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, -) (GCInfo, error) { - - iter := rditer.NewReplicaDataIterator(desc, snap, - true /* replicatedOnly */, false /* seekEnd */) - defer iter.Close() - - // Compute intent expiration (intent age at which we attempt to resolve). - intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) - txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) - - gc := MakeGarbageCollector(now, policy) - - if err := gcer.SetGCThreshold(ctx, GCThreshold{ - Key: gc.Threshold, - Txn: txnExp, - }); err != nil { - return GCInfo{}, errors.Wrap(err, "failed to set GC thresholds") - } - - var batchGCKeys []roachpb.GCRequest_GCKey - var batchGCKeysBytes int64 - var expBaseKey roachpb.Key - var keys []engine.MVCCKey - var vals [][]byte - var keyBytes int64 - var valBytes int64 - info := GCInfo{ - Policy: policy, - Now: now, - Threshold: gc.Threshold, - } - - // Maps from txn ID to txn and intent key slice. - txnMap := map[uuid.UUID]*roachpb.Transaction{} - intentSpanMap := map[uuid.UUID][]roachpb.Span{} - - // processKeysAndValues is invoked with each key and its set of - // values. Intents older than the intent age threshold are sent for - // resolution and values after the MVCC metadata, and possible - // intent, are sent for garbage collection. - processKeysAndValues := func() { - // If there's more than a single value for the key, possibly send for GC. - if len(keys) > 1 { - meta := &enginepb.MVCCMetadata{} - if err := protoutil.Unmarshal(vals[0], meta); err != nil { - log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keys[0], err) - } else { - // In the event that there's an active intent, send for - // intent resolution if older than the threshold. - startIdx := 1 - if meta.Txn != nil { - // Keep track of intent to resolve if older than the intent - // expiration threshold. - if hlc.Timestamp(meta.Timestamp).Less(intentExp) { - txnID := meta.Txn.ID - if _, ok := txnMap[txnID]; !ok { - txnMap[txnID] = &roachpb.Transaction{ - TxnMeta: *meta.Txn, - } - // IntentTxns and PushTxn will be equal here, since - // pushes to transactions whose record lies in this - // range (but which are not associated to a remaining - // intent on it) happen asynchronously and are accounted - // for separately. Thus higher up in the stack, we - // expect PushTxn > IntentTxns. - info.IntentTxns++ - // All transactions in txnMap may be PENDING and - // cleanupIntentsFn will push them to finalize them. - info.PushTxn++ - } - info.IntentsConsidered++ - intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{Key: expBaseKey}) - } - // With an active intent, GC ignores MVCC metadata & intent value. - startIdx = 2 - } - // See if any values may be GC'd. - if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) { - // Batch keys after the total size of version keys exceeds - // the threshold limit. This avoids sending potentially large - // GC requests through Raft. Iterate through the keys in reverse - // order so that GC requests can be made multiple times even on - // a single key, with successively newer timestamps to prevent - // any single request from exploding during GC evaluation. - for i := len(keys) - 1; i >= startIdx+idx; i-- { - keyBytes = int64(keys[i].EncodedSize()) - valBytes = int64(len(vals[i])) - - // Add the total size of the GC'able versions of the keys and values to GCInfo. - info.AffectedVersionsKeyBytes += keyBytes - info.AffectedVersionsValBytes += valBytes - - batchGCKeysBytes += keyBytes - // If the current key brings the batch over the target - // size, add the current timestamp to finish the current - // chunk and start a new one. - if batchGCKeysBytes >= KeyVersionChunkBytes { - batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp}) - - err := gcer.GC(ctx, batchGCKeys) - - // Succeed or fail, allow releasing the memory backing batchGCKeys. - iter.ResetAllocator() - batchGCKeys = nil - batchGCKeysBytes = 0 - - if err != nil { - // Even though we are batching the GC process, it's - // safe to continue because we bumped the GC - // thresholds. We may leave some inconsistent history - // behind, but nobody can read it. - log.Warning(ctx, err) - return - } - } - } - // Add the key to the batch at the GC timestamp, unless it was already added. - if batchGCKeysBytes != 0 { - batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: gcTS}) - } - info.NumKeysAffected++ - } - } - } - } - - // Iterate through the keys and values of this replica's range. - log.Event(ctx, "iterating through range") - for ; ; iter.Next() { - if ok, err := iter.Valid(); err != nil { - return GCInfo{}, err - } else if !ok { - break - } else if ctx.Err() != nil { - // Stop iterating if our context has expired. - return GCInfo{}, err - } - iterKey := iter.Key() - if !iterKey.IsValue() || !iterKey.Key.Equal(expBaseKey) { - // Moving to the next key (& values). - processKeysAndValues() - expBaseKey = iterKey.Key - if !iterKey.IsValue() { - keys = []engine.MVCCKey{iter.Key()} - vals = [][]byte{iter.Value()} - continue - } - // An implicit metadata. - keys = []engine.MVCCKey{engine.MakeMVCCMetadataKey(iterKey.Key)} - // A nil value for the encoded MVCCMetadata. This will unmarshal to an - // empty MVCCMetadata which is sufficient for processKeysAndValues to - // determine that there is no intent. - vals = [][]byte{nil} - } - keys = append(keys, iter.Key()) - vals = append(vals, iter.Value()) - } - // Handle last collected set of keys/vals. - processKeysAndValues() - if len(batchGCKeys) > 0 { - if err := gcer.GC(ctx, batchGCKeys); err != nil { - return GCInfo{}, err - } - } - - // From now on, all newly added keys are range-local. - - // Process local range key entries (txn records, queue last processed times). - localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) - if err != nil { - return GCInfo{}, err - } - - if err := gcer.GC(ctx, localRangeKeys); err != nil { - return GCInfo{}, err - } - - // Clean up the AbortSpan. - log.Event(ctx, "processing AbortSpan") - abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) - if err := gcer.GC(ctx, abortSpanKeys); err != nil { - return GCInfo{}, err - } - - log.Eventf(ctx, "GC'ed keys; stats %+v", info) - - // Push transactions (if pending) and resolve intents. - var intents []roachpb.Intent - for txnID, txn := range txnMap { - intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) - } - info.ResolveTotal += len(intents) - log.Eventf(ctx, "cleanup of %d intents", len(intents)) - if err := cleanupIntentsFn(ctx, intents); err != nil { - return GCInfo{}, err - } - - return info, nil -} - -// processLocalKeyRange scans the local range key entries, consisting of -// transaction records, queue last processed timestamps, and range descriptors. -// -// - Transaction entries: -// - For expired transactions , schedule the intents for -// asynchronous resolution. The actual transaction spans are not -// returned for GC in this pass, but are separately GC'ed after -// successful resolution of all intents. The exception is if there -// are no intents on the txn record, in which case it's returned for -// immediate GC. -// -// - Queue last processed times: cleanup any entries which don't match -// this range's start key. This can happen on range merges. -func processLocalKeyRange( - ctx context.Context, - snap engine.Reader, - desc *roachpb.RangeDescriptor, - cutoff hlc.Timestamp, - info *GCInfo, - cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, -) ([]roachpb.GCRequest_GCKey, error) { - var gcKeys []roachpb.GCRequest_GCKey - - handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { - // If the transaction needs to be pushed or there are intents to - // resolve, invoke the cleanup function. - if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { - return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsIntents(txn.IntentSpans, txn)) - } - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp - return nil - } - - handleOneTransaction := func(kv roachpb.KeyValue) error { - var txn roachpb.Transaction - if err := kv.Value.GetProto(&txn); err != nil { - return err - } - info.TransactionSpanTotal++ - if cutoff.LessEq(txn.LastActive()) { - return nil - } - - // The transaction record should be considered for removal. - switch txn.Status { - case roachpb.PENDING: - info.TransactionSpanGCPending++ - case roachpb.STAGING: - info.TransactionSpanGCStaging++ - case roachpb.ABORTED: - info.TransactionSpanGCAborted++ - case roachpb.COMMITTED: - info.TransactionSpanGCCommitted++ - default: - panic(fmt.Sprintf("invalid transaction state: %s", txn)) - } - return handleTxnIntents(kv.Key, &txn) - } - - handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { - if !rangeKey.Equal(desc.StartKey) { - // Garbage collect the last processed timestamp if it doesn't match start key. - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp - } - return nil - } - - handleOne := func(kv roachpb.KeyValue) error { - rangeKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) - if err != nil { - return err - } - if suffix.Equal(keys.LocalTransactionSuffix.AsRawKey()) { - if err := handleOneTransaction(kv); err != nil { - return err - } - } else if suffix.Equal(keys.LocalQueueLastProcessedSuffix.AsRawKey()) { - if err := handleOneQueueLastProcessed(kv, roachpb.RKey(rangeKey)); err != nil { - return err - } - } - return nil - } - - startKey := keys.MakeRangeKeyPrefix(desc.StartKey) - endKey := keys.MakeRangeKeyPrefix(desc.EndKey) - - _, err := engine.MVCCIterate(ctx, snap, startKey, endKey, hlc.Timestamp{}, engine.MVCCScanOptions{}, - func(kv roachpb.KeyValue) (bool, error) { - return false, handleOne(kv) - }) - return gcKeys, err -} - -// processAbortSpan iterates through the local AbortSpan entries -// and collects entries which indicate that a client which was running -// this transaction must have realized that it has been aborted (due to -// heartbeating having failed). The parameter minAge is typically a -// multiple of the heartbeat timeout used by the coordinator. -// -// TODO(tschottdorf): this could be done in Replica.GC itself, but it's -// handy to have it here for stats (though less performant due to sending -// all of the keys over the wire). -func processAbortSpan( - ctx context.Context, - snap engine.Reader, - rangeID roachpb.RangeID, - threshold hlc.Timestamp, - info *GCInfo, -) []roachpb.GCRequest_GCKey { - var gcKeys []roachpb.GCRequest_GCKey - abortSpan := abortspan.New(rangeID) - if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { - info.AbortSpanTotal++ - if v.Timestamp.Less(threshold) { - info.AbortSpanGCNum++ - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) - } - return nil - }); err != nil { - // Still return whatever we managed to collect. - log.Warning(ctx, err) - } - return gcKeys -} diff --git a/pkg/storage/gc_queue.go b/pkg/storage/gc_queue.go index 915acb458b25..3c02a63510b4 100644 --- a/pkg/storage/gc_queue.go +++ b/pkg/storage/gc_queue.go @@ -347,7 +347,7 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error { return nil } -func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh gc.GCThreshold) error { +func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh gc.Threshold) error { req := r.template() req.Threshold = thresh.Key return r.send(ctx, req) @@ -406,14 +406,14 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg *config.S // Synchronize the new GC threshold decision with concurrent // AdminVerifyProtectedTimestamp requests. if err := repl.markPendingGC(gcTimestamp, - gc.MakeGarbageCollector(gcTimestamp, *zone.GC).Threshold); err != nil { + gc.CalculateThreshold(gcTimestamp, *zone.GC)); err != nil { log.VEventf(ctx, 1, "not gc'ing replica %v due to pending protection: %v", repl, err) return nil } snap := repl.store.Engine().NewSnapshot() defer snap.Close() - info, err := gc.RunGC(ctx, desc, snap, gcTimestamp, *zone.GC, &replicaGCer{repl: repl}, + info, err := gc.Run(ctx, desc, snap, gcTimestamp, *zone.GC, &replicaGCer{repl: repl}, func(ctx context.Context, intents []roachpb.Intent) error { intentCount, err := repl.store.intentResolver. CleanupIntents(ctx, intents, gcTimestamp, roachpb.PUSH_ABORT) @@ -449,7 +449,7 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg *config.S return nil } -func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.GCInfo) { +func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.Info) { metrics.GCNumKeysAffected.Inc(int64(info.NumKeysAffected)) metrics.GCIntentsConsidered.Inc(int64(info.IntentsConsidered)) metrics.GCIntentTxns.Inc(int64(info.IntentTxns)) diff --git a/pkg/storage/gc_queue_test.go b/pkg/storage/gc_queue_test.go index 74ef487e4ee3..68f01d9f0fbf 100644 --- a/pkg/storage/gc_queue_test.go +++ b/pkg/storage/gc_queue_test.go @@ -69,7 +69,7 @@ func TestGCQueueScoreString(t *testing.T) { }, `queue=true with 4.31/fuzz(1.25)=3.45=valScaleScore(4.00)*deadFrac(0.25)+intentScore(0.45) likely last GC: 5s ago, 3.0 KiB non-live, curr. age 512 KiB*s, min exp. reduction: 256 KiB*s`}, - // Check case of empty GCThreshold. + // Check case of empty Threshold. {gcQueueScore{ShouldQueue: true}, `queue=true with 0.00/fuzz(0.00)=NaN=valScaleScore(0.00)*deadFrac(0.00)+intentScore(0.00) likely last GC: never, 0 B non-live, curr. age 0 B*s, min exp. reduction: 0 B*s`}, } { @@ -482,7 +482,7 @@ func TestGCQueueProcess(t *testing.T) { t.Fatal("config not set") } - // The total size of the GC'able versions of the keys and values in GCInfo. + // The total size of the GC'able versions of the keys and values in Info. // Key size: len("a") + MVCCVersionTimestampSize (13 bytes) = 14 bytes. // Value size: len("value") + headerSize (5 bytes) = 10 bytes. // key1 at ts1 (14 bytes) => "value" (10 bytes) @@ -495,8 +495,8 @@ func TestGCQueueProcess(t *testing.T) { var expectedVersionsKeyBytes int64 = 7 * 14 var expectedVersionsValBytes int64 = 5 * 10 - // Call RunGC with dummy functions to get current GCInfo. - gcInfo, err := func() (gc.GCInfo, error) { + // Call Run with dummy functions to get current Info. + gcInfo, err := func() (gc.Info, error) { snap := tc.repl.store.Engine().NewSnapshot() desc := tc.repl.Desc() defer snap.Close() @@ -507,7 +507,7 @@ func TestGCQueueProcess(t *testing.T) { } now := tc.Clock().Now() - return gc.RunGC(ctx, desc, snap, now, *zone.GC, + return gc.Run(ctx, desc, snap, now, *zone.GC, gc.NoopGCer{}, func(ctx context.Context, intents []roachpb.Intent) error { return nil diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index 329dcf494146..3cab0bfa9a19 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -217,6 +217,18 @@ func (ri *ReplicaDataIterator) Value() []byte { return value } +// UnsafeKey returns the same value as Key, but the memory is invalidated on +// the next call to {Next,Prev,Close}. +func (ri *ReplicaDataIterator) UnsafeKey() engine.MVCCKey { + return ri.it.UnsafeKey() +} + +// UnsafeValue returns the same value as Value, but the memory is invalidated on +// the next call to {Next,Prev,Close}. +func (ri *ReplicaDataIterator) UnsafeValue() []byte { + return ri.it.UnsafeValue() +} + // ResetAllocator resets the ReplicaDataIterator's internal byte allocator. func (ri *ReplicaDataIterator) ResetAllocator() { ri.a = nil