diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set.go b/pkg/kv/kvclient/kvcoord/condensable_span_set.go new file mode 100644 index 000000000000..05cd98808e62 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set.go @@ -0,0 +1,182 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// condensableSpanSet is a set of key spans that is condensable in order to +// stay below some maximum byte limit. Condensing of the set happens in two +// ways. Initially, overlapping spans are merged together to deduplicate +// redundant keys. If that alone isn't sufficient to stay below the byte limit, +// spans within the same Range will be merged together. This can cause the +// "footprint" of the set to grow, so the set should be thought of as on +// overestimate. +type condensableSpanSet struct { + s []roachpb.Span + bytes int64 + + // condensed is set if we ever condensed the spans. Meaning, if the set of + // spans currently tracked has lost fidelity compared to the spans inserted. + // Note that we might have otherwise mucked with the inserted spans to save + // memory without losing fidelity, in which case this flag would not be set + // (e.g. merging overlapping or adjacent spans). + condensed bool +} + +// insert adds new spans to the condensable span set. No attempt to condense the +// set or deduplicate the new span with existing spans is made. +func (s *condensableSpanSet) insert(spans ...roachpb.Span) { + s.s = append(s.s, spans...) + for _, sp := range spans { + s.bytes += spanSize(sp) + } +} + +// mergeAndSort merges all overlapping spans. Calling this method will not +// increase the overall bounds of the span set, but will eliminate duplicated +// spans and combine overlapping spans. +// +// The method has the side effect of sorting the stable write set. +func (s *condensableSpanSet) mergeAndSort() { + oldLen := len(s.s) + s.s, _ = roachpb.MergeSpans(s.s) + // Recompute the size if anything has changed. + if oldLen != len(s.s) { + s.bytes = 0 + for _, sp := range s.s { + s.bytes += spanSize(sp) + } + } +} + +// maybeCondense is similar in spirit to mergeAndSort, but it only adjusts the +// span set when the maximum byte limit is exceeded. However, when this limit is +// exceeded, the method is more aggressive in its attempt to reduce the memory +// footprint of the span set. Not only will it merge overlapping spans, but +// spans within the same range boundaries are also condensed. +// +// Returns true if condensing was done. Note that, even if condensing was +// performed, this doesn't guarantee that the size was reduced below the byte +// limit. Condensing is only performed at the level of individual ranges, not +// across ranges, so it's possible to not be able to condense as much as +// desired. +func (s *condensableSpanSet) maybeCondense( + ctx context.Context, riGen rangeIteratorFactory, maxBytes int64, +) bool { + if s.bytes < maxBytes { + return false + } + + // Start by attempting to simply merge the spans within the set. This alone + // may bring us under the byte limit. Even if it doesn't, this step has the + // nice property that it sorts the spans by start key, which we rely on + // lower in this method. + s.mergeAndSort() + if s.bytes < maxBytes { + return false + } + + ri := riGen.newRangeIterator() + + // Divide spans by range boundaries and condense. Iterate over spans + // using a range iterator and add each to a bucket keyed by range + // ID. Local keys are kept in a new slice and not added to buckets. + type spanBucket struct { + rangeID roachpb.RangeID + bytes int64 + spans []roachpb.Span + } + var buckets []spanBucket + var localSpans []roachpb.Span + for _, sp := range s.s { + if keys.IsLocal(sp.Key) { + localSpans = append(localSpans, sp) + continue + } + ri.Seek(ctx, roachpb.RKey(sp.Key), Ascending) + if !ri.Valid() { + // We haven't modified s.s yet, so it is safe to return. + log.Warningf(ctx, "failed to condense lock spans: %v", ri.Error()) + return false + } + rangeID := ri.Desc().RangeID + if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID { + buckets[l-1].spans = append(buckets[l-1].spans, sp) + } else { + buckets = append(buckets, spanBucket{ + rangeID: rangeID, spans: []roachpb.Span{sp}, + }) + } + buckets[len(buckets)-1].bytes += spanSize(sp) + } + + // Sort the buckets by size and collapse from largest to smallest + // until total size of uncondensed spans no longer exceeds threshold. + sort.Slice(buckets, func(i, j int) bool { return buckets[i].bytes > buckets[j].bytes }) + s.s = localSpans // reset to hold just the local spans; will add newly condensed and remainder + for _, bucket := range buckets { + // Condense until we get to half the threshold. + if s.bytes <= maxBytes/2 { + // Collect remaining spans from each bucket into uncondensed slice. + s.s = append(s.s, bucket.spans...) + continue + } + s.bytes -= bucket.bytes + // TODO(spencer): consider further optimizations here to create + // more than one span out of a bucket to avoid overly broad span + // combinations. + cs := bucket.spans[0] + for _, s := range bucket.spans[1:] { + cs = cs.Combine(s) + if !cs.Valid() { + // If we didn't fatal here then we would need to ensure that the + // spans were restored or a transaction could lose part of its + // lock footprint. + log.Fatalf(ctx, "failed to condense lock spans: "+ + "combining span %s yielded invalid result", s) + } + } + s.bytes += spanSize(cs) + s.s = append(s.s, cs) + } + s.condensed = true + return true +} + +// asSlice returns the set as a slice of spans. +func (s *condensableSpanSet) asSlice() []roachpb.Span { + l := len(s.s) + return s.s[:l:l] // immutable on append +} + +// empty returns whether the set is empty or whether it contains spans. +func (s *condensableSpanSet) empty() bool { + return len(s.s) == 0 +} + +func (s *condensableSpanSet) clear() { + *s = condensableSpanSet{} +} + +func spanSize(sp roachpb.Span) int64 { + return int64(len(sp.Key) + len(sp.EndKey)) +} + +func keySize(k roachpb.Key) int64 { + return int64(len(k)) +} diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go b/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go new file mode 100644 index 000000000000..9921ef9281c1 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go @@ -0,0 +1,31 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// Test that the size of the condensableSpanSet is properly maintained when +// contiguous spans are merged. +func TestCondensableSpanSetMergeContiguousSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + s := condensableSpanSet{} + s.insert(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}) + s.insert(roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}) + require.Equal(t, int64(4), s.bytes) + s.mergeAndSort() + require.Equal(t, int64(2), s.bytes) +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 4436090fc045..bf04fe7e00d2 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -210,9 +210,6 @@ type DistSender struct { // It is copied out of the rpcContext at construction time and used in // testing. clusterID *base.ClusterIDContainer - // rangeIteratorGen returns a range iterator bound to the DistSender. - // Used to avoid allocations. - rangeIteratorGen RangeIteratorGen // disableFirstRangeUpdates disables updates of the first range via // gossip. Used by tests which want finer control of the contents of the @@ -303,7 +300,6 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV))) }) ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) - ds.rangeIteratorGen = func() *RangeIterator { return NewRangeIterator(ds) } if g != nil { ctx := ds.AnnotateCtx(context.Background()) @@ -482,7 +478,7 @@ func (ds *DistSender) CountRanges(ctx context.Context, rs roachpb.RSpan) (int64, break } } - return count, ri.Error().GoError() + return count, ri.Error() } // getDescriptor looks up the range descriptor to use for a query of @@ -1079,7 +1075,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( ri := NewRangeIterator(ds) ri.Seek(ctx, seekKey, scanDir) if !ri.Valid() { - return nil, ri.Error() + return nil, roachpb.NewError(ri.Error()) } // Take the fast path if this batch fits within a single range. if !ri.NeedAnother(rs) { @@ -1292,7 +1288,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( // We've exited early. Return the range iterator error. responseCh := make(chan response, 1) - responseCh <- response{pErr: ri.Error()} + responseCh <- response{pErr: roachpb.NewError(ri.Error())} responseChs = append(responseChs, responseCh) return } @@ -1473,22 +1469,24 @@ func (ds *DistSender) sendPartialBatch( // Propagate error if either the retry closer or context done // channels were closed. if pErr == nil { - if pErr = ds.deduceRetryEarlyExitError(ctx); pErr == nil { + if err := ds.deduceRetryEarlyExitError(ctx); err == nil { log.Fatal(ctx, "exited retry loop without an error") + } else { + pErr = roachpb.NewError(err) } } return response{pErr: pErr} } -func (ds *DistSender) deduceRetryEarlyExitError(ctx context.Context) *roachpb.Error { +func (ds *DistSender) deduceRetryEarlyExitError(ctx context.Context) error { select { case <-ds.rpcRetryOptions.Closer: // Typically happens during shutdown. - return roachpb.NewError(&roachpb.NodeUnavailableError{}) + return &roachpb.NodeUnavailableError{} case <-ctx.Done(): // Happens when the client request is canceled. - return roachpb.NewError(errors.Wrap(ctx.Err(), "aborted in distSender")) + return errors.Wrap(ctx.Err(), "aborted in distSender") default: } return nil diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 385d7793294b..d511c7c78023 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -117,7 +117,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( break } } - return ri.Error().GoError() + return ri.Error() } // partialRangeFeed establishes a RangeFeed to the range specified by desc. It diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 1be557a00293..1a338f912179 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -1522,8 +1522,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return nil } + var refreshSpansCondenseFilter atomic.Value s, _, _ := serverutils.StartServer(t, - base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}}) + base.TestServerArgs{Knobs: base.TestingKnobs{ + Store: &storeKnobs, + KVClient: &kvcoord.ClientTestingKnobs{ + CondenseRefreshSpansFilter: func() bool { + fnVal := refreshSpansCondenseFilter.Load() + if fn, ok := fnVal.(func() bool); ok { + return fn() + } + return true + }, + }}}) + + disableCondensingRefreshSpans := func() bool { return false } + ctx := context.Background() defer s.Stopper().Stop(ctx) @@ -1550,13 +1564,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { } testCases := []struct { - name string - beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts - afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp - retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried - filter func(storagebase.FilterArgs) *roachpb.Error - priorReads bool - tsLeaked bool + name string + beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts + afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp + retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried + filter func(storagebase.FilterArgs) *roachpb.Error + refreshSpansCondenseFilter func() bool + priorReads bool + tsLeaked bool // If both of these are false, no retries. txnCoordRetry bool clientRetry bool @@ -1709,7 +1724,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { { // If we've exhausted the limit for tracking refresh spans but we // already refreshed, keep running the txn. - name: "forwarded timestamp with too many refreshes, read only", + name: "forwarded timestamp with too many refreshes, read only", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { return db.Put(ctx, "a", "value") }, @@ -1740,7 +1756,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { // has been pushed, if we successfully commit then we won't hit an // error. This is the case even if the final batch itself causes a // no-op refresh because the txn has no refresh spans. - name: "forwarded timestamp with too many refreshes in batch commit with no-op refresh", + name: "forwarded timestamp with too many refreshes in batch commit " + + "with no-op refresh", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err @@ -1773,7 +1791,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { // has been pushed, if we successfully commit then we won't hit an // error. This is the case even if the final batch itself causes a // real refresh. - name: "forwarded timestamp with too many refreshes in batch commit with refresh", + name: "forwarded timestamp with too many refreshes in batch commit " + + "with refresh", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err @@ -2541,9 +2561,13 @@ func TestTxnCoordSenderRetries(t *testing.T) { filterFn.Store(tc.filter) defer filterFn.Store((func(storagebase.FilterArgs) *roachpb.Error)(nil)) } + if tc.refreshSpansCondenseFilter != nil { + refreshSpansCondenseFilter.Store(tc.refreshSpansCondenseFilter) + defer refreshSpansCondenseFilter.Store((func() bool)(nil)) + } var metrics kvcoord.TxnMetrics - var lastAutoRetries int64 + var lastRefreshes int64 var hadClientRetry bool epoch := 0 if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -2575,7 +2599,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics() - lastAutoRetries = metrics.AutoRetries.Count() + lastRefreshes = metrics.RefreshSuccess.Count() return tc.retryable(ctx, txn) }); err != nil { @@ -2591,11 +2615,11 @@ func TestTxnCoordSenderRetries(t *testing.T) { // from the cluster setup are still ongoing and can experience // their own retries, this might increase by more than one, so we // can only check here that it's >= 1. - autoRetries := metrics.AutoRetries.Count() - lastAutoRetries - if tc.txnCoordRetry && autoRetries == 0 { - t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries) - } else if !tc.txnCoordRetry && autoRetries != 0 { - t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries) + refreshes := metrics.RefreshSuccess.Count() - lastRefreshes + if tc.txnCoordRetry && refreshes == 0 { + t.Errorf("expected [at least] one txn coord sender auto retry; got %d", refreshes) + } else if !tc.txnCoordRetry && refreshes != 0 { + t.Errorf("expected no txn coord sender auto retries; got %d", refreshes) } if tc.clientRetry && !hadClientRetry { t.Errorf("expected but did not experience client retry") diff --git a/pkg/kv/kvclient/kvcoord/range_iter.go b/pkg/kv/kvclient/kvcoord/range_iter.go index 8fc8bc446797..ca8aaaa2ebc9 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter.go +++ b/pkg/kv/kvclient/kvcoord/range_iter.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/errors" ) // A RangeIterator provides a mechanism for iterating over all ranges @@ -30,12 +31,9 @@ type RangeIterator struct { desc *roachpb.RangeDescriptor token *EvictionToken init bool - pErr *roachpb.Error + err error } -// RangeIteratorGen is a generator of RangeIterators. -type RangeIteratorGen func() *RangeIterator - // NewRangeIterator creates a new RangeIterator. func NewRangeIterator(ds *DistSender) *RangeIterator { return &RangeIterator{ @@ -105,11 +103,11 @@ func (ri *RangeIterator) Valid() bool { // Error returns the error the iterator encountered, if any. If // the iterator has not been initialized, returns iterator error. -func (ri *RangeIterator) Error() *roachpb.Error { +func (ri *RangeIterator) Error() error { if !ri.init { - return roachpb.NewErrorf("range iterator not intialized with Seek()") + return errors.New("range iterator not intialized with Seek()") } - return ri.pErr + return ri.err } // Reset resets the RangeIterator to its initial state. @@ -146,12 +144,12 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca } ri.scanDir = scanDir ri.init = true // the iterator is now initialized - ri.pErr = nil // clear any prior error + ri.err = nil // clear any prior error ri.key = key // set the key if (scanDir == Ascending && key.Equal(roachpb.RKeyMax)) || (scanDir == Descending && key.Equal(roachpb.RKeyMin)) { - ri.pErr = roachpb.NewErrorf("RangeIterator seek to invalid key %s", key) + ri.err = errors.Errorf("RangeIterator seek to invalid key %s", key) return } @@ -188,7 +186,7 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca (!reverse && !ri.desc.ContainsKey(ri.key)) { log.Eventf(ctx, "addressing error: %s does not include key %s", ri.desc, ri.key) if err := ri.token.Evict(ctx); err != nil { - ri.pErr = roachpb.NewError(err) + ri.err = err return } // On addressing errors, don't backoff; retry immediately. @@ -202,9 +200,9 @@ func (ri *RangeIterator) Seek(ctx context.Context, key roachpb.RKey, scanDir Sca } // Check for an early exit from the retry loop. - if pErr := ri.ds.deduceRetryEarlyExitError(ctx); pErr != nil { - ri.pErr = pErr + if err := ri.ds.deduceRetryEarlyExitError(ctx); err != nil { + ri.err = err } else { - ri.pErr = roachpb.NewErrorf("RangeIterator failed to seek to %s", key) + ri.err = errors.Errorf("RangeIterator failed to seek to %s", key) } } diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 1f6ae51177e9..762e6cb9fb08 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -23,6 +23,12 @@ type ClientTestingKnobs struct { // spans for a single transactional batch. // 0 means use a default. -1 means disable refresh. MaxTxnRefreshAttempts int + + // CondenseRefreshSpansFilter, if set, is called when the span refresher is + // considering condensing the refresh spans. If it returns false, condensing + // will not be attempted and the span refresher will behave as if condensing + // failed to save enough memory. + CondenseRefreshSpansFilter func() bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 582852daf410..3c36c77ad27a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -172,7 +172,7 @@ type txnInterceptor interface { // importLeafFinalState updates any internal state held inside the // interceptor from the given LeafTxn final state. - importLeafFinalState(*roachpb.LeafTxnFinalState) + importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState) // epochBumpedLocked resets the interceptor in the case of a txn epoch // increment. @@ -217,10 +217,6 @@ func newRootTxnCoordSender( // txnLockGatekeeper at the bottom of the stack to connect it with the // TxnCoordSender's wrapped sender. First, each of the interceptor objects // is initialized. - var riGen RangeIteratorGen - if ds, ok := tcf.wrapped.(*DistSender); ok { - riGen = ds.rangeIteratorGen - } tcs.interceptorAlloc.txnHeartbeater.init( tcf.AmbientContext, tcs.stopper, @@ -241,7 +237,7 @@ func newRootTxnCoordSender( clock: tcs.clock, txn: &tcs.mu.txn, } - tcs.initCommonInterceptors(tcf, txn, kv.RootTxn, riGen) + tcs.initCommonInterceptors(tcf, txn, kv.RootTxn) // Once the interceptors are initialized, piece them all together in the // correct order. @@ -279,8 +275,12 @@ func newRootTxnCoordSender( } func (tc *TxnCoordSender) initCommonInterceptors( - tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, riGen RangeIteratorGen, + tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, ) { + var riGen rangeIteratorFactory + if ds, ok := tcf.wrapped.(*DistSender); ok { + riGen.ds = ds + } tc.interceptorAlloc.txnPipeliner = txnPipeliner{ st: tcf.st, riGen: riGen, @@ -288,13 +288,16 @@ func (tc *TxnCoordSender) initCommonInterceptors( tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{ st: tcf.st, knobs: &tcf.testingKnobs, + riGen: riGen, // We can only allow refresh span retries on root transactions // because those are the only places where we have all of the // refresh spans. If this is a leaf, as in a distributed sql flow, // we need to propagate the error to the root for an epoch restart. - canAutoRetry: typ == kv.RootTxn, - autoRetryCounter: tc.metrics.AutoRetries, - refreshSpanBytesExceededCounter: tc.metrics.RefreshSpanBytesExceeded, + canAutoRetry: typ == kv.RootTxn, + refreshSuccess: tc.metrics.RefreshSuccess, + refreshFail: tc.metrics.RefreshFail, + refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans, + refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded, } tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{ wrapped: tc.wrapped, @@ -348,11 +351,7 @@ func newLeafTxnCoordSender( // txnLockGatekeeper at the bottom of the stack to connect it with the // TxnCoordSender's wrapped sender. First, each of the interceptor objects // is initialized. - var riGen RangeIteratorGen - if ds, ok := tcf.wrapped.(*DistSender); ok { - riGen = ds.rangeIteratorGen - } - tcs.initCommonInterceptors(tcf, txn, kv.LeafTxn, riGen) + tcs.initCommonInterceptors(tcf, txn, kv.LeafTxn) // Per-interceptor leaf initialization. If/when more interceptors // need leaf initialization, this should be turned into an interface @@ -1078,7 +1077,7 @@ func (tc *TxnCoordSender) UpdateRootWithLeafFinalState( tc.mu.txn.Update(&tfs.Txn) for _, reqInt := range tc.interceptorStack { - reqInt.importLeafFinalState(tfs) + reqInt.importLeafFinalState(ctx, tfs) } } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index ef5d0a313bc5..2d4e944ca538 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -41,9 +41,8 @@ type savepoint struct { seqNum enginepb.TxnSeq // txnSpanRefresher fields. - refreshSpans []roachpb.Span - refreshInvalid bool - refreshSpanBytes int64 + refreshSpans []roachpb.Span + refreshInvalid bool } var _ kv.SavepointToken = (*savepoint)(nil) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 7ee7f251bb76..671b728faefb 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -907,7 +907,7 @@ func TestTxnMultipleCoord(t *testing.T) { // Verify presence of both locks. tcs := txn.Sender().(*TxnCoordSender) - refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshSpans + refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshFootprint.asSlice() require.Equal(t, []roachpb.Span{{Key: key}, {Key: key2}}, refreshSpans) ba := txn.NewBatch() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 523c53d20a8b..0c00d7438eef 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -460,7 +460,7 @@ func (*txnCommitter) populateLeafInputState(*roachpb.LeafTxnInputState) {} func (*txnCommitter) populateLeafFinalState(*roachpb.LeafTxnFinalState) {} // importLeafFinalState is part of the txnInterceptor interface. -func (*txnCommitter) importLeafFinalState(*roachpb.LeafTxnFinalState) {} +func (*txnCommitter) importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState) {} // epochBumpedLocked implements the txnReqInterceptor interface. func (tc *txnCommitter) epochBumpedLocked() {} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go index 2a60c2f366e3..0c2f3c1d3dbf 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go @@ -184,7 +184,7 @@ func (*txnHeartbeater) populateLeafInputState(*roachpb.LeafTxnInputState) {} func (*txnHeartbeater) populateLeafFinalState(*roachpb.LeafTxnFinalState) {} // importLeafFinalState is part of the txnInterceptor interface. -func (*txnHeartbeater) importLeafFinalState(*roachpb.LeafTxnFinalState) {} +func (*txnHeartbeater) importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (h *txnHeartbeater) epochBumpedLocked() {} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go index 924bd47197ab..1b692617ce17 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go @@ -70,7 +70,7 @@ func (*txnMetricRecorder) populateLeafInputState(*roachpb.LeafTxnInputState) {} func (*txnMetricRecorder) populateLeafFinalState(*roachpb.LeafTxnFinalState) {} // importLeafFinalState is part of the txnInterceptor interface. -func (*txnMetricRecorder) importLeafFinalState(*roachpb.LeafTxnFinalState) {} +func (*txnMetricRecorder) importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState) {} // epochBumpedLocked is part of the txnInterceptor interface. func (*txnMetricRecorder) epochBumpedLocked() {} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index aefa3c41e45b..0d2ad741cbd4 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -15,7 +15,6 @@ import ( "fmt" "sort" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -177,10 +176,8 @@ var trackedWritesMaxSize = settings.RegisterPublicIntSetting( // attached to any end transaction request that is passed through the pipeliner // to ensure that they the locks within them are released. type txnPipeliner struct { - st *cluster.Settings - // Optional; used to condense lock spans, if provided. If not provided, a - // transaction's lock footprint may grow without bound. - riGen RangeIteratorGen + st *cluster.Settings + riGen rangeIteratorFactory // used to condense lock spans, if provided wrapped lockedSender disabled bool @@ -206,6 +203,37 @@ type txnPipeliner struct { lockFootprint condensableSpanSet } +// condensableSpanSetRangeIterator describes the interface of RangeIterator +// needed by the condensableSpanSetRangeIterator. Useful for mocking an +// iterator in tests. +type condensableSpanSetRangeIterator interface { + Valid() bool + Seek(ctx context.Context, key roachpb.RKey, scanDir ScanDirection) + Error() error + Desc() *roachpb.RangeDescriptor +} + +// rangeIteratorFactory is used to create a condensableSpanSetRangeIterator +// lazily. It's used to avoid allocating an iterator when it's not needed. The +// factory can be configured either with a callback, used for mocking in tests, +// or with a DistSender. Can also be left empty for unittests that don't push +// memory limits in their span sets (and thus don't need collapsing). +type rangeIteratorFactory struct { + factory func() condensableSpanSetRangeIterator + ds *DistSender +} + +// newRangeIterator creates a range iterator. If no factory was configured, it panics. +func (f rangeIteratorFactory) newRangeIterator() condensableSpanSetRangeIterator { + if f.factory != nil { + return f.factory() + } + if f.ds != nil { + return NewRangeIterator(f.ds) + } + panic("no iterator factory configured") +} + // SendLocked implements the lockedSender interface. func (tp *txnPipeliner) SendLocked( ctx context.Context, ba roachpb.BatchRequest, @@ -604,7 +632,7 @@ func (tp *txnPipeliner) initializeLeaf(tis *roachpb.LeafTxnInputState) { func (tp *txnPipeliner) populateLeafFinalState(*roachpb.LeafTxnFinalState) {} // importLeafFinalState is part of the txnInterceptor interface. -func (tp *txnPipeliner) importLeafFinalState(*roachpb.LeafTxnFinalState) {} +func (tp *txnPipeliner) importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState) {} // epochBumpedLocked implements the txnReqInterceptor interface. func (tp *txnPipeliner) epochBumpedLocked() { @@ -861,148 +889,3 @@ func (a *inFlightWriteAlloc) clear() { } *a = (*a)[:0] } - -// condensableSpanSet is a set of key spans that is condensable in order to -// stay below some maximum byte limit. Condensing of the set happens in two -// ways. Initially, overlapping spans are merged together to deduplicate -// redundant keys. If that alone isn't sufficient to stay below the byte limit, -// spans within the same Range will be merged together. This can cause the -// "footprint" of the set to grow, so the set should be thought of as on -// overestimate. -type condensableSpanSet struct { - s []roachpb.Span - bytes int64 -} - -// insert adds a new span to the condensable span set. No attempt to condense -// the set or deduplicate the new span with existing spans is made. -func (s *condensableSpanSet) insert(sp roachpb.Span) { - s.s = append(s.s, sp) - s.bytes += spanSize(sp) -} - -// mergeAndSort merges all overlapping spans. Calling this method will not -// increase the overall bounds of the span set, but will eliminate duplicated -// spans and combine overlapping spans. -// -// The method has the side effect of sorting the stable write set. -func (s *condensableSpanSet) mergeAndSort() (distinct bool) { - s.s, distinct = roachpb.MergeSpans(s.s) - if !distinct { - // Recompute the size. - s.bytes = 0 - for _, sp := range s.s { - s.bytes += spanSize(sp) - } - } - return distinct -} - -// maybeCondense is similar in spirit to mergeAndSort, but it only adjusts the -// span set when the maximum byte limit is exceeded. However, when this limit is -// exceeded, the method is more aggressive in its attempt to reduce the memory -// footprint of the span set. Not only will it merge overlapping spans, but -// spans within the same range boundaries are also condensed. -func (s *condensableSpanSet) maybeCondense( - ctx context.Context, riGen RangeIteratorGen, maxBytes int64, -) { - if s.bytes < maxBytes { - return - } - - // Start by attempting to simply merge the spans within the set. This alone - // may bring us under the byte limit. Even if it doesn't, this step has the - // nice property that it sorts the spans by start key, which we rely on - // lower in this method. - s.mergeAndSort() - if s.bytes < maxBytes { - return - } - - if riGen == nil { - // If we were not given a RangeIteratorGen, we cannot condense the spans. - return - } - ri := riGen() - - // Divide spans by range boundaries and condense. Iterate over spans - // using a range iterator and add each to a bucket keyed by range - // ID. Local keys are kept in a new slice and not added to buckets. - type spanBucket struct { - rangeID roachpb.RangeID - bytes int64 - spans []roachpb.Span - } - var buckets []spanBucket - var localSpans []roachpb.Span - for _, sp := range s.s { - if keys.IsLocal(sp.Key) { - localSpans = append(localSpans, sp) - continue - } - ri.Seek(ctx, roachpb.RKey(sp.Key), Ascending) - if !ri.Valid() { - // We haven't modified s.s yet, so it is safe to return. - log.VEventf(ctx, 2, "failed to condense lock spans: %v", ri.Error()) - return - } - rangeID := ri.Desc().RangeID - if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID { - buckets[l-1].spans = append(buckets[l-1].spans, sp) - } else { - buckets = append(buckets, spanBucket{ - rangeID: rangeID, spans: []roachpb.Span{sp}, - }) - } - buckets[len(buckets)-1].bytes += spanSize(sp) - } - - // Sort the buckets by size and collapse from largest to smallest - // until total size of uncondensed spans no longer exceeds threshold. - sort.Slice(buckets, func(i, j int) bool { return buckets[i].bytes > buckets[j].bytes }) - s.s = localSpans // reset to hold just the local spans; will add newly condensed and remainder - for _, bucket := range buckets { - // Condense until we get to half the threshold. - if s.bytes <= maxBytes/2 { - // Collect remaining spans from each bucket into uncondensed slice. - s.s = append(s.s, bucket.spans...) - continue - } - s.bytes -= bucket.bytes - // TODO(spencer): consider further optimizations here to create - // more than one span out of a bucket to avoid overly broad span - // combinations. - cs := bucket.spans[0] - for _, s := range bucket.spans[1:] { - cs = cs.Combine(s) - if !cs.Valid() { - // If we didn't fatal here then we would need to ensure that the - // spans were restored or a transaction could lose part of its - // lock footprint. - log.Fatalf(ctx, "failed to condense lock spans: "+ - "combining span %s yielded invalid result", s) - } - } - s.bytes += spanSize(cs) - s.s = append(s.s, cs) - } -} - -// asSlice returns the set as a slice of spans. -func (s *condensableSpanSet) asSlice() []roachpb.Span { - l := len(s.s) - return s.s[:l:l] // immutable on append -} - -// empty returns whether the set is empty or whether it contains spans. -func (s *condensableSpanSet) empty() bool { - return len(s.s) == 0 -} - -func spanSize(sp roachpb.Span) int64 { - return int64(len(sp.Key) + len(sp.EndKey)) -} - -func keySize(k roachpb.Key) int64 { - return int64(len(k)) -} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 7b72db70fb4a..5fbcca805519 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -560,9 +560,11 @@ func TestTxnPipelinerManyWrites(t *testing.T) { ctx := context.Background() tp, mockSender := makeMockTxnPipeliner() - // Disable maxInFlightSize and maxBatchSize limits/ + // Disable write_pipelining_max_outstanding_size, + // write_pipelining_max_batch_size, and max_intents_bytes limits. pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, math.MaxInt64) pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0) + trackedWritesMaxSize.Override(&tp.st.SV, math.MaxInt64) const writes = 2048 keyBuf := roachpb.Key(strings.Repeat("a", writes+1)) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go index 19801c3046af..4e35ac934d90 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go @@ -125,7 +125,10 @@ func (s *txnSeqNumAllocator) initializeLeaf(tis *roachpb.LeafTxnInputState) { func (s *txnSeqNumAllocator) populateLeafFinalState(tfs *roachpb.LeafTxnFinalState) {} // importLeafFinalState is part of the txnInterceptor interface. -func (s *txnSeqNumAllocator) importLeafFinalState(tfs *roachpb.LeafTxnFinalState) {} +func (s *txnSeqNumAllocator) importLeafFinalState( + ctx context.Context, tfs *roachpb.LeafTxnFinalState, +) { +} // stepLocked bumps the read seqnum to the current write seqnum. // Used by the TxnCoordSender's Step() method. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 8cd4320c6fa7..5eac2c10c992 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -104,21 +104,21 @@ var MaxTxnRefreshSpansBytes = settings.RegisterPublicIntSetting( type txnSpanRefresher struct { st *cluster.Settings knobs *ClientTestingKnobs + riGen rangeIteratorFactory wrapped lockedSender - // refreshSpans contains key spans which were read during the transaction. In - // case the transaction's timestamp needs to be pushed, we can avoid a - // retriable error by "refreshing" these spans: verifying that there have been - // no changes to their data in between the timestamp at which they were read - // and the higher timestamp we want to move to. - refreshSpans []roachpb.Span + // refreshFootprint contains key spans which were read during the + // transaction. In case the transaction's timestamp needs to be pushed, we can + // avoid a retriable error by "refreshing" these spans: verifying that there + // have been no changes to their data in between the timestamp at which they + // were read and the higher timestamp we want to move to. + refreshFootprint condensableSpanSet // refreshInvalid is set if refresh spans have not been collected (because the - // memory budget was exceeded). When set, refreshSpans is empty. + // memory budget was exceeded). When set, refreshFootprint is empty. This is + // set when we've failed to condense the refresh spans below the target memory + // limit. refreshInvalid bool - // refreshSpansBytes is the total size in bytes of the spans - // encountered during this transaction that need to be refreshed - // to avoid serializable restart. - refreshSpansBytes int64 + // refreshedTimestamp keeps track of the largest timestamp that refreshed // don't fail on (i.e. if we'll refresh, we'll refreshFrom timestamp onwards). // After every epoch bump, it is initialized to the timestamp of the first @@ -127,12 +127,11 @@ type txnSpanRefresher struct { // canAutoRetry is set if the txnSpanRefresher is allowed to auto-retry. canAutoRetry bool - // autoRetryCounter counts the number of auto retries which avoid - // client-side restarts. - autoRetryCounter *metric.Counter - // refreshSpanBytesExceeded counter counts the number of transactions which - // do not refresh because they exceed the MaxRefreshSpanBytes. - refreshSpanBytesExceededCounter *metric.Counter + + refreshSuccess *metric.Counter + refreshFail *metric.Counter + refreshFailWithCondensedSpans *metric.Counter + refreshMemoryLimitExceeded *metric.Counter } // SendLocked implements the lockedSender interface. @@ -191,25 +190,48 @@ func (sr *txnSpanRefresher) SendLocked( return br, nil } - // Iterate over and aggregate refresh spans in the requests, - // qualified by possible resume spans in the responses, if we - // haven't yet exceeded the max read key bytes. + // Iterate over and aggregate refresh spans in the requests, qualified by + // possible resume spans in the responses. if !sr.refreshInvalid { if err := sr.appendRefreshSpans(ctx, ba, br); err != nil { return nil, roachpb.NewError(err) } - } - // Verify and enforce the size in bytes of all read-only spans - // doesn't exceed the max threshold. - if sr.refreshSpansBytes > MaxTxnRefreshSpansBytes.Get(&sr.st.SV) { - log.VEventf(ctx, 2, "refresh spans max size exceeded; clearing") - sr.refreshSpans = nil - sr.refreshInvalid = true - sr.refreshSpansBytes = 0 + // Check whether we should condense the refresh spans. + maxBytes := MaxTxnRefreshSpansBytes.Get(&sr.st.SV) + if sr.refreshFootprint.bytes >= maxBytes { + condensedBefore := sr.refreshFootprint.condensed + condensedSufficient := sr.tryCondenseRefreshSpans(ctx, maxBytes) + if condensedSufficient { + log.VEventf(ctx, 2, "condensed refresh spans for txn %s to %d bytes", + br.Txn, sr.refreshFootprint.bytes) + } else { + // Condensing was not enough. Giving up on tracking reads. Refreshed + // will not be possible. + log.VEventf(ctx, 2, "condensed refresh spans didn't save enough memory. txn %s. "+ + "refresh spans after condense: %d bytes", + br.Txn, sr.refreshFootprint.bytes) + sr.refreshInvalid = true + sr.refreshFootprint.clear() + } + + if sr.refreshFootprint.condensed && !condensedBefore { + sr.refreshMemoryLimitExceeded.Inc(1) + } + } } return br, nil } +// tryCondenseRefreshSpans attempts to condense the refresh spans in order to +// save memory. Returns true if we managed to condense them below maxBytes. +func (sr *txnSpanRefresher) tryCondenseRefreshSpans(ctx context.Context, maxBytes int64) bool { + if sr.knobs.CondenseRefreshSpansFilter != nil && !sr.knobs.CondenseRefreshSpansFilter() { + return false + } + sr.refreshFootprint.maybeCondense(ctx, sr.riGen, maxBytes) + return sr.refreshFootprint.bytes < maxBytes +} + // sendLockedWithRefreshAttempts sends the batch through the wrapped sender. It // catches serializable errors and attempts to avoid them by refreshing the txn // at a larger timestamp. @@ -294,8 +316,13 @@ func (sr *txnSpanRefresher) maybeRetrySend( // Try updating the txn spans so we can retry. if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok { + sr.refreshFail.Inc(1) + if sr.refreshFootprint.condensed { + sr.refreshFailWithCondensedSpans.Inc(1) + } return nil, pErr } + sr.refreshSuccess.Inc(1) // We've refreshed all of the read spans successfully and bumped // ba.Txn's timestamps. Attempt the request again. @@ -308,7 +335,6 @@ func (sr *txnSpanRefresher) maybeRetrySend( } log.VEventf(ctx, 2, "retry successful @%s", retryBr.Txn.ReadTimestamp) - sr.autoRetryCounter.Inc(1) return retryBr, nil } @@ -323,9 +349,8 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( if sr.refreshInvalid { log.VEvent(ctx, 2, "can't refresh txn spans; not valid") - sr.refreshSpanBytesExceededCounter.Inc(1) return false - } else if len(sr.refreshSpans) == 0 { + } else if sr.refreshFootprint.empty() { log.VEvent(ctx, 2, "there are no txn spans to refresh") sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp) return true @@ -335,7 +360,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // TODO(nvanbenschoten): actually merge spans. refreshSpanBa := roachpb.BatchRequest{} refreshSpanBa.Txn = refreshTxn - addRefreshes := func(refreshes []roachpb.Span) { + addRefreshes := func(refreshes *condensableSpanSet) { // We're going to check writes between the previous refreshed timestamp, if // any, and the timestamp we want to bump the transaction to. Note that if // we've already refreshed the transaction before, we don't need to check @@ -347,7 +372,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // refreshed). Checking below that timestamp (like we would, for example, if // we simply used txn.OrigTimestamp here), could cause false-positives that // would fail the refresh. - for _, u := range refreshes { + for _, u := range refreshes.asSlice() { var req roachpb.Request if len(u.EndKey) == 0 { req = &roachpb.RefreshRequest{ @@ -365,7 +390,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( req.Header().Span(), sr.refreshedTimestamp, refreshTxn.WriteTimestamp) } } - addRefreshes(sr.refreshSpans) + addRefreshes(&sr.refreshFootprint) // Send through wrapped lockedSender. Unlocks while sending then re-locks. if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil { @@ -393,8 +418,7 @@ func (sr *txnSpanRefresher) appendRefreshSpans( ba.RefreshSpanIterate(br, func(span roachpb.Span) { log.VEventf(ctx, 3, "recording span to refresh: %s", span) - sr.refreshSpans = append(sr.refreshSpans, span) - sr.refreshSpansBytes += int64(len(span.Key) + len(span.EndKey)) + sr.refreshFootprint.insert(span) }) return nil } @@ -404,7 +428,7 @@ func (sr *txnSpanRefresher) appendRefreshSpans( // for the "server-side refresh" optimization, where batches are re-evaluated // at a higher read-timestamp without returning to transaction coordinator. func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool { - return sr.canAutoRetry && !sr.refreshInvalid && len(sr.refreshSpans) == 0 && !txn.CommitTimestampFixed + return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed } // forwardRefreshTimestampOnResponse updates the refresher's tracked @@ -437,52 +461,42 @@ func (sr *txnSpanRefresher) populateLeafFinalState(tfs *roachpb.LeafTxnFinalStat tfs.RefreshInvalid = sr.refreshInvalid if !sr.refreshInvalid { // Copy mutable state so access is safe for the caller. - tfs.RefreshSpans = append([]roachpb.Span(nil), sr.refreshSpans...) + tfs.RefreshSpans = append([]roachpb.Span(nil), sr.refreshFootprint.asSlice()...) } } // importLeafFinalState is part of the txnInterceptor interface. -func (sr *txnSpanRefresher) importLeafFinalState(tfs *roachpb.LeafTxnFinalState) { - // Do not modify existing span slices when copying. +func (sr *txnSpanRefresher) importLeafFinalState( + ctx context.Context, tfs *roachpb.LeafTxnFinalState, +) { if tfs.RefreshInvalid { sr.refreshInvalid = true - sr.refreshSpans = nil + sr.refreshFootprint.clear() } else if !sr.refreshInvalid { - if tfs.RefreshSpans != nil { - sr.refreshSpans, _ = roachpb.MergeSpans( - append(append([]roachpb.Span(nil), sr.refreshSpans...), tfs.RefreshSpans...), - ) - } - } - // Recompute the size of the refreshes. - sr.refreshSpansBytes = 0 - for _, u := range sr.refreshSpans { - sr.refreshSpansBytes += int64(len(u.Key) + len(u.EndKey)) + sr.refreshFootprint.insert(tfs.RefreshSpans...) + sr.refreshFootprint.maybeCondense(ctx, sr.riGen, MaxTxnRefreshSpansBytes.Get(&sr.st.SV)) } } // epochBumpedLocked implements the txnInterceptor interface. func (sr *txnSpanRefresher) epochBumpedLocked() { - sr.refreshSpans = nil + sr.refreshFootprint.clear() sr.refreshInvalid = false - sr.refreshSpansBytes = 0 sr.refreshedTimestamp.Reset() } // createSavepointLocked is part of the txnReqInterceptor interface. func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepoint) { - s.refreshSpans = make([]roachpb.Span, len(sr.refreshSpans)) - copy(s.refreshSpans, sr.refreshSpans) + s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice())) + copy(s.refreshSpans, sr.refreshFootprint.asSlice()) s.refreshInvalid = sr.refreshInvalid - s.refreshSpanBytes = sr.refreshSpansBytes } // rollbackToSavepointLocked is part of the txnReqInterceptor interface. func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) { - sr.refreshSpans = make([]roachpb.Span, len(s.refreshSpans)) - copy(sr.refreshSpans, s.refreshSpans) + sr.refreshFootprint.clear() + sr.refreshFootprint.insert(s.refreshSpans...) sr.refreshInvalid = s.refreshInvalid - sr.refreshSpansBytes = s.refreshSpanBytes } // closeLocked implements the txnInterceptor interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index a199c8d9c620..4295d4d50542 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -25,12 +25,14 @@ import ( func makeMockTxnSpanRefresher() (txnSpanRefresher, *mockLockedSender) { mockSender := &mockLockedSender{} return txnSpanRefresher{ - st: cluster.MakeTestingClusterSettings(), - knobs: new(ClientTestingKnobs), - wrapped: mockSender, - canAutoRetry: true, - autoRetryCounter: metric.NewCounter(metaAutoRetriesRates), - refreshSpanBytesExceededCounter: metric.NewCounter(metaRefreshSpanBytesExceeded), + st: cluster.MakeTestingClusterSettings(), + knobs: new(ClientTestingKnobs), + wrapped: mockSender, + canAutoRetry: true, + refreshSuccess: metric.NewCounter(metaRefreshSuccess), + refreshFail: metric.NewCounter(metaRefreshFail), + refreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), + refreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), }, mockSender } @@ -70,9 +72,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, + tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshSpansBytes) + require.Equal(t, int64(3), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Scan with limit. Only the scanned keys are added to the refresh spans. @@ -97,9 +100,9 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}}, - tsr.refreshSpans) + tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(5), tsr.refreshSpansBytes) + require.Equal(t, int64(5), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) } @@ -210,9 +213,8 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshSpansBytes) require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. @@ -273,12 +275,15 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp) require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp) require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp) - require.Equal(t, int64(1), tsr.autoRetryCounter.Count()) + require.Equal(t, int64(1), tsr.refreshSuccess.Count()) + require.Equal(t, int64(0), tsr.refreshFail.Count()) } else { require.Nil(t, br) require.NotNil(t, pErr) require.Equal(t, ba.Txn.ReadTimestamp, tsr.refreshedTimestamp) - require.Equal(t, int64(0), tsr.autoRetryCounter.Count()) + require.Equal(t, int64(0), tsr.refreshSuccess.Count()) + // Note that we don't check the tsr.refreshFail metric here as tests + // here expect the refresh to not be attempted, not to fail. } }) } @@ -308,9 +313,8 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. @@ -356,18 +360,41 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Equal(t, tsr.knobs.MaxTxnRefreshAttempts, refreshes) } +type singleRangeIterator struct{} + +func (s singleRangeIterator) Valid() bool { + return true +} + +func (s singleRangeIterator) Seek(context.Context, roachpb.RKey, ScanDirection) {} + +func (s singleRangeIterator) Error() error { + return nil +} + +func (s singleRangeIterator) Desc() *roachpb.RangeDescriptor { + return &roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + } +} + // TestTxnSpanRefresherMaxTxnRefreshSpansBytes tests that the txnSpanRefresher -// only collects up to kv.transaction.max_refresh_spans_bytes refresh bytes -// before throwing away refresh spans and refusing to attempt to refresh -// transactions. +// collapses spans after they exceed kv.transaction.max_refresh_spans_bytes +// refresh bytes. func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() tsr, mockSender := makeMockTxnSpanRefresher() + tsr.riGen = rangeIteratorFactory{factory: func() condensableSpanSetRangeIterator { + return singleRangeIterator{} + }} txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") - keyC, keyD := roachpb.Key("c"), roachpb.Key("d") + keyC := roachpb.Key("c") + keyD, keyE := roachpb.Key("d"), roachpb.Key("e") // Set MaxTxnRefreshSpansBytes limit to 3 bytes. MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) @@ -382,13 +409,13 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) - // Send another batch that pushes us above the limit. The refresh spans - // should become invalid. + // Send another batch that pushes us above the limit. The tracked spans are + // adjacent so the spans will be merged, but not condensed. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyB, EndKey: keyC}} ba.Add(&scanArgs2) @@ -397,28 +424,29 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyC}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) + require.False(t, tsr.refreshFootprint.condensed) + require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count()) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) - // Once invalid, the refresh spans should stay invalid. + // Exceed the limit again, this time with a non-adjacent span such that + // condensing needs to occur. ba.Requests = nil - scanArgs3 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} + scanArgs3 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyD, EndKey: keyE}} ba.Add(&scanArgs3) br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyE}}, tsr.refreshFootprint.asSlice()) + require.True(t, tsr.refreshFootprint.condensed) + require.False(t, tsr.refreshInvalid) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) - - // Make sure that the metric due to the refresh span bytes being exceeded - // has not been incremented. - require.Equal(t, int64(0), tsr.refreshSpanBytesExceededCounter.Count()) + require.Equal(t, int64(1), tsr.refreshMemoryLimitExceeded.Count()) + require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count()) // Return a transaction retry error and make sure the metric indicating that // we did not retry due to the refresh span bytes in incremented. @@ -431,7 +459,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { exp := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "") require.Equal(t, exp, pErr.GetDetail()) require.Nil(t, br) - require.Equal(t, int64(1), tsr.refreshSpanBytesExceededCounter.Count()) + require.Equal(t, int64(1), tsr.refreshFailWithCondensedSpans.Count()) } // TestTxnSpanRefresherAssignsCanForwardReadTimestamp tests that the @@ -446,9 +474,6 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) - // Send a Put request. Should set CanForwardReadTimestamp flag. Should not // collect refresh spans. var ba roachpb.BatchRequest @@ -468,7 +493,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr := tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Nil(t, tsr.refreshSpans) + require.Nil(t, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send a Put request for a transaction with a fixed commit timestamp. @@ -492,7 +517,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, baFixed) require.Nil(t, pErr) require.NotNil(t, br) - require.Nil(t, tsr.refreshSpans) + require.Nil(t, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send a Scan request. Should set CanForwardReadTimestamp flag. Should @@ -514,11 +539,10 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send another Scan request. Should NOT set CanForwardReadTimestamp flag. - // Should push the spans above the limit. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} ba.Add(&scanArgs2) @@ -536,8 +560,8 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyB}, {Key: keyC, EndKey: keyD}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) // Send another Put request. Still should NOT set CanForwardReadTimestamp flag. ba.Requests = nil @@ -556,8 +580,8 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyB}, {Key: keyC, EndKey: keyD}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) // Increment the transaction's epoch and send another Put request. Should // set CanForwardReadTimestamp flag. @@ -578,7 +602,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) } @@ -594,9 +618,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) - // Send an EndTxn request. Should set CanCommitAtHigherTimestamp and // CanForwardReadTimestamp flags. var ba roachpb.BatchRequest @@ -651,7 +672,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send another EndTxn request. Should NOT set CanCommitAtHigherTimestamp @@ -674,7 +695,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - // Send another batch to push the spans above the limit. + // Send another batch. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} ba.Add(&scanArgs2) @@ -683,8 +704,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) // Send another EndTxn request. Still should NOT set // CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags. @@ -705,8 +724,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) // Increment the transaction's epoch and send another EndTxn request. Should // set CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags. @@ -728,7 +745,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) } @@ -738,6 +755,8 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() tsr, _ := makeMockTxnSpanRefresher() + // Disable span condensing. + tsr.knobs.CondenseRefreshSpansFilter = func() bool { return false } txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -756,17 +775,16 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the spans. tsr.epochBumpedLocked() - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) // Send a batch above the limit. @@ -778,17 +796,16 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, int64(0), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the invalid status. tsr.epochBumpedLocked() - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) } @@ -820,42 +837,33 @@ func TestTxnSpanRefresherSavepoint(t *testing.T) { require.NotNil(t, br) } read(keyA) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) s := savepoint{} tsr.createSavepointLocked(ctx, &s) // Another read after the savepoint was created. read(keyB) - require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice()) require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) - require.Less(t, s.refreshSpanBytes, tsr.refreshSpansBytes) require.False(t, s.refreshInvalid) // Rollback the savepoint and check that refresh spans were overwritten. tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) - - // Set MaxTxnRefreshSpansBytes limit low and then exceed it. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 1) - read(keyB) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) // Check that rolling back to the savepoint resets refreshInvalid. + tsr.refreshInvalid = true tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, tsr.refreshSpansBytes, s.refreshSpanBytes) require.False(t, tsr.refreshInvalid) - // Exceed the limit again and then create a savepoint. - read(keyB) - require.True(t, tsr.refreshInvalid) + // Set refreshInvalid and then create a savepoint. + tsr.refreshInvalid = true s = savepoint{} tsr.createSavepointLocked(ctx, &s) require.True(t, s.refreshInvalid) - require.Empty(t, s.refreshSpans) // Rollback to the savepoint check that refreshes are still invalid. tsr.rollbackToSavepointLocked(ctx, s) - require.Empty(t, tsr.refreshSpans) require.True(t, tsr.refreshInvalid) } diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index 12943abf774f..979bc6b905bb 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -19,13 +19,17 @@ import ( // TxnMetrics holds all metrics relating to KV transactions. type TxnMetrics struct { - Aborts *metric.Counter - Commits *metric.Counter - Commits1PC *metric.Counter // Commits which finished in a single phase - ParallelCommits *metric.Counter // Commits which entered the STAGING state - AutoRetries *metric.Counter // Auto retries which avoid client-side restarts - RefreshSpanBytesExceeded *metric.Counter // Transactions which don't refresh due to span bytes - Durations *metric.Histogram + Aborts *metric.Counter + Commits *metric.Counter + Commits1PC *metric.Counter // Commits which finished in a single phase + ParallelCommits *metric.Counter // Commits which entered the STAGING state + + RefreshSuccess *metric.Counter + RefreshFail *metric.Counter + RefreshFailWithCondensedSpans *metric.Counter + RefreshMemoryLimitExceeded *metric.Counter + + Durations *metric.Histogram // Restarts is the number of times we had to restart the transaction. Restarts *metric.Histogram @@ -66,16 +70,33 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } - metaAutoRetriesRates = metric.Metadata{ - Name: "txn.autoretries", - Help: "Number of automatic retries to avoid serializable restarts", - Measurement: "Retries", + metaRefreshSuccess = metric.Metadata{ + Name: "txn.refresh.success", + Help: "Number of successful refreshes", + Measurement: "Refreshes", + Unit: metric.Unit_COUNT, + } + metaRefreshFail = metric.Metadata{ + Name: "txn.refresh.fail", + Help: "Number of failed refreshes", + Measurement: "Refreshes", + Unit: metric.Unit_COUNT, + } + metaRefreshFailWithCondensedSpans = metric.Metadata{ + Name: "txn.refresh.fail_with_condensed_spans", + Help: "Number of failed refreshes for transactions whose read " + + "tracking lost fidelity because of condensing. Such a failure " + + "could be a false conflict. Failures counted here are also counted " + + "in txn.refresh.fail, and the respective transactions are also counted in " + + "txn.refresh.memory_limit_exceeded.", + Measurement: "Refreshes", Unit: metric.Unit_COUNT, } - metaRefreshSpanBytesExceeded = metric.Metadata{ - Name: "txn.refreshspanbytesexceeded", - Help: "Number of transaction retries which fail to refresh due to the refresh span bytes", - Measurement: "Retries", + metaRefreshMemoryLimitExceeded = metric.Metadata{ + Name: "txn.refresh.memory_limit_exceeded", + Help: "Number of transaction which exceed the refresh span bytes limit, causing " + + "their read spans to be condensed", + Measurement: "Transactions", Unit: metric.Unit_COUNT, } metaDurationsHistograms = metric.Metadata{ @@ -175,8 +196,10 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), - AutoRetries: metric.NewCounter(metaAutoRetriesRates), - RefreshSpanBytesExceeded: metric.NewCounter(metaRefreshSpanBytesExceeded), + RefreshFail: metric.NewCounter(metaRefreshFail), + RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), + RefreshSuccess: metric.NewCounter(metaRefreshSuccess), + RefreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow), Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3), RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld), diff --git a/pkg/roachpb/merge_spans.go b/pkg/roachpb/merge_spans.go index d53f165150e7..3d1c56b811a5 100644 --- a/pkg/roachpb/merge_spans.go +++ b/pkg/roachpb/merge_spans.go @@ -34,7 +34,10 @@ func (s sortedSpans) Len() int { } // MergeSpans sorts the incoming spans and merges overlapping spans. Returns -// true iff all of the spans are distinct. +// true iff all of the spans are distinct. Note that even if it returns true, +// adjacent spans might have been merged (i.e. [a, b) is distinct from [b,c), +// but the two are still merged. +// // The input spans are not safe for re-use. func MergeSpans(spans []Span) ([]Span, bool) { if len(spans) == 0 { diff --git a/pkg/sql/gcjob/table_garbage_collection.go b/pkg/sql/gcjob/table_garbage_collection.go index ae3f6654ce08..c25f015407af 100644 --- a/pkg/sql/gcjob/table_garbage_collection.go +++ b/pkg/sql/gcjob/table_garbage_collection.go @@ -122,7 +122,7 @@ func clearTableData( for ri.Seek(ctx, tableSpan.Key, kvcoord.Ascending); ; ri.Next(ctx) { if !ri.Valid() { - return ri.Error().GoError() + return ri.Error() } if n++; n >= batchSize || !ri.NeedAnother(tableSpan) { diff --git a/pkg/sql/physicalplan/span_resolver.go b/pkg/sql/physicalplan/span_resolver.go index 4b2a9516ae24..a8bf01749939 100644 --- a/pkg/sql/physicalplan/span_resolver.go +++ b/pkg/sql/physicalplan/span_resolver.go @@ -191,8 +191,7 @@ func (it *spanResolverIterator) Error() error { if it.err != nil { return it.err } - // TODO(andrei): make the DistSender iterator return error, not pErr - return it.it.Error().GoError() + return it.it.Error() } // Seek is part of the SpanResolverIterator interface. diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 5d568de24a13..25114ee0885e 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -765,12 +765,20 @@ var charts = []sectionDescription{ Metrics: []string{"txn.aborts"}, }, { - Title: "Auto Retries", - Metrics: []string{"txn.autoretries"}, + Title: "Successful refreshes", + Metrics: []string{"txn.refresh.success"}, }, { - Title: "Refresh Span Bytes Exceeded", - Metrics: []string{"txn.refreshspanbytesexceeded"}, + Title: "Failed refreshes", + Metrics: []string{"txn.refresh.fail"}, + }, + { + Title: "Failed refreshes with condensed spans", + Metrics: []string{"txn.refresh.fail_with_condensed_spans"}, + }, + { + Title: "Transactions exceeding refresh spans memory limit", + Metrics: []string{"txn.refresh.memory_limit_exceeded"}, }, { Title: "Commits",