Skip to content

Commit

Permalink
Merge #46803
Browse files Browse the repository at this point in the history
46803: release-20.1: kvcoord: condense read spans when they exceed the memory limit r=andreimatei a=andreimatei

Backport 7/7 commits from #46275.

/cc @cockroachdb/release

---

Before this patch, once a transaction exceeds the
kv.transaction.max_refresh_spans_bytes limit, it stopped tracking reads
and it didn't attempt to refresh any more when pushed.
This patch make the span refresher condense the spans when it runs out
of memory instead. So we'll get bigger spans and potentially false
conflicts, but at least we have a chance at refreshing. In particular,
it'll succeed if there's no writes anywhere.

The condensing is performed using the condensableSpanSet, like we do in
the pipeliner interceptor for the tracking of write intents. Internally,
that guy condenses spans in ranges with lots of reads.

We've seen people run into kv.transaction.max_refresh_spans_bytes in the
past, so this should help many uses cases. But in particular I've
written this patch because, without it, I'm scared about the effects of
20.1's reduction in the closed timestamp target duration to 3s from a
previous 30s. Every transaction writing something after having run for
longer than that will get pushed, so being able to refresh is getting
more important.

Fixes #46095

Release note (general change): Transactions reading a lot of data behave
better when exceeding the memory limit set by
kv.transaction.max_refresh_spans_bytes. Such transactions now attempt to
resolve the conflicts they run into instead of being forced to always
retry. Increasing kv.transaction.max_refresh_spans_bytes should no
longer be necessary for most workloads.

Release justification: fix for new "functionality" - the reduction in
the closed timestamp target duration.


Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Mar 31, 2020
2 parents 3b6c338 + f0a16a1 commit e17e3ad
Show file tree
Hide file tree
Showing 23 changed files with 565 additions and 385 deletions.
182 changes: 182 additions & 0 deletions pkg/kv/kvclient/kvcoord/condensable_span_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// condensableSpanSet is a set of key spans that is condensable in order to
// stay below some maximum byte limit. Condensing of the set happens in two
// ways. Initially, overlapping spans are merged together to deduplicate
// redundant keys. If that alone isn't sufficient to stay below the byte limit,
// spans within the same Range will be merged together. This can cause the
// "footprint" of the set to grow, so the set should be thought of as on
// overestimate.
type condensableSpanSet struct {
s []roachpb.Span
bytes int64

// condensed is set if we ever condensed the spans. Meaning, if the set of
// spans currently tracked has lost fidelity compared to the spans inserted.
// Note that we might have otherwise mucked with the inserted spans to save
// memory without losing fidelity, in which case this flag would not be set
// (e.g. merging overlapping or adjacent spans).
condensed bool
}

// insert adds new spans to the condensable span set. No attempt to condense the
// set or deduplicate the new span with existing spans is made.
func (s *condensableSpanSet) insert(spans ...roachpb.Span) {
s.s = append(s.s, spans...)
for _, sp := range spans {
s.bytes += spanSize(sp)
}
}

// mergeAndSort merges all overlapping spans. Calling this method will not
// increase the overall bounds of the span set, but will eliminate duplicated
// spans and combine overlapping spans.
//
// The method has the side effect of sorting the stable write set.
func (s *condensableSpanSet) mergeAndSort() {
oldLen := len(s.s)
s.s, _ = roachpb.MergeSpans(s.s)
// Recompute the size if anything has changed.
if oldLen != len(s.s) {
s.bytes = 0
for _, sp := range s.s {
s.bytes += spanSize(sp)
}
}
}

// maybeCondense is similar in spirit to mergeAndSort, but it only adjusts the
// span set when the maximum byte limit is exceeded. However, when this limit is
// exceeded, the method is more aggressive in its attempt to reduce the memory
// footprint of the span set. Not only will it merge overlapping spans, but
// spans within the same range boundaries are also condensed.
//
// Returns true if condensing was done. Note that, even if condensing was
// performed, this doesn't guarantee that the size was reduced below the byte
// limit. Condensing is only performed at the level of individual ranges, not
// across ranges, so it's possible to not be able to condense as much as
// desired.
func (s *condensableSpanSet) maybeCondense(
ctx context.Context, riGen rangeIteratorFactory, maxBytes int64,
) bool {
if s.bytes < maxBytes {
return false
}

// Start by attempting to simply merge the spans within the set. This alone
// may bring us under the byte limit. Even if it doesn't, this step has the
// nice property that it sorts the spans by start key, which we rely on
// lower in this method.
s.mergeAndSort()
if s.bytes < maxBytes {
return false
}

ri := riGen.newRangeIterator()

// Divide spans by range boundaries and condense. Iterate over spans
// using a range iterator and add each to a bucket keyed by range
// ID. Local keys are kept in a new slice and not added to buckets.
type spanBucket struct {
rangeID roachpb.RangeID
bytes int64
spans []roachpb.Span
}
var buckets []spanBucket
var localSpans []roachpb.Span
for _, sp := range s.s {
if keys.IsLocal(sp.Key) {
localSpans = append(localSpans, sp)
continue
}
ri.Seek(ctx, roachpb.RKey(sp.Key), Ascending)
if !ri.Valid() {
// We haven't modified s.s yet, so it is safe to return.
log.Warningf(ctx, "failed to condense lock spans: %v", ri.Error())
return false
}
rangeID := ri.Desc().RangeID
if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID {
buckets[l-1].spans = append(buckets[l-1].spans, sp)
} else {
buckets = append(buckets, spanBucket{
rangeID: rangeID, spans: []roachpb.Span{sp},
})
}
buckets[len(buckets)-1].bytes += spanSize(sp)
}

// Sort the buckets by size and collapse from largest to smallest
// until total size of uncondensed spans no longer exceeds threshold.
sort.Slice(buckets, func(i, j int) bool { return buckets[i].bytes > buckets[j].bytes })
s.s = localSpans // reset to hold just the local spans; will add newly condensed and remainder
for _, bucket := range buckets {
// Condense until we get to half the threshold.
if s.bytes <= maxBytes/2 {
// Collect remaining spans from each bucket into uncondensed slice.
s.s = append(s.s, bucket.spans...)
continue
}
s.bytes -= bucket.bytes
// TODO(spencer): consider further optimizations here to create
// more than one span out of a bucket to avoid overly broad span
// combinations.
cs := bucket.spans[0]
for _, s := range bucket.spans[1:] {
cs = cs.Combine(s)
if !cs.Valid() {
// If we didn't fatal here then we would need to ensure that the
// spans were restored or a transaction could lose part of its
// lock footprint.
log.Fatalf(ctx, "failed to condense lock spans: "+
"combining span %s yielded invalid result", s)
}
}
s.bytes += spanSize(cs)
s.s = append(s.s, cs)
}
s.condensed = true
return true
}

// asSlice returns the set as a slice of spans.
func (s *condensableSpanSet) asSlice() []roachpb.Span {
l := len(s.s)
return s.s[:l:l] // immutable on append
}

// empty returns whether the set is empty or whether it contains spans.
func (s *condensableSpanSet) empty() bool {
return len(s.s) == 0
}

func (s *condensableSpanSet) clear() {
*s = condensableSpanSet{}
}

func spanSize(sp roachpb.Span) int64 {
return int64(len(sp.Key) + len(sp.EndKey))
}

func keySize(k roachpb.Key) int64 {
return int64(len(k))
}
31 changes: 31 additions & 0 deletions pkg/kv/kvclient/kvcoord/condensable_span_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// Test that the size of the condensableSpanSet is properly maintained when
// contiguous spans are merged.
func TestCondensableSpanSetMergeContiguousSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
s := condensableSpanSet{}
s.insert(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")})
s.insert(roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")})
require.Equal(t, int64(4), s.bytes)
s.mergeAndSort()
require.Equal(t, int64(2), s.bytes)
}
20 changes: 9 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,6 @@ type DistSender struct {
// It is copied out of the rpcContext at construction time and used in
// testing.
clusterID *base.ClusterIDContainer
// rangeIteratorGen returns a range iterator bound to the DistSender.
// Used to avoid allocations.
rangeIteratorGen RangeIteratorGen

// disableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the
Expand Down Expand Up @@ -303,7 +300,6 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV)))
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))
ds.rangeIteratorGen = func() *RangeIterator { return NewRangeIterator(ds) }

if g != nil {
ctx := ds.AnnotateCtx(context.Background())
Expand Down Expand Up @@ -482,7 +478,7 @@ func (ds *DistSender) CountRanges(ctx context.Context, rs roachpb.RSpan) (int64,
break
}
}
return count, ri.Error().GoError()
return count, ri.Error()
}

// getDescriptor looks up the range descriptor to use for a query of
Expand Down Expand Up @@ -1079,7 +1075,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ri := NewRangeIterator(ds)
ri.Seek(ctx, seekKey, scanDir)
if !ri.Valid() {
return nil, ri.Error()
return nil, roachpb.NewError(ri.Error())
}
// Take the fast path if this batch fits within a single range.
if !ri.NeedAnother(rs) {
Expand Down Expand Up @@ -1292,7 +1288,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(

// We've exited early. Return the range iterator error.
responseCh := make(chan response, 1)
responseCh <- response{pErr: ri.Error()}
responseCh <- response{pErr: roachpb.NewError(ri.Error())}
responseChs = append(responseChs, responseCh)
return
}
Expand Down Expand Up @@ -1473,22 +1469,24 @@ func (ds *DistSender) sendPartialBatch(
// Propagate error if either the retry closer or context done
// channels were closed.
if pErr == nil {
if pErr = ds.deduceRetryEarlyExitError(ctx); pErr == nil {
if err := ds.deduceRetryEarlyExitError(ctx); err == nil {
log.Fatal(ctx, "exited retry loop without an error")
} else {
pErr = roachpb.NewError(err)
}
}

return response{pErr: pErr}
}

func (ds *DistSender) deduceRetryEarlyExitError(ctx context.Context) *roachpb.Error {
func (ds *DistSender) deduceRetryEarlyExitError(ctx context.Context) error {
select {
case <-ds.rpcRetryOptions.Closer:
// Typically happens during shutdown.
return roachpb.NewError(&roachpb.NodeUnavailableError{})
return &roachpb.NodeUnavailableError{}
case <-ctx.Done():
// Happens when the client request is canceled.
return roachpb.NewError(errors.Wrap(ctx.Err(), "aborted in distSender"))
return errors.Wrap(ctx.Err(), "aborted in distSender")
default:
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
break
}
}
return ri.Error().GoError()
return ri.Error()
}

// partialRangeFeed establishes a RangeFeed to the range specified by desc. It
Expand Down
Loading

0 comments on commit e17e3ad

Please sign in to comment.