Skip to content

Commit

Permalink
kvcoord: condense read spans when they exceed the memory limit
Browse files Browse the repository at this point in the history
Before this patch, once a transaction exceeds the
kv.transaction.max_refresh_spans_bytes limit, it stopped tracking reads
and it didn't attempt to refresh any more when pushed.
This patch make the span refresher condense the spans when it runs out
of memory instead. So we'll get bigger spans and potentially false
conflicts, but at least we have a chance at refreshing. In particular,
it'll succeed if there's no writes anywhere.

The condensing is performed using the condensableSpanSet, like we do in
the pipeliner interceptor for the tracking of write intents. Internally,
that guy condenses spans in ranges with lots of reads.

We've seen people run into kv.transaction.max_refresh_spans_bytes in the
past, so this should help many uses cases. But in particular I've
written this patch because, without it, I'm scared about the effects of
20.1's reduction in the closed timestamp target duration to 3s from a
previous 30s. Every transaction writing something after having run for
longer than that will get pushed, so being able to refresh is getting
more important.

Fixes #46095

Release note (general change): Transactions reading a lot of data behave
better when exceeding the memory limit set by
kv.transaction.max_refresh_spans_bytes. Such transactions now attempt to
resolve the conflicts they run into instead of being forced to always
retry. Increasing kv.transaction.max_refresh_spans_bytes should no
longer be necessary for most workloads.

Release justification: fix for new "functionality" - the reduction in
the closed timestamp target duration.
  • Loading branch information
andreimatei committed Mar 24, 2020
1 parent 06ac93a commit 34dd44e
Show file tree
Hide file tree
Showing 14 changed files with 334 additions and 215 deletions.
47 changes: 32 additions & 15 deletions pkg/kv/kvclient/kvcoord/condensable_span_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,22 @@ import (
type condensableSpanSet struct {
s []roachpb.Span
bytes int64

// condensed is set if we ever condensed the spans. Meaning, if the set of
// spans currently tracked has lost fidelity compared to the spans inserted.
// Note that we might have otherwise mucked with the inserted spans to save
// memory without losing fidelity, in which case this flag would not be set
// (e.g. merging overlapping or adjacent spans).
condensed bool
}

// insert adds a new span to the condensable span set. No attempt to condense
// the set or deduplicate the new span with existing spans is made.
func (s *condensableSpanSet) insert(sp roachpb.Span) {
s.s = append(s.s, sp)
s.bytes += spanSize(sp)
// insert adds new spans to the condensable span set. No attempt to condense the
// set or deduplicate the new span with existing spans is made.
func (s *condensableSpanSet) insert(spans ...roachpb.Span) {
s.s = append(s.s, spans...)
for _, sp := range spans {
s.bytes += spanSize(sp)
}
}

// mergeAndSort merges all overlapping spans. Calling this method will not
Expand All @@ -60,11 +69,17 @@ func (s *condensableSpanSet) mergeAndSort() {
// exceeded, the method is more aggressive in its attempt to reduce the memory
// footprint of the span set. Not only will it merge overlapping spans, but
// spans within the same range boundaries are also condensed.
//
// Returns true if condensing was done. Note that, even if condensing was
// performed, this doesn't guarantee that the size was reduced below the byte
// limit. Condensing is only performed at the level of individual ranges, not
// across ranges, so it's possible to not be able to condense as much as
// desired.
func (s *condensableSpanSet) maybeCondense(
ctx context.Context, riGen RangeIteratorGen, maxBytes int64,
) {
ctx context.Context, riGen rangeIteratorFactory, maxBytes int64,
) bool {
if s.bytes < maxBytes {
return
return false
}

// Start by attempting to simply merge the spans within the set. This alone
Expand All @@ -73,14 +88,10 @@ func (s *condensableSpanSet) maybeCondense(
// lower in this method.
s.mergeAndSort()
if s.bytes < maxBytes {
return
return false
}

if riGen == nil {
// If we were not given a RangeIteratorGen, we cannot condense the spans.
return
}
ri := riGen()
ri := riGen.newRangeIterator()

// Divide spans by range boundaries and condense. Iterate over spans
// using a range iterator and add each to a bucket keyed by range
Expand All @@ -101,7 +112,7 @@ func (s *condensableSpanSet) maybeCondense(
if !ri.Valid() {
// We haven't modified s.s yet, so it is safe to return.
log.Warningf(ctx, "failed to condense lock spans: %v", ri.Error())
return
return false
}
rangeID := ri.Desc().RangeID
if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID {
Expand Down Expand Up @@ -143,6 +154,8 @@ func (s *condensableSpanSet) maybeCondense(
s.bytes += spanSize(cs)
s.s = append(s.s, cs)
}
s.condensed = true
return true
}

// asSlice returns the set as a slice of spans.
Expand All @@ -156,6 +169,10 @@ func (s *condensableSpanSet) empty() bool {
return len(s.s) == 0
}

func (s *condensableSpanSet) clear() {
*s = condensableSpanSet{}
}

func spanSize(sp roachpb.Span) int64 {
return int64(len(sp.Key) + len(sp.EndKey))
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,6 @@ type DistSender struct {
// It is copied out of the rpcContext at construction time and used in
// testing.
clusterID *base.ClusterIDContainer
// rangeIteratorGen returns a range iterator bound to the DistSender.
// Used to avoid allocations.
rangeIteratorGen RangeIteratorGen

// disableFirstRangeUpdates disables updates of the first range via
// gossip. Used by tests which want finer control of the contents of the
Expand Down Expand Up @@ -303,7 +300,6 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV)))
})
ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper"))
ds.rangeIteratorGen = func() *RangeIterator { return NewRangeIterator(ds) }

if g != nil {
ctx := ds.AnnotateCtx(context.Background())
Expand Down
58 changes: 40 additions & 18 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,8 +1526,22 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// Don't clobber the test's splits.
storeKnobs.DisableMergeQueue = true

var refreshSpansCondenseFilter atomic.Value
s, _, _ := serverutils.StartServer(t,
base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}})
base.TestServerArgs{Knobs: base.TestingKnobs{
Store: &storeKnobs,
KVClient: &kvcoord.ClientTestingKnobs{
CondenseRefreshSpansFilter: func() bool {
fnVal := refreshSpansCondenseFilter.Load()
if fn, ok := fnVal.(func() bool); ok {
return fn()
}
return true
},
}}})

disableCondensingRefreshSpans := func() bool { return false }

ctx := context.Background()
defer s.Stopper().Stop(ctx)

Expand All @@ -1554,13 +1568,14 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}

testCases := []struct {
name string
beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts
afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp
retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried
filter func(storagebase.FilterArgs) *roachpb.Error
priorReads bool
tsLeaked bool
name string
beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts
afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp
retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried
filter func(storagebase.FilterArgs) *roachpb.Error
refreshSpansCondenseFilter func() bool
priorReads bool
tsLeaked bool
// If both of these are false, no retries.
txnCoordRetry bool
clientRetry bool
Expand Down Expand Up @@ -1677,7 +1692,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
{
// If we've exhausted the limit for tracking refresh spans but we
// already refreshed, keep running the txn.
name: "forwarded timestamp with too many refreshes, read only",
name: "forwarded timestamp with too many refreshes, read only",
refreshSpansCondenseFilter: disableCondensingRefreshSpans,
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
return db.Put(ctx, "a", "value")
},
Expand Down Expand Up @@ -1707,7 +1723,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// limit for tracking refresh spans and our transaction's timestamp
// has been pushed, if we successfully commit then we won't hit an
// error.
name: "forwarded timestamp with too many refreshes in batch commit",
name: "forwarded timestamp with too many refreshes in batch commit",
refreshSpansCondenseFilter: disableCondensingRefreshSpans,
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
Expand Down Expand Up @@ -1740,7 +1757,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// has been pushed, if we successfully commit then we won't hit an
// error. This is the case even if the final batch itself causes a
// refresh.
name: "forwarded timestamp with too many refreshes in batch commit triggering refresh",
name: "forwarded timestamp with too many refreshes in batch commit triggering refresh",
refreshSpansCondenseFilter: disableCondensingRefreshSpans,
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
_, err := db.Get(ctx, "a") // set ts cache
return err
Expand Down Expand Up @@ -2508,9 +2526,13 @@ func TestTxnCoordSenderRetries(t *testing.T) {
filterFn.Store(tc.filter)
defer filterFn.Store((func(storagebase.FilterArgs) *roachpb.Error)(nil))
}
if tc.refreshSpansCondenseFilter != nil {
refreshSpansCondenseFilter.Store(tc.refreshSpansCondenseFilter)
defer refreshSpansCondenseFilter.Store((func() bool)(nil))
}

var metrics kvcoord.TxnMetrics
var lastAutoRetries int64
var lastRefreshes int64
var hadClientRetry bool
epoch := 0
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down Expand Up @@ -2542,7 +2564,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}

metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics()
lastAutoRetries = metrics.AutoRetries.Count()
lastRefreshes = metrics.RefreshSuccess.Count()

return tc.retryable(ctx, txn)
}); err != nil {
Expand All @@ -2558,11 +2580,11 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// from the cluster setup are still ongoing and can experience
// their own retries, this might increase by more than one, so we
// can only check here that it's >= 1.
autoRetries := metrics.AutoRetries.Count() - lastAutoRetries
if tc.txnCoordRetry && autoRetries == 0 {
t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries)
} else if !tc.txnCoordRetry && autoRetries != 0 {
t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries)
refreshes := metrics.RefreshSuccess.Count() - lastRefreshes
if tc.txnCoordRetry && refreshes == 0 {
t.Errorf("expected [at least] one txn coord sender auto retry; got %d", refreshes)
} else if !tc.txnCoordRetry && refreshes != 0 {
t.Errorf("expected no txn coord sender auto retries; got %d", refreshes)
}
if tc.clientRetry && !hadClientRetry {
t.Errorf("expected but did not experience client retry")
Expand Down
3 changes: 0 additions & 3 deletions pkg/kv/kvclient/kvcoord/range_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ type RangeIterator struct {
err error
}

// RangeIteratorGen is a generator of RangeIterators.
type RangeIteratorGen func() *RangeIterator

// NewRangeIterator creates a new RangeIterator.
func NewRangeIterator(ds *DistSender) *RangeIterator {
return &RangeIterator{
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type ClientTestingKnobs struct {
// spans for a single transactional batch.
// 0 means use a default. -1 means disable refresh.
MaxTxnRefreshAttempts int

// CondenseRefreshSpansFilter, if set, is called when the span refresher is
// considering condensing the refresh spans. If it returns false, condensing
// will not be attempted and the span refresher will behave as if condensing
// failed to save enough memory.
CondenseRefreshSpansFilter func() bool
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ func newRootTxnCoordSender(
// txnLockGatekeeper at the bottom of the stack to connect it with the
// TxnCoordSender's wrapped sender. First, each of the interceptor objects
// is initialized.
var riGen RangeIteratorGen
var riGen rangeIteratorFactory
if ds, ok := tcf.wrapped.(*DistSender); ok {
riGen = ds.rangeIteratorGen
riGen.ds = ds
}
tcs.interceptorAlloc.txnHeartbeater.init(
tcf.AmbientContext,
Expand Down Expand Up @@ -279,7 +279,7 @@ func newRootTxnCoordSender(
}

func (tc *TxnCoordSender) initCommonInterceptors(
tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, riGen RangeIteratorGen,
tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, riGen rangeIteratorFactory,
) {
tc.interceptorAlloc.txnPipeliner = txnPipeliner{
st: tcf.st,
Expand All @@ -288,13 +288,16 @@ func (tc *TxnCoordSender) initCommonInterceptors(
tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{
st: tcf.st,
knobs: &tcf.testingKnobs,
riGen: riGen,
// We can only allow refresh span retries on root transactions
// because those are the only places where we have all of the
// refresh spans. If this is a leaf, as in a distributed sql flow,
// we need to propagate the error to the root for an epoch restart.
canAutoRetry: typ == kv.RootTxn,
autoRetryCounter: tc.metrics.AutoRetries,
refreshSpanBytesExceededCounter: tc.metrics.RefreshSpanBytesExceeded,
canAutoRetry: typ == kv.RootTxn,
refreshSuccess: tc.metrics.RefreshSuccess,
refreshFail: tc.metrics.RefreshFail,
refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans,
refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded,
}
tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{
wrapped: tc.wrapped,
Expand Down Expand Up @@ -348,9 +351,9 @@ func newLeafTxnCoordSender(
// txnLockGatekeeper at the bottom of the stack to connect it with the
// TxnCoordSender's wrapped sender. First, each of the interceptor objects
// is initialized.
var riGen RangeIteratorGen
var riGen rangeIteratorFactory
if ds, ok := tcf.wrapped.(*DistSender); ok {
riGen = ds.rangeIteratorGen
riGen.ds = ds
}
tcs.initCommonInterceptors(tcf, txn, kv.LeafTxn, riGen)

Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ type savepoint struct {
seqNum enginepb.TxnSeq

// txnSpanRefresher fields.
refreshSpans []roachpb.Span
refreshInvalid bool
refreshSpanBytes int64
refreshSpans []roachpb.Span
refreshInvalid bool
}

var _ kv.SavepointToken = (*savepoint)(nil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func TestTxnMultipleCoord(t *testing.T) {

// Verify presence of both locks.
tcs := txn.Sender().(*TxnCoordSender)
refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshSpans
refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshFootprint.asSlice()
require.Equal(t, []roachpb.Span{{Key: key}, {Key: key2}}, refreshSpans)

ba := txn.NewBatch()
Expand Down
36 changes: 32 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,8 @@ var trackedWritesMaxSize = settings.RegisterPublicIntSetting(
// attached to any end transaction request that is passed through the pipeliner
// to ensure that they the locks within them are released.
type txnPipeliner struct {
st *cluster.Settings
// Optional; used to condense lock spans, if provided. If not provided, a
// transaction's lock footprint may grow without bound.
riGen RangeIteratorGen
st *cluster.Settings
riGen rangeIteratorFactory //used to condense lock spans, if provided
wrapped lockedSender
disabled bool

Expand All @@ -205,6 +203,36 @@ type txnPipeliner struct {
lockFootprint condensableSpanSet
}

// condensableSpanSetRangeIterator describes the interface of RangeIterator
// needed by the condensableSpanSetRangeIterator. Useful for mocking an
// interator in tests.
type condensableSpanSetRangeIterator interface {
Valid() bool
Seek(ctx context.Context, key roachpb.RKey, scanDir ScanDirection)
Error() error
Desc() *roachpb.RangeDescriptor
}

// rangeIteratorFactory is used to create a condensableSpanSetRangeIterator
// lazily. It's used to avoid allocating an iterator when it's not needed. The
// factory can be configured either with a callback, used for mocking in tests,
// or with a DistSender. Can also not be configured with anything, in which
type rangeIteratorFactory struct {
factory func() condensableSpanSetRangeIterator
ds *DistSender
}

// newRangeIterator creates a range iterator. If no factory was configured, it panics.
func (f rangeIteratorFactory) newRangeIterator() condensableSpanSetRangeIterator {
if f.factory != nil {
return f.factory()
}
if f.ds != nil {
return NewRangeIterator(f.ds)
}
panic("no iterator factory configured")
}

// SendLocked implements the lockedSender interface.
func (tp *txnPipeliner) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,11 @@ func TestTxnPipelinerManyWrites(t *testing.T) {
ctx := context.Background()
tp, mockSender := makeMockTxnPipeliner()

// Disable maxInFlightSize and maxBatchSize limits/
// Disable write_pipelining_max_outstanding_size,
// write_pipelining_max_batch_size, and max_intents_bytes limits.
pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, math.MaxInt64)
pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0)
trackedWritesMaxSize.Override(&tp.st.SV, math.MaxInt64)

const writes = 2048
keyBuf := roachpb.Key(strings.Repeat("a", writes+1))
Expand Down
Loading

0 comments on commit 34dd44e

Please sign in to comment.