Skip to content

Commit

Permalink
gc: paginate GC of local key ranges
Browse files Browse the repository at this point in the history
Touches #40815.

I'm not calling it fixed yet because there are two remaining issues:

1. GC will not trigger automatically based on a large abort span. This
   means that manual intervention is needed in such a case.
2. The local key ranges are processed after the user key ranges.
   Even if no user keys are GC'able, all of it has to be scanned
   regardless (to arrive at that conclusion). With a large enough
   range size, this scan could theoretically consume the entirety
   of the time budget.

However, in practice we expect operators to be able to "fix" any
range via a manual GC once this PR is merged. Follow-up work needs
to be done to address 1) and 2) above.

Release note (bug fix): Improved the ability of garbage collection
process to make process through ranges exhibiting abnormally large
numbers of transaction records and/or abort span entries.
  • Loading branch information
tbg committed Feb 28, 2020
1 parent da84c81 commit 4c86748
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 37 deletions.
85 changes: 60 additions & 25 deletions pkg/storage/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ const (
IntentAgeThreshold = 2 * time.Hour // 2 hour

// KeyVersionChunkBytes is the threshold size for splitting
// GCRequests into multiple batches.
// GCRequests into multiple batches. The goal is that the evaluated
// Raft command for each GCRequest does not significantly exceed
// this threshold.
KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes
)

Expand Down Expand Up @@ -186,24 +188,16 @@ func Run(
return Info{}, err
}

// From now on, all newly added keys are range-local.
// From now on, all keys processed are range-local and inline (zero timestamp).

// Process local range key entries (txn records, queue last processed times).
localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn)
if err != nil {
return Info{}, err
}

if err := gcer.GC(ctx, localRangeKeys); err != nil {
return Info{}, err
if err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn, gcer); err != nil {
log.Warningf(ctx, "while gc'ing local key range: %s", err)
}

// Clean up the AbortSpan.
log.Event(ctx, "processing AbortSpan")
abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info)
if err := gcer.GC(ctx, abortSpanKeys); err != nil {
return Info{}, err
}
processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info, gcer)

log.Eventf(ctx, "GC'ed keys; stats %+v", info)

Expand Down Expand Up @@ -408,16 +402,20 @@ func processLocalKeyRange(
cutoff hlc.Timestamp,
info *Info,
cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc,
) ([]roachpb.GCRequest_GCKey, error) {
var gcKeys []roachpb.GCRequest_GCKey
gcer PureGCer,
) error {
b := makeBatchingInlineGCer(gcer, func(err error) {
log.Warningf(ctx, "failed to GC from local key range: %s", err)
})
defer b.Flush(ctx)

handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error {
// If the transaction needs to be pushed or there are intents to
// resolve, invoke the cleanup function.
if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 {
return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsLockUpdates(txn, txn.IntentSpans, lock.Replicated))
}
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp
b.FlushingAdd(ctx, key)
return nil
}

Expand Down Expand Up @@ -450,7 +448,7 @@ func processLocalKeyRange(
handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error {
if !rangeKey.Equal(desc.StartKey) {
// Garbage collect the last processed timestamp if it doesn't match start key.
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp
b.FlushingAdd(ctx, kv.Key)
}
return nil
}
Expand Down Expand Up @@ -479,7 +477,7 @@ func processLocalKeyRange(
func(kv roachpb.KeyValue) (bool, error) {
return false, handleOne(kv)
})
return gcKeys, err
return err
}

// processAbortSpan iterates through the local AbortSpan entries
Expand All @@ -493,19 +491,56 @@ func processAbortSpan(
rangeID roachpb.RangeID,
threshold hlc.Timestamp,
info *Info,
) []roachpb.GCRequest_GCKey {
var gcKeys []roachpb.GCRequest_GCKey
gcer PureGCer,
) {
b := makeBatchingInlineGCer(gcer, func(err error) {
log.Warningf(ctx, "unable to GC from abort span: %s", err)
})
defer b.Flush(ctx)
abortSpan := abortspan.New(rangeID)
if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error {
err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error {
info.AbortSpanTotal++
if v.Timestamp.Less(threshold) {
info.AbortSpanGCNum++
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key})
b.FlushingAdd(ctx, key)
}
return nil
}); err != nil {
// Still return whatever we managed to collect.
})
if err != nil {
log.Warning(ctx, err)
}
return gcKeys
}

// batchingInlineGCer is a helper to paginate the GC of inline (i.e. zero
// timestamp keys). After creation, keys are added via FlushingAdd(). A
// final call to Flush() empties out the buffer when all keys were added.
type batchingInlineGCer struct {
gcer PureGCer
onErr func(error)

size int
max int
gcKeys []roachpb.GCRequest_GCKey
}

func makeBatchingInlineGCer(gcer PureGCer, onErr func(error)) batchingInlineGCer {
return batchingInlineGCer{gcer: gcer, onErr: onErr, max: KeyVersionChunkBytes}
}

func (b *batchingInlineGCer) FlushingAdd(ctx context.Context, key roachpb.Key) {
b.gcKeys = append(b.gcKeys, roachpb.GCRequest_GCKey{Key: key})
b.size += len(key)
if b.size < b.max {
return
}
b.Flush(ctx)
}

func (b *batchingInlineGCer) Flush(ctx context.Context) {
err := b.gcer.GC(ctx, b.gcKeys)
b.gcKeys = nil
b.size = 0
if err != nil {
b.onErr(err)
}
}
16 changes: 4 additions & 12 deletions pkg/storage/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,24 +213,16 @@ func runGCOld(
}
}

// From now on, all newly added keys are range-local.
// From now on, all keys processed are range-local.

// Process local range key entries (txn records, queue last processed times).
localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn)
if err != nil {
return Info{}, err
}

if err := gcer.GC(ctx, localRangeKeys); err != nil {
return Info{}, err
if err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn, gcer); err != nil {
log.Warningf(ctx, "while gc'ing local key range: %s", err)
}

// Clean up the AbortSpan.
log.Event(ctx, "processing AbortSpan")
abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info)
if err := gcer.GC(ctx, abortSpanKeys); err != nil {
return Info{}, err
}
processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info, gcer)

log.Eventf(ctx, "GC'ed keys; stats %+v", info)

Expand Down
56 changes: 56 additions & 0 deletions pkg/storage/gc/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
package gc

import (
"bytes"
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

Expand All @@ -33,3 +37,55 @@ func TestCalculateThreshold(t *testing.T) {
require.Equal(t, c.ts, TimestampForThreshold(CalculateThreshold(c.ts, policy), policy))
}
}

type collectingGCer struct {
keys [][]roachpb.GCRequest_GCKey
}

func (c *collectingGCer) GC(_ context.Context, keys []roachpb.GCRequest_GCKey) error {
c.keys = append(c.keys, keys)
return nil
}

func TestBatchingInlineGCer(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
c := &collectingGCer{}
m := makeBatchingInlineGCer(c, func(err error) { t.Error(err) })
if m.max == 0 {
t.Fatal("did not init max")
}
m.max = 10 // something reasonable for this unit test

long := roachpb.GCRequest_GCKey{
Key: bytes.Repeat([]byte("x"), m.max-1),
}
short := roachpb.GCRequest_GCKey{
Key: roachpb.Key("q"),
}

m.FlushingAdd(ctx, long.Key)
require.Nil(t, c.keys) // no flush

m.FlushingAdd(ctx, short.Key)
// Flushed long and short.
require.Len(t, c.keys, 1)
require.Len(t, c.keys[0], 2)
require.Equal(t, long, c.keys[0][0])
require.Equal(t, short, c.keys[0][1])
// Reset itself properly.
require.Nil(t, m.gcKeys)
require.Zero(t, m.size)

m.FlushingAdd(ctx, short.Key)
require.Len(t, c.keys, 1) // no flush

m.Flush(ctx)
require.Len(t, c.keys, 2) // flushed
require.Len(t, c.keys[1], 1)
require.Equal(t, short, c.keys[1][0])
// Reset itself properly.
require.Nil(t, m.gcKeys)
require.Zero(t, m.size)
}

0 comments on commit 4c86748

Please sign in to comment.