diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 077e2375a685..41e7ed694b7b 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -532,7 +532,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error { for _, desc := range descs { snap := db.NewSnapshot() defer snap.Close() - info, err := gc.RunGC( + info, err := gc.Run( context.Background(), &desc, snap, diff --git a/pkg/storage/gc/data_distribution_test.go b/pkg/storage/gc/data_distribution_test.go new file mode 100644 index 000000000000..7d9e133865e7 --- /dev/null +++ b/pkg/storage/gc/data_distribution_test.go @@ -0,0 +1,254 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "fmt" + "math/rand" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// dataDistribution is an abstraction for testing that represents a stream of +// MVCCKeyValues. The stream may indicate that a value is an intent by returning +// a non-nil transaction. If an intent is returned it must have a higher +// timestamp than any other version written for the key. +type dataDistribution func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) + +// setupTest writes the data from this distribution into eng. All data should +// be a part of the range represented by desc. +func (ds dataDistribution) setupTest( + t testing.TB, eng engine.Engine, desc roachpb.RangeDescriptor, +) enginepb.MVCCStats { + ctx := context.TODO() + var maxTs hlc.Timestamp + var ms enginepb.MVCCStats + for { + kv, txn, ok := ds() + if !ok { + break + } + if txn == nil { + require.NoError(t, eng.Put(kv.Key, kv.Value)) + } else { + // TODO(ajwerner): Decide if using MVCCPut is worth it. + ts := kv.Key.Timestamp + if txn.RefreshedTimestamp == (hlc.Timestamp{}) { + txn.RefreshedTimestamp = ts + } + if txn.Timestamp == (hlc.Timestamp{}) { + txn.Timestamp = ts + } + err := engine.MVCCPut(ctx, eng, &ms, kv.Key.Key, ts, + roachpb.Value{RawBytes: kv.Value}, txn) + require.NoError(t, err) + } + if !kv.Key.Timestamp.Less(maxTs) { + maxTs = kv.Key.Timestamp + } + } + require.NoError(t, eng.Flush()) + snap := eng.NewSnapshot() + defer snap.Close() + ms, err := rditer.ComputeStatsForRange(&desc, snap, maxTs.WallTime) + require.NoError(t, err) + return ms +} + +// newDataDistribution constructs a dataDistribution from various underlying +// distributions. +func newDataDistribution( + tsDist func() hlc.Timestamp, + keyDist func() roachpb.Key, + valueDist func() []byte, + versionsPerKey func() int, + intentFrac float64, + totalKeys int, + rng *rand.Rand, +) dataDistribution { + // TODO(ajwerner): provide a mechanism to control the rate of expired intents + // or the intent age. Such a knob would likely require decoupling intents from + // other keys. + var ( + remaining = totalKeys + key roachpb.Key + seen = map[string]struct{}{} + timestamps []hlc.Timestamp + haveIntent bool + ) + return func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) { + if remaining == 0 { + return engine.MVCCKeyValue{}, nil, false + } + defer func() { remaining-- }() + for len(timestamps) == 0 { + versions := versionsPerKey() + if versions == 0 { + continue + } + if versions > remaining { + versions = remaining + } + timestamps = make([]hlc.Timestamp, 0, versions) + for i := 0; i < versions; i++ { + timestamps = append(timestamps, tsDist()) + } + sort.Slice(timestamps, func(i, j int) bool { + return timestamps[i].Less(timestamps[j]) + }) + for { + key = keyDist() + sk := string(key) + if _, ok := seen[sk]; ok { + continue + } + seen[sk] = struct{}{} + break + } + haveIntent = rng.Float64() < intentFrac + } + ts := timestamps[0] + timestamps = timestamps[1:] + var txn *roachpb.Transaction + if len(timestamps) == 0 && haveIntent { + txn = &roachpb.Transaction{ + Status: roachpb.PENDING, + RefreshedTimestamp: ts, + MaxTimestamp: ts.Next().Next(), + } + txn.ID = uuid.MakeV4() + txn.Timestamp = ts + txn.Key = keyDist() + } + return engine.MVCCKeyValue{ + Key: engine.MVCCKey{Key: key, Timestamp: ts}, + Value: valueDist(), + }, txn, true + } +} + +// distSpec abstractly represents a distribution. +type distSpec interface { + dist(maxRows int, rng *rand.Rand) dataDistribution + desc() *roachpb.RangeDescriptor + String() string +} + +// uniformDistSpec is a distSpec which represents uniform distributions over its +// various dimensions. +type uniformDistSpec struct { + tsFrom, tsTo int64 // seconds + keySuffixMin, keySuffixMax int + valueLenMin, valueLenMax int + deleteFrac float64 + keysPerValueMin, keysPerValueMax int + intentFrac float64 +} + +var _ distSpec = uniformDistSpec{} + +func (ds uniformDistSpec) dist(maxRows int, rng *rand.Rand) dataDistribution { + return newDataDistribution( + uniformTimestampDistribution(ds.tsFrom*time.Second.Nanoseconds(), ds.tsTo*time.Second.Nanoseconds(), rng), + uniformTableKeyDistribution(ds.desc().StartKey.AsRawKey(), ds.keySuffixMin, ds.keySuffixMax, rng), + uniformValueDistribution(ds.valueLenMin, ds.valueLenMax, ds.deleteFrac, rng), + uniformValuesPerKey(ds.keysPerValueMin, ds.keysPerValueMax, rng), + ds.intentFrac, + maxRows, + rng, + ) +} + +func (ds uniformDistSpec) desc() *roachpb.RangeDescriptor { + tablePrefix := keys.MakeTablePrefix(42) + return &roachpb.RangeDescriptor{ + StartKey: tablePrefix, + EndKey: roachpb.RKey(roachpb.Key(tablePrefix).PrefixEnd()), + } +} + +func (ds uniformDistSpec) String() string { + return fmt.Sprintf( + "ts=[%d,%d],"+ + "keySuffix=[%d,%d],"+ + "valueLen=[%d,%d],"+ + "keysPerValue=[%d,%d],"+ + "deleteFrac=%f,intentFrac=%f", + ds.tsFrom, ds.tsTo, + ds.keySuffixMin, ds.keySuffixMax, + ds.valueLenMin, ds.valueLenMax, + ds.keysPerValueMin, ds.keysPerValueMax, + ds.deleteFrac, ds.intentFrac) +} + +// uniformTimestamp returns an hlc timestamp distribution with a wall time +// uniform over [from, to] and a zero logical timestamp. +func uniformTimestampDistribution(from, to int64, rng *rand.Rand) func() hlc.Timestamp { + if from >= to { + panic(fmt.Errorf("from (%d) >= to (%d)", from, to)) + } + n := int(to-from) + 1 + return func() hlc.Timestamp { + return hlc.Timestamp{WallTime: from + int64(rng.Intn(n))} + } +} + +// returns a uniform length random value distribution. +func uniformValueDistribution(min, max int, deleteFrac float64, rng *rand.Rand) func() []byte { + if min > max { + panic(fmt.Errorf("min (%d) > max (%d)", min, max)) + } + n := (max - min) + 1 + return func() []byte { + if rng.Float64() < deleteFrac { + return nil + } + value := make([]byte, min+rng.Intn(n)) + if _, err := rng.Read(value); err != nil { + panic(err) + } + return value + } +} + +func uniformValuesPerKey(valuesPerKeyMin, valuesPerKeyMax int, rng *rand.Rand) func() int { + if valuesPerKeyMin > valuesPerKeyMax { + panic(fmt.Errorf("min (%d) > max (%d)", valuesPerKeyMin, valuesPerKeyMax)) + } + n := (valuesPerKeyMax - valuesPerKeyMin) + 1 + return func() int { return valuesPerKeyMin + rng.Intn(n) } +} + +func uniformTableKeyDistribution( + prefix roachpb.Key, suffixMin, suffixMax int, rng *rand.Rand, +) func() roachpb.Key { + if suffixMin > suffixMax { + panic(fmt.Errorf("suffixMin (%d) > suffixMax (%d)", suffixMin, suffixMax)) + } + n := (suffixMax - suffixMin) + 1 + return func() roachpb.Key { + randData := make([]byte, suffixMin+rng.Intn(n)) + _, _ = rng.Read(randData) + return encoding.EncodeBytesAscending(prefix[0:len(prefix):len(prefix)], randData) + } +} diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index a397b0d89fa9..68205a0788bf 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -1,4 +1,4 @@ -// Copyright 2014 The Cockroach Authors. +// Copyright 2020 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,79 +8,491 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +// Package gc contains the logic to run scan a range for garbage and issue +// GC requests to remove that garbage. +// +// The Run function is the primary entry point and is called underneath the +// gcQueue in the storage package. It can also be run for debugging. package gc import ( - "sort" + "context" + "fmt" + "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +const ( + // IntentAgeThreshold is the threshold after which an extant intent + // will be resolved. + IntentAgeThreshold = 2 * time.Hour // 2 hour + + // KeyVersionChunkBytes is the threshold size for splitting + // GCRequests into multiple batches. + KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes ) -// GarbageCollector GCs MVCC key/values using a zone-specific GC -// policy allows either the union or intersection of maximum # of -// versions and maximum age. -type GarbageCollector struct { +// CalculateThreshold calculates the GC threshold given the policy and the +// current view of time. +func CalculateThreshold(now hlc.Timestamp, policy config.GCPolicy) (threshold hlc.Timestamp) { + ttlNanos := int64(policy.TTLSeconds) * time.Second.Nanoseconds() + return hlc.Timestamp{WallTime: now.WallTime - ttlNanos} +} + +// A GCer is an abstraction used by the GC queue to carry out chunked deletions. +type GCer interface { + SetGCThreshold(context.Context, Threshold) error + GC(context.Context, []roachpb.GCRequest_GCKey) error +} + +// NoopGCer implements GCer by doing nothing. +type NoopGCer struct{} + +var _ GCer = NoopGCer{} + +// SetGCThreshold implements storage.GCer. +func (NoopGCer) SetGCThreshold(context.Context, Threshold) error { return nil } + +// GC implements storage.GCer. +func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil } + +// Threshold holds the key and txn span GC thresholds, respectively. +type Threshold struct { + Key hlc.Timestamp + Txn hlc.Timestamp +} + +// Info contains statistics and insights from a GC run. +type Info struct { + // Now is the timestamp used for age computations. + Now hlc.Timestamp + // Policy is the policy used for this garbage collection cycle. + Policy config.GCPolicy + // Stats about the userspace key-values considered, namely the number of + // keys with GC'able data, the number of "old" intents and the number of + // associated distinct transactions. + NumKeysAffected, IntentsConsidered, IntentTxns int + // TransactionSpanTotal is the total number of entries in the transaction span. + TransactionSpanTotal int + // Summary of transactions which were found GCable (assuming that + // potentially necessary intent resolutions did not fail). + TransactionSpanGCAborted, TransactionSpanGCCommitted int + TransactionSpanGCStaging, TransactionSpanGCPending int + // AbortSpanTotal is the total number of transactions present in the AbortSpan. + AbortSpanTotal int + // AbortSpanConsidered is the number of AbortSpan entries old enough to be + // considered for removal. An "entry" corresponds to one transaction; + // more than one key-value pair may be associated with it. + AbortSpanConsidered int + // AbortSpanGCNum is the number of AbortSpan entries fit for removal (due + // to their transactions having terminated). + AbortSpanGCNum int + // PushTxn is the total number of pushes attempted in this cycle. + PushTxn int + // ResolveTotal is the total number of attempted intent resolutions in + // this cycle. + ResolveTotal int + // Threshold is the computed expiration timestamp. Equal to `Now - Policy`. Threshold hlc.Timestamp - policy config.GCPolicy + + // AffectedVersionsKeyBytes is the number of (fully encoded) bytes deleted from keys in the storage engine. + // Note that this does not account for compression that the storage engine uses to store data on disk. Real + // space savings tends to be smaller due to this compression, and space may be released only at a later point + // in time. + AffectedVersionsKeyBytes int64 + // AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine. + // See AffectedVersionsKeyBytes for caveats. + AffectedVersionsValBytes int64 +} + +// CleanupIntentsFunc synchronously resolves the supplied intents +// (which may be PENDING, in which case they are first pushed) while +// taking care of proper batching. +type CleanupIntentsFunc func(context.Context, []roachpb.Intent) error + +// CleanupTxnIntentsAsyncFunc asynchronously cleans up intents from a +// transaction record, pushing the transaction first if it is +// PENDING. Once all intents are resolved successfully, removes the +// transaction record. +type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.Intent) error + +// Run runs garbage collection for the specified descriptor on the +// provided Engine (which is not mutated). It uses the provided gcFn +// to run garbage collection once on all implicated spans, +// cleanupIntentsFn to resolve intents synchronously, and +// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and +// associated transaction record on success. +func Run( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + policy config.GCPolicy, + gcer GCer, + cleanupIntentsFn CleanupIntentsFunc, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) (Info, error) { + + txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) + threshold := CalculateThreshold(now, policy) + + if err := gcer.SetGCThreshold(ctx, Threshold{ + Key: threshold, + Txn: txnExp, + }); err != nil { + return Info{}, errors.Wrap(err, "failed to set GC thresholds") + } + + info := Info{ + Policy: policy, + Now: now, + Threshold: threshold, + } + + // Maps from txn ID to txn and intent key slice. + txnMap := map[uuid.UUID]*roachpb.Transaction{} + intentSpanMap := map[uuid.UUID][]roachpb.Span{} + err := processReplicatedKeyRange(ctx, desc, snap, now, threshold, gcer, txnMap, intentSpanMap, &info) + if err != nil { + return Info{}, err + } + + // From now on, all newly added keys are range-local. + + // Process local range key entries (txn records, queue last processed times). + localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) + if err != nil { + return Info{}, err + } + + if err := gcer.GC(ctx, localRangeKeys); err != nil { + return Info{}, err + } + + // Clean up the AbortSpan. + log.Event(ctx, "processing AbortSpan") + abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) + if err := gcer.GC(ctx, abortSpanKeys); err != nil { + return Info{}, err + } + + log.Eventf(ctx, "GC'ed keys; stats %+v", info) + + // Push transactions (if pending) and resolve intents. + var intents []roachpb.Intent + for txnID, txn := range txnMap { + intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) + } + info.ResolveTotal += len(intents) + log.Eventf(ctx, "cleanup of %d intents", len(intents)) + if err := cleanupIntentsFn(ctx, intents); err != nil { + return Info{}, err + } + + return info, nil } -// MakeGarbageCollector allocates and returns a new GC, with expiration -// computed based on current time and policy.TTLSeconds. -func MakeGarbageCollector(now hlc.Timestamp, policy config.GCPolicy) GarbageCollector { - ttlNanos := int64(policy.TTLSeconds) * 1E9 - return GarbageCollector{ - Threshold: hlc.Timestamp{WallTime: now.WallTime - ttlNanos}, - policy: policy, +// processReplicatedKeyRange identifies garbage and sends GC requests to +// remove it. +// +// The logic iterates all versions of all keys in the range from oldest to +// newest. Expired intents are written into the txnMap and intentSpanMap. +func processReplicatedKeyRange( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + threshold hlc.Timestamp, + gcer GCer, + txnMap map[uuid.UUID]*roachpb.Transaction, + intentSpanMap map[uuid.UUID][]roachpb.Span, + info *Info, +) error { + var alloc bufalloc.ByteAllocator + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) + handleIntent := func(md *engine.MVCCKeyValue) { + meta := &enginepb.MVCCMetadata{} + if err := protoutil.Unmarshal(md.Value, meta); err != nil { + log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", md.Key, err) + return + } + if meta.Txn != nil { + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if hlc.Timestamp(meta.Timestamp).Less(intentExp) { + txnID := meta.Txn.ID + if _, ok := txnMap[txnID]; !ok { + txnMap[txnID] = &roachpb.Transaction{ + TxnMeta: *meta.Txn, + } + // IntentTxns and PushTxn will be equal here, since + // pushes to transactions whose record lies in this + // range (but which are not associated to a remaining + // intent on it) happen asynchronously and are accounted + // for separately. Thus higher up in the stack, we + // expect PushTxn > IntentTxns. + info.IntentTxns++ + // All transactions in txnMap may be PENDING and + // cleanupIntentsFn will push them to finalize them. + info.PushTxn++ + } + info.IntentsConsidered++ + alloc, md.Key.Key = alloc.Copy(md.Key.Key, 0) + intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{ + Key: md.Key.Key, + }) + } + } + } + + // Iterate all versions of all keys from oldest to newest. If a version is an + // intent it will have the highest timestamp of any versions and will be + // followed by a metadata entry. The loop will determine whether a given key + // has garbage and, if so, will determine the timestamp of the latest version + // which is garbage to be added to the current batch. If the current version + // pushes the size of keys to be removed above the limit, the current key will + // be added with that version and the batch will be sent. When the newest + // version for a key has been reached, if haveGarbageForThisKey, we'll add the + // current key to the batch with the gcTimestampForThisKey. + var ( + batchGCKeys []roachpb.GCRequest_GCKey + batchGCKeysBytes int64 + haveGarbageForThisKey bool + gcTimestampForThisKey hlc.Timestamp + sentBatchForThisKey bool + ) + it := makeGCIterator(desc, snap) + defer it.close() + for ; ; it.step() { + s, ok := it.state() + if !ok { + if it.err != nil { + return it.err + } + break + } + if s.curIsNotValue() { // Step over metadata or other system keys + continue + } + if s.curIsIntent() { + handleIntent(s.next) + continue + } + isNewest := s.curIsNewest() + if isGarbage(threshold, s.cur, s.next, isNewest) { + keyBytes := int64(s.cur.Key.EncodedSize()) + batchGCKeysBytes += keyBytes + haveGarbageForThisKey = true + gcTimestampForThisKey = s.cur.Key.Timestamp + info.AffectedVersionsKeyBytes += keyBytes + info.AffectedVersionsValBytes += int64(len(s.cur.Value)) + } + if affected := isNewest && (sentBatchForThisKey || haveGarbageForThisKey); affected { + info.NumKeysAffected++ + } + shouldSendBatch := batchGCKeysBytes >= KeyVersionChunkBytes + if shouldSendBatch || isNewest && haveGarbageForThisKey { + alloc, s.cur.Key.Key = alloc.Copy(s.cur.Key.Key, 0) + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{ + Key: s.cur.Key.Key, + Timestamp: gcTimestampForThisKey, + }) + haveGarbageForThisKey = false + gcTimestampForThisKey = hlc.Timestamp{} + + // Mark that we sent a batch for this key so we know that we had garbage + // even if it turns out that there's no more garbage for this key. + // We want to count a key as affected once even if we paginate the + // deletion of its versions. + sentBatchForThisKey = shouldSendBatch && !isNewest + } + if shouldSendBatch { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + // Even though we are batching the GC process, it's + // safe to continue because we bumped the GC + // thresholds. We may leave some inconsistent history + // behind, but nobody can read it. + log.Warningf(ctx, "failed to GC a batch of keys: %v", err) + } + batchGCKeys = nil + batchGCKeysBytes = 0 + alloc = nil + } } + if len(batchGCKeys) > 0 { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + return err + } + } + return nil } -// Filter makes decisions about garbage collection based on the -// garbage collection policy for batches of values for the same -// key. Returns the index of the first key to be GC'd and the -// timestamp including, and after which, all values should be garbage -// collected. If no values should be GC'd, returns -1 for the index -// and the zero timestamp. Keys must be in descending time -// order. Values deleted at or before the returned timestamp can be -// deleted without invalidating any reads in the time interval -// (gc.expiration, \infinity). +// isGarbage makes a determination whether a key ('cur') is garbage. If 'next' +// is non-nil, it should be the chronologically newer version of the same key +// (or the metadata KV if cur is an intent). If isNewest is false, next must be +// non-nil. isNewest implies that this is the highest timestamp committed +// version for this key. If isNewest is true and next is non-nil, it is an +// intent. Conservatively we have to assume that the intent will get aborted, +// so we will be able to GC just the values that we could remove if there +// weren't an intent. Hence this definition of isNewest. // -// The GC keeps all values (including deletes) above the expiration time, plus +// We keep all values (including deletes) above the expiration time, plus // the first value before or at the expiration time. This allows reads to be // guaranteed as described above. However if this were the only rule, then if // the most recent write was a delete, it would never be removed. Thus, when a -// deleted value is the most recent before expiration, it can be deleted. This -// would still allow for the tombstone bugs in #6227, so in the future we will -// add checks that disallow writes before the last GC expiration time. -func (gc GarbageCollector) Filter(keys []engine.MVCCKey, values [][]byte) (int, hlc.Timestamp) { - if gc.policy.TTLSeconds <= 0 { - return -1, hlc.Timestamp{} +// deleted value is the most recent before expiration, it can be deleted. +func isGarbage(threshold hlc.Timestamp, cur, next *engine.MVCCKeyValue, isNewest bool) bool { + // If the value is not at or below the threshold then it's not garbage. + if belowThreshold := !threshold.Less(cur.Key.Timestamp); !belowThreshold { + return false + } + isDelete := len(cur.Value) == 0 + if isNewest && !isDelete { + return false } - if len(keys) == 0 { - return -1, hlc.Timestamp{} + // If this value is not a delete, then we need to make sure that the next + // value is also at or below the threshold. + // NB: This doesn't need to check whether next is nil because we know + // isNewest is false when evaluating rhs of the or below. + if !isDelete && next == nil { + panic("huh") + } + return isDelete || !threshold.Less(next.Key.Timestamp) +} + +// processLocalKeyRange scans the local range key entries, consisting of +// transaction records, queue last processed timestamps, and range descriptors. +// +// - Transaction entries: +// - For expired transactions , schedule the intents for +// asynchronous resolution. The actual transaction spans are not +// returned for GC in this pass, but are separately GC'ed after +// successful resolution of all intents. The exception is if there +// are no intents on the txn record, in which case it's returned for +// immediate GC. +// +// - Queue last processed times: cleanup any entries which don't match +// this range's start key. This can happen on range merges. +func processLocalKeyRange( + ctx context.Context, + snap engine.Reader, + desc *roachpb.RangeDescriptor, + cutoff hlc.Timestamp, + info *Info, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) ([]roachpb.GCRequest_GCKey, error) { + var gcKeys []roachpb.GCRequest_GCKey + + handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { + // If the transaction needs to be pushed or there are intents to + // resolve, invoke the cleanup function. + if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { + return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsIntents(txn.IntentSpans, txn)) + } + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp + return nil } - // find the first expired key index using binary search - i := sort.Search(len(keys), func(i int) bool { return !gc.Threshold.Less(keys[i].Timestamp) }) + handleOneTransaction := func(kv roachpb.KeyValue) error { + var txn roachpb.Transaction + if err := kv.Value.GetProto(&txn); err != nil { + return err + } + info.TransactionSpanTotal++ + if !txn.LastActive().Less(cutoff) { + return nil + } + + // The transaction record should be considered for removal. + switch txn.Status { + case roachpb.PENDING: + info.TransactionSpanGCPending++ + case roachpb.STAGING: + info.TransactionSpanGCStaging++ + case roachpb.ABORTED: + info.TransactionSpanGCAborted++ + case roachpb.COMMITTED: + info.TransactionSpanGCCommitted++ + default: + panic(fmt.Sprintf("invalid transaction state: %s", txn)) + } + return handleTxnIntents(kv.Key, &txn) + } - if i == len(keys) { - return -1, hlc.Timestamp{} + handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { + if !rangeKey.Equal(desc.StartKey) { + // Garbage collect the last processed timestamp if it doesn't match start key. + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp + } + return nil } - // Now keys[i].Timestamp is <= gc.expiration, but the key-value pair is still - // "visible" at timestamp gc.expiration (and up to the next version). - if deleted := len(values[i]) == 0; deleted { - // We don't have to keep a delete visible (since GCing it does not change - // the outcome of the read). Note however that we can't touch deletes at - // higher timestamps immediately preceding this one, since they're above - // gc.expiration and are needed for correctness; see #6227. - return i, keys[i].Timestamp - } else if i+1 < len(keys) { - // Otherwise mark the previous timestamp for deletion (since it won't ever - // be returned for reads at gc.expiration and up). - return i + 1, keys[i+1].Timestamp + handleOne := func(kv roachpb.KeyValue) error { + rangeKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) + if err != nil { + return err + } + if suffix.Equal(keys.LocalTransactionSuffix.AsRawKey()) { + if err := handleOneTransaction(kv); err != nil { + return err + } + } else if suffix.Equal(keys.LocalQueueLastProcessedSuffix.AsRawKey()) { + if err := handleOneQueueLastProcessed(kv, roachpb.RKey(rangeKey)); err != nil { + return err + } + } + return nil } - return -1, hlc.Timestamp{} + startKey := keys.MakeRangeKeyPrefix(desc.StartKey) + endKey := keys.MakeRangeKeyPrefix(desc.EndKey) + + _, err := engine.MVCCIterate(ctx, snap, startKey, endKey, hlc.Timestamp{}, engine.MVCCScanOptions{}, + func(kv roachpb.KeyValue) (bool, error) { + return false, handleOne(kv) + }) + return gcKeys, err +} + +// processAbortSpan iterates through the local AbortSpan entries +// and collects entries which indicate that a client which was running +// this transaction must have realized that it has been aborted (due to +// heartbeating having failed). The parameter minAge is typically a +// multiple of the heartbeat timeout used by the coordinator. +func processAbortSpan( + ctx context.Context, + snap engine.Reader, + rangeID roachpb.RangeID, + threshold hlc.Timestamp, + info *Info, +) []roachpb.GCRequest_GCKey { + var gcKeys []roachpb.GCRequest_GCKey + abortSpan := abortspan.New(rangeID) + if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { + info.AbortSpanTotal++ + if v.Timestamp.Less(threshold) { + info.AbortSpanGCNum++ + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) + } + return nil + }); err != nil { + // Still return whatever we managed to collect. + log.Warning(ctx, err) + } + return gcKeys } diff --git a/pkg/storage/gc/gc_iterator.go b/pkg/storage/gc/gc_iterator.go new file mode 100644 index 000000000000..a86876210b07 --- /dev/null +++ b/pkg/storage/gc/gc_iterator.go @@ -0,0 +1,174 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/util/bufalloc" +) + +// gcIterator wraps an rditer.ReplicaDataIterator which it reverse iterates for +// the purpose of discovering gc-able replicated data. +type gcIterator struct { + it *rditer.ReplicaDataIterator + done bool + err error + buf gcIteratorRingBuf +} + +func makeGCIterator(desc *roachpb.RangeDescriptor, snap engine.Reader) gcIterator { + return gcIterator{ + it: rditer.NewReplicaDataIterator(desc, snap, + true /* replicatedOnly */, true /* seekEnd */), + } +} + +type gcIteratorState struct { + cur, next, afterNext *engine.MVCCKeyValue +} + +// curIsNewest returns true if the current MVCCKeyValue in the gcIteratorState +// is the newest committed version of the key. +// +// It returns true if next is nil or if next is an intent. +func (s *gcIteratorState) curIsNewest() bool { + return s.cur.Key.IsValue() && + (s.next == nil || (s.afterNext != nil && !s.afterNext.Key.IsValue())) +} + +// curIsNotValue returns true if the current MVCCKeyValue in the gcIteratorState +// is not a value, i.e. does not have a timestamp. +func (s *gcIteratorState) curIsNotValue() bool { + return !s.cur.Key.IsValue() +} + +// curIsIntent returns true if the current MVCCKeyValue in the gcIteratorState +// is an intent. +func (s *gcIteratorState) curIsIntent() bool { + return s.next != nil && !s.next.Key.IsValue() +} + +// state returns the current state of the iterator. The state contains the +// current and the two following versions of the current key if they exist. +// +// If ok is false, further iteration is unsafe; either the end of iteration has +// been reached or an error has occurred. Callers should check it.err to +// determine whether an error has occurred in cases where ok is false. +// +// It is not safe to use values in the state after subsequent calls to +// it.step(). +func (it *gcIterator) state() (s gcIteratorState, ok bool) { + // The current key is the newest if the key which comes next is different or + // the key which comes after the current key is an intent or this is the first + // key in the range. + s.cur, ok = it.peekAt(0) + if !ok { + return gcIteratorState{}, false + } + next, ok := it.peekAt(1) + if !ok && it.err != nil { // cur is the first key in the range + return gcIteratorState{}, false + } + if !ok || !next.Key.Key.Equal(s.cur.Key.Key) { + return s, true + } + s.next = next + afterNext, ok := it.peekAt(2) + if !ok && it.err != nil { // cur is the first key in the range + return gcIteratorState{}, false + } + if !ok || !afterNext.Key.Key.Equal(s.cur.Key.Key) { + return s, true + } + s.afterNext = afterNext + return s, true +} + +func (it *gcIterator) step() { + it.buf.removeFront() +} + +func (it *gcIterator) peekAt(i int) (*engine.MVCCKeyValue, bool) { + if it.buf.len <= i { + if !it.fillTo(i + 1) { + return nil, false + } + } + return it.buf.at(i), true +} + +func (it *gcIterator) fillTo(targetLen int) (ok bool) { + for it.buf.len < targetLen { + if ok, err := it.it.Valid(); !ok { + it.err, it.done = err, err == nil + return false + } + it.buf.pushBack(it.it) + it.it.Prev() + } + return true +} + +func (it *gcIterator) close() { + it.it.Close() + it.it = nil +} + +// gcIteratorRingBufSize is 3 because the gcIterator.state method at most needs +// to look forward two keys ahead of the current key. +const gcIteratorRingBufSize = 3 + +type gcIteratorRingBuf struct { + allocs [gcIteratorRingBufSize]bufalloc.ByteAllocator + buf [gcIteratorRingBufSize]engine.MVCCKeyValue + len int + head int +} + +func (b *gcIteratorRingBuf) at(i int) *engine.MVCCKeyValue { + if i >= b.len { + panic("index out of range") + } + return &b.buf[(b.head+i)%gcIteratorRingBufSize] +} + +func (b *gcIteratorRingBuf) removeFront() { + if b.len == 0 { + panic("cannot remove from empty gcIteratorRingBuf") + } + b.buf[b.head] = engine.MVCCKeyValue{} + b.head = (b.head + 1) % gcIteratorRingBufSize + b.len-- +} + +type iterator interface { + UnsafeKey() engine.MVCCKey + UnsafeValue() []byte +} + +func (b *gcIteratorRingBuf) pushBack(it iterator) { + if b.len == gcIteratorRingBufSize { + panic("cannot add to full gcIteratorRingBuf") + } + i := (b.head + b.len) % gcIteratorRingBufSize + b.allocs[i] = b.allocs[i][:0] + k := it.UnsafeKey() + v := it.UnsafeValue() + b.allocs[i], k.Key = b.allocs[i].Copy(k.Key, len(v)) + b.allocs[i], v = b.allocs[i].Copy(v, 0) + b.buf[i] = engine.MVCCKeyValue{ + Key: k, + Value: v, + } + b.len++ +} diff --git a/pkg/storage/gc/gc_iterator_test.go b/pkg/storage/gc/gc_iterator_test.go new file mode 100644 index 000000000000..746bc458ac61 --- /dev/null +++ b/pkg/storage/gc/gc_iterator_test.go @@ -0,0 +1,165 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestGCIterator exercises the GC iterator by writing data to the underlying +// engine and then validating the state of the iterator as it iterates that +// data. +func TestGCIterator(t *testing.T) { + // dataItem represents a version in the storage engine and optionally a + // corresponding transaction which will make the MVCCKeyValue an intent. + type dataItem struct { + engine.MVCCKeyValue + txn *roachpb.Transaction + } + // makeDataItem is a shorthand to construct dataItems. + makeDataItem := func(k roachpb.Key, val []byte, ts int64, txn *roachpb.Transaction) dataItem { + return dataItem{ + MVCCKeyValue: engine.MVCCKeyValue{ + Key: engine.MVCCKey{ + Key: k, + Timestamp: hlc.Timestamp{WallTime: ts * time.Nanosecond.Nanoseconds()}, + }, + Value: val, + }, + txn: txn, + } + } + // makeLiteralDistribution adapts dataItems for use with the data distribution + // infrastructure. + makeLiteralDataDistribution := func(items ...dataItem) dataDistribution { + return func() (engine.MVCCKeyValue, *roachpb.Transaction, bool) { + if len(items) == 0 { + return engine.MVCCKeyValue{}, nil, false + } + item := items[0] + defer func() { items = items[1:] }() + return item.MVCCKeyValue, item.txn, true + } + } + // stateExpectations are expectations about the state of the iterator. + type stateExpectations struct { + cur, next, afterNext int + isNewest bool + isIntent bool + isNotValue bool + } + // notation to mark that an iterator state element as either nil or metadata. + const ( + isNil = -1 + isMD = -2 + ) + // exp is a shorthand to construct state expectations. + exp := func(cur, next, afterNext int, isNewest, isIntent, isNotValue bool) stateExpectations { + return stateExpectations{ + cur: cur, next: next, afterNext: afterNext, + isNewest: isNewest, + isIntent: isIntent, + isNotValue: isNotValue, + } + } + vals := uniformValueDistribution(3, 5, 0, rand.New(rand.NewSource(1))) + tablePrefix := keys.MakeTablePrefix(42) + desc := roachpb.RangeDescriptor{StartKey: tablePrefix, EndKey: roachpb.RKey(roachpb.Key(tablePrefix).PrefixEnd())} + keyA := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'a') + keyB := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'b') + keyC := append(tablePrefix[0:len(tablePrefix):len(tablePrefix)], 'c') + makeTxn := func() *roachpb.Transaction { + txn := roachpb.Transaction{} + txn.Key = keyA + txn.ID = uuid.MakeV4() + txn.Status = roachpb.PENDING + return &txn + } + + type testCase struct { + name string + data []dataItem + expectations []stateExpectations + } + // checkExpectations tests whether the state of the iterator matches the + // expectation. + checkExpectations := func( + t *testing.T, data []dataItem, ex stateExpectations, s gcIteratorState, + ) { + check := func(ex int, kv *engine.MVCCKeyValue) { + switch { + case ex >= 0: + require.EqualValues(t, &data[ex].MVCCKeyValue, kv) + case ex == isNil: + require.Nil(t, kv) + case ex == isMD: + require.False(t, kv.Key.IsValue()) + } + } + check(ex.cur, s.cur) + check(ex.next, s.next) + check(ex.afterNext, s.afterNext) + require.Equal(t, ex.isNewest, s.curIsNewest()) + require.Equal(t, ex.isIntent, s.curIsIntent()) + } + makeTest := func(tc testCase) func(t *testing.T) { + return func(t *testing.T) { + eng := engine.NewInMem(roachpb.Attributes{}, 1<<25) + ds := makeLiteralDataDistribution(tc.data...) + ds.setupTest(t, eng, desc) + snap := eng.NewSnapshot() + defer snap.Close() + it := makeGCIterator(&desc, snap) + expectations := tc.expectations + for i, ex := range expectations { + t.Run(fmt.Sprint(i), func(t *testing.T) { + s, ok := it.state() + require.True(t, ok) + checkExpectations(t, tc.data, ex, s) + }) + it.step() + } + } + } + di := makeDataItem // shorthand for convenient notation + for _, tc := range []testCase{ + { + name: "basic", + data: []dataItem{ + di(keyA, vals(), 2, nil), + di(keyA, vals(), 11, nil), + di(keyA, vals(), 14, nil), + di(keyB, vals(), 3, nil), + di(keyC, vals(), 7, makeTxn()), + }, + expectations: []stateExpectations{ + exp(4, isMD, isNil, false, true, false), + exp(isMD, isNil, isNil, false, false, true), + exp(3, isNil, isNil, true, false, false), + exp(0, 1, 2, false, false, false), + exp(1, 2, isNil, false, false, false), + exp(2, isNil, isNil, true, false, false), + }, + }, + } { + t.Run(tc.name, makeTest(tc)) + } +} diff --git a/pkg/storage/gc/gc_old_test.go b/pkg/storage/gc/gc_old_test.go new file mode 100644 index 000000000000..b9e525202092 --- /dev/null +++ b/pkg/storage/gc/gc_old_test.go @@ -0,0 +1,380 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" +) + +// runGCOld is an older implementation of Run. It is used for benchmarking and +// testing. +// +// runGCOld runs garbage collection for the specified descriptor on the +// provided Engine (which is not mutated). It uses the provided gcFn +// to run garbage collection once on all implicated spans, +// cleanupIntentsFn to resolve intents synchronously, and +// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and +// associated transaction record on success. +func runGCOld( + ctx context.Context, + desc *roachpb.RangeDescriptor, + snap engine.Reader, + now hlc.Timestamp, + policy config.GCPolicy, + gcer GCer, + cleanupIntentsFn CleanupIntentsFunc, + cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, +) (Info, error) { + + iter := rditer.NewReplicaDataIterator(desc, snap, + true /* replicatedOnly */, false /* seekEnd */) + defer iter.Close() + + // Compute intent expiration (intent age at which we attempt to resolve). + intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) + txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) + + gc := MakeGarbageCollector(now, policy) + + if err := gcer.SetGCThreshold(ctx, Threshold{ + Key: gc.Threshold, + Txn: txnExp, + }); err != nil { + return Info{}, errors.Wrap(err, "failed to set GC thresholds") + } + + var batchGCKeys []roachpb.GCRequest_GCKey + var batchGCKeysBytes int64 + var expBaseKey roachpb.Key + var keys []engine.MVCCKey + var vals [][]byte + var keyBytes int64 + var valBytes int64 + info := Info{ + Policy: policy, + Now: now, + Threshold: gc.Threshold, + } + + // Maps from txn ID to txn and intent key slice. + txnMap := map[uuid.UUID]*roachpb.Transaction{} + intentSpanMap := map[uuid.UUID][]roachpb.Span{} + + // processKeysAndValues is invoked with each key and its set of + // values. Intents older than the intent age threshold are sent for + // resolution and values after the MVCC metadata, and possible + // intent, are sent for garbage collection. + processKeysAndValues := func() { + // If there's more than a single value for the key, possibly send for GC. + if len(keys) > 1 { + meta := &enginepb.MVCCMetadata{} + if err := protoutil.Unmarshal(vals[0], meta); err != nil { + log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keys[0], err) + } else { + // In the event that there's an active intent, send for + // intent resolution if older than the threshold. + startIdx := 1 + if meta.Txn != nil { + // Keep track of intent to resolve if older than the intent + // expiration threshold. + if hlc.Timestamp(meta.Timestamp).Less(intentExp) { + txnID := meta.Txn.ID + if _, ok := txnMap[txnID]; !ok { + txnMap[txnID] = &roachpb.Transaction{ + TxnMeta: *meta.Txn, + } + // IntentTxns and PushTxn will be equal here, since + // pushes to transactions whose record lies in this + // range (but which are not associated to a remaining + // intent on it) happen asynchronously and are accounted + // for separately. Thus higher up in the stack, we + // expect PushTxn > IntentTxns. + info.IntentTxns++ + // All transactions in txnMap may be PENDING and + // cleanupIntentsFn will push them to finalize them. + info.PushTxn++ + } + info.IntentsConsidered++ + intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{Key: expBaseKey}) + } + // With an active intent, GC ignores MVCC metadata & intent value. + startIdx = 2 + } + // See if any values may be GC'd. + if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) { + // Batch keys after the total size of version keys exceeds + // the threshold limit. This avoids sending potentially large + // GC requests through Raft. Iterate through the keys in reverse + // order so that GC requests can be made multiple times even on + // a single key, with successively newer timestamps to prevent + // any single request from exploding during GC evaluation. + for i := len(keys) - 1; i >= startIdx+idx; i-- { + keyBytes = int64(keys[i].EncodedSize()) + valBytes = int64(len(vals[i])) + + // Add the total size of the GC'able versions of the keys and values to Info. + info.AffectedVersionsKeyBytes += keyBytes + info.AffectedVersionsValBytes += valBytes + + batchGCKeysBytes += keyBytes + // If the current key brings the batch over the target + // size, add the current timestamp to finish the current + // chunk and start a new one. + if batchGCKeysBytes >= KeyVersionChunkBytes { + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp}) + + err := gcer.GC(ctx, batchGCKeys) + + // Succeed or fail, allow releasing the memory backing batchGCKeys. + iter.ResetAllocator() + batchGCKeys = nil + batchGCKeysBytes = 0 + + if err != nil { + // Even though we are batching the GC process, it's + // safe to continue because we bumped the GC + // thresholds. We may leave some inconsistent history + // behind, but nobody can read it. + log.Warning(ctx, err) + return + } + } + } + // Add the key to the batch at the GC timestamp, unless it was already added. + if batchGCKeysBytes != 0 { + batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: gcTS}) + } + info.NumKeysAffected++ + } + } + } + } + + // Iterate through the keys and values of this replica's range. + log.Event(ctx, "iterating through range") + for ; ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return Info{}, err + } else if !ok { + break + } else if ctx.Err() != nil { + // Stop iterating if our context has expired. + return Info{}, err + } + iterKey := iter.Key() + if !iterKey.IsValue() || !iterKey.Key.Equal(expBaseKey) { + // Moving to the next key (& values). + processKeysAndValues() + expBaseKey = iterKey.Key + if !iterKey.IsValue() { + keys = []engine.MVCCKey{iter.Key()} + vals = [][]byte{iter.Value()} + continue + } + // An implicit metadata. + keys = []engine.MVCCKey{engine.MakeMVCCMetadataKey(iterKey.Key)} + // A nil value for the encoded MVCCMetadata. This will unmarshal to an + // empty MVCCMetadata which is sufficient for processKeysAndValues to + // determine that there is no intent. + vals = [][]byte{nil} + } + keys = append(keys, iter.Key()) + vals = append(vals, iter.Value()) + } + // Handle last collected set of keys/vals. + processKeysAndValues() + if len(batchGCKeys) > 0 { + if err := gcer.GC(ctx, batchGCKeys); err != nil { + return Info{}, err + } + } + + // From now on, all newly added keys are range-local. + + // Process local range key entries (txn records, queue last processed times). + localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) + if err != nil { + return Info{}, err + } + + if err := gcer.GC(ctx, localRangeKeys); err != nil { + return Info{}, err + } + + // Clean up the AbortSpan. + log.Event(ctx, "processing AbortSpan") + abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) + if err := gcer.GC(ctx, abortSpanKeys); err != nil { + return Info{}, err + } + + log.Eventf(ctx, "GC'ed keys; stats %+v", info) + + // Push transactions (if pending) and resolve intents. + var intents []roachpb.Intent + for txnID, txn := range txnMap { + intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) + } + info.ResolveTotal += len(intents) + log.Eventf(ctx, "cleanup of %d intents", len(intents)) + if err := cleanupIntentsFn(ctx, intents); err != nil { + return Info{}, err + } + + return info, nil +} + +// GarbageCollector GCs MVCC key/values using a zone-specific GC +// policy allows either the union or intersection of maximum # of +// versions and maximum age. +type GarbageCollector struct { + Threshold hlc.Timestamp + policy config.GCPolicy +} + +// MakeGarbageCollector allocates and returns a new GC, with expiration +// computed based on current time and policy.TTLSeconds. +func MakeGarbageCollector(now hlc.Timestamp, policy config.GCPolicy) GarbageCollector { + return GarbageCollector{ + Threshold: CalculateThreshold(now, policy), + policy: policy, + } +} + +// Filter makes decisions about garbage collection based on the +// garbage collection policy for batches of values for the same +// key. Returns the index of the first key to be GC'd and the +// timestamp including, and after which, all values should be garbage +// collected. If no values should be GC'd, returns -1 for the index +// and the zero timestamp. Keys must be in descending time +// order. Values deleted at or before the returned timestamp can be +// deleted without invalidating any reads in the time interval +// (gc.expiration, \infinity). +// +// The GC keeps all values (including deletes) above the expiration time, plus +// the first value before or at the expiration time. This allows reads to be +// guaranteed as described above. However if this were the only rule, then if +// the most recent write was a delete, it would never be removed. Thus, when a +// deleted value is the most recent before expiration, it can be deleted. This +// would still allow for the tombstone bugs in #6227, so in the future we will +// add checks that disallow writes before the last GC expiration time. +func (gc GarbageCollector) Filter(keys []engine.MVCCKey, values [][]byte) (int, hlc.Timestamp) { + if gc.policy.TTLSeconds <= 0 { + return -1, hlc.Timestamp{} + } + if len(keys) == 0 { + return -1, hlc.Timestamp{} + } + + // find the first expired key index using binary search + i := sort.Search(len(keys), func(i int) bool { return !gc.Threshold.Less(keys[i].Timestamp) }) + + if i == len(keys) { + return -1, hlc.Timestamp{} + } + + // Now keys[i].Timestamp is <= gc.expiration, but the key-value pair is still + // "visible" at timestamp gc.expiration (and up to the next version). + if deleted := len(values[i]) == 0; deleted { + // We don't have to keep a delete visible (since GCing it does not change + // the outcome of the read). Note however that we can't touch deletes at + // higher timestamps immediately preceding this one, since they're above + // gc.expiration and are needed for correctness; see #6227. + return i, keys[i].Timestamp + } else if i+1 < len(keys) { + // Otherwise mark the previous timestamp for deletion (since it won't ever + // be returned for reads at gc.expiration and up). + return i + 1, keys[i+1].Timestamp + } + + return -1, hlc.Timestamp{} +} + +func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) engine.MVCCKey { + return engine.MVCCKey{Key: key, Timestamp: ts} +} + +var ( + aKey = roachpb.Key("a") + bKey = roachpb.Key("b") + aKeys = []engine.MVCCKey{ + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 1}), + mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), + } + bKeys = []engine.MVCCKey{ + mvccVersionKey(bKey, hlc.Timestamp{WallTime: 2e9, Logical: 0}), + mvccVersionKey(bKey, hlc.Timestamp{WallTime: 1e9, Logical: 0}), + } +) + +// TestGarbageCollectorFilter verifies the filter policies for +// different sorts of MVCC keys. +func TestGarbageCollectorFilter(t *testing.T) { + defer leaktest.AfterTest(t)() + gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, config.GCPolicy{TTLSeconds: 1}) + gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, config.GCPolicy{TTLSeconds: 2}) + n := []byte("data") + d := []byte(nil) + testData := []struct { + gc GarbageCollector + time hlc.Timestamp + keys []engine.MVCCKey + values [][]byte + expIdx int + expDelTS hlc.Timestamp + }{ + {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{d, d, d}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{d, d}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 1e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcB, hlc.Timestamp{WallTime: 1e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 2e9, Logical: 0}, aKeys, [][]byte{d, d, d}, 2, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 2e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcA, hlc.Timestamp{WallTime: 3e9, Logical: 0}, aKeys, [][]byte{d, n, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 3e9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, + {gcA, hlc.Timestamp{WallTime: 4e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 4e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + {gcA, hlc.Timestamp{WallTime: 5e9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 1}}, + {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1e9, Logical: 0}}, + {gcB, hlc.Timestamp{WallTime: 5e9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2e9, Logical: 0}}, + } + for i, test := range testData { + test.gc.Threshold = test.time + test.gc.Threshold.WallTime -= int64(test.gc.policy.TTLSeconds) * 1e9 + idx, delTS := test.gc.Filter(test.keys, test.values) + if idx != test.expIdx { + t.Errorf("%d: expected index %d; got %d", i, test.expIdx, idx) + } + if delTS != test.expDelTS { + t.Errorf("%d: expected deletion timestamp %s; got %s", i, test.expDelTS, delTS) + } + } +} diff --git a/pkg/storage/gc/gc_random_test.go b/pkg/storage/gc/gc_random_test.go new file mode 100644 index 000000000000..0bb6874069e7 --- /dev/null +++ b/pkg/storage/gc/gc_random_test.go @@ -0,0 +1,244 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gc + +import ( + "context" + "fmt" + "math/rand" + "sort" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/stretchr/testify/require" +) + +// randomRunGCTestSpec specifies a distribution for to create random data for +// testing Run +type randomRunGCTestSpec struct { + ds distSpec + now hlc.Timestamp + ttl int32 // seconds +} + +var ( + fewVersionsTinyRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 2, keySuffixMax: 3, + valueLenMin: 1, valueLenMax: 1, + deleteFrac: 0, + keysPerValueMin: 1, keysPerValueMax: 2, + intentFrac: .1, + } + someVersionsMidSizeRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1, keysPerValueMax: 100, + intentFrac: .1, + } + lotsOfVersionsMidSizeRows = uniformDistSpec{ + tsFrom: 0, tsTo: 100, + keySuffixMin: 8, keySuffixMax: 8, + valueLenMin: 8, valueLenMax: 16, + deleteFrac: .1, + keysPerValueMin: 1000, keysPerValueMax: 1000000, + intentFrac: .1, + } +) + +// TestRunNewVsOld exercises the behavior of Run relative to the old +// implementation. It runs both the new and old implementation and ensures +// that they produce exactly the same results on the same set of keys. +func TestRunNewVsOld(t *testing.T) { + rng := rand.New(rand.NewSource(1)) + ctx := context.Background() + const N = 100000 + + someVersionsMidSizeRowsLotsOfIntents := someVersionsMidSizeRows + someVersionsMidSizeRowsLotsOfIntents.intentFrac = 1 + for _, tc := range []randomRunGCTestSpec{ + { + ds: someVersionsMidSizeRowsLotsOfIntents, + now: hlc.Timestamp{ + WallTime: (IntentAgeThreshold + 100*time.Second).Nanoseconds(), + }, + ttl: int32(IntentAgeThreshold.Seconds()), + }, + { + ds: someVersionsMidSizeRows, + now: hlc.Timestamp{ + WallTime: 100 * time.Second.Nanoseconds(), + }, + ttl: 1, + }, + } { + t.Run(fmt.Sprintf("%v@%v,ttl=%v", tc.ds, tc.now, tc.ttl), func(t *testing.T) { + eng := engine.NewInMem(roachpb.Attributes{}, 1<<25) + tc.ds.dist(N, rng).setupTest(t, eng, *tc.ds.desc()) + snap := eng.NewSnapshot() + + oldGCer := makeFakeGCer() + gcInfoOld, err := runGCOld(ctx, tc.ds.desc(), snap, tc.now, + config.GCPolicy{TTLSeconds: tc.ttl}, + &oldGCer, + oldGCer.resolveIntents, + oldGCer.resolveIntentsAsync) + require.NoError(t, err) + + newGCer := makeFakeGCer() + gcInfoNew, err := Run(ctx, tc.ds.desc(), snap, tc.now, + config.GCPolicy{TTLSeconds: tc.ttl}, + &newGCer, + newGCer.resolveIntents, + newGCer.resolveIntentsAsync) + require.NoError(t, err) + + oldGCer.normalize() + newGCer.normalize() + require.EqualValues(t, gcInfoOld, gcInfoNew) + require.EqualValues(t, oldGCer, newGCer) + }) + } +} + +// BenchmarkRun benchmarks the old and implementations of Run with different +// data distributions. +func BenchmarkRun(b *testing.B) { + rng := rand.New(rand.NewSource(1)) + ctx := context.Background() + runGC := func(eng engine.Engine, old bool, spec randomRunGCTestSpec) (Info, error) { + runGCFunc := Run + if old { + runGCFunc = runGCOld + } + snap := eng.NewSnapshot() + return runGCFunc(ctx, spec.ds.desc(), snap, spec.now, + config.GCPolicy{TTLSeconds: spec.ttl}, + NoopGCer{}, + func(ctx context.Context, intents []roachpb.Intent) error { + return nil + }, + func(ctx context.Context, txn *roachpb.Transaction, intents []roachpb.Intent) error { + return nil + }) + } + makeTest := func(old bool, spec randomRunGCTestSpec) func(b *testing.B) { + return func(b *testing.B) { + eng := engine.NewInMem(roachpb.Attributes{}, 1<<25) + defer eng.Close() + ms := spec.ds.dist(b.N, rng).setupTest(b, eng, *spec.ds.desc()) + b.SetBytes(int64(float64(ms.Total()) / float64(b.N))) + b.ResetTimer() + _, err := runGC(eng, old, spec) + b.StopTimer() + require.NoError(b, err) + } + } + specsWithTTLs := func( + ds distSpec, now hlc.Timestamp, ttls []int32, + ) (specs []randomRunGCTestSpec) { + for _, ttl := range ttls { + specs = append(specs, randomRunGCTestSpec{ + ds: ds, + now: now, + ttl: ttl, + }) + } + return specs + } + ts100 := hlc.Timestamp{WallTime: (100 * time.Second).Nanoseconds()} + ttls := []int32{0, 25, 50, 75, 100} + specs := specsWithTTLs(fewVersionsTinyRows, ts100, ttls) + specs = append(specs, specsWithTTLs(someVersionsMidSizeRows, ts100, ttls)...) + specs = append(specs, specsWithTTLs(lotsOfVersionsMidSizeRows, ts100, ttls)...) + for _, old := range []bool{true, false} { + b.Run(fmt.Sprintf("old=%v", old), func(b *testing.B) { + for _, spec := range specs { + b.Run(fmt.Sprint(spec.ds), makeTest(old, spec)) + } + }) + } +} + +type fakeGCer struct { + gcKeys map[string]roachpb.GCRequest_GCKey + threshold Threshold + intents []roachpb.Intent + txnIntents []txnIntents +} + +func makeFakeGCer() fakeGCer { + return fakeGCer{ + gcKeys: make(map[string]roachpb.GCRequest_GCKey), + } +} + +var _ GCer = (*fakeGCer)(nil) + +func (f *fakeGCer) SetGCThreshold(ctx context.Context, t Threshold) error { + f.threshold = t + return nil +} + +func (f *fakeGCer) GC(ctx context.Context, keys []roachpb.GCRequest_GCKey) error { + for _, k := range keys { + f.gcKeys[k.Key.String()] = k + } + return nil +} + +func (f *fakeGCer) resolveIntentsAsync( + _ context.Context, txn *roachpb.Transaction, intents []roachpb.Intent, +) error { + f.txnIntents = append(f.txnIntents, txnIntents{txn: txn, intents: intents}) + return nil +} + +func (f *fakeGCer) resolveIntents(_ context.Context, intents []roachpb.Intent) error { + f.intents = append(f.intents, intents...) + return nil +} + +func (f *fakeGCer) normalize() { + sortIntents := func(i, j int) bool { + return intentLess(&f.intents[i], &f.intents[j]) + } + sort.Slice(f.intents, sortIntents) + for i := range f.txnIntents { + sort.Slice(f.txnIntents[i].intents, sortIntents) + } + sort.Slice(f.txnIntents, func(i, j int) bool { + return f.txnIntents[i].txn.ID.String() < f.txnIntents[j].txn.ID.String() + }) +} + +func intentLess(a, b *roachpb.Intent) bool { + cmp := a.Key.Compare(b.Key) + switch { + case cmp < 0: + return true + case cmp > 0: + return false + default: + return a.Txn.ID.String() < b.Txn.ID.String() + } +} + +type txnIntents struct { + txn *roachpb.Transaction + intents []roachpb.Intent +} diff --git a/pkg/storage/gc/gc_test.go b/pkg/storage/gc/gc_test.go deleted file mode 100644 index 6d562a4c4244..000000000000 --- a/pkg/storage/gc/gc_test.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package gc - -import ( - "testing" - - "github.com/cockroachdb/cockroach/pkg/config" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" -) - -func mvccVersionKey(key roachpb.Key, ts hlc.Timestamp) engine.MVCCKey { - return engine.MVCCKey{Key: key, Timestamp: ts} -} - -var ( - aKey = roachpb.Key("a") - bKey = roachpb.Key("b") - aKeys = []MVCCKey{ - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 2E9, Logical: 0}), - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1E9, Logical: 1}), - mvccVersionKey(aKey, hlc.Timestamp{WallTime: 1E9, Logical: 0}), - } - bKeys = []MVCCKey{ - mvccVersionKey(bKey, hlc.Timestamp{WallTime: 2E9, Logical: 0}), - mvccVersionKey(bKey, hlc.Timestamp{WallTime: 1E9, Logical: 0}), - } -) - -// TestGarbageCollectorFilter verifies the filter policies for -// different sorts of MVCC keys. -func TestGarbageCollectorFilter(t *testing.T) { - defer leaktest.AfterTest(t)() - gcA := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, config.GCPolicy{TTLSeconds: 1}) - gcB := MakeGarbageCollector(hlc.Timestamp{WallTime: 0, Logical: 0}, config.GCPolicy{TTLSeconds: 2}) - n := []byte("data") - d := []byte(nil) - testData := []struct { - gc GarbageCollector - time hlc.Timestamp - keys []engine.MVCCKey - values [][]byte - expIdx int - expDelTS hlc.Timestamp - }{ - {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 0, Logical: 0}, aKeys, [][]byte{d, d, d}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 0, Logical: 0}, bKeys, [][]byte{d, d}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 1E9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcB, hlc.Timestamp{WallTime: 1E9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 2E9, Logical: 0}, aKeys, [][]byte{n, n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 2E9, Logical: 0}, aKeys, [][]byte{d, d, d}, 2, hlc.Timestamp{WallTime: 1E9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 2E9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 3E9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1E9, Logical: 1}}, - {gcA, hlc.Timestamp{WallTime: 3E9, Logical: 0}, aKeys, [][]byte{d, n, n}, 0, hlc.Timestamp{WallTime: 2E9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 3E9, Logical: 0}, bKeys, [][]byte{n, n}, -1, hlc.Timestamp{}}, - {gcA, hlc.Timestamp{WallTime: 4E9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1E9, Logical: 1}}, - {gcB, hlc.Timestamp{WallTime: 4E9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1E9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 4E9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2E9, Logical: 0}}, - {gcA, hlc.Timestamp{WallTime: 5E9, Logical: 0}, aKeys, [][]byte{n, n, n}, 1, hlc.Timestamp{WallTime: 1E9, Logical: 1}}, - {gcB, hlc.Timestamp{WallTime: 5E9, Logical: 0}, bKeys, [][]byte{n, n}, 1, hlc.Timestamp{WallTime: 1E9, Logical: 0}}, - {gcB, hlc.Timestamp{WallTime: 5E9, Logical: 0}, bKeys, [][]byte{d, n}, 0, hlc.Timestamp{WallTime: 2E9, Logical: 0}}, - } - for i, test := range testData { - test.gc.Threshold = test.time - test.gc.Threshold.WallTime -= int64(test.gc.policy.TTLSeconds) * 1E9 - idx, delTS := test.gc.Filter(test.keys, test.values) - if idx != test.expIdx { - t.Errorf("%d: expected index %d; got %d", i, test.expIdx, idx) - } - if delTS != test.expDelTS { - t.Errorf("%d: expected deletion timestamp %s; got %s", i, test.expDelTS, delTS) - } - } -} diff --git a/pkg/storage/gc/run_gc.go b/pkg/storage/gc/run_gc.go deleted file mode 100644 index 0a7b8d1ca511..000000000000 --- a/pkg/storage/gc/run_gc.go +++ /dev/null @@ -1,464 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -// Package gc contains the logic to run scan a range for garbage and issue -// GC requests to remove that garbage. -// -// The Run function is the primary entrypoint and is called underneath the -// gcQueue in the storage package. It can also be run for debugging. -package gc - -import ( - "context" - "fmt" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/abortspan" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/errors" -) - -const ( - // IntentAgeThreshold is the threshold after which an extant intent - // will be resolved. - IntentAgeThreshold = 2 * time.Hour // 2 hour - - // KeyVersionChunkBytes is the threshold size for splitting - // GCRequests into multiple batches. - KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes -) - -// A GCer is an abstraction used by the GC queue to carry out chunked deletions. -type GCer interface { - SetGCThreshold(context.Context, GCThreshold) error - GC(context.Context, []roachpb.GCRequest_GCKey) error -} - -// NoopGCer implements GCer by doing nothing. -type NoopGCer struct{} - -var _ GCer = NoopGCer{} - -// SetGCThreshold implements storage.GCer. -func (NoopGCer) SetGCThreshold(context.Context, GCThreshold) error { return nil } - -// GC implements storage.GCer. -func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil } - -// GCThreshold holds the key and txn span GC thresholds, respectively. -type GCThreshold struct { - Key hlc.Timestamp - Txn hlc.Timestamp -} - -// GCInfo contains statistics and insights from a GC run. -type GCInfo struct { - // Now is the timestamp used for age computations. - Now hlc.Timestamp - // Policy is the policy used for this garbage collection cycle. - Policy config.GCPolicy - // Stats about the userspace key-values considered, namely the number of - // keys with GC'able data, the number of "old" intents and the number of - // associated distinct transactions. - NumKeysAffected, IntentsConsidered, IntentTxns int - // TransactionSpanTotal is the total number of entries in the transaction span. - TransactionSpanTotal int - // Summary of transactions which were found GCable (assuming that - // potentially necessary intent resolutions did not fail). - TransactionSpanGCAborted, TransactionSpanGCCommitted int - TransactionSpanGCStaging, TransactionSpanGCPending int - // AbortSpanTotal is the total number of transactions present in the AbortSpan. - AbortSpanTotal int - // AbortSpanConsidered is the number of AbortSpan entries old enough to be - // considered for removal. An "entry" corresponds to one transaction; - // more than one key-value pair may be associated with it. - AbortSpanConsidered int - // AbortSpanGCNum is the number of AbortSpan entries fit for removal (due - // to their transactions having terminated). - AbortSpanGCNum int - // PushTxn is the total number of pushes attempted in this cycle. - PushTxn int - // ResolveTotal is the total number of attempted intent resolutions in - // this cycle. - ResolveTotal int - // Threshold is the computed expiration timestamp. Equal to `Now - Policy`. - Threshold hlc.Timestamp - // AffectedVersionsKeyBytes is the number of (fully encoded) bytes deleted from keys in the storage engine. - // Note that this does not account for compression that the storage engine uses to store data on disk. Real - // space savings tends to be smaller due to this compression, and space may be released only at a later point - // in time. - AffectedVersionsKeyBytes int64 - // AffectedVersionsValBytes is the number of (fully encoded) bytes deleted from values in the storage engine. - // See AffectedVersionsKeyBytes for caveats. - AffectedVersionsValBytes int64 -} - -// A CleanupIntentsFunc synchronously resolves the supplied intents -// (which may be PENDING, in which case they are first pushed) while -// taking care of proper batching. -type CleanupIntentsFunc func(context.Context, []roachpb.Intent) error - -// A cleanupTxnIntentsFunc asynchronously cleans up intents from a -// transaction record, pushing the transaction first if it is -// PENDING. Once all intents are resolved successfully, removes the -// transaction record. -type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.Intent) error - -// RunGC runs garbage collection for the specified descriptor on the -// provided Engine (which is not mutated). It uses the provided gcFn -// to run garbage collection once on all implicated spans, -// cleanupIntentsFn to resolve intents synchronously, and -// cleanupTxnIntentsAsyncFn to asynchronously cleanup intents and -// associated transaction record on success. -func RunGC( - ctx context.Context, - desc *roachpb.RangeDescriptor, - snap engine.Reader, - now hlc.Timestamp, - policy config.GCPolicy, - gcer GCer, - cleanupIntentsFn CleanupIntentsFunc, - cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, -) (GCInfo, error) { - - iter := rditer.NewReplicaDataIterator(desc, snap, - true /* replicatedOnly */, false /* seekEnd */) - defer iter.Close() - - // Compute intent expiration (intent age at which we attempt to resolve). - intentExp := now.Add(-IntentAgeThreshold.Nanoseconds(), 0) - txnExp := now.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0) - - gc := MakeGarbageCollector(now, policy) - - if err := gcer.SetGCThreshold(ctx, GCThreshold{ - Key: gc.Threshold, - Txn: txnExp, - }); err != nil { - return GCInfo{}, errors.Wrap(err, "failed to set GC thresholds") - } - - var batchGCKeys []roachpb.GCRequest_GCKey - var batchGCKeysBytes int64 - var expBaseKey roachpb.Key - var keys []engine.MVCCKey - var vals [][]byte - var keyBytes int64 - var valBytes int64 - info := GCInfo{ - Policy: policy, - Now: now, - Threshold: gc.Threshold, - } - - // Maps from txn ID to txn and intent key slice. - txnMap := map[uuid.UUID]*roachpb.Transaction{} - intentSpanMap := map[uuid.UUID][]roachpb.Span{} - - // processKeysAndValues is invoked with each key and its set of - // values. Intents older than the intent age threshold are sent for - // resolution and values after the MVCC metadata, and possible - // intent, are sent for garbage collection. - processKeysAndValues := func() { - // If there's more than a single value for the key, possibly send for GC. - if len(keys) > 1 { - meta := &enginepb.MVCCMetadata{} - if err := protoutil.Unmarshal(vals[0], meta); err != nil { - log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keys[0], err) - } else { - // In the event that there's an active intent, send for - // intent resolution if older than the threshold. - startIdx := 1 - if meta.Txn != nil { - // Keep track of intent to resolve if older than the intent - // expiration threshold. - if hlc.Timestamp(meta.Timestamp).Less(intentExp) { - txnID := meta.Txn.ID - if _, ok := txnMap[txnID]; !ok { - txnMap[txnID] = &roachpb.Transaction{ - TxnMeta: *meta.Txn, - } - // IntentTxns and PushTxn will be equal here, since - // pushes to transactions whose record lies in this - // range (but which are not associated to a remaining - // intent on it) happen asynchronously and are accounted - // for separately. Thus higher up in the stack, we - // expect PushTxn > IntentTxns. - info.IntentTxns++ - // All transactions in txnMap may be PENDING and - // cleanupIntentsFn will push them to finalize them. - info.PushTxn++ - } - info.IntentsConsidered++ - intentSpanMap[txnID] = append(intentSpanMap[txnID], roachpb.Span{Key: expBaseKey}) - } - // With an active intent, GC ignores MVCC metadata & intent value. - startIdx = 2 - } - // See if any values may be GC'd. - if idx, gcTS := gc.Filter(keys[startIdx:], vals[startIdx:]); gcTS != (hlc.Timestamp{}) { - // Batch keys after the total size of version keys exceeds - // the threshold limit. This avoids sending potentially large - // GC requests through Raft. Iterate through the keys in reverse - // order so that GC requests can be made multiple times even on - // a single key, with successively newer timestamps to prevent - // any single request from exploding during GC evaluation. - for i := len(keys) - 1; i >= startIdx+idx; i-- { - keyBytes = int64(keys[i].EncodedSize()) - valBytes = int64(len(vals[i])) - - // Add the total size of the GC'able versions of the keys and values to GCInfo. - info.AffectedVersionsKeyBytes += keyBytes - info.AffectedVersionsValBytes += valBytes - - batchGCKeysBytes += keyBytes - // If the current key brings the batch over the target - // size, add the current timestamp to finish the current - // chunk and start a new one. - if batchGCKeysBytes >= KeyVersionChunkBytes { - batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp}) - - err := gcer.GC(ctx, batchGCKeys) - - // Succeed or fail, allow releasing the memory backing batchGCKeys. - iter.ResetAllocator() - batchGCKeys = nil - batchGCKeysBytes = 0 - - if err != nil { - // Even though we are batching the GC process, it's - // safe to continue because we bumped the GC - // thresholds. We may leave some inconsistent history - // behind, but nobody can read it. - log.Warning(ctx, err) - return - } - } - } - // Add the key to the batch at the GC timestamp, unless it was already added. - if batchGCKeysBytes != 0 { - batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: gcTS}) - } - info.NumKeysAffected++ - } - } - } - } - - // Iterate through the keys and values of this replica's range. - log.Event(ctx, "iterating through range") - for ; ; iter.Next() { - if ok, err := iter.Valid(); err != nil { - return GCInfo{}, err - } else if !ok { - break - } else if ctx.Err() != nil { - // Stop iterating if our context has expired. - return GCInfo{}, err - } - iterKey := iter.Key() - if !iterKey.IsValue() || !iterKey.Key.Equal(expBaseKey) { - // Moving to the next key (& values). - processKeysAndValues() - expBaseKey = iterKey.Key - if !iterKey.IsValue() { - keys = []engine.MVCCKey{iter.Key()} - vals = [][]byte{iter.Value()} - continue - } - // An implicit metadata. - keys = []engine.MVCCKey{engine.MakeMVCCMetadataKey(iterKey.Key)} - // A nil value for the encoded MVCCMetadata. This will unmarshal to an - // empty MVCCMetadata which is sufficient for processKeysAndValues to - // determine that there is no intent. - vals = [][]byte{nil} - } - keys = append(keys, iter.Key()) - vals = append(vals, iter.Value()) - } - // Handle last collected set of keys/vals. - processKeysAndValues() - if len(batchGCKeys) > 0 { - if err := gcer.GC(ctx, batchGCKeys); err != nil { - return GCInfo{}, err - } - } - - // From now on, all newly added keys are range-local. - - // Process local range key entries (txn records, queue last processed times). - localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) - if err != nil { - return GCInfo{}, err - } - - if err := gcer.GC(ctx, localRangeKeys); err != nil { - return GCInfo{}, err - } - - // Clean up the AbortSpan. - log.Event(ctx, "processing AbortSpan") - abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) - if err := gcer.GC(ctx, abortSpanKeys); err != nil { - return GCInfo{}, err - } - - log.Eventf(ctx, "GC'ed keys; stats %+v", info) - - // Push transactions (if pending) and resolve intents. - var intents []roachpb.Intent - for txnID, txn := range txnMap { - intents = append(intents, roachpb.AsIntents(intentSpanMap[txnID], txn)...) - } - info.ResolveTotal += len(intents) - log.Eventf(ctx, "cleanup of %d intents", len(intents)) - if err := cleanupIntentsFn(ctx, intents); err != nil { - return GCInfo{}, err - } - - return info, nil -} - -// processLocalKeyRange scans the local range key entries, consisting of -// transaction records, queue last processed timestamps, and range descriptors. -// -// - Transaction entries: -// - For expired transactions , schedule the intents for -// asynchronous resolution. The actual transaction spans are not -// returned for GC in this pass, but are separately GC'ed after -// successful resolution of all intents. The exception is if there -// are no intents on the txn record, in which case it's returned for -// immediate GC. -// -// - Queue last processed times: cleanup any entries which don't match -// this range's start key. This can happen on range merges. -func processLocalKeyRange( - ctx context.Context, - snap engine.Reader, - desc *roachpb.RangeDescriptor, - cutoff hlc.Timestamp, - info *GCInfo, - cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, -) ([]roachpb.GCRequest_GCKey, error) { - var gcKeys []roachpb.GCRequest_GCKey - - handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { - // If the transaction needs to be pushed or there are intents to - // resolve, invoke the cleanup function. - if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { - return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsIntents(txn.IntentSpans, txn)) - } - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp - return nil - } - - handleOneTransaction := func(kv roachpb.KeyValue) error { - var txn roachpb.Transaction - if err := kv.Value.GetProto(&txn); err != nil { - return err - } - info.TransactionSpanTotal++ - if !txn.LastActive().Less(cutoff) { - return nil - } - - // The transaction record should be considered for removal. - switch txn.Status { - case roachpb.PENDING: - info.TransactionSpanGCPending++ - case roachpb.STAGING: - info.TransactionSpanGCStaging++ - case roachpb.ABORTED: - info.TransactionSpanGCAborted++ - case roachpb.COMMITTED: - info.TransactionSpanGCCommitted++ - default: - panic(fmt.Sprintf("invalid transaction state: %s", txn)) - } - return handleTxnIntents(kv.Key, &txn) - } - - handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { - if !rangeKey.Equal(desc.StartKey) { - // Garbage collect the last processed timestamp if it doesn't match start key. - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp - } - return nil - } - - handleOne := func(kv roachpb.KeyValue) error { - rangeKey, suffix, _, err := keys.DecodeRangeKey(kv.Key) - if err != nil { - return err - } - if suffix.Equal(keys.LocalTransactionSuffix.AsRawKey()) { - if err := handleOneTransaction(kv); err != nil { - return err - } - } else if suffix.Equal(keys.LocalQueueLastProcessedSuffix.AsRawKey()) { - if err := handleOneQueueLastProcessed(kv, roachpb.RKey(rangeKey)); err != nil { - return err - } - } - return nil - } - - startKey := keys.MakeRangeKeyPrefix(desc.StartKey) - endKey := keys.MakeRangeKeyPrefix(desc.EndKey) - - _, err := engine.MVCCIterate(ctx, snap, startKey, endKey, hlc.Timestamp{}, engine.MVCCScanOptions{}, - func(kv roachpb.KeyValue) (bool, error) { - return false, handleOne(kv) - }) - return gcKeys, err -} - -// processAbortSpan iterates through the local AbortSpan entries -// and collects entries which indicate that a client which was running -// this transaction must have realized that it has been aborted (due to -// heartbeating having failed). The parameter minAge is typically a -// multiple of the heartbeat timeout used by the coordinator. -// -// TODO(tschottdorf): this could be done in Replica.GC itself, but it's -// handy to have it here for stats (though less performant due to sending -// all of the keys over the wire). -func processAbortSpan( - ctx context.Context, - snap engine.Reader, - rangeID roachpb.RangeID, - threshold hlc.Timestamp, - info *GCInfo, -) []roachpb.GCRequest_GCKey { - var gcKeys []roachpb.GCRequest_GCKey - abortSpan := abortspan.New(rangeID) - if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { - info.AbortSpanTotal++ - if v.Timestamp.Less(threshold) { - info.AbortSpanGCNum++ - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) - } - return nil - }); err != nil { - // Still return whatever we managed to collect. - log.Warning(ctx, err) - } - return gcKeys -} diff --git a/pkg/storage/gc_queue.go b/pkg/storage/gc_queue.go index 1704a857aca7..7fc378b5d7b0 100644 --- a/pkg/storage/gc_queue.go +++ b/pkg/storage/gc_queue.go @@ -340,7 +340,7 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error { return nil } -func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh gc.GCThreshold) error { +func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh gc.Threshold) error { req := r.template() req.Threshold = thresh.Key return r.send(ctx, req) @@ -424,7 +424,7 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg *config.S return nil } -func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.GCInfo) { +func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.Info) { metrics.GCNumKeysAffected.Inc(int64(info.NumKeysAffected)) metrics.GCIntentsConsidered.Inc(int64(info.IntentsConsidered)) metrics.GCIntentTxns.Inc(int64(info.IntentTxns)) diff --git a/pkg/storage/gc_queue_test.go b/pkg/storage/gc_queue_test.go index eef0fb13f8e7..caf9c51d1411 100644 --- a/pkg/storage/gc_queue_test.go +++ b/pkg/storage/gc_queue_test.go @@ -69,7 +69,7 @@ func TestGCQueueScoreString(t *testing.T) { }, `queue=true with 4.31/fuzz(1.25)=3.45=valScaleScore(4.00)*deadFrac(0.25)+intentScore(0.45) likely last GC: 5s ago, 3.0 KiB non-live, curr. age 512 KiB*s, min exp. reduction: 256 KiB*s`}, - // Check case of empty GCThreshold. + // Check case of empty Threshold. {gcQueueScore{ShouldQueue: true}, `queue=true with 0.00/fuzz(0.00)=NaN=valScaleScore(0.00)*deadFrac(0.00)+intentScore(0.00) likely last GC: never, 0 B non-live, curr. age 0 B*s, min exp. reduction: 0 B*s`}, } { @@ -478,7 +478,7 @@ func TestGCQueueProcess(t *testing.T) { t.Fatal("config not set") } - // The total size of the GC'able versions of the keys and values in GCInfo. + // The total size of the GC'able versions of the keys and values in Info. // Key size: len("a") + MVCCVersionTimestampSize (13 bytes) = 14 bytes. // Value size: len("value") + headerSize (5 bytes) = 10 bytes. // key1 at ts1 (14 bytes) => "value" (10 bytes) @@ -491,8 +491,8 @@ func TestGCQueueProcess(t *testing.T) { var expectedVersionsKeyBytes int64 = 7 * 14 var expectedVersionsValBytes int64 = 5 * 10 - // Call RunGC with dummy functions to get current GCInfo. - gcInfo, err := func() (gc.GCInfo, error) { + // Call Run with dummy functions to get current Info. + gcInfo, err := func() (gc.Info, error) { snap := tc.repl.store.Engine().NewSnapshot() desc := tc.repl.Desc() defer snap.Close() @@ -503,7 +503,7 @@ func TestGCQueueProcess(t *testing.T) { } now := tc.Clock().Now() - return gc.RunGC(ctx, desc, snap, now, *zone.GC, + return gc.Run(ctx, desc, snap, now, *zone.GC, gc.NoopGCer{}, func(ctx context.Context, intents []roachpb.Intent) error { return nil diff --git a/pkg/storage/rditer/replica_data_iter.go b/pkg/storage/rditer/replica_data_iter.go index 47027c1b5a19..cf786895ef45 100644 --- a/pkg/storage/rditer/replica_data_iter.go +++ b/pkg/storage/rditer/replica_data_iter.go @@ -217,6 +217,18 @@ func (ri *ReplicaDataIterator) Value() []byte { return value } +// UnsafeKey returns the same value as Key, but the memory is invalidated on +// the next call to {Next,Prev,Close}. +func (ri *ReplicaDataIterator) UnsafeKey() engine.MVCCKey { + return ri.it.UnsafeKey() +} + +// UnsafeValue returns the same value as Value, but the memory is invalidated on +// the next call to {Next,Prev,Close}. +func (ri *ReplicaDataIterator) UnsafeValue() []byte { + return ri.it.UnsafeValue() +} + // ResetAllocator resets the ReplicaDataIterator's internal byte allocator. func (ri *ReplicaDataIterator) ResetAllocator() { ri.a = nil