diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 6a28df1df2a0..0fe1b8c9ecb8 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/gc" "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -286,7 +287,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error { return err } - iter := rditer.NewReplicaDataIterator(&desc, db, debugCtx.replicated) + iter := rditer.NewReplicaDataIterator(&desc, db, debugCtx.replicated, false /* seekEnd */) defer iter.Close() for ; ; iter.Next() { if ok, err := iter.Valid(); err != nil { @@ -555,13 +556,13 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error { for _, desc := range descs { snap := db.NewSnapshot() defer snap.Close() - info, err := storage.RunGC( + info, err := gc.Run( context.Background(), &desc, snap, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, zonepb.GCPolicy{TTLSeconds: int32(gcTTLInSeconds)}, - storage.NoopGCer{}, + gc.NoopGCer{}, func(_ context.Context, _ []roachpb.Intent) error { return nil }, func(_ context.Context, _ *roachpb.Transaction, _ []roachpb.Intent) error { return nil }, ) diff --git a/pkg/storage/batch_spanset_test.go b/pkg/storage/batch_spanset_test.go index be1f83b378fe..4897bb4382ab 100644 --- a/pkg/storage/batch_spanset_test.go +++ b/pkg/storage/batch_spanset_test.go @@ -184,11 +184,19 @@ func TestSpanSetBatchBoundaries(t *testing.T) { } else if err != nil { t.Errorf("unexpected error on iterator: %+v", err) } + // Seeking back in bounds restores validity. - iter.SeekLT(insideKey) + iter.SeekLT(insideKey2) if ok, err := iter.Valid(); !ok { t.Fatalf("expected valid iterator, err=%v", err) } + // SeekLT to the lower bound is invalid. + iter.SeekLT(insideKey) + if ok, err := iter.Valid(); ok { + t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key()) + } else if !isReadSpanErr(err) { + t.Fatalf("SeekLT: unexpected error %v", err) + } } func TestSpanSetBatchTimestamps(t *testing.T) { diff --git a/pkg/storage/engine/gc.go b/pkg/storage/engine/gc.go deleted file mode 100644 index b412f9d3efa0..000000000000 --- a/pkg/storage/engine/gc.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package engine - -import ( - "sort" - - "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" -) - -// GarbageCollector GCs MVCC key/values using a zone-specific GC -// policy allows either the union or intersection of maximum # of -// versions and maximum age. -type GarbageCollector struct { - Threshold hlc.Timestamp - policy zonepb.GCPolicy -} - -// MakeGarbageCollector allocates and returns a new GC, with expiration -// computed based on current time and policy.TTLSeconds. -func MakeGarbageCollector(now hlc.Timestamp, policy zonepb.GCPolicy) GarbageCollector { - ttlNanos := int64(policy.TTLSeconds) * 1e9 - return GarbageCollector{ - Threshold: hlc.Timestamp{WallTime: now.WallTime - ttlNanos}, - policy: policy, - } -} - -// Filter makes decisions about garbage collection based on the -// garbage collection policy for batches of values for the same -// key. Returns the index of the first key to be GC'd and the -// timestamp including, and after which, all values should be garbage -// collected. If no values should be GC'd, returns -1 for the index -// and the zero timestamp. Keys must be in descending time -// order. Values deleted at or before the returned timestamp can be -// deleted without invalidating any reads in the time interval -// (gc.expiration, \infinity). -// -// The GC keeps all values (including deletes) above the expiration time, plus -// the first value before or at the expiration time. This allows reads to be -// guaranteed as described above. However if this were the only rule, then if -// the most recent write was a delete, it would never be removed. Thus, when a -// deleted value is the most recent before expiration, it can be deleted. This -// would still allow for the tombstone bugs in #6227, so in the future we will -// add checks that disallow writes before the last GC expiration time. -func (gc GarbageCollector) Filter(keys []MVCCKey, values [][]byte) (int, hlc.Timestamp) { - if gc.policy.TTLSeconds <= 0 { - return -1, hlc.Timestamp{} - } - if len(keys) == 0 { - return -1, hlc.Timestamp{} - } - - // find the first expired key index using binary search - i := sort.Search(len(keys), func(i int) bool { return keys[i].Timestamp.LessEq(gc.Threshold) }) - - if i == len(keys) { - return -1, hlc.Timestamp{} - } - - // Now keys[i].Timestamp is <= gc.expiration, but the key-value pair is still - // "visible" at timestamp gc.expiration (and up to the next version). - if deleted := len(values[i]) == 0; deleted { - // We don't have to keep a delete visible (since GCing it does not change - // the outcome of the read). Note however that we can't touch deletes at - // higher timestamps immediately preceding this one, since they're above - // gc.expiration and are needed for correctness; see #6227. - return i, keys[i].Timestamp - } else if i+1 < len(keys) { - // Otherwise mark the previous timestamp for deletion (since it won't ever - // be returned for reads at gc.expiration and up). - return i + 1, keys[i+1].Timestamp - } - - return -1, hlc.Timestamp{} -} diff --git a/pkg/storage/engine/gc_test.go b/pkg/storage/engine/gc_test.go deleted file mode 100644 index 0fcc7572fad4..000000000000 --- a/pkg/storage/engine/gc_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package engine - -import ( - "testing" - - "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" -) - -func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) MVCCKey { - return MVCCKey{Key: key, Timestamp: ts} -} - -var ( - aKey = roachpb.Key("a") - bKey = roachpb.Key("b") - aKeys = []MVCCKey{ - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 1}), - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), - } - bKeys = []MVCCKey{ - mvccVersionKey(bKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), - mvccVersionKey(bKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), - } -) - -// TestGarbageCollectorFilter verifies the filter policies for -// different sorts of MVCC keys. -func TestGarbageCollectorFilter(t *testing.T) { - defer leaktest.AfterTest(t)() - gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 1}) - gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 2}) - n := []byte("data") - d := []byte(nil) - testData := []struct { - gc GarbageCollector - time hlc.Timestamp - keys []MVCCKey - values [][]byte - expIdx int - expDelTS hlc.Timestamp - }{ - {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{d, d, d}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{d, d}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 1e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 1e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{d, d, d}, 2, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 2e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, - {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{d, n, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 3e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 4e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, - {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, - {gcA, hlc.Timestamp{WallTime: 5e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, - {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, - } - for i, test := range testData { - test.gc.Threshold = test.time - test.gc.Threshold.WallTime -= int64(test.gc.policy.TTLSeconds) * 1e9 - idx, delTS := test.gc.Filter(test.keys, test.values) - if idx != test.expIdx { - t.Errorf("%d: expected index %d; got %d", i, test.expIdx, idx) - } - if delTS != test.expDelTS { - t.Errorf("%d: expected deletion timestamp %s; got %s", i, test.expDelTS, delTS) - } - } -} diff --git a/pkg/storage/engine/mvcc_test.go b/pkg/storage/engine/mvcc_test.go index fb66fb6d3be8..c6dbd9f4bf2f 100644 --- a/pkg/storage/engine/mvcc_test.go +++ b/pkg/storage/engine/mvcc_test.go @@ -106,6 +106,10 @@ func makeTxn(baseTxn roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction return txn } +func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) MVCCKey { + return MVCCKey{Key: key, Timestamp: ts} +} + type mvccKeys []MVCCKey func (n mvccKeys) Len() int { return len(n) } diff --git a/pkg/storage/gc/data_distribution_test.go b/pkg/storage/gc/data_distribution_test.go new file mode 100644 index 000000000000..0daf9b62261e --- /dev/null +++ b/pkg/storage/gc/data_distribution_test.go @@ -0,0 +1,254 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "fmt" + "math/rand" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// dataDistribution is an abstraction for testing that represents a stream of +// MVCCKeyValues. The stream may indicate that a value is an intent by returning +// a non-nil transaction. If an intent is returned it must have a higher +// timestamp than any other version written for the key. +type dataDistribution func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) + +// setupTest writes the data from this distribution into eng. All data should +// be a part of the range represented by desc. +func (ds dataDistribution) setupTest( + t testing.TB, eng engine.Engine, desc roachpb.RangeDescriptor, +) enginepb.MVCCStats { + ctx := context.TODO() + var maxTs hlc.Timestamp + var ms enginepb.MVCCStats + for { + kv, txn, ok := ds() + if !ok { + break + } + if txn == nil { + require.NoError(t, eng.Put(kv.Key, kv.Value)) + } else { + // TODO(ajwerner): Decide if using MVCCPut is worth it. + ts := kv.Key.Timestamp + if txn.ReadTimestamp == (hlc.Timestamp{}) { + txn.ReadTimestamp = ts + } + if txn.WriteTimestamp == (hlc.Timestamp{}) { + txn.WriteTimestamp = ts + } + err := engine.MVCCPut(ctx, eng, &ms, kv.Key.Key, ts, + roachpb.Value{RawBytes: kv.Value}, txn) + require.NoError(t, err) + } + if !kv.Key.Timestamp.Less(maxTs) { + maxTs = kv.Key.Timestamp + } + } + require.NoError(t, eng.Flush()) + snap := eng.NewSnapshot() + defer snap.Close() + ms, err := rditer.ComputeStatsForRange(&desc, snap, maxTs.WallTime) + require.NoError(t, err) + return ms +} + +// newDataDistribution constructs a dataDistribution from various underlying +// distributions. +func newDataDistribution( + tsDist func() hlc.Timestamp, + keyDist func() roachpb.Key, + valueDist func() []byte, + versionsPerKey func() int, + intentFrac float64, + totalKeys int, + rng *rand.Rand, +) dataDistribution { + // TODO(ajwerner): provide a mechanism to control the rate of expired intents + // or the intent age. Such a knob would likely require decoupling intents from + // other keys. + var ( + remaining = totalKeys + key roachpb.Key + seen = map[string]struct{}{} + timestamps []hlc.Timestamp + haveIntent bool + ) + return func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) { + if remaining == 0 { + return engine.MVCCKeyValue{}, nil, false + } + defer func() { remaining-- }() + for len(timestamps) == 0 { + versions := versionsPerKey() + if versions == 0 { + continue + } + if versions > remaining { + versions = remaining + } + timestamps = make([]hlc.Timestamp, 0, versions) + for i := 0; i < versions; i++ { + timestamps = append(timestamps, tsDist()) + } + sort.Slice(timestamps, func(i, j int) bool { + return timestamps[i].Less(timestamps[j]) + }) + for { + key = keyDist() + sk := string(key) + if _, ok := seen[sk]; ok { + continue + } + seen[sk] = struct{}{} + break + } + haveIntent = rng.Float64() < intentFrac + } + ts := timestamps[0] + timestamps = timestamps[1:] + var txn *roachpb.Transaction + if len(timestamps) == 0 && haveIntent { + txn = &roachpb.Transaction{ + Status: roachpb.PENDING, + ReadTimestamp: ts, + MaxTimestamp: ts.Next().Next(), + } + txn.ID = uuid.MakeV4() + txn.WriteTimestamp = ts + txn.Key = keyDist() + } + return engine.MVCCKeyValue{ + Key: engine.MVCCKey{Key: key, Timestamp: ts}, + Value: valueDist(), + }, txn, true + } +} + +// distSpec abstractly represents a distribution. +type distSpec interface { + dist(maxRows int, rng *rand.Rand) dataDistribution + desc() *roachpb.RangeDescriptor + String() string +} + +// uniformDistSpec is a distSpec which represents uniform distributions over its +// various dimensions. +type uniformDistSpec struct { + tsFrom, tsTo int64 // seconds + keySuffixMin, keySuffixMax int + valueLenMin, valueLenMax int + deleteFrac float64 + keysPerValueMin, keysPerValueMax int + intentFrac float64 +} + +var _ distSpec = uniformDistSpec{} + +func (ds uniformDistSpec) dist(maxRows int, rng *rand.Rand) dataDistribution { + return newDataDistribution( + uniformTimestampDistribution(ds.tsFrom*time.Second.Nanoseconds(), ds.tsTo*time.Second.Nanoseconds(), rng), + uniformTableKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng), + uniformValueDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng), + uniformValuesPerKey(ds.keysPerValueMin, ds.keysPerValueMax, rng), + ds.intentFrac, + maxRows, + rng, + ) +} + +func (ds uniformDistSpec) desc() *roachpb.RangeDescriptor { + tablePrefix := keys.MakeTablePrefix(42) + return &roachpb.RangeDescriptor{ + StartKey: tablePrefix, + EndKey: roachpb.RKey(roachpb.Key(tablePrefix).PrefixEnd()), + } +} + +func (ds uniformDistSpec) String() string { + return fmt.Sprintf( + "ts=[%d,%d],"+ + "keySuffix=[%d,%d],"+ + "valueLen=[%d,%d],"+ + "keysPerValue=[%d,%d],"+ + "deleteFrac=%f,intentFrac=%f", + ds.tsFrom, ds.tsTo, + ds.keySuffixMin, ds.keySuffixMax, + ds.valueLenMin, ds.valueLenMax, + ds.keysPerValueMin, ds.keysPerValueMax, + ds.deleteFrac, ds.intentFrac) +} + +// uniformTimestamp returns an hlc timestamp distribution with a wall time +// uniform over [from, to] and a zero logical timestamp. +func uniformTimestampDistribution(from, to int64, rng *rand.Rand) func() hlc.Timestamp { + if from >= to { + panic(fmt.Errorf("from (%d) >= to (%d)", from, to)) + } + n := int(to-from) + 1 + return func() hlc.Timestamp { + return hlc.Timestamp{WallTime: from + int64(rng.Intn(n))} + } +} + +// returns a uniform length random value distribution. +func uniformValueDistribution(min, max int, deleteFrac float64, rng *rand.Rand) func() []byte { + if min > max { + panic(fmt.Errorf("min (%d) > max (%d)", min, max)) + } + n := (max - min) + 1 + return func() []byte { + if rng.Float64() < deleteFrac { + return nil + } + value := make([]byte, min+rng.Intn(n)) + if _, err := rng.Read(value); err != nil { + panic(err) + } + return value + } +} + +func uniformValuesPerKey(valuesPerKeyMin, valuesPerKeyMax int, rng *rand.Rand) func() int { + if valuesPerKeyMin > valuesPerKeyMax { + panic(fmt.Errorf("min (%d) > max (%d)", valuesPerKeyMin, valuesPerKeyMax)) + } + n := (valuesPerKeyMax - valuesPerKeyMin) + 1 + return func() int { return valuesPerKeyMin + rng.Intn(n) } +} + +func uniformTableKeyDistribution( + prefix roachpb.Key, suffixMin, suffixMax int, rng *rand.Rand, +) func() roachpb.Key { + if suffixMin > suffixMax { + panic(fmt.Errorf("suffixMin (%d) > suffixMax (%d)", suffixMin, suffixMax)) + } + n := (suffixMax - suffixMin) + 1 + return func() roachpb.Key { + randData := make([]byte, suffixMin+rng.Intn(n)) + _, _ = rng.Read(randData) + return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], randData) + } +} diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go new file mode 100644 index 000000000000..60947fc81718 --- /dev/null +++ b/pkg/storage/gc/gc.go @@ -0,0 +1,497 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package gc contains the logic to run scan a range for garbage and issue +// GC requests to remove that garbage. +// +// The Run function is the primary entry point and is called underneath the +// gcQueue in the storage package. It can also be run for debugging. +package gc + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/abortspan" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +const ( + // IntentAgeThreshold is the threshold after which an extant intent + // will be resolved. + IntentAgeThreshold = 2 * time.Hour // 2 hour + + // KeyVersionChunkBytes is the threshold size for splitting + // GCRequests into multiple batches. + KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes +) + +// CalculateThreshold calculates the GC threshold given the policy and the +// current view of time. +func CalculateThreshold(now hlc.Timestamp, policy zonepb.GCPolicy) (threshold hlc.Timestamp) { + ttlNanos := int64(policy.TTLSeconds) * time.Second.Nanoseconds() + return hlc.Timestamp{WallTime: now.WallTime - ttlNanos} +} + +// A GCer is an abstraction used by the GC queue to carry out chunked deletions. +type GCer interface { + SetGCThreshold(context.Context, Threshold) error + GC(context.Context, []roachpb.GCRequest_GCKey) error +} + +// NoopGCer implements GCer by doing nothing. +type NoopGCer struct{} + +var _ GCer = NoopGCer{} + +// SetGCThreshold implements storage.GCer. +func (NoopGCer) SetGCThreshold(context.Context, Threshold) error { return nil } + +// GC implements storage.GCer. +func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil } + +// Threshold holds the key and txn span GC thresholds, respectively. +type Threshold struct { + Key hlc.Timestamp + Txn hlc.Timestamp +} + +// Info contains statistics and insights from a GC run. +type Info struct { + // Now is the timestamp used for age computations. + Now hlc.Timestamp + // Policy is the policy used for this garbage collection cycle. + Policy zonepb.GCPolicy + // Stats about the userspace key-values considered, namely the number of + // keys with GC'able data, the number of "old" intents and the number of + // associated distinct transactions. + NumKeysAffected, IntentsConsidered, IntentTxns int + // TransactionSpanTotal is the total number of entries in the transaction span. + TransactionSpanTotal int + // Summary of transactions which were found GCable (assuming that + // potentially necessary intent resolutions did not fail). + TransactionSpanGCAborted, TransactionSpanGCCommitted int + TransactionSpanGCStaging, TransactionSpanGCPending int + // AbortSpanTotal is the total number of transactions present in the AbortSpan. + AbortSpanTotal int + // AbortSpanConsidered is the number of AbortSpan entries old enough to be + // considered for removal. An "entry" corresponds to one transaction; + // more than one key-value pair may be associated with it. + AbortSpanConsidered int + // AbortSpanGCNum is the number of AbortSpan entries fit for removal (due + // to their transactions having terminated). + AbortSpanGCNum int + // PushTxn is the total number of pushes attempted in this cycle. + PushTxn int + // ResolveTotal is the total number of attempted intent resolutions in + // this cycle. + ResolveTotal int + // Threshold is the computed expiration timestamp. Equal to `Now - Policy`. + Threshold hlc.Timestamp + // AffectedVersionsKeyBytes is the number of (fully encoded) bytes deleted from keys in the storage engine. + // Note that this does not account for compression that the storage engine uses to store data on disk. Real + // space savings tends to be smaller due to this compression, and space may be released only at a later point + // in time. + AffectedVersionsKeyBytes int64 + // AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine. + // See AffectedVersionsKeyBytes for caveats. + AffectedVersionsValBytes int64 +} + +// CleanupIntentsFunc synchronously resolves the supplied intents +// (which may be PENDING, in which case they are first pushed) while +// taking care of proper batching. +type CleanupIntentsFunc func(context.Context, []roachpb.Intent) error + +// CleanupTxnIntentsAsyncFunc asynchronously cleans up intents from a +// transaction record, pushing the transaction first if it is +// PENDING. Once all intents are resolved successfully, removes the +// transaction record. +type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.Intent) error + +// Run runs garbage collection for the specified descriptor on the +// provided Engine (which is not mutated). It uses the provided gcFn +// to run garbage collection once on all implicated spans, +// cleanupIntentsFn to resolve intents synchronously, and +// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and +// associated transaction record on success. +func Run( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + policy zonepb.GCPolicy, + gcer GCer, + cleanupIntentsFn CleanupIntentsFunc, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) (Info, error) { + + txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) + threshold := CalculateThreshold(now, policy) + + if err := gcer.SetGCThreshold(ctx, Threshold{ + Key: threshold, + Txn: txnExp, + }); err != nil { + return Info{}, errors.Wrap(err, "failed to set GC thresholds") + } + + info := Info{ + Policy: policy, + Now: now, + Threshold: threshold, + } + + // Maps from txn ID to txn and intent key slice. + txnMap := map[uuid.UUID]*roachpb.Transaction{} + intentSpanMap := map[uuid.UUID][]roachpb.Span{} + err := processReplicatedKeyRange(ctx, desc, snap, now, threshold, gcer, txnMap, intentSpanMap, &info) + if err != nil { + return Info{}, err + } + + // From now on, all newly added keys are range-local. + + // Process local range key entries (txn records, queue last processed times). + localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) + if err != nil { + return Info{}, err + } + + if err := gcer.GC(ctx, localRangeKeys); err != nil { + return Info{}, err + } + + // Clean up the AbortSpan. + log.Event(ctx, "processing AbortSpan") + abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) + if err := gcer.GC(ctx, abortSpanKeys); err != nil { + return Info{}, err + } + + log.Eventf(ctx, "GC'ed keys; stats %+v", info) + + // Push transactions (if pending) and resolve intents. + var intents []roachpb.Intent + for txnID, txn := range txnMap { + intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) + } + info.ResolveTotal += len(intents) + log.Eventf(ctx, "cleanup of %d intents", len(intents)) + if err := cleanupIntentsFn(ctx, intents); err != nil { + return Info{}, err + } + + return info, nil +} + +// processReplicatedKeyRange identifies garbage and sends GC requests to +// remove it. +// +// The logic iterates all versions of all keys in the range from oldest to +// newest. Expired intents are written into the txnMap and intentSpanMap. +func processReplicatedKeyRange( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + threshold hlc.Timestamp, + gcer GCer, + txnMap map[uuid.UUID]*roachpb.Transaction, + intentSpanMap map[uuid.UUID][]roachpb.Span, + info *Info, +) error { + var alloc bufalloc.ByteAllocator + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) + handleIntent := func(md *engine.MVCCKeyValue) { + meta := &enginepb.MVCCMetadata{} + if err := protoutil.Unmarshal(md.Value, meta); err != nil { + log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", md.Key, err) + return + } + if meta.Txn != nil { + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if hlc.Timestamp(meta.Timestamp).Less(intentExp) { + txnID := meta.Txn.ID + if _, ok := txnMap[txnID]; !ok { + txnMap[txnID] = &roachpb.Transaction{ + TxnMeta: *meta.Txn, + } + // IntentTxns and PushTxn will be equal here, since + // pushes to transactions whose record lies in this + // range (but which are not associated to a remaining + // intent on it) happen asynchronously and are accounted + // for separately. Thus higher up in the stack, we + // expect PushTxn > IntentTxns. + info.IntentTxns++ + // All transactions in txnMap may be PENDING and + // cleanupIntentsFn will push them to finalize them. + info.PushTxn++ + } + info.IntentsConsidered++ + alloc, md.Key.Key = alloc.Copy(md.Key.Key, 0) + intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{ + Key: md.Key.Key, + }) + } + } + } + + // Iterate all versions of all keys from oldest to newest. If a version is an + // intent it will have the highest timestamp of any versions and will be + // followed by a metadata entry. The loop will determine whether a given key + // has garbage and, if so, will determine the timestamp of the latest version + // which is garbage to be added to the current batch. If the current version + // pushes the size of keys to be removed above the limit, the current key will + // be added with that version and the batch will be sent. When the newest + // version for a key has been reached, if haveGarbageForThisKey, we'll add the + // current key to the batch with the gcTimestampForThisKey. + var ( + batchGCKeys []roachpb.GCRequest_GCKey + batchGCKeysBytes int64 + haveGarbageForThisKey bool + gcTimestampForThisKey hlc.Timestamp + sentBatchForThisKey bool + ) + it := makeGCIterator(desc, snap) + defer it.close() + for ; ; it.step() { + s, ok := it.state() + if !ok { + if it.err != nil { + return it.err + } + break + } + if s.curIsNotValue() { // Step over metadata or other system keys + continue + } + if s.curIsIntent() { + handleIntent(s.next) + continue + } + isNewest := s.curIsNewest() + if isGarbage(threshold, s.cur, s.next, isNewest) { + keyBytes := int64(s.cur.Key.EncodedSize()) + batchGCKeysBytes += keyBytes + haveGarbageForThisKey = true + gcTimestampForThisKey = s.cur.Key.Timestamp + info.AffectedVersionsKeyBytes += keyBytes + info.AffectedVersionsValBytes += int64(len(s.cur.Value)) + } + if affected := isNewest && (sentBatchForThisKey || haveGarbageForThisKey); affected { + info.NumKeysAffected++ + } + shouldSendBatch := batchGCKeysBytes >= KeyVersionChunkBytes + if shouldSendBatch || isNewest && haveGarbageForThisKey { + alloc, s.cur.Key.Key = alloc.Copy(s.cur.Key.Key, 0) + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{ + Key: s.cur.Key.Key, + Timestamp: gcTimestampForThisKey, + }) + haveGarbageForThisKey = false + gcTimestampForThisKey = hlc.Timestamp{} + + // Mark that we sent a batch for this key so we know that we had garbage + // even if it turns out that there's no more garbage for this key. + // We want to count a key as affected once even if we paginate the + // deletion of its versions. + sentBatchForThisKey = shouldSendBatch && !isNewest + } + if shouldSendBatch { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + // Even though we are batching the GC process, it's + // safe to continue because we bumped the GC + // thresholds. We may leave some inconsistent history + // behind, but nobody can read it. + log.Warningf(ctx, "failed to GC a batch of keys: %v", err) + } + batchGCKeys = nil + batchGCKeysBytes = 0 + alloc = nil + } + } + if len(batchGCKeys) > 0 { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + return err + } + } + return nil +} + +// isGarbage makes a determination whether a key ('cur') is garbage. If 'next' +// is non-nil, it should be the chronologically newer version of the same key +// (or the metadata KV if cur is an intent). If isNewest is false, next must be +// non-nil. isNewest implies that this is the highest timestamp committed +// version for this key. If isNewest is true and next is non-nil, it is an +// intent. Conservatively we have to assume that the intent will get aborted, +// so we will be able to GC just the values that we could remove if there +// weren't an intent. Hence this definition of isNewest. +// +// We keep all values (including deletes) above the expiration time, plus +// the first value before or at the expiration time. This allows reads to be +// guaranteed as described above. However if this were the only rule, then if +// the most recent write was a delete, it would never be removed. Thus, when a +// deleted value is the most recent before expiration, it can be deleted. +func isGarbage(threshold hlc.Timestamp, cur, next *engine.MVCCKeyValue, isNewest bool) bool { + // If the value is not at or below the threshold then it's not garbage. + if belowThreshold := cur.Key.Timestamp.LessEq(threshold); !belowThreshold { + return false + } + isDelete := len(cur.Value) == 0 + if isNewest && !isDelete { + return false + } + // If this value is not a delete, then we need to make sure that the next + // value is also at or below the threshold. + // NB: This doesn't need to check whether next is nil because we know + // isNewest is false when evaluating rhs of the or below. + if !isDelete && next == nil { + panic("huh") + } + return isDelete || next.Key.Timestamp.LessEq(threshold) +} + +// processLocalKeyRange scans the local range key entries, consisting of +// transaction records, queue last processed timestamps, and range descriptors. +// +// - Transaction entries: +// - For expired transactions , schedule the intents for +// asynchronous resolution. The actual transaction spans are not +// returned for GC in this pass, but are separately GC'ed after +// successful resolution of all intents. The exception is if there +// are no intents on the txn record, in which case it's returned for +// immediate GC. +// +// - Queue last processed times: cleanup any entries which don't match +// this range's start key. This can happen on range merges. +func processLocalKeyRange( + ctx context.Context, + snap engine.Reader, + desc *roachpb.RangeDescriptor, + cutoff hlc.Timestamp, + info *Info, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) ([]roachpb.GCRequest_GCKey, error) { + var gcKeys []roachpb.GCRequest_GCKey + + handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { + // If the transaction needs to be pushed or there are intents to + // resolve, invoke the cleanup function. + if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { + return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsIntents(txn.IntentSpans, txn)) + } + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp + return nil + } + + handleOneTransaction := func(kv roachpb.KeyValue) error { + var txn roachpb.Transaction + if err := kv.Value.GetProto(&txn); err != nil { + return err + } + info.TransactionSpanTotal++ + if cutoff.LessEq(txn.LastActive()) { + return nil + } + + // The transaction record should be considered for removal. + switch txn.Status { + case roachpb.PENDING: + info.TransactionSpanGCPending++ + case roachpb.STAGING: + info.TransactionSpanGCStaging++ + case roachpb.ABORTED: + info.TransactionSpanGCAborted++ + case roachpb.COMMITTED: + info.TransactionSpanGCCommitted++ + default: + panic(fmt.Sprintf("invalid transaction state: %s", txn)) + } + return handleTxnIntents(kv.Key, &txn) + } + + handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { + if !rangeKey.Equal(desc.StartKey) { + // Garbage collect the last processed timestamp if it doesn't match start key. + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp + } + return nil + } + + handleOne := func(kv roachpb.KeyValue) error { + rangeKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) + if err != nil { + return err + } + if suffix.Equal(keys.LocalTransactionSuffix.AsRawKey()) { + if err := handleOneTransaction(kv); err != nil { + return err + } + } else if suffix.Equal(keys.LocalQueueLastProcessedSuffix.AsRawKey()) { + if err := handleOneQueueLastProcessed(kv, roachpb.RKey(rangeKey)); err != nil { + return err + } + } + return nil + } + + startKey := keys.MakeRangeKeyPrefix(desc.StartKey) + endKey := keys.MakeRangeKeyPrefix(desc.EndKey) + + _, err := engine.MVCCIterate(ctx, snap, startKey, endKey, hlc.Timestamp{}, engine.MVCCScanOptions{}, + func(kv roachpb.KeyValue) (bool, error) { + return false, handleOne(kv) + }) + return gcKeys, err +} + +// processAbortSpan iterates through the local AbortSpan entries +// and collects entries which indicate that a client which was running +// this transaction must have realized that it has been aborted (due to +// heartbeating having failed). The parameter minAge is typically a +// multiple of the heartbeat timeout used by the coordinator. +func processAbortSpan( + ctx context.Context, + snap engine.Reader, + rangeID roachpb.RangeID, + threshold hlc.Timestamp, + info *Info, +) []roachpb.GCRequest_GCKey { + var gcKeys []roachpb.GCRequest_GCKey + abortSpan := abortspan.New(rangeID) + if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { + info.AbortSpanTotal++ + if v.Timestamp.Less(threshold) { + info.AbortSpanGCNum++ + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) + } + return nil + }); err != nil { + // Still return whatever we managed to collect. + log.Warning(ctx, err) + } + return gcKeys +} diff --git a/pkg/storage/gc/gc_iterator.go b/pkg/storage/gc/gc_iterator.go new file mode 100644 index 000000000000..a86876210b07 --- /dev/null +++ b/pkg/storage/gc/gc_iterator.go @@ -0,0 +1,174 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" +) + +// gcIterator wraps an rditer.ReplicaDataIterator which it reverse iterates for +// the purpose of discovering gc-able replicated data. +type gcIterator struct { + it *rditer.ReplicaDataIterator + done bool + err error + buf gcIteratorRingBuf +} + +func makeGCIterator(desc *roachpb.RangeDescriptor, snap engine.Reader) gcIterator { + return gcIterator{ + it: rditer.NewReplicaDataIterator(desc, snap, + true /* replicatedOnly */, true /* seekEnd */), + } +} + +type gcIteratorState struct { + cur, next, afterNext *engine.MVCCKeyValue +} + +// curIsNewest returns true if the current MVCCKeyValue in the gcIteratorState +// is the newest committed version of the key. +// +// It returns true if next is nil or if next is an intent. +func (s *gcIteratorState) curIsNewest() bool { + return s.cur.Key.IsValue() && + (s.next == nil || (s.afterNext != nil && !s.afterNext.Key.IsValue())) +} + +// curIsNotValue returns true if the current MVCCKeyValue in the gcIteratorState +// is not a value, i.e. does not have a timestamp. +func (s *gcIteratorState) curIsNotValue() bool { + return !s.cur.Key.IsValue() +} + +// curIsIntent returns true if the current MVCCKeyValue in the gcIteratorState +// is an intent. +func (s *gcIteratorState) curIsIntent() bool { + return s.next != nil && !s.next.Key.IsValue() +} + +// state returns the current state of the iterator. The state contains the +// current and the two following versions of the current key if they exist. +// +// If ok is false, further iteration is unsafe; either the end of iteration has +// been reached or an error has occurred. Callers should check it.err to +// determine whether an error has occurred in cases where ok is false. +// +// It is not safe to use values in the state after subsequent calls to +// it.step(). +func (it *gcIterator) state() (s gcIteratorState, ok bool) { + // The current key is the newest if the key which comes next is different or + // the key which comes after the current key is an intent or this is the first + // key in the range. + s.cur, ok = it.peekAt(0) + if !ok { + return gcIteratorState{}, false + } + next, ok := it.peekAt(1) + if !ok && it.err != nil { // cur is the first key in the range + return gcIteratorState{}, false + } + if !ok || !next.Key.Key.Equal(s.cur.Key.Key) { + return s, true + } + s.next = next + afterNext, ok := it.peekAt(2) + if !ok && it.err != nil { // cur is the first key in the range + return gcIteratorState{}, false + } + if !ok || !afterNext.Key.Key.Equal(s.cur.Key.Key) { + return s, true + } + s.afterNext = afterNext + return s, true +} + +func (it *gcIterator) step() { + it.buf.removeFront() +} + +func (it *gcIterator) peekAt(i int) (*engine.MVCCKeyValue, bool) { + if it.buf.len <= i { + if !it.fillTo(i + 1) { + return nil, false + } + } + return it.buf.at(i), true +} + +func (it *gcIterator) fillTo(targetLen int) (ok bool) { + for it.buf.len < targetLen { + if ok, err := it.it.Valid(); !ok { + it.err, it.done = err, err == nil + return false + } + it.buf.pushBack(it.it) + it.it.Prev() + } + return true +} + +func (it *gcIterator) close() { + it.it.Close() + it.it = nil +} + +// gcIteratorRingBufSize is 3 because the gcIterator.state method at most needs +// to look forward two keys ahead of the current key. +const gcIteratorRingBufSize = 3 + +type gcIteratorRingBuf struct { + allocs [gcIteratorRingBufSize]bufalloc.ByteAllocator + buf [gcIteratorRingBufSize]engine.MVCCKeyValue + len int + head int +} + +func (b *gcIteratorRingBuf) at(i int) *engine.MVCCKeyValue { + if i >= b.len { + panic("index out of range") + } + return &b.buf[(b.head+i)%gcIteratorRingBufSize] +} + +func (b *gcIteratorRingBuf) removeFront() { + if b.len == 0 { + panic("cannot remove from empty gcIteratorRingBuf") + } + b.buf[b.head] = engine.MVCCKeyValue{} + b.head = (b.head + 1) % gcIteratorRingBufSize + b.len-- +} + +type iterator interface { + UnsafeKey() engine.MVCCKey + UnsafeValue() []byte +} + +func (b *gcIteratorRingBuf) pushBack(it iterator) { + if b.len == gcIteratorRingBufSize { + panic("cannot add to full gcIteratorRingBuf") + } + i := (b.head + b.len) % gcIteratorRingBufSize + b.allocs[i] = b.allocs[i][:0] + k := it.UnsafeKey() + v := it.UnsafeValue() + b.allocs[i], k.Key = b.allocs[i].Copy(k.Key, len(v)) + b.allocs[i], v = b.allocs[i].Copy(v, 0) + b.buf[i] = engine.MVCCKeyValue{ + Key: k, + Value: v, + } + b.len++ +} diff --git a/pkg/storage/gc/gc_iterator_test.go b/pkg/storage/gc/gc_iterator_test.go new file mode 100644 index 000000000000..a5a7d58b8d18 --- /dev/null +++ b/pkg/storage/gc/gc_iterator_test.go @@ -0,0 +1,165 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestGCIterator exercises the GC iterator by writing data to the underlying +// engine and then validating the state of the iterator as it iterates that +// data. +func TestGCIterator(t *testing.T) { + // dataItem represents a version in the storage engine and optionally a + // corresponding transaction which will make the MVCCKeyValue an intent. + type dataItem struct { + engine.MVCCKeyValue + txn *roachpb.Transaction + } + // makeDataItem is a shorthand to construct dataItems. + makeDataItem := func(k roachpb.Key, val []byte, ts int64, txn *roachpb.Transaction) dataItem { + return dataItem{ + MVCCKeyValue: engine.MVCCKeyValue{ + Key: engine.MVCCKey{ + Key: k, + Timestamp: hlc.Timestamp{WallTime: ts * time.Nanosecond.Nanoseconds()}, + }, + Value: val, + }, + txn: txn, + } + } + // makeLiteralDistribution adapts dataItems for use with the data distribution + // infrastructure. + makeLiteralDataDistribution := func(items ...dataItem) dataDistribution { + return func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) { + if len(items) == 0 { + return engine.MVCCKeyValue{}, nil, false + } + item := items[0] + defer func() { items = items[1:] }() + return item.MVCCKeyValue, item.txn, true + } + } + // stateExpectations are expectations about the state of the iterator. + type stateExpectations struct { + cur, next, afterNext int + isNewest bool + isIntent bool + isNotValue bool + } + // notation to mark that an iterator state element as either nil or metadata. + const ( + isNil = -1 + isMD = -2 + ) + // exp is a shorthand to construct state expectations. + exp := func(cur, next, afterNext int, isNewest, isIntent, isNotValue bool) stateExpectations { + return stateExpectations{ + cur: cur, next: next, afterNext: afterNext, + isNewest: isNewest, + isIntent: isIntent, + isNotValue: isNotValue, + } + } + vals := uniformValueDistribution(3, 5, 0, rand.New(rand.NewSource(1))) + tablePrefix := keys.MakeTablePrefix(42) + desc := roachpb.RangeDescriptor{StartKey: tablePrefix, EndKey: roachpb.RKey(roachpb.Key(tablePrefix).PrefixEnd())} + keyA := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'a') + keyB := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'b') + keyC := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'c') + makeTxn := func() *roachpb.Transaction { + txn := roachpb.Transaction{} + txn.Key = keyA + txn.ID = uuid.MakeV4() + txn.Status = roachpb.PENDING + return &txn + } + + type testCase struct { + name string + data []dataItem + expectations []stateExpectations + } + // checkExpectations tests whether the state of the iterator matches the + // expectation. + checkExpectations := func( + t *testing.T, data []dataItem, ex stateExpectations, s gcIteratorState, + ) { + check := func(ex int, kv *engine.MVCCKeyValue) { + switch { + case ex >= 0: + require.EqualValues(t, &data[ex].MVCCKeyValue, kv) + case ex == isNil: + require.Nil(t, kv) + case ex == isMD: + require.False(t, kv.Key.IsValue()) + } + } + check(ex.cur, s.cur) + check(ex.next, s.next) + check(ex.afterNext, s.afterNext) + require.Equal(t, ex.isNewest, s.curIsNewest()) + require.Equal(t, ex.isIntent, s.curIsIntent()) + } + makeTest := func(tc testCase) func(t *testing.T) { + return func(t *testing.T) { + eng := engine.NewDefaultInMem() + ds := makeLiteralDataDistribution(tc.data...) + ds.setupTest(t, eng, desc) + snap := eng.NewSnapshot() + defer snap.Close() + it := makeGCIterator(&desc, snap) + expectations := tc.expectations + for i, ex := range expectations { + t.Run(fmt.Sprint(i), func(t *testing.T) { + s, ok := it.state() + require.True(t, ok) + checkExpectations(t, tc.data, ex, s) + }) + it.step() + } + } + } + di := makeDataItem // shorthand for convenient notation + for _, tc := range []testCase{ + { + name: "basic", + data: []dataItem{ + di(keyA, vals(), 2, nil), + di(keyA, vals(), 11, nil), + di(keyA, vals(), 14, nil), + di(keyB, vals(), 3, nil), + di(keyC, vals(), 7, makeTxn()), + }, + expectations: []stateExpectations{ + exp(4, isMD, isNil, false, true, false), + exp(isMD, isNil, isNil, false, false, true), + exp(3, isNil, isNil, true, false, false), + exp(0, 1, 2, false, false, false), + exp(1, 2, isNil, false, false, false), + exp(2, isNil, isNil, true, false, false), + }, + }, + } { + t.Run(tc.name, makeTest(tc)) + } +} diff --git a/pkg/storage/gc/gc_old_test.go b/pkg/storage/gc/gc_old_test.go new file mode 100644 index 000000000000..4f22542e47b9 --- /dev/null +++ b/pkg/storage/gc/gc_old_test.go @@ -0,0 +1,380 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +// runGCOld is an older implementation of Run. It is used for benchmarking and +// testing. +// +// runGCOld runs garbage collection for the specified descriptor on the +// provided Engine (which is not mutated). It uses the provided gcFn +// to run garbage collection once on all implicated spans, +// cleanupIntentsFn to resolve intents synchronously, and +// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and +// associated transaction record on success. +func runGCOld( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + policy zonepb.GCPolicy, + gcer GCer, + cleanupIntentsFn CleanupIntentsFunc, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) (Info, error) { + + iter := rditer.NewReplicaDataIterator(desc, snap, + true /* replicatedOnly */, false /* seekEnd */) + defer iter.Close() + + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) + txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) + + gc := MakeGarbageCollector(now, policy) + + if err := gcer.SetGCThreshold(ctx, Threshold{ + Key: gc.Threshold, + Txn: txnExp, + }); err != nil { + return Info{}, errors.Wrap(err, "failed to set GC thresholds") + } + + var batchGCKeys []roachpb.GCRequest_GCKey + var batchGCKeysBytes int64 + var expBaseKey roachpb.Key + var keys []engine.MVCCKey + var vals [][]byte + var keyBytes int64 + var valBytes int64 + info := Info{ + Policy: policy, + Now: now, + Threshold: gc.Threshold, + } + + // Maps from txn ID to txn and intent key slice. + txnMap := map[uuid.UUID]*roachpb.Transaction{} + intentSpanMap := map[uuid.UUID][]roachpb.Span{} + + // processKeysAndValues is invoked with each key and its set of + // values. Intents older than the intent age threshold are sent for + // resolution and values after the MVCC metadata, and possible + // intent, are sent for garbage collection. + processKeysAndValues := func() { + // If there's more than a single value for the key, possibly send for GC. + if len(keys) > 1 { + meta := &enginepb.MVCCMetadata{} + if err := protoutil.Unmarshal(vals[0], meta); err != nil { + log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keys[0], err) + } else { + // In the event that there's an active intent, send for + // intent resolution if older than the threshold. + startIdx := 1 + if meta.Txn != nil { + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if hlc.Timestamp(meta.Timestamp).Less(intentExp) { + txnID := meta.Txn.ID + if _, ok := txnMap[txnID]; !ok { + txnMap[txnID] = &roachpb.Transaction{ + TxnMeta: *meta.Txn, + } + // IntentTxns and PushTxn will be equal here, since + // pushes to transactions whose record lies in this + // range (but which are not associated to a remaining + // intent on it) happen asynchronously and are accounted + // for separately. Thus higher up in the stack, we + // expect PushTxn > IntentTxns. + info.IntentTxns++ + // All transactions in txnMap may be PENDING and + // cleanupIntentsFn will push them to finalize them. + info.PushTxn++ + } + info.IntentsConsidered++ + intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{Key: expBaseKey}) + } + // With an active intent, GC ignores MVCC metadata & intent value. + startIdx = 2 + } + // See if any values may be GC'd. + if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) { + // Batch keys after the total size of version keys exceeds + // the threshold limit. This avoids sending potentially large + // GC requests through Raft. Iterate through the keys in reverse + // order so that GC requests can be made multiple times even on + // a single key, with successively newer timestamps to prevent + // any single request from exploding during GC evaluation. + for i := len(keys) - 1; i >= startIdx+idx; i-- { + keyBytes = int64(keys[i].EncodedSize()) + valBytes = int64(len(vals[i])) + + // Add the total size of the GC'able versions of the keys and values to Info. + info.AffectedVersionsKeyBytes += keyBytes + info.AffectedVersionsValBytes += valBytes + + batchGCKeysBytes += keyBytes + // If the current key brings the batch over the target + // size, add the current timestamp to finish the current + // chunk and start a new one. + if batchGCKeysBytes >= KeyVersionChunkBytes { + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp}) + + err := gcer.GC(ctx, batchGCKeys) + + // Succeed or fail, allow releasing the memory backing batchGCKeys. + iter.ResetAllocator() + batchGCKeys = nil + batchGCKeysBytes = 0 + + if err != nil { + // Even though we are batching the GC process, it's + // safe to continue because we bumped the GC + // thresholds. We may leave some inconsistent history + // behind, but nobody can read it. + log.Warning(ctx, err) + return + } + } + } + // Add the key to the batch at the GC timestamp, unless it was already added. + if batchGCKeysBytes != 0 { + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: gcTS}) + } + info.NumKeysAffected++ + } + } + } + } + + // Iterate through the keys and values of this replica's range. + log.Event(ctx, "iterating through range") + for ; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return Info{}, err + } else if !ok { + break + } else if ctx.Err() != nil { + // Stop iterating if our context has expired. + return Info{}, err + } + iterKey := iter.Key() + if !iterKey.IsValue() || !iterKey.Key.Equal(expBaseKey) { + // Moving to the next key (& values). + processKeysAndValues() + expBaseKey = iterKey.Key + if !iterKey.IsValue() { + keys = []engine.MVCCKey{iter.Key()} + vals = [][]byte{iter.Value()} + continue + } + // An implicit metadata. + keys = []engine.MVCCKey{engine.MakeMVCCMetadataKey(iterKey.Key)} + // A nil value for the encoded MVCCMetadata. This will unmarshal to an + // empty MVCCMetadata which is sufficient for processKeysAndValues to + // determine that there is no intent. + vals = [][]byte{nil} + } + keys = append(keys, iter.Key()) + vals = append(vals, iter.Value()) + } + // Handle last collected set of keys/vals. + processKeysAndValues() + if len(batchGCKeys) > 0 { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + return Info{}, err + } + } + + // From now on, all newly added keys are range-local. + + // Process local range key entries (txn records, queue last processed times). + localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) + if err != nil { + return Info{}, err + } + + if err := gcer.GC(ctx, localRangeKeys); err != nil { + return Info{}, err + } + + // Clean up the AbortSpan. + log.Event(ctx, "processing AbortSpan") + abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) + if err := gcer.GC(ctx, abortSpanKeys); err != nil { + return Info{}, err + } + + log.Eventf(ctx, "GC'ed keys; stats %+v", info) + + // Push transactions (if pending) and resolve intents. + var intents []roachpb.Intent + for txnID, txn := range txnMap { + intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) + } + info.ResolveTotal += len(intents) + log.Eventf(ctx, "cleanup of %d intents", len(intents)) + if err := cleanupIntentsFn(ctx, intents); err != nil { + return Info{}, err + } + + return info, nil +} + +// GarbageCollector GCs MVCC key/values using a zone-specific GC +// policy allows either the union or intersection of maximum # of +// versions and maximum age. +type GarbageCollector struct { + Threshold hlc.Timestamp + policy zonepb.GCPolicy +} + +// MakeGarbageCollector allocates and returns a new GC, with expiration +// computed based on current time and policy.TTLSeconds. +func MakeGarbageCollector(now hlc.Timestamp, policy zonepb.GCPolicy) GarbageCollector { + return GarbageCollector{ + Threshold: CalculateThreshold(now, policy), + policy: policy, + } +} + +// Filter makes decisions about garbage collection based on the +// garbage collection policy for batches of values for the same +// key. Returns the index of the first key to be GC'd and the +// timestamp including, and after which, all values should be garbage +// collected. If no values should be GC'd, returns -1 for the index +// and the zero timestamp. Keys must be in descending time +// order. Values deleted at or before the returned timestamp can be +// deleted without invalidating any reads in the time interval +// (gc.expiration, \infinity). +// +// The GC keeps all values (including deletes) above the expiration time, plus +// the first value before or at the expiration time. This allows reads to be +// guaranteed as described above. However if this were the only rule, then if +// the most recent write was a delete, it would never be removed. Thus, when a +// deleted value is the most recent before expiration, it can be deleted. This +// would still allow for the tombstone bugs in #6227, so in the future we will +// add checks that disallow writes before the last GC expiration time. +func (gc GarbageCollector) Filter(keys []engine.MVCCKey, values [][]byte) (int, hlc.Timestamp) { + if gc.policy.TTLSeconds <= 0 { + return -1, hlc.Timestamp{} + } + if len(keys) == 0 { + return -1, hlc.Timestamp{} + } + + // find the first expired key index using binary search + i := sort.Search(len(keys), func(i int) bool { return keys[i].Timestamp.LessEq(gc.Threshold) }) + + if i == len(keys) { + return -1, hlc.Timestamp{} + } + + // Now keys[i].Timestamp is <= gc.expiration, but the key-value pair is still + // "visible" at timestamp gc.expiration (and up to the next version). + if deleted := len(values[i]) == 0; deleted { + // We don't have to keep a delete visible (since GCing it does not change + // the outcome of the read). Note however that we can't touch deletes at + // higher timestamps immediately preceding this one, since they're above + // gc.expiration and are needed for correctness; see #6227. + return i, keys[i].Timestamp + } else if i+1 < len(keys) { + // Otherwise mark the previous timestamp for deletion (since it won't ever + // be returned for reads at gc.expiration and up). + return i + 1, keys[i+1].Timestamp + } + + return -1, hlc.Timestamp{} +} + +func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) engine.MVCCKey { + return engine.MVCCKey{Key: key, Timestamp: ts} +} + +var ( + aKey = roachpb.Key("a") + bKey = roachpb.Key("b") + aKeys = []engine.MVCCKey{ + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 1}), + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), + } + bKeys = []engine.MVCCKey{ + mvccVersionKey(bKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), + mvccVersionKey(bKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), + } +) + +// TestGarbageCollectorFilter verifies the filter policies for +// different sorts of MVCC keys. +func TestGarbageCollectorFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 1}) + gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, zonepb.GCPolicy{TTLSeconds: 2}) + n := []byte("data") + d := []byte(nil) + testData := []struct { + gc GarbageCollector + time hlc.Timestamp + keys []engine.MVCCKey + values [][]byte + expIdx int + expDelTS hlc.Timestamp + }{ + {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{d, d, d}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{d, d}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 1e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 1e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{d, d, d}, 2, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 2e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{d, n, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 3e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 4e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + {gcA, hlc.Timestamp{WallTime: 5e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + } + for i, test := range testData { + test.gc.Threshold = test.time + test.gc.Threshold.WallTime -= int64(test.gc.policy.TTLSeconds) * 1e9 + idx, delTS := test.gc.Filter(test.keys, test.values) + if idx != test.expIdx { + t.Errorf("%d: expected index %d; got %d", i, test.expIdx, idx) + } + if delTS != test.expDelTS { + t.Errorf("%d: expected deletion timestamp %s; got %s", i, test.expDelTS, delTS) + } + } +} diff --git a/pkg/storage/gc/gc_random_test.go b/pkg/storage/gc/gc_random_test.go new file mode 100644 index 000000000000..0f04da4e9fac --- /dev/null +++ b/pkg/storage/gc/gc_random_test.go @@ -0,0 +1,244 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "fmt" + "math/rand" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/stretchr/testify/require" +) + +// randomRunGCTestSpec specifies a distribution for to create random data for +// testing Run +type randomRunGCTestSpec struct { + ds distSpec + now hlc.Timestamp + ttl int32 // seconds +} + +var ( + fewVersionsTinyRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 2, keySuffixMax: 3, + valueLenMin: 1, valueLenMax: 1, + deleteFrac: 0, + keysPerValueMin: 1, keysPerValueMax: 2, + intentFrac: .1, + } + someVersionsMidSizeRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1, keysPerValueMax: 100, + intentFrac: .1, + } + lotsOfVersionsMidSizeRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1000, keysPerValueMax: 1000000, + intentFrac: .1, + } +) + +// TestRunNewVsOld exercises the behavior of Run relative to the old +// implementation. It runs both the new and old implementation and ensures +// that they produce exactly the same results on the same set of keys. +func TestRunNewVsOld(t *testing.T) { + rng := rand.New(rand.NewSource(1)) + ctx := context.Background() + const N = 100000 + + someVersionsMidSizeRowsLotsOfIntents := someVersionsMidSizeRows + someVersionsMidSizeRowsLotsOfIntents.intentFrac = 1 + for _, tc := range []randomRunGCTestSpec{ + { + ds: someVersionsMidSizeRowsLotsOfIntents, + now: hlc.Timestamp{ + WallTime: (IntentAgeThreshold + 100*time.Second).Nanoseconds(), + }, + ttl: int32(IntentAgeThreshold.Seconds()), + }, + { + ds: someVersionsMidSizeRows, + now: hlc.Timestamp{ + WallTime: 100 * time.Second.Nanoseconds(), + }, + ttl: 1, + }, + } { + t.Run(fmt.Sprintf("%v@%v,ttl=%v", tc.ds, tc.now, tc.ttl), func(t *testing.T) { + eng := engine.NewDefaultInMem() + tc.ds.dist(N, rng).setupTest(t, eng, *tc.ds.desc()) + snap := eng.NewSnapshot() + + oldGCer := makeFakeGCer() + gcInfoOld, err := runGCOld(ctx, tc.ds.desc(), snap, tc.now, + zonepb.GCPolicy{TTLSeconds: tc.ttl}, + &oldGCer, + oldGCer.resolveIntents, + oldGCer.resolveIntentsAsync) + require.NoError(t, err) + + newGCer := makeFakeGCer() + gcInfoNew, err := Run(ctx, tc.ds.desc(), snap, tc.now, + zonepb.GCPolicy{TTLSeconds: tc.ttl}, + &newGCer, + newGCer.resolveIntents, + newGCer.resolveIntentsAsync) + require.NoError(t, err) + + oldGCer.normalize() + newGCer.normalize() + require.EqualValues(t, gcInfoOld, gcInfoNew) + require.EqualValues(t, oldGCer, newGCer) + }) + } +} + +// BenchmarkRun benchmarks the old and implementations of Run with different +// data distributions. +func BenchmarkRun(b *testing.B) { + rng := rand.New(rand.NewSource(1)) + ctx := context.Background() + runGC := func(eng engine.Engine, old bool, spec randomRunGCTestSpec) (Info, error) { + runGCFunc := Run + if old { + runGCFunc = runGCOld + } + snap := eng.NewSnapshot() + return runGCFunc(ctx, spec.ds.desc(), snap, spec.now, + zonepb.GCPolicy{TTLSeconds: spec.ttl}, + NoopGCer{}, + func(ctx context.Context, intents []roachpb.Intent) error { + return nil + }, + func(ctx context.Context, txn *roachpb.Transaction, intents []roachpb.Intent) error { + return nil + }) + } + makeTest := func(old bool, spec randomRunGCTestSpec) func(b *testing.B) { + return func(b *testing.B) { + eng := engine.NewDefaultInMem() + defer eng.Close() + ms := spec.ds.dist(b.N, rng).setupTest(b, eng, *spec.ds.desc()) + b.SetBytes(int64(float64(ms.Total()) / float64(b.N))) + b.ResetTimer() + _, err := runGC(eng, old, spec) + b.StopTimer() + require.NoError(b, err) + } + } + specsWithTTLs := func( + ds distSpec, now hlc.Timestamp, ttls []int32, + ) (specs []randomRunGCTestSpec) { + for _, ttl := range ttls { + specs = append(specs, randomRunGCTestSpec{ + ds: ds, + now: now, + ttl: ttl, + }) + } + return specs + } + ts100 := hlc.Timestamp{WallTime: (100 * time.Second).Nanoseconds()} + ttls := []int32{0, 25, 50, 75, 100} + specs := specsWithTTLs(fewVersionsTinyRows, ts100, ttls) + specs = append(specs, specsWithTTLs(someVersionsMidSizeRows, ts100, ttls)...) + specs = append(specs, specsWithTTLs(lotsOfVersionsMidSizeRows, ts100, ttls)...) + for _, old := range []bool{true, false} { + b.Run(fmt.Sprintf("old=%v", old), func(b *testing.B) { + for _, spec := range specs { + b.Run(fmt.Sprint(spec.ds), makeTest(old, spec)) + } + }) + } +} + +type fakeGCer struct { + gcKeys map[string]roachpb.GCRequest_GCKey + threshold Threshold + intents []roachpb.Intent + txnIntents []txnIntents +} + +func makeFakeGCer() fakeGCer { + return fakeGCer{ + gcKeys: make(map[string]roachpb.GCRequest_GCKey), + } +} + +var _ GCer = (*fakeGCer)(nil) + +func (f *fakeGCer) SetGCThreshold(ctx context.Context, t Threshold) error { + f.threshold = t + return nil +} + +func (f *fakeGCer) GC(ctx context.Context, keys []roachpb.GCRequest_GCKey) error { + for _, k := range keys { + f.gcKeys[k.Key.String()] = k + } + return nil +} + +func (f *fakeGCer) resolveIntentsAsync( + _ context.Context, txn *roachpb.Transaction, intents []roachpb.Intent, +) error { + f.txnIntents = append(f.txnIntents, txnIntents{txn: txn, intents: intents}) + return nil +} + +func (f *fakeGCer) resolveIntents(_ context.Context, intents []roachpb.Intent) error { + f.intents = append(f.intents, intents...) + return nil +} + +func (f *fakeGCer) normalize() { + sortIntents := func(i, j int) bool { + return intentLess(&f.intents[i], &f.intents[j]) + } + sort.Slice(f.intents, sortIntents) + for i := range f.txnIntents { + sort.Slice(f.txnIntents[i].intents, sortIntents) + } + sort.Slice(f.txnIntents, func(i, j int) bool { + return f.txnIntents[i].txn.ID.String() < f.txnIntents[j].txn.ID.String() + }) +} + +func intentLess(a, b *roachpb.Intent) bool { + cmp := a.Key.Compare(b.Key) + switch { + case cmp < 0: + return true + case cmp > 0: + return false + default: + return a.Txn.ID.String() < b.Txn.ID.String() + } +} + +type txnIntents struct { + txn *roachpb.Transaction + intents []roachpb.Intent +} diff --git a/pkg/storage/gc_queue.go b/pkg/storage/gc_queue.go index e2374a653c58..3c02a63510b4 100644 --- a/pkg/storage/gc_queue.go +++ b/pkg/storage/gc_queue.go @@ -18,24 +18,15 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" - "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/abortspan" - "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/gc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" ) @@ -45,18 +36,11 @@ const ( // intentAgeNormalization is the average age of outstanding intents // which amount to a score of "1" added to total replica priority. intentAgeNormalization = 24 * time.Hour // 1 day - // intentAgeThreshold is the threshold after which an extant intent - // will be resolved. - intentAgeThreshold = 2 * time.Hour // 2 hour // Thresholds used to decide whether to queue for GC based // on keys and intents. gcKeyScoreThreshold = 2 gcIntentScoreThreshold = 10 - - // gcKeyVersionChunkBytes is the threshold size for splitting - // GCRequests into multiple batches. - gcKeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes ) // gcQueue manages a queue of replicas slated to be scanned in their @@ -95,17 +79,6 @@ func newGCQueue(store *Store, gossip *gossip.Gossip) *gcQueue { return gcq } -// A cleanupIntentsFunc synchronously resolves the supplied intents -// (which may be PENDING, in which case they are first pushed) while -// taking care of proper batching. -type cleanupIntentsFunc func(context.Context, []roachpb.Intent) error - -// A cleanupTxnIntentsFunc asynchronously cleans up intents from a -// transaction record, pushing the transaction first if it is -// PENDING. Once all intents are resolved successfully, removes the -// transaction record. -type cleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.Intent) error - // gcQueueScore holds details about the score returned by makeGCQueueScoreImpl for // testing and logging. The fields in this struct are documented in // makeGCQueueScoreImpl. @@ -340,154 +313,12 @@ func makeGCQueueScoreImpl( return r } -// processLocalKeyRange scans the local range key entries, consisting of -// transaction records, queue last processed timestamps, and range descriptors. -// -// - Transaction entries: -// - For expired transactions , schedule the intents for -// asynchronous resolution. The actual transaction spans are not -// returned for GC in this pass, but are separately GC'ed after -// successful resolution of all intents. The exception is if there -// are no intents on the txn record, in which case it's returned for -// immediate GC. -// -// - Queue last processed times: cleanup any entries which don't match -// this range's start key. This can happen on range merges. -func processLocalKeyRange( - ctx context.Context, - snap engine.Reader, - desc *roachpb.RangeDescriptor, - cutoff hlc.Timestamp, - infoMu *lockableGCInfo, - cleanupTxnIntentsAsyncFn cleanupTxnIntentsAsyncFunc, -) ([]roachpb.GCRequest_GCKey, error) { - infoMu.Lock() - defer infoMu.Unlock() - - var gcKeys []roachpb.GCRequest_GCKey - - handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { - // If the transaction needs to be pushed or there are intents to - // resolve, invoke the cleanup function. - if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { - return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsIntents(txn.IntentSpans, txn)) - } - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp - return nil - } - - handleOneTransaction := func(kv roachpb.KeyValue) error { - var txn roachpb.Transaction - if err := kv.Value.GetProto(&txn); err != nil { - return err - } - infoMu.TransactionSpanTotal++ - if cutoff.LessEq(txn.LastActive()) { - return nil - } - - // The transaction record should be considered for removal. - switch txn.Status { - case roachpb.PENDING: - infoMu.TransactionSpanGCPending++ - case roachpb.STAGING: - infoMu.TransactionSpanGCStaging++ - case roachpb.ABORTED: - infoMu.TransactionSpanGCAborted++ - case roachpb.COMMITTED: - infoMu.TransactionSpanGCCommitted++ - default: - panic(fmt.Sprintf("invalid transaction state: %s", txn)) - } - return handleTxnIntents(kv.Key, &txn) - } - - handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { - if !rangeKey.Equal(desc.StartKey) { - // Garbage collect the last processed timestamp if it doesn't match start key. - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp - } - return nil - } - - handleOne := func(kv roachpb.KeyValue) error { - rangeKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) - if err != nil { - return err - } - if suffix.Equal(keys.LocalTransactionSuffix.AsRawKey()) { - if err := handleOneTransaction(kv); err != nil { - return err - } - } else if suffix.Equal(keys.LocalQueueLastProcessedSuffix.AsRawKey()) { - if err := handleOneQueueLastProcessed(kv, roachpb.RKey(rangeKey)); err != nil { - return err - } - } - return nil - } - - startKey := keys.MakeRangeKeyPrefix(desc.StartKey) - endKey := keys.MakeRangeKeyPrefix(desc.EndKey) - - _, err := engine.MVCCIterate(ctx, snap, startKey, endKey, hlc.Timestamp{}, engine.MVCCScanOptions{}, - func(kv roachpb.KeyValue) (bool, error) { - return false, handleOne(kv) - }) - return gcKeys, err -} - -// processAbortSpan iterates through the local AbortSpan entries -// and collects entries which indicate that a client which was running -// this transaction must have realized that it has been aborted (due to -// heartbeating having failed). The parameter minAge is typically a -// multiple of the heartbeat timeout used by the coordinator. -// -// TODO(tschottdorf): this could be done in Replica.GC itself, but it's -// handy to have it here for stats (though less performant due to sending -// all of the keys over the wire). -func processAbortSpan( - ctx context.Context, - snap engine.Reader, - rangeID roachpb.RangeID, - threshold hlc.Timestamp, - infoMu *lockableGCInfo, -) []roachpb.GCRequest_GCKey { - var gcKeys []roachpb.GCRequest_GCKey - abortSpan := abortspan.New(rangeID) - infoMu.Lock() - defer infoMu.Unlock() - if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { - infoMu.AbortSpanTotal++ - if v.Timestamp.Less(threshold) { - infoMu.AbortSpanGCNum++ - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) - } - return nil - }); err != nil { - // Still return whatever we managed to collect. - log.Warning(ctx, err) - } - return gcKeys -} - -// NoopGCer implements GCer by doing nothing. -type NoopGCer struct{} - -var _ GCer = NoopGCer{} - -// SetGCThreshold implements storage.GCer. -func (NoopGCer) SetGCThreshold(context.Context, GCThreshold) error { return nil } - -// GC implements storage.GCer. -func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil } - type replicaGCer struct { repl *Replica count int32 // update atomically } -var _ GCer = &replicaGCer{} +var _ gc.GCer = &replicaGCer{} func (r *replicaGCer) template() roachpb.GCRequest { desc := r.repl.Desc() @@ -516,7 +347,7 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error { return nil } -func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh GCThreshold) error { +func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh gc.Threshold) error { req := r.template() req.Threshold = thresh.Key return r.send(ctx, req) @@ -575,14 +406,14 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg *config.S // Synchronize the new GC threshold decision with concurrent // AdminVerifyProtectedTimestamp requests. if err := repl.markPendingGC(gcTimestamp, - engine.MakeGarbageCollector(gcTimestamp, *zone.GC).Threshold); err != nil { + gc.CalculateThreshold(gcTimestamp, *zone.GC)); err != nil { log.VEventf(ctx, 1, "not gc'ing replica %v due to pending protection: %v", repl, err) return nil } snap := repl.store.Engine().NewSnapshot() defer snap.Close() - info, err := RunGC(ctx, desc, snap, gcTimestamp, *zone.GC, &replicaGCer{repl: repl}, + info, err := gc.Run(ctx, desc, snap, gcTimestamp, *zone.GC, &replicaGCer{repl: repl}, func(ctx context.Context, intents []roachpb.Intent) error { intentCount, err := repl.store.intentResolver. CleanupIntents(ctx, intents, gcTimestamp, roachpb.PUSH_ABORT) @@ -614,54 +445,11 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg *config.S log.Eventf(ctx, "MVCC stats after GC: %+v", repl.GetMVCCStats()) log.Eventf(ctx, "GC score after GC: %s", makeGCQueueScore(ctx, repl, repl.store.Clock().Now(), sysCfg)) - info.updateMetrics(gcq.store.metrics) - + updateStoreMetricsWithGCInfo(gcq.store.metrics, info) return nil } -// GCInfo contains statistics and insights from a GC run. -type GCInfo struct { - // Now is the timestamp used for age computations. - Now hlc.Timestamp - // Policy is the policy used for this garbage collection cycle. - Policy zonepb.GCPolicy - // Stats about the userspace key-values considered, namely the number of - // keys with GC'able data, the number of "old" intents and the number of - // associated distinct transactions. - NumKeysAffected, IntentsConsidered, IntentTxns int - // TransactionSpanTotal is the total number of entries in the transaction span. - TransactionSpanTotal int - // Summary of transactions which were found GCable (assuming that - // potentially necessary intent resolutions did not fail). - TransactionSpanGCAborted, TransactionSpanGCCommitted int - TransactionSpanGCStaging, TransactionSpanGCPending int - // AbortSpanTotal is the total number of transactions present in the AbortSpan. - AbortSpanTotal int - // AbortSpanConsidered is the number of AbortSpan entries old enough to be - // considered for removal. An "entry" corresponds to one transaction; - // more than one key-value pair may be associated with it. - AbortSpanConsidered int - // AbortSpanGCNum is the number of AbortSpan entries fit for removal (due - // to their transactions having terminated). - AbortSpanGCNum int - // PushTxn is the total number of pushes attempted in this cycle. - PushTxn int - // ResolveTotal is the total number of attempted intent resolutions in - // this cycle. - ResolveTotal int - // Threshold is the computed expiration timestamp. Equal to `Now - Policy`. - Threshold hlc.Timestamp - // AffectedVersionsKeyBytes is the number of (fully encoded) bytes deleted from keys in the storage engine. - // Note that this does not account for compression that the storage engine uses to store data on disk. Real - // space savings tends to be smaller due to this compression, and space may be released only at a later point - // in time. - AffectedVersionsKeyBytes int64 - // AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine. - // See AffectedVersionsKeyBytes for caveats. - AffectedVersionsValBytes int64 -} - -func (info *GCInfo) updateMetrics(metrics *StoreMetrics) { +func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.Info) { metrics.GCNumKeysAffected.Inc(int64(info.NumKeysAffected)) metrics.GCIntentsConsidered.Inc(int64(info.IntentsConsidered)) metrics.GCIntentTxns.Inc(int64(info.IntentTxns)) @@ -677,241 +465,6 @@ func (info *GCInfo) updateMetrics(metrics *StoreMetrics) { metrics.GCResolveTotal.Inc(int64(info.ResolveTotal)) } -type lockableGCInfo struct { - syncutil.Mutex - GCInfo -} - -// GCThreshold holds the key and txn span GC thresholds, respectively. -type GCThreshold struct { - Key hlc.Timestamp - Txn hlc.Timestamp -} - -// A GCer is an abstraction used by the GC queue to carry out chunked deletions. -type GCer interface { - SetGCThreshold(context.Context, GCThreshold) error - GC(context.Context, []roachpb.GCRequest_GCKey) error -} - -// RunGC runs garbage collection for the specified descriptor on the -// provided Engine (which is not mutated). It uses the provided gcFn -// to run garbage collection once on all implicated spans, -// cleanupIntentsFn to resolve intents synchronously, and -// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and -// associated transaction record on success. -func RunGC( - ctx context.Context, - desc *roachpb.RangeDescriptor, - snap engine.Reader, - now hlc.Timestamp, - policy zonepb.GCPolicy, - gcer GCer, - cleanupIntentsFn cleanupIntentsFunc, - cleanupTxnIntentsAsyncFn cleanupTxnIntentsAsyncFunc, -) (GCInfo, error) { - - iter := rditer.NewReplicaDataIterator(desc, snap, true /* replicatedOnly */) - defer iter.Close() - - var infoMu = lockableGCInfo{} - infoMu.Policy = policy - infoMu.Now = now - - // Compute intent expiration (intent age at which we attempt to resolve). - intentExp := now.Add(-intentAgeThreshold.Nanoseconds(), 0) - txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) - - gc := engine.MakeGarbageCollector(now, policy) - infoMu.Threshold = gc.Threshold - - if err := gcer.SetGCThreshold(ctx, GCThreshold{ - Key: gc.Threshold, - Txn: txnExp, - }); err != nil { - return GCInfo{}, errors.Wrap(err, "failed to set GC thresholds") - } - - var batchGCKeys []roachpb.GCRequest_GCKey - var batchGCKeysBytes int64 - var expBaseKey roachpb.Key - var keys []engine.MVCCKey - var vals [][]byte - var keyBytes int64 - var valBytes int64 - - // Maps from txn ID to txn and intent key slice. - txnMap := map[uuid.UUID]*roachpb.Transaction{} - intentSpanMap := map[uuid.UUID][]roachpb.Span{} - - // processKeysAndValues is invoked with each key and its set of - // values. Intents older than the intent age threshold are sent for - // resolution and values after the MVCC metadata, and possible - // intent, are sent for garbage collection. - processKeysAndValues := func() { - // If there's more than a single value for the key, possibly send for GC. - if len(keys) > 1 { - meta := &enginepb.MVCCMetadata{} - if err := protoutil.Unmarshal(vals[0], meta); err != nil { - log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keys[0], err) - } else { - // In the event that there's an active intent, send for - // intent resolution if older than the threshold. - startIdx := 1 - if meta.Txn != nil { - // Keep track of intent to resolve if older than the intent - // expiration threshold. - if hlc.Timestamp(meta.Timestamp).Less(intentExp) { - txnID := meta.Txn.ID - if _, ok := txnMap[txnID]; !ok { - txnMap[txnID] = &roachpb.Transaction{ - TxnMeta: *meta.Txn, - } - // IntentTxns and PushTxn will be equal here, since - // pushes to transactions whose record lies in this - // range (but which are not associated to a remaining - // intent on it) happen asynchronously and are accounted - // for separately. Thus higher up in the stack, we - // expect PushTxn > IntentTxns. - infoMu.IntentTxns++ - // All transactions in txnMap may be PENDING and - // cleanupIntentsFn will push them to finalize them. - infoMu.PushTxn++ - } - infoMu.IntentsConsidered++ - intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{Key: expBaseKey}) - } - // With an active intent, GC ignores MVCC metadata & intent value. - startIdx = 2 - } - // See if any values may be GC'd. - if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) { - // Batch keys after the total size of version keys exceeds - // the threshold limit. This avoids sending potentially large - // GC requests through Raft. Iterate through the keys in reverse - // order so that GC requests can be made multiple times even on - // a single key, with successively newer timestamps to prevent - // any single request from exploding during GC evaluation. - for i := len(keys) - 1; i >= startIdx+idx; i-- { - keyBytes = int64(keys[i].EncodedSize()) - valBytes = int64(len(vals[i])) - - // Add the total size of the GC'able versions of the keys and values to GCInfo. - infoMu.GCInfo.AffectedVersionsKeyBytes += keyBytes - infoMu.GCInfo.AffectedVersionsValBytes += valBytes - - batchGCKeysBytes += keyBytes - // If the current key brings the batch over the target - // size, add the current timestamp to finish the current - // chunk and start a new one. - if batchGCKeysBytes >= gcKeyVersionChunkBytes { - batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp}) - - err := gcer.GC(ctx, batchGCKeys) - - // Succeed or fail, allow releasing the memory backing batchGCKeys. - iter.ResetAllocator() - batchGCKeys = nil - batchGCKeysBytes = 0 - - if err != nil { - // Even though we are batching the GC process, it's - // safe to continue because we bumped the GC - // thresholds. We may leave some inconsistent history - // behind, but nobody can read it. - log.Warning(ctx, err) - return - } - } - } - // Add the key to the batch at the GC timestamp, unless it was already added. - if batchGCKeysBytes != 0 { - batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: gcTS}) - } - infoMu.NumKeysAffected++ - } - } - } - } - - // Iterate through the keys and values of this replica's range. - log.Event(ctx, "iterating through range") - for ; ; iter.Next() { - if ok, err := iter.Valid(); err != nil { - return GCInfo{}, err - } else if !ok { - break - } else if ctx.Err() != nil { - // Stop iterating if our context has expired. - return GCInfo{}, err - } - iterKey := iter.Key() - if !iterKey.IsValue() || !iterKey.Key.Equal(expBaseKey) { - // Moving to the next key (& values). - processKeysAndValues() - expBaseKey = iterKey.Key - if !iterKey.IsValue() { - keys = []engine.MVCCKey{iter.Key()} - vals = [][]byte{iter.Value()} - continue - } - // An implicit metadata. - keys = []engine.MVCCKey{engine.MakeMVCCMetadataKey(iterKey.Key)} - // A nil value for the encoded MVCCMetadata. This will unmarshal to an - // empty MVCCMetadata which is sufficient for processKeysAndValues to - // determine that there is no intent. - vals = [][]byte{nil} - } - keys = append(keys, iter.Key()) - vals = append(vals, iter.Value()) - } - // Handle last collected set of keys/vals. - processKeysAndValues() - if len(batchGCKeys) > 0 { - if err := gcer.GC(ctx, batchGCKeys); err != nil { - return GCInfo{}, err - } - } - - // From now on, all newly added keys are range-local. - - // Process local range key entries (txn records, queue last processed times). - localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &infoMu, cleanupTxnIntentsAsyncFn) - if err != nil { - return GCInfo{}, err - } - - if err := gcer.GC(ctx, localRangeKeys); err != nil { - return GCInfo{}, err - } - - // Clean up the AbortSpan. - log.Event(ctx, "processing AbortSpan") - abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &infoMu) - if err := gcer.GC(ctx, abortSpanKeys); err != nil { - return GCInfo{}, err - } - - infoMu.Lock() - log.Eventf(ctx, "GC'ed keys; stats %+v", infoMu.GCInfo) - infoMu.Unlock() - - // Push transactions (if pending) and resolve intents. - var intents []roachpb.Intent - for txnID, txn := range txnMap { - intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) - } - infoMu.Lock() - infoMu.ResolveTotal += len(intents) - infoMu.Unlock() - log.Eventf(ctx, "cleanup of %d intents", len(intents)) - if err := cleanupIntentsFn(ctx, intents); err != nil { - return GCInfo{}, err - } - - return infoMu.GCInfo, nil -} - // timer returns a constant duration to space out GC processing // for successive queued replicas. func (*gcQueue) timer(_ time.Duration) time.Duration { diff --git a/pkg/storage/gc_queue_test.go b/pkg/storage/gc_queue_test.go index b05156adf112..68f01d9f0fbf 100644 --- a/pkg/storage/gc_queue_test.go +++ b/pkg/storage/gc_queue_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/gc" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -68,7 +69,7 @@ func TestGCQueueScoreString(t *testing.T) { }, `queue=true with 4.31/fuzz(1.25)=3.45=valScaleScore(4.00)*deadFrac(0.25)+intentScore(0.45) likely last GC: 5s ago, 3.0 KiB non-live, curr. age 512 KiB*s, min exp. reduction: 256 KiB*s`}, - // Check case of empty GCThreshold. + // Check case of empty Threshold. {gcQueueScore{ShouldQueue: true}, `queue=true with 0.00/fuzz(0.00)=NaN=valScaleScore(0.00)*deadFrac(0.00)+intentScore(0.00) likely last GC: never, 0 B non-live, curr. age 0 B*s, min exp. reduction: 0 B*s`}, } { @@ -373,12 +374,12 @@ func TestGCQueueProcess(t *testing.T) { tc.manualClock.Increment(48 * 60 * 60 * 1e9) // 2d past the epoch now := tc.Clock().Now().WallTime - ts1 := makeTS(now-2*24*60*60*1e9+1, 0) // 2d old (add one nanosecond so we're not using zero timestamp) - ts2 := makeTS(now-25*60*60*1e9, 0) // GC will occur at time=25 hours - ts2m1 := ts2.Prev() // ts2 - 1 so we have something not right at the GC time - ts3 := makeTS(now-intentAgeThreshold.Nanoseconds(), 0) // 2h old - ts4 := makeTS(now-(intentAgeThreshold.Nanoseconds()-1), 0) // 2h-1ns old - ts5 := makeTS(now-1e9, 0) // 1s old + ts1 := makeTS(now-2*24*60*60*1e9+1, 0) // 2d old (add one nanosecond so we're not using zero timestamp) + ts2 := makeTS(now-25*60*60*1e9, 0) // GC will occur at time=25 hours + ts2m1 := ts2.Prev() // ts2 - 1 so we have something not right at the GC time + ts3 := makeTS(now-gc.IntentAgeThreshold.Nanoseconds(), 0) // 2h old + ts4 := makeTS(now-(gc.IntentAgeThreshold.Nanoseconds()-1), 0) // 2h-1ns old + ts5 := makeTS(now-1e9, 0) // 1s old key1 := roachpb.Key("a") key2 := roachpb.Key("b") key3 := roachpb.Key("c") @@ -481,7 +482,7 @@ func TestGCQueueProcess(t *testing.T) { t.Fatal("config not set") } - // The total size of the GC'able versions of the keys and values in GCInfo. + // The total size of the GC'able versions of the keys and values in Info. // Key size: len("a") + MVCCVersionTimestampSize (13 bytes) = 14 bytes. // Value size: len("value") + headerSize (5 bytes) = 10 bytes. // key1 at ts1 (14 bytes) => "value" (10 bytes) @@ -494,8 +495,8 @@ func TestGCQueueProcess(t *testing.T) { var expectedVersionsKeyBytes int64 = 7 * 14 var expectedVersionsValBytes int64 = 5 * 10 - // Call RunGC with dummy functions to get current GCInfo. - gcInfo, err := func() (GCInfo, error) { + // Call Run with dummy functions to get current Info. + gcInfo, err := func() (gc.Info, error) { snap := tc.repl.store.Engine().NewSnapshot() desc := tc.repl.Desc() defer snap.Close() @@ -506,8 +507,8 @@ func TestGCQueueProcess(t *testing.T) { } now := tc.Clock().Now() - return RunGC(ctx, desc, snap, now, *zone.GC, - NoopGCer{}, + return gc.Run(ctx, desc, snap, now, *zone.GC, + gc.NoopGCer{}, func(ctx context.Context, intents []roachpb.Intent) error { return nil }, @@ -862,7 +863,7 @@ func TestGCQueueIntentResolution(t *testing.T) { newTransaction("txn1", roachpb.Key("0-0"), 1, tc.Clock()), newTransaction("txn2", roachpb.Key("1-0"), 1, tc.Clock()), } - intentResolveTS := makeTS(now-intentAgeThreshold.Nanoseconds(), 0) + intentResolveTS := makeTS(now-gc.IntentAgeThreshold.Nanoseconds(), 0) txns[0].ReadTimestamp = intentResolveTS txns[0].WriteTimestamp = intentResolveTS txns[1].ReadTimestamp = intentResolveTS @@ -991,12 +992,12 @@ func TestGCQueueChunkRequests(t *testing.T) { tc.StartWithStoreConfig(t, stopper, tsc) const keyCount = 100 - if gcKeyVersionChunkBytes%keyCount != 0 { + if gc.KeyVersionChunkBytes%keyCount != 0 { t.Fatalf("expected gcKeyVersionChunkBytes to be a multiple of %d", keyCount) } // Reduce the key size by MVCCVersionTimestampSize (13 bytes) to prevent batch overflow. // This is due to MVCCKey.EncodedSize(), which returns the full size of the encoded key. - const keySize = (gcKeyVersionChunkBytes / keyCount) - 13 + const keySize = (gc.KeyVersionChunkBytes / keyCount) - 13 // Create a format string for creating version keys of exactly // length keySize. fmtStr := fmt.Sprintf("%%0%dd", keySize) diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index f002f1482eed..3cab0bfa9a19 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -37,7 +37,7 @@ type KeyRange struct { type ReplicaDataIterator struct { curIndex int ranges []KeyRange - it engine.SimpleIterator + it engine.Iterator a bufalloc.ByteAllocator } @@ -112,7 +112,7 @@ func MakeUserKeyRange(d *roachpb.RangeDescriptor) KeyRange { // NewReplicaDataIterator creates a ReplicaDataIterator for the given replica. func NewReplicaDataIterator( - d *roachpb.RangeDescriptor, reader engine.Reader, replicatedOnly bool, + d *roachpb.RangeDescriptor, reader engine.Reader, replicatedOnly bool, seekEnd bool, ) *ReplicaDataIterator { it := reader.NewIterator(engine.IterOptions{UpperBound: d.EndKey.AsRawKey()}) @@ -124,9 +124,26 @@ func NewReplicaDataIterator( ranges: rangeFunc(d), it: it, } + if seekEnd { + ri.seekEnd() + } else { + ri.seekStart() + } + return ri +} + +// seekStart seeks the iterator to the start of its data range. +func (ri *ReplicaDataIterator) seekStart() { + ri.curIndex = 0 ri.it.SeekGE(ri.ranges[ri.curIndex].Start) ri.advance() - return ri +} + +// seekEnd seeks the iterator to the end of its data range. +func (ri *ReplicaDataIterator) seekEnd() { + ri.curIndex = len(ri.ranges) - 1 + ri.it.SeekLT(ri.ranges[ri.curIndex].End) + ri.retreat() } // Close the underlying iterator. @@ -158,10 +175,31 @@ func (ri *ReplicaDataIterator) advance() { } } +// Prev advances the iterator one key backwards. +func (ri *ReplicaDataIterator) Prev() { + ri.it.Prev() + ri.retreat() +} + +// retreat is the opposite of advance. +func (ri *ReplicaDataIterator) retreat() { + for { + if ok, _ := ri.Valid(); ok && ri.ranges[ri.curIndex].Start.Less(ri.it.UnsafeKey()) { + return + } + ri.curIndex-- + if ri.curIndex >= 0 { + ri.it.SeekLT(ri.ranges[ri.curIndex].End) + } else { + return + } + } +} + // Valid returns true if the iterator currently points to a valid value. func (ri *ReplicaDataIterator) Valid() (bool, error) { ok, err := ri.it.Valid() - ok = ok && ri.curIndex < len(ri.ranges) + ok = ok && ri.curIndex >= 0 && ri.curIndex < len(ri.ranges) return ok, err } @@ -179,6 +217,18 @@ func (ri *ReplicaDataIterator) Value() []byte { return value } +// UnsafeKey returns the same value as Key, but the memory is invalidated on +// the next call to {Next,Prev,Close}. +func (ri *ReplicaDataIterator) UnsafeKey() engine.MVCCKey { + return ri.it.UnsafeKey() +} + +// UnsafeValue returns the same value as Value, but the memory is invalidated on +// the next call to {Next,Prev,Close}. +func (ri *ReplicaDataIterator) UnsafeValue() []byte { + return ri.it.UnsafeValue() +} + // ResetAllocator resets the ReplicaDataIterator's internal byte allocator. func (ri *ReplicaDataIterator) ResetAllocator() { ri.a = nil diff --git a/pkg/storage/rditer/replica_data_iter_test.go b/pkg/storage/rditer/replica_data_iter_test.go index cfb32f0bf4a4..02a541263685 100644 --- a/pkg/storage/rditer/replica_data_iter_test.go +++ b/pkg/storage/rditer/replica_data_iter_test.go @@ -125,7 +125,7 @@ func verifyRDIter( expectedKeys []engine.MVCCKey, ) { t.Helper() - testutils.RunTrueAndFalse(t, "spanset", func(t *testing.T, useSpanSet bool) { + verify := func(t *testing.T, useSpanSet, reverse bool) { if useSpanSet { var spans spanset.SpanSet spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ @@ -142,16 +142,22 @@ func verifyRDIter( }, hlc.Timestamp{WallTime: 42}) readWriter = spanset.NewReadWriterAt(readWriter, &spans, hlc.Timestamp{WallTime: 42}) } - iter := NewReplicaDataIterator(desc, readWriter, replicatedOnly) + iter := NewReplicaDataIterator(desc, readWriter, replicatedOnly, reverse /* seekEnd */) defer iter.Close() i := 0 - for ; ; iter.Next() { + if reverse { + i = len(expectedKeys) - 1 + } + for { if ok, err := iter.Valid(); err != nil { t.Fatal(err) } else if !ok { break } - if i >= len(expectedKeys) { + if !reverse && i >= len(expectedKeys) { + t.Fatal("there are more keys in the iteration than expected") + } + if reverse && i < 0 { t.Fatal("there are more keys in the iteration than expected") } if key := iter.Key(); !key.Equal(expectedKeys[i]) { @@ -159,11 +165,22 @@ func verifyRDIter( k2, ts2 := expectedKeys[i].Key, expectedKeys[i].Timestamp t.Errorf("%d: expected %q(%d); got %q(%d)", i, k2, ts2, k1, ts1) } - i++ + if reverse { + i-- + iter.Prev() + } else { + i++ + iter.Next() + } } - if i != len(expectedKeys) { + if (reverse && i >= 0) || (!reverse && i != len(expectedKeys)) { t.Fatal("there are fewer keys in the iteration than expected") } + } + testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { + testutils.RunTrueAndFalse(t, "spanset", func(t *testing.T, useSpanSet bool) { + verify(t, useSpanSet, reverse) + }) }) } @@ -224,7 +241,8 @@ func TestReplicaDataIterator(t *testing.T) { // Verify that the replicated-only iterator ignores unreplicated keys. unreplicatedPrefix := keys.MakeRangeIDUnreplicatedPrefix(desc.RangeID) - iter := NewReplicaDataIterator(&desc, eng, true /* replicatedOnly */) + iter := NewReplicaDataIterator(&desc, eng, + true /* replicatedOnly */, false /* seekEnd */) defer iter.Close() for ; ; iter.Next() { if ok, err := iter.Valid(); err != nil { diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 34ded0258c9d..62d4512d4329 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -544,7 +544,8 @@ func snapshot( // Intentionally let this iterator and the snapshot escape so that the // streamer can send chunks from it bit by bit. - iter := rditer.NewReplicaDataIterator(&desc, snap, true /* replicatedOnly */) + iter := rditer.NewReplicaDataIterator(&desc, snap, + true /* replicatedOnly */, false /* seekEnd */) return OutgoingSnapshot{ RaftEntryCache: eCache, diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 94bbd8010baf..1469430c32cb 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6813,7 +6813,8 @@ func TestReplicaDestroy(t *testing.T) { } }() - iter := rditer.NewReplicaDataIterator(tc.repl.Desc(), tc.repl.store.Engine(), false /* replicatedOnly */) + iter := rditer.NewReplicaDataIterator(tc.repl.Desc(), tc.repl.store.Engine(), + false /* replicatedOnly */, false /* seekEnd */) defer iter.Close() if ok, err := iter.Valid(); err != nil { t.Fatal(err) diff --git a/pkg/storage/spanset/batch.go b/pkg/storage/spanset/batch.go index d6bb8510ca1b..8560cffc0eaa 100644 --- a/pkg/storage/spanset/batch.go +++ b/pkg/storage/spanset/batch.go @@ -81,13 +81,11 @@ func (i *Iterator) SeekGE(key engine.MVCCKey) { // SeekLT is part of the engine.Iterator interface. func (i *Iterator) SeekLT(key engine.MVCCKey) { - // NB: this isn't exactly right because the key provided to SeekLT is - // exclusive so requesting the first key in an allowed span should not - // be permitted, but it's close enough. + const spanKeyExclusive = true if i.spansOnly { - i.err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) + i.err = i.spans.checkAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}, spanKeyExclusive) } else { - i.err = i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: key.Key}, i.ts) + i.err = i.spans.checkAllowedAt(SpanReadOnly, roachpb.Span{Key: key.Key}, i.ts, spanKeyExclusive) } if i.err == nil { i.invalid = false diff --git a/pkg/storage/spanset/spanset.go b/pkg/storage/spanset/spanset.go index edc223d35fe3..2b4988c1dbc8 100644 --- a/pkg/storage/spanset/spanset.go +++ b/pkg/storage/spanset/spanset.go @@ -179,6 +179,12 @@ func (s *SpanSet) AssertAllowed(access SpanAccess, span roachpb.Span) { // is also a problem if the added spans were read only and the spanset wasn't // already SortAndDedup-ed. func (s *SpanSet) CheckAllowed(access SpanAccess, span roachpb.Span) error { + return s.checkAllowed(access, span, false /* spanKeyExclusive */) +} + +// See CheckAllowed(). The reversed arguments makes the lower bound exclusive +// and the upper bound inclusive, i.e. [a,b) will be considered (a,b]. +func (s *SpanSet) checkAllowed(access SpanAccess, span roachpb.Span, reversed bool) error { scope := SpanGlobal if keys.IsLocal(span.Key) { scope = SpanLocal @@ -186,7 +192,9 @@ func (s *SpanSet) CheckAllowed(access SpanAccess, span roachpb.Span) error { for ac := access; ac < NumSpanAccess; ac++ { for _, cur := range s.spans[ac][scope] { - if cur.Contains(span) { + if cur.Contains(span) && + (!reversed || cur.EndKey != nil && !cur.Key.Equal(span.Key)) || + reversed && cur.EndKey.Equal(span.Key) { return nil } } @@ -199,6 +207,14 @@ func (s *SpanSet) CheckAllowed(access SpanAccess, span roachpb.Span) error { // at the given timestamp. func (s *SpanSet) CheckAllowedAt( access SpanAccess, span roachpb.Span, timestamp hlc.Timestamp, +) error { + return s.checkAllowedAt(access, span, timestamp, false /* inclusiveEnd */) +} + +// See CheckAllowedAt. The reversed arguments makes the lower bound exclusive +// and the upper bound inclusive, i.e. [a,b) will be considered (a,b]. +func (s *SpanSet) checkAllowedAt( + access SpanAccess, span roachpb.Span, timestamp hlc.Timestamp, reversed bool, ) error { scope := SpanGlobal if keys.IsLocal(span.Key) { @@ -207,7 +223,9 @@ func (s *SpanSet) CheckAllowedAt( for ac := access; ac < NumSpanAccess; ac++ { for _, cur := range s.spans[ac][scope] { - if cur.Contains(span) { + if (cur.Contains(span) && + (!reversed || (cur.EndKey != nil && !cur.Key.Equal(span.Key)))) || + (reversed && cur.EndKey.Equal(span.Key)) { if cur.Timestamp.IsEmpty() { // When the span is acquired as non-MVCC (i.e. with an empty // timestamp), it's equivalent to a read/write mutex where we don't diff --git a/pkg/storage/spanset/spanset_test.go b/pkg/storage/spanset/spanset_test.go index a986fd596ee5..45912d5f4cac 100644 --- a/pkg/storage/spanset/spanset_test.go +++ b/pkg/storage/spanset/spanset_test.go @@ -52,10 +52,10 @@ func TestSpanSetGetSpansScope(t *testing.T) { func TestSpanSetCheckAllowedBoundaries(t *testing.T) { defer leaktest.AfterTest(t)() - var ss SpanSet - ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}) - ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}) - ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("q")}) + var bdGkq SpanSet + bdGkq.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}) + bdGkq.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}) + bdGkq.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("q")}) allowed := []roachpb.Span{ // Exactly as declared. @@ -73,7 +73,7 @@ func TestSpanSetCheckAllowedBoundaries(t *testing.T) { {Key: roachpb.Key("l"), EndKey: roachpb.Key("m")}, } for _, span := range allowed { - if err := ss.CheckAllowed(SpanReadOnly, span); err != nil { + if err := bdGkq.CheckAllowed(SpanReadOnly, span); err != nil { t.Errorf("expected %s to be allowed, but got error: %+v", span, err) } } @@ -102,7 +102,7 @@ func TestSpanSetCheckAllowedBoundaries(t *testing.T) { {Key: roachpb.Key("k"), EndKey: roachpb.Key("q").Next()}, } for _, span := range disallowed { - if err := ss.CheckAllowed(SpanReadOnly, span); err == nil { + if err := bdGkq.CheckAllowed(SpanReadOnly, span); err == nil { t.Errorf("expected %s to be disallowed", span) } } @@ -218,6 +218,71 @@ func TestSpanSetCheckAllowedAtTimestamps(t *testing.T) { } } +func TestSpanSetCheckAllowedReversed(t *testing.T) { + defer leaktest.AfterTest(t)() + + var bdGkq SpanSet + bdGkq.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}) + bdGkq.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}) + bdGkq.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("q")}) + + allowed := []roachpb.Span{ + // Exactly as declared. + {Key: roachpb.Key("d")}, + {Key: roachpb.Key("q")}, + } + for _, span := range allowed { + if err := bdGkq.checkAllowed(SpanReadOnly, span, true /* spanKeyExclusive */); err != nil { + t.Errorf("expected %s to be allowed, but got error: %+v", span, err) + } + } + + disallowed := []roachpb.Span{ + // Points outside the declared spans, and on the endpoints. + {Key: roachpb.Key("b")}, + {Key: roachpb.Key("g")}, + {Key: roachpb.Key("k")}, + } + for _, span := range disallowed { + if err := bdGkq.checkAllowed(SpanReadOnly, span, true /* spanKeyExclusive */); err == nil { + t.Errorf("expected %s to be disallowed", span) + } + } +} + +func TestSpanSetCheckAllowedAtReversed(t *testing.T) { + defer leaktest.AfterTest(t)() + + ts := hlc.Timestamp{WallTime: 42} + var bdGkq SpanSet + bdGkq.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, ts) + bdGkq.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}, ts) + bdGkq.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("q")}, ts) + + allowed := []roachpb.Span{ + // Exactly as declared. + {Key: roachpb.Key("d")}, + {Key: roachpb.Key("q")}, + } + for _, span := range allowed { + if err := bdGkq.checkAllowedAt(SpanReadOnly, span, ts, true /* spanKeyExclusive */); err != nil { + t.Errorf("expected %s to be allowed, but got error: %+v", span, err) + } + } + + disallowed := []roachpb.Span{ + // Points outside the declared spans, and on the endpoints. + {Key: roachpb.Key("b")}, + {Key: roachpb.Key("g")}, + {Key: roachpb.Key("k")}, + } + for _, span := range disallowed { + if err := bdGkq.checkAllowedAt(SpanReadOnly, span, ts, true /* spanKeyExclusive */); err == nil { + t.Errorf("expected %s to be disallowed", span) + } + } +} + // Test that a span declared for write access also implies read // access, but not vice-versa. func TestSpanSetWriteImpliesRead(t *testing.T) { diff --git a/pkg/storage/store_snapshot_test.go b/pkg/storage/store_snapshot_test.go index adec003cf74f..6c866db8ecc7 100644 --- a/pkg/storage/store_snapshot_test.go +++ b/pkg/storage/store_snapshot_test.go @@ -70,7 +70,8 @@ func TestSnapshotRaftLogLimit(t *testing.T) { limiter: rate.NewLimiter(1<<10, 1), newBatch: eng.NewBatch, } - iter := rditer.NewReplicaDataIterator(repl.Desc(), snap, true /* replicatedOnly */) + iter := rditer.NewReplicaDataIterator(repl.Desc(), snap, + true /* replicatedOnly */, false /* seekEnd */) defer iter.Close() outSnap := &OutgoingSnapshot{ Iter: iter,