Skip to content

Commit

Permalink
storage: paginage GC of abort span
Browse files Browse the repository at this point in the history
Partial (manual) backport of cockroachdb#45444.

Release note: None
  • Loading branch information
tbg committed Feb 27, 2020
1 parent cc0a2ca commit 9b37bbf
Showing 1 changed file with 45 additions and 7 deletions.
52 changes: 45 additions & 7 deletions pkg/storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,23 +445,26 @@ func processAbortSpan(
rangeID roachpb.RangeID,
threshold hlc.Timestamp,
infoMu *lockableGCInfo,
gcer PureGCer,
) []roachpb.GCRequest_GCKey {
var gcKeys []roachpb.GCRequest_GCKey
b := makeBatchingInlineGCer(gcer, func(err error) {
log.Warningf(ctx, "unable to GC from abort span: %s", err)
})
defer b.Flush(ctx)
abortSpan := abortspan.New(rangeID)
infoMu.Lock()
defer infoMu.Unlock()
if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error {
infoMu.AbortSpanTotal++
if v.Timestamp.Less(threshold) {
infoMu.AbortSpanGCNum++
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key})
b.FlushingAdd(ctx, key)
}
return nil
}); err != nil {
// Still return whatever we managed to collect.
log.Warning(ctx, err)
}
return gcKeys
}

// NoopGCer implements GCer by doing nothing.
Expand Down Expand Up @@ -862,10 +865,7 @@ func RunGC(

// Clean up the AbortSpan.
log.Event(ctx, "processing AbortSpan")
abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &infoMu)
if err := gcer.GC(ctx, abortSpanKeys); err != nil {
return GCInfo{}, err
}
processAbortSpan(ctx, snap, desc.RangeID, txnExp, &infoMu, gcer)

infoMu.Lock()
log.Eventf(ctx, "GC'ed keys; stats %+v", infoMu.GCInfo)
Expand Down Expand Up @@ -897,3 +897,41 @@ func (*gcQueue) timer(_ time.Duration) time.Duration {
func (*gcQueue) purgatoryChan() <-chan time.Time {
return nil
}

type PureGCer interface {
GC(context.Context, []roachpb.GCRequest_GCKey) error
}

// batchingInlineGCer is a helper to paginate the GC of inline (i.e. zero
// timestamp keys). After creation, keys are added via FlushingAdd(). A
// final call to Flush() empties out the buffer when all keys were added.
type batchingInlineGCer struct {
gcer PureGCer
onErr func(error)

size int
max int
gcKeys []roachpb.GCRequest_GCKey
}

func makeBatchingInlineGCer(gcer PureGCer, onErr func(error)) batchingInlineGCer {
return batchingInlineGCer{gcer: gcer, onErr: onErr, max: base.ChunkRaftCommandThresholdBytes}
}

func (b *batchingInlineGCer) FlushingAdd(ctx context.Context, key roachpb.Key) {
b.gcKeys = append(b.gcKeys, roachpb.GCRequest_GCKey{Key: key})
b.size += len(key)
if b.size < b.max {
return
}
b.Flush(ctx)
}

func (b *batchingInlineGCer) Flush(ctx context.Context) {
err := b.gcer.GC(ctx, b.gcKeys)
b.gcKeys = nil
b.size = 0
if err != nil {
b.onErr(err)
}
}

0 comments on commit 9b37bbf

Please sign in to comment.