Skip to content

Commit

Permalink
Merge #45444
Browse files Browse the repository at this point in the history
45444:  gc: paginate GC of local key ranges  r=ajwerner a=tbg

Touches #40815.

I'm not calling it fixed yet because there are two remaining issues:

1. GC will not trigger automatically based on a large abort span. This
   means that manual intervention is needed in such a case.
2. The local key ranges are processed after the user key ranges.
   Even if no user keys are GC'able, all of it has to be scanned
   regardless (to arrive at that conclusion). With a large enough
   range size, this scan could theoretically consume the entirety
   of the time budget.

However, in practice we expect operators to be able to "fix" any
range via a manual GC once this PR is merged. Follow-up work needs
to be done to address 1) and 2) above.

Release note (bug fix): Improved the ability of garbage collection
process to make process through ranges exhibiting abnormally large
numbers of transaction records and/or abort span entries.

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 28, 2020
2 parents dcf8c44 + 4c86748 commit 22544bf
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 39 deletions.
99 changes: 72 additions & 27 deletions pkg/storage/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ const (
IntentAgeThreshold = 2 * time.Hour // 2 hour

// KeyVersionChunkBytes is the threshold size for splitting
// GCRequests into multiple batches.
// GCRequests into multiple batches. The goal is that the evaluated
// Raft command for each GCRequest does not significantly exceed
// this threshold.
KeyVersionChunkBytes = base.ChunkRaftCommandThresholdBytes
)

Expand All @@ -61,12 +63,22 @@ func TimestampForThreshold(threshold hlc.Timestamp, policy zonepb.GCPolicy) (ts
return threshold.Add(ttlNanos, 0)
}

// A GCer is an abstraction used by the GC queue to carry out chunked deletions.
type GCer interface {
// Thresholder is part of the GCer interface.
type Thresholder interface {
SetGCThreshold(context.Context, Threshold) error
}

// PureGCer is part of the GCer interface.
type PureGCer interface {
GC(context.Context, []roachpb.GCRequest_GCKey) error
}

// A GCer is an abstraction used by the GC queue to carry out chunked deletions.
type GCer interface {
Thresholder
PureGCer
}

// NoopGCer implements GCer by doing nothing.
type NoopGCer struct{}

Expand Down Expand Up @@ -176,24 +188,16 @@ func Run(
return Info{}, err
}

// From now on, all newly added keys are range-local.
// From now on, all keys processed are range-local and inline (zero timestamp).

// Process local range key entries (txn records, queue last processed times).
localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn)
if err != nil {
return Info{}, err
}

if err := gcer.GC(ctx, localRangeKeys); err != nil {
return Info{}, err
if err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn, gcer); err != nil {
log.Warningf(ctx, "while gc'ing local key range: %s", err)
}

// Clean up the AbortSpan.
log.Event(ctx, "processing AbortSpan")
abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info)
if err := gcer.GC(ctx, abortSpanKeys); err != nil {
return Info{}, err
}
processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info, gcer)

log.Eventf(ctx, "GC'ed keys; stats %+v", info)

Expand Down Expand Up @@ -398,16 +402,20 @@ func processLocalKeyRange(
cutoff hlc.Timestamp,
info *Info,
cleanupTxnIntentsAsyncFn CleanupTxnIntentsAsyncFunc,
) ([]roachpb.GCRequest_GCKey, error) {
var gcKeys []roachpb.GCRequest_GCKey
gcer PureGCer,
) error {
b := makeBatchingInlineGCer(gcer, func(err error) {
log.Warningf(ctx, "failed to GC from local key range: %s", err)
})
defer b.Flush(ctx)

handleTxnIntents := func(key roachpb.Key, txn *roachpb.Transaction) error {
// If the transaction needs to be pushed or there are intents to
// resolve, invoke the cleanup function.
if !txn.Status.IsFinalized() || len(txn.IntentSpans) > 0 {
return cleanupTxnIntentsAsyncFn(ctx, txn, roachpb.AsLockUpdates(txn, txn.IntentSpans, lock.Replicated))
}
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) // zero timestamp
b.FlushingAdd(ctx, key)
return nil
}

Expand Down Expand Up @@ -440,7 +448,7 @@ func processLocalKeyRange(
handleOneQueueLastProcessed := func(kv roachpb.KeyValue, rangeKey roachpb.RKey) error {
if !rangeKey.Equal(desc.StartKey) {
// Garbage collect the last processed timestamp if it doesn't match start key.
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp
b.FlushingAdd(ctx, kv.Key)
}
return nil
}
Expand Down Expand Up @@ -469,7 +477,7 @@ func processLocalKeyRange(
func(kv roachpb.KeyValue) (bool, error) {
return false, handleOne(kv)
})
return gcKeys, err
return err
}

// processAbortSpan iterates through the local AbortSpan entries
Expand All @@ -483,19 +491,56 @@ func processAbortSpan(
rangeID roachpb.RangeID,
threshold hlc.Timestamp,
info *Info,
) []roachpb.GCRequest_GCKey {
var gcKeys []roachpb.GCRequest_GCKey
gcer PureGCer,
) {
b := makeBatchingInlineGCer(gcer, func(err error) {
log.Warningf(ctx, "unable to GC from abort span: %s", err)
})
defer b.Flush(ctx)
abortSpan := abortspan.New(rangeID)
if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error {
err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error {
info.AbortSpanTotal++
if v.Timestamp.Less(threshold) {
info.AbortSpanGCNum++
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key})
b.FlushingAdd(ctx, key)
}
return nil
}); err != nil {
// Still return whatever we managed to collect.
})
if err != nil {
log.Warning(ctx, err)
}
return gcKeys
}

// batchingInlineGCer is a helper to paginate the GC of inline (i.e. zero
// timestamp keys). After creation, keys are added via FlushingAdd(). A
// final call to Flush() empties out the buffer when all keys were added.
type batchingInlineGCer struct {
gcer PureGCer
onErr func(error)

size int
max int
gcKeys []roachpb.GCRequest_GCKey
}

func makeBatchingInlineGCer(gcer PureGCer, onErr func(error)) batchingInlineGCer {
return batchingInlineGCer{gcer: gcer, onErr: onErr, max: KeyVersionChunkBytes}
}

func (b *batchingInlineGCer) FlushingAdd(ctx context.Context, key roachpb.Key) {
b.gcKeys = append(b.gcKeys, roachpb.GCRequest_GCKey{Key: key})
b.size += len(key)
if b.size < b.max {
return
}
b.Flush(ctx)
}

func (b *batchingInlineGCer) Flush(ctx context.Context) {
err := b.gcer.GC(ctx, b.gcKeys)
b.gcKeys = nil
b.size = 0
if err != nil {
b.onErr(err)
}
}
16 changes: 4 additions & 12 deletions pkg/storage/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,24 +213,16 @@ func runGCOld(
}
}

// From now on, all newly added keys are range-local.
// From now on, all keys processed are range-local.

// Process local range key entries (txn records, queue last processed times).
localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn)
if err != nil {
return Info{}, err
}

if err := gcer.GC(ctx, localRangeKeys); err != nil {
return Info{}, err
if err := processLocalKeyRange(ctx, snap, desc, txnExp, &info, cleanupTxnIntentsAsyncFn, gcer); err != nil {
log.Warningf(ctx, "while gc'ing local key range: %s", err)
}

// Clean up the AbortSpan.
log.Event(ctx, "processing AbortSpan")
abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info)
if err := gcer.GC(ctx, abortSpanKeys); err != nil {
return Info{}, err
}
processAbortSpan(ctx, snap, desc.RangeID, txnExp, &info, gcer)

log.Eventf(ctx, "GC'ed keys; stats %+v", info)

Expand Down
56 changes: 56 additions & 0 deletions pkg/storage/gc/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
package gc

import (
"bytes"
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

Expand All @@ -33,3 +37,55 @@ func TestCalculateThreshold(t *testing.T) {
require.Equal(t, c.ts, TimestampForThreshold(CalculateThreshold(c.ts, policy), policy))
}
}

type collectingGCer struct {
keys [][]roachpb.GCRequest_GCKey
}

func (c *collectingGCer) GC(_ context.Context, keys []roachpb.GCRequest_GCKey) error {
c.keys = append(c.keys, keys)
return nil
}

func TestBatchingInlineGCer(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
c := &collectingGCer{}
m := makeBatchingInlineGCer(c, func(err error) { t.Error(err) })
if m.max == 0 {
t.Fatal("did not init max")
}
m.max = 10 // something reasonable for this unit test

long := roachpb.GCRequest_GCKey{
Key: bytes.Repeat([]byte("x"), m.max-1),
}
short := roachpb.GCRequest_GCKey{
Key: roachpb.Key("q"),
}

m.FlushingAdd(ctx, long.Key)
require.Nil(t, c.keys) // no flush

m.FlushingAdd(ctx, short.Key)
// Flushed long and short.
require.Len(t, c.keys, 1)
require.Len(t, c.keys[0], 2)
require.Equal(t, long, c.keys[0][0])
require.Equal(t, short, c.keys[0][1])
// Reset itself properly.
require.Nil(t, m.gcKeys)
require.Zero(t, m.size)

m.FlushingAdd(ctx, short.Key)
require.Len(t, c.keys, 1) // no flush

m.Flush(ctx)
require.Len(t, c.keys, 2) // flushed
require.Len(t, c.keys[1], 1)
require.Equal(t, short, c.keys[1][0])
// Reset itself properly.
require.Nil(t, m.gcKeys)
require.Zero(t, m.size)
}

0 comments on commit 22544bf

Please sign in to comment.