From 4c86748782c646301d21840d38dfc020d4d96278 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 26 Feb 2020 16:36:43 +0100 Subject: [PATCH] gc: paginate GC of local key ranges Touches #40815. I'm not calling it fixed yet because there are two remaining issues: 1. GC will not trigger automatically based on a large abort span. This means that manual intervention is needed in such a case. 2. The local key ranges are processed after the user key ranges. Even if no user keys are GC'able, all of it has to be scanned regardless (to arrive at that conclusion). With a large enough range size, this scan could theoretically consume the entirety of the time budget. However, in practice we expect operators to be able to "fix" any range via a manual GC once this PR is merged. Follow-up work needs to be done to address 1) and 2) above. Release note (bug fix): Improved the ability of garbage collection process to make process through ranges exhibiting abnormally large numbers of transaction records and/or abort span entries. --- pkg/storage/gc/gc.go | 85 ++++++++++++++++++++++++----------- pkg/storage/gc/gc_old_test.go | 16 ++----- pkg/storage/gc/gc_test.go | 56 +++++++++++++++++++++++ 3 files changed, 120 insertions(+), 37 deletions(-) diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index 8e2e69f95fc6..53bf8679d4da 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -43,7 +43,9 @@ const ( IntentAgeThreshold = 2 * time.Hour // 2 hour // KeyVersionChunkBytes is the threshold size for splitting - // GCRequests into multiple batches. + // GCRequests into multiple batches. The goal is that the evaluated + // Raft command for each GCRequest does not significantly exceed + // this threshold. KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes ) @@ -186,24 +188,16 @@ func Run( return Info{}, err } - // From now on, all newly added keys are range-local. + // From now on, all keys processed are range-local and inline (zero timestamp). // Process local range key entries (txn records, queue last processed times). - localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) - if err != nil { - return Info{}, err - } - - if err := gcer.GC(ctx, localRangeKeys); err != nil { - return Info{}, err + if err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn, gcer); err != nil { + log.Warningf(ctx, "while gc'ing local key range: %s", err) } // Clean up the AbortSpan. log.Event(ctx, "processing AbortSpan") - abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) - if err := gcer.GC(ctx, abortSpanKeys); err != nil { - return Info{}, err - } + processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info, gcer) log.Eventf(ctx, "GC'ed keys; stats %+v", info) @@ -408,8 +402,12 @@ func processLocalKeyRange( cutoff hlc.Timestamp, info *Info, cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc, -) ([]roachpb.GCRequest_GCKey, error) { - var gcKeys []roachpb.GCRequest_GCKey + gcer PureGCer, +) error { + b := makeBatchingInlineGCer(gcer, func(err error) { + log.Warningf(ctx, "failed to GC from local key range: %s", err) + }) + defer b.Flush(ctx) handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error { // If the transaction needs to be pushed or there are intents to @@ -417,7 +415,7 @@ func processLocalKeyRange( if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 { return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsLockUpdates(txn, txn.IntentSpans, lock.Replicated)) } - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp + b.FlushingAdd(ctx, key) return nil } @@ -450,7 +448,7 @@ func processLocalKeyRange( handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error { if !rangeKey.Equal(desc.StartKey) { // Garbage collect the last processed timestamp if it doesn't match start key. - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp + b.FlushingAdd(ctx, kv.Key) } return nil } @@ -479,7 +477,7 @@ func processLocalKeyRange( func(kv roachpb.KeyValue) (bool, error) { return false, handleOne(kv) }) - return gcKeys, err + return err } // processAbortSpan iterates through the local AbortSpan entries @@ -493,19 +491,56 @@ func processAbortSpan( rangeID roachpb.RangeID, threshold hlc.Timestamp, info *Info, -) []roachpb.GCRequest_GCKey { - var gcKeys []roachpb.GCRequest_GCKey + gcer PureGCer, +) { + b := makeBatchingInlineGCer(gcer, func(err error) { + log.Warningf(ctx, "unable to GC from abort span: %s", err) + }) + defer b.Flush(ctx) abortSpan := abortspan.New(rangeID) - if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { + err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { info.AbortSpanTotal++ if v.Timestamp.Less(threshold) { info.AbortSpanGCNum++ - gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) + b.FlushingAdd(ctx, key) } return nil - }); err != nil { - // Still return whatever we managed to collect. + }) + if err != nil { log.Warning(ctx, err) } - return gcKeys +} + +// batchingInlineGCer is a helper to paginate the GC of inline (i.e. zero +// timestamp keys). After creation, keys are added via FlushingAdd(). A +// final call to Flush() empties out the buffer when all keys were added. +type batchingInlineGCer struct { + gcer PureGCer + onErr func(error) + + size int + max int + gcKeys []roachpb.GCRequest_GCKey +} + +func makeBatchingInlineGCer(gcer PureGCer, onErr func(error)) batchingInlineGCer { + return batchingInlineGCer{gcer: gcer, onErr: onErr, max: KeyVersionChunkBytes} +} + +func (b *batchingInlineGCer) FlushingAdd(ctx context.Context, key roachpb.Key) { + b.gcKeys = append(b.gcKeys, roachpb.GCRequest_GCKey{Key: key}) + b.size += len(key) + if b.size < b.max { + return + } + b.Flush(ctx) +} + +func (b *batchingInlineGCer) Flush(ctx context.Context) { + err := b.gcer.GC(ctx, b.gcKeys) + b.gcKeys = nil + b.size = 0 + if err != nil { + b.onErr(err) + } } diff --git a/pkg/storage/gc/gc_old_test.go b/pkg/storage/gc/gc_old_test.go index a586f220eacb..4f34317b6c65 100644 --- a/pkg/storage/gc/gc_old_test.go +++ b/pkg/storage/gc/gc_old_test.go @@ -213,24 +213,16 @@ func runGCOld( } } - // From now on, all newly added keys are range-local. + // From now on, all keys processed are range-local. // Process local range key entries (txn records, queue last processed times). - localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn) - if err != nil { - return Info{}, err - } - - if err := gcer.GC(ctx, localRangeKeys); err != nil { - return Info{}, err + if err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn, gcer); err != nil { + log.Warningf(ctx, "while gc'ing local key range: %s", err) } // Clean up the AbortSpan. log.Event(ctx, "processing AbortSpan") - abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info) - if err := gcer.GC(ctx, abortSpanKeys); err != nil { - return Info{}, err - } + processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info, gcer) log.Eventf(ctx, "GC'ed keys; stats %+v", info) diff --git a/pkg/storage/gc/gc_test.go b/pkg/storage/gc/gc_test.go index 8e00e0b5fba7..efc67fff2f77 100644 --- a/pkg/storage/gc/gc_test.go +++ b/pkg/storage/gc/gc_test.go @@ -11,11 +11,15 @@ package gc import ( + "bytes" + "context" "testing" "time" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/stretchr/testify/require" ) @@ -33,3 +37,55 @@ func TestCalculateThreshold(t *testing.T) { require.Equal(t, c.ts, TimestampForThreshold(CalculateThreshold(c.ts, policy), policy)) } } + +type collectingGCer struct { + keys [][]roachpb.GCRequest_GCKey +} + +func (c *collectingGCer) GC(_ context.Context, keys []roachpb.GCRequest_GCKey) error { + c.keys = append(c.keys, keys) + return nil +} + +func TestBatchingInlineGCer(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + c := &collectingGCer{} + m := makeBatchingInlineGCer(c, func(err error) { t.Error(err) }) + if m.max == 0 { + t.Fatal("did not init max") + } + m.max = 10 // something reasonable for this unit test + + long := roachpb.GCRequest_GCKey{ + Key: bytes.Repeat([]byte("x"), m.max-1), + } + short := roachpb.GCRequest_GCKey{ + Key: roachpb.Key("q"), + } + + m.FlushingAdd(ctx, long.Key) + require.Nil(t, c.keys) // no flush + + m.FlushingAdd(ctx, short.Key) + // Flushed long and short. + require.Len(t, c.keys, 1) + require.Len(t, c.keys[0], 2) + require.Equal(t, long, c.keys[0][0]) + require.Equal(t, short, c.keys[0][1]) + // Reset itself properly. + require.Nil(t, m.gcKeys) + require.Zero(t, m.size) + + m.FlushingAdd(ctx, short.Key) + require.Len(t, c.keys, 1) // no flush + + m.Flush(ctx) + require.Len(t, c.keys, 2) // flushed + require.Len(t, c.keys[1], 1) + require.Equal(t, short, c.keys[1][0]) + // Reset itself properly. + require.Nil(t, m.gcKeys) + require.Zero(t, m.size) +}