diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set.go b/pkg/kv/kvclient/kvcoord/condensable_span_set.go index 8eee6ba97fe4..05cd98808e62 100644 --- a/pkg/kv/kvclient/kvcoord/condensable_span_set.go +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set.go @@ -29,13 +29,22 @@ import ( type condensableSpanSet struct { s []roachpb.Span bytes int64 + + // condensed is set if we ever condensed the spans. Meaning, if the set of + // spans currently tracked has lost fidelity compared to the spans inserted. + // Note that we might have otherwise mucked with the inserted spans to save + // memory without losing fidelity, in which case this flag would not be set + // (e.g. merging overlapping or adjacent spans). + condensed bool } -// insert adds a new span to the condensable span set. No attempt to condense -// the set or deduplicate the new span with existing spans is made. -func (s *condensableSpanSet) insert(sp roachpb.Span) { - s.s = append(s.s, sp) - s.bytes += spanSize(sp) +// insert adds new spans to the condensable span set. No attempt to condense the +// set or deduplicate the new span with existing spans is made. +func (s *condensableSpanSet) insert(spans ...roachpb.Span) { + s.s = append(s.s, spans...) + for _, sp := range spans { + s.bytes += spanSize(sp) + } } // mergeAndSort merges all overlapping spans. Calling this method will not @@ -60,11 +69,17 @@ func (s *condensableSpanSet) mergeAndSort() { // exceeded, the method is more aggressive in its attempt to reduce the memory // footprint of the span set. Not only will it merge overlapping spans, but // spans within the same range boundaries are also condensed. +// +// Returns true if condensing was done. Note that, even if condensing was +// performed, this doesn't guarantee that the size was reduced below the byte +// limit. Condensing is only performed at the level of individual ranges, not +// across ranges, so it's possible to not be able to condense as much as +// desired. func (s *condensableSpanSet) maybeCondense( - ctx context.Context, riGen RangeIteratorGen, maxBytes int64, -) { + ctx context.Context, riGen rangeIteratorFactory, maxBytes int64, +) bool { if s.bytes < maxBytes { - return + return false } // Start by attempting to simply merge the spans within the set. This alone @@ -73,14 +88,10 @@ func (s *condensableSpanSet) maybeCondense( // lower in this method. s.mergeAndSort() if s.bytes < maxBytes { - return + return false } - if riGen == nil { - // If we were not given a RangeIteratorGen, we cannot condense the spans. - return - } - ri := riGen() + ri := riGen.newRangeIterator() // Divide spans by range boundaries and condense. Iterate over spans // using a range iterator and add each to a bucket keyed by range @@ -101,7 +112,7 @@ func (s *condensableSpanSet) maybeCondense( if !ri.Valid() { // We haven't modified s.s yet, so it is safe to return. log.Warningf(ctx, "failed to condense lock spans: %v", ri.Error()) - return + return false } rangeID := ri.Desc().RangeID if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID { @@ -143,6 +154,8 @@ func (s *condensableSpanSet) maybeCondense( s.bytes += spanSize(cs) s.s = append(s.s, cs) } + s.condensed = true + return true } // asSlice returns the set as a slice of spans. @@ -156,6 +169,10 @@ func (s *condensableSpanSet) empty() bool { return len(s.s) == 0 } +func (s *condensableSpanSet) clear() { + *s = condensableSpanSet{} +} + func spanSize(sp roachpb.Span) int64 { return int64(len(sp.Key) + len(sp.EndKey)) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 2b6a7ffa1e41..bf04fe7e00d2 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -210,9 +210,6 @@ type DistSender struct { // It is copied out of the rpcContext at construction time and used in // testing. clusterID *base.ClusterIDContainer - // rangeIteratorGen returns a range iterator bound to the DistSender. - // Used to avoid allocations. - rangeIteratorGen RangeIteratorGen // disableFirstRangeUpdates disables updates of the first range via // gossip. Used by tests which want finer control of the contents of the @@ -303,7 +300,6 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV))) }) ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) - ds.rangeIteratorGen = func() *RangeIterator { return NewRangeIterator(ds) } if g != nil { ctx := ds.AnnotateCtx(context.Background()) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index bfe9350a0778..68215143e2cb 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -1526,8 +1526,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { // Don't clobber the test's splits. storeKnobs.DisableMergeQueue = true + var refreshSpansCondenseFilter atomic.Value s, _, _ := serverutils.StartServer(t, - base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}}) + base.TestServerArgs{Knobs: base.TestingKnobs{ + Store: &storeKnobs, + KVClient: &kvcoord.ClientTestingKnobs{ + CondenseRefreshSpansFilter: func() bool { + fnVal := refreshSpansCondenseFilter.Load() + if fn, ok := fnVal.(func() bool); ok { + return fn() + } + return true + }, + }}}) + + disableCondensingRefreshSpans := func() bool { return false } + ctx := context.Background() defer s.Stopper().Stop(ctx) @@ -1554,13 +1568,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { } testCases := []struct { - name string - beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts - afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp - retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried - filter func(storagebase.FilterArgs) *roachpb.Error - priorReads bool - tsLeaked bool + name string + beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts + afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp + retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried + filter func(storagebase.FilterArgs) *roachpb.Error + refreshSpansCondenseFilter func() bool + priorReads bool + tsLeaked bool // If both of these are false, no retries. txnCoordRetry bool clientRetry bool @@ -1677,7 +1692,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { { // If we've exhausted the limit for tracking refresh spans but we // already refreshed, keep running the txn. - name: "forwarded timestamp with too many refreshes, read only", + name: "forwarded timestamp with too many refreshes, read only", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { return db.Put(ctx, "a", "value") }, @@ -1707,7 +1723,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { // limit for tracking refresh spans and our transaction's timestamp // has been pushed, if we successfully commit then we won't hit an // error. - name: "forwarded timestamp with too many refreshes in batch commit", + name: "forwarded timestamp with too many refreshes in batch commit", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err @@ -1740,7 +1757,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { // has been pushed, if we successfully commit then we won't hit an // error. This is the case even if the final batch itself causes a // refresh. - name: "forwarded timestamp with too many refreshes in batch commit triggering refresh", + name: "forwarded timestamp with too many refreshes in batch commit triggering refresh", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err @@ -2508,9 +2526,13 @@ func TestTxnCoordSenderRetries(t *testing.T) { filterFn.Store(tc.filter) defer filterFn.Store((func(storagebase.FilterArgs) *roachpb.Error)(nil)) } + if tc.refreshSpansCondenseFilter != nil { + refreshSpansCondenseFilter.Store(tc.refreshSpansCondenseFilter) + defer refreshSpansCondenseFilter.Store((func() bool)(nil)) + } var metrics kvcoord.TxnMetrics - var lastAutoRetries int64 + var lastRefreshes int64 var hadClientRetry bool epoch := 0 if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -2542,7 +2564,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics() - lastAutoRetries = metrics.AutoRetries.Count() + lastRefreshes = metrics.RefreshSuccess.Count() return tc.retryable(ctx, txn) }); err != nil { @@ -2558,11 +2580,11 @@ func TestTxnCoordSenderRetries(t *testing.T) { // from the cluster setup are still ongoing and can experience // their own retries, this might increase by more than one, so we // can only check here that it's >= 1. - autoRetries := metrics.AutoRetries.Count() - lastAutoRetries - if tc.txnCoordRetry && autoRetries == 0 { - t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries) - } else if !tc.txnCoordRetry && autoRetries != 0 { - t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries) + refreshes := metrics.RefreshSuccess.Count() - lastRefreshes + if tc.txnCoordRetry && refreshes == 0 { + t.Errorf("expected [at least] one txn coord sender auto retry; got %d", refreshes) + } else if !tc.txnCoordRetry && refreshes != 0 { + t.Errorf("expected no txn coord sender auto retries; got %d", refreshes) } if tc.clientRetry && !hadClientRetry { t.Errorf("expected but did not experience client retry") diff --git a/pkg/kv/kvclient/kvcoord/range_iter.go b/pkg/kv/kvclient/kvcoord/range_iter.go index 990cf2d1a1c0..ca8aaaa2ebc9 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter.go +++ b/pkg/kv/kvclient/kvcoord/range_iter.go @@ -34,9 +34,6 @@ type RangeIterator struct { err error } -// RangeIteratorGen is a generator of RangeIterators. -type RangeIteratorGen func() *RangeIterator - // NewRangeIterator creates a new RangeIterator. func NewRangeIterator(ds *DistSender) *RangeIterator { return &RangeIterator{ diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 1f6ae51177e9..762e6cb9fb08 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -23,6 +23,12 @@ type ClientTestingKnobs struct { // spans for a single transactional batch. // 0 means use a default. -1 means disable refresh. MaxTxnRefreshAttempts int + + // CondenseRefreshSpansFilter, if set, is called when the span refresher is + // considering condensing the refresh spans. If it returns false, condensing + // will not be attempted and the span refresher will behave as if condensing + // failed to save enough memory. + CondenseRefreshSpansFilter func() bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index f46fc5de5f4a..f0b9d9c14b8b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -217,9 +217,9 @@ func newRootTxnCoordSender( // txnLockGatekeeper at the bottom of the stack to connect it with the // TxnCoordSender's wrapped sender. First, each of the interceptor objects // is initialized. - var riGen RangeIteratorGen + var riGen rangeIteratorFactory if ds, ok := tcf.wrapped.(*DistSender); ok { - riGen = ds.rangeIteratorGen + riGen.ds = ds } tcs.interceptorAlloc.txnHeartbeater.init( tcf.AmbientContext, @@ -279,7 +279,7 @@ func newRootTxnCoordSender( } func (tc *TxnCoordSender) initCommonInterceptors( - tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, riGen RangeIteratorGen, + tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, riGen rangeIteratorFactory, ) { tc.interceptorAlloc.txnPipeliner = txnPipeliner{ st: tcf.st, @@ -288,13 +288,16 @@ func (tc *TxnCoordSender) initCommonInterceptors( tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{ st: tcf.st, knobs: &tcf.testingKnobs, + riGen: riGen, // We can only allow refresh span retries on root transactions // because those are the only places where we have all of the // refresh spans. If this is a leaf, as in a distributed sql flow, // we need to propagate the error to the root for an epoch restart. - canAutoRetry: typ == kv.RootTxn, - autoRetryCounter: tc.metrics.AutoRetries, - refreshSpanBytesExceededCounter: tc.metrics.RefreshSpanBytesExceeded, + canAutoRetry: typ == kv.RootTxn, + refreshSuccess: tc.metrics.RefreshSuccess, + refreshFail: tc.metrics.RefreshFail, + refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans, + refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded, } tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{ wrapped: tc.wrapped, @@ -348,9 +351,9 @@ func newLeafTxnCoordSender( // txnLockGatekeeper at the bottom of the stack to connect it with the // TxnCoordSender's wrapped sender. First, each of the interceptor objects // is initialized. - var riGen RangeIteratorGen + var riGen rangeIteratorFactory if ds, ok := tcf.wrapped.(*DistSender); ok { - riGen = ds.rangeIteratorGen + riGen.ds = ds } tcs.initCommonInterceptors(tcf, txn, kv.LeafTxn, riGen) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index ef5d0a313bc5..2d4e944ca538 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -41,9 +41,8 @@ type savepoint struct { seqNum enginepb.TxnSeq // txnSpanRefresher fields. - refreshSpans []roachpb.Span - refreshInvalid bool - refreshSpanBytes int64 + refreshSpans []roachpb.Span + refreshInvalid bool } var _ kv.SavepointToken = (*savepoint)(nil) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 7ee7f251bb76..671b728faefb 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -907,7 +907,7 @@ func TestTxnMultipleCoord(t *testing.T) { // Verify presence of both locks. tcs := txn.Sender().(*TxnCoordSender) - refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshSpans + refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshFootprint.asSlice() require.Equal(t, []roachpb.Span{{Key: key}, {Key: key2}}, refreshSpans) ba := txn.NewBatch() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index c7748734e55e..9f75fec92f5e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -176,10 +176,8 @@ var trackedWritesMaxSize = settings.RegisterPublicIntSetting( // attached to any end transaction request that is passed through the pipeliner // to ensure that they the locks within them are released. type txnPipeliner struct { - st *cluster.Settings - // Optional; used to condense lock spans, if provided. If not provided, a - // transaction's lock footprint may grow without bound. - riGen RangeIteratorGen + st *cluster.Settings + riGen rangeIteratorFactory //used to condense lock spans, if provided wrapped lockedSender disabled bool @@ -205,6 +203,36 @@ type txnPipeliner struct { lockFootprint condensableSpanSet } +// condensableSpanSetRangeIterator describes the interface of RangeIterator +// needed by the condensableSpanSetRangeIterator. Useful for mocking an +// interator in tests. +type condensableSpanSetRangeIterator interface { + Valid() bool + Seek(ctx context.Context, key roachpb.RKey, scanDir ScanDirection) + Error() error + Desc() *roachpb.RangeDescriptor +} + +// rangeIteratorFactory is used to create a condensableSpanSetRangeIterator +// lazily. It's used to avoid allocating an iterator when it's not needed. The +// factory can be configured either with a callback, used for mocking in tests, +// or with a DistSender. Can also not be configured with anything, in which +type rangeIteratorFactory struct { + factory func() condensableSpanSetRangeIterator + ds *DistSender +} + +// newRangeIterator creates a range iterator. If no factory was configured, it panics. +func (f rangeIteratorFactory) newRangeIterator() condensableSpanSetRangeIterator { + if f.factory != nil { + return f.factory() + } + if f.ds != nil { + return NewRangeIterator(f.ds) + } + panic("no iterator factory configured") +} + // SendLocked implements the lockedSender interface. func (tp *txnPipeliner) SendLocked( ctx context.Context, ba roachpb.BatchRequest, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 7b72db70fb4a..5fbcca805519 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -560,9 +560,11 @@ func TestTxnPipelinerManyWrites(t *testing.T) { ctx := context.Background() tp, mockSender := makeMockTxnPipeliner() - // Disable maxInFlightSize and maxBatchSize limits/ + // Disable write_pipelining_max_outstanding_size, + // write_pipelining_max_batch_size, and max_intents_bytes limits. pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, math.MaxInt64) pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0) + trackedWritesMaxSize.Override(&tp.st.SV, math.MaxInt64) const writes = 2048 keyBuf := roachpb.Key(strings.Repeat("a", writes+1)) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 48e9f0103253..26c46053e03d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -104,21 +104,16 @@ var MaxTxnRefreshSpansBytes = settings.RegisterPublicIntSetting( type txnSpanRefresher struct { st *cluster.Settings knobs *ClientTestingKnobs + riGen rangeIteratorFactory wrapped lockedSender - // refreshSpans contains key spans which were read during the transaction. In - // case the transaction's timestamp needs to be pushed, we can avoid a - // retriable error by "refreshing" these spans: verifying that there have been - // no changes to their data in between the timestamp at which they were read - // and the higher timestamp we want to move to. - refreshSpans []roachpb.Span + refreshFootprint condensableSpanSet // refreshInvalid is set if refresh spans have not been collected (because the - // memory budget was exceeded). When set, refreshSpans is empty. + // memory budget was exceeded). When set, refreshFootprint is empty. This is + // set when we've failed to condense the refresh spans below the target memory + // limit. refreshInvalid bool - // refreshSpansBytes is the total size in bytes of the spans - // encountered during this transaction that need to be refreshed - // to avoid serializable restart. - refreshSpansBytes int64 + // refreshedTimestamp keeps track of the largest timestamp that refreshed // don't fail on (i.e. if we'll refresh, we'll refreshFrom timestamp onwards). // After every epoch bump, it is initialized to the timestamp of the first @@ -127,12 +122,11 @@ type txnSpanRefresher struct { // canAutoRetry is set if the txnSpanRefresher is allowed to auto-retry. canAutoRetry bool - // autoRetryCounter counts the number of auto retries which avoid - // client-side restarts. - autoRetryCounter *metric.Counter - // refreshSpanBytesExceeded counter counts the number of transactions which - // do not refresh because they exceed the MaxRefreshSpanBytes. - refreshSpanBytesExceededCounter *metric.Counter + + refreshSuccess *metric.Counter + refreshFail *metric.Counter + refreshFailWithCondensedSpans *metric.Counter + refreshMemoryLimitExceeded *metric.Counter } // SendLocked implements the lockedSender interface. @@ -191,25 +185,48 @@ func (sr *txnSpanRefresher) SendLocked( return br, nil } - // Iterate over and aggregate refresh spans in the requests, - // qualified by possible resume spans in the responses, if we - // haven't yet exceeded the max read key bytes. + // Iterate over and aggregate refresh spans in the requests, qualified by + // possible resume spans in the responses. if !sr.refreshInvalid { if err := sr.appendRefreshSpans(ctx, ba, br); err != nil { return nil, roachpb.NewError(err) } - } - // Verify and enforce the size in bytes of all read-only spans - // doesn't exceed the max threshold. - if sr.refreshSpansBytes > MaxTxnRefreshSpansBytes.Get(&sr.st.SV) { - log.VEventf(ctx, 2, "refresh spans max size exceeded; clearing") - sr.refreshSpans = nil - sr.refreshInvalid = true - sr.refreshSpansBytes = 0 + // Check whether we should condense the refresh spans. + maxBytes := MaxTxnRefreshSpansBytes.Get(&sr.st.SV) + if sr.refreshFootprint.bytes >= maxBytes { + condensedBefore := sr.refreshFootprint.condensed + condensedSufficient := sr.tryCondenseRefreshSpans(ctx, maxBytes) + if condensedSufficient { + log.VEventf(ctx, 2, "condensed refresh spans for txn %s to %d bytes", + br.Txn, sr.refreshFootprint.bytes) + } else { + // Condensing was not enough. Giving up on tracking reads. Refreshed + // will not be possible. + log.VEventf(ctx, 2, "condensed refresh spans didn't save enough memory. txn %s. "+ + "refresh spans after condense: %d bytes", + br.Txn, sr.refreshFootprint.bytes) + sr.refreshInvalid = true + sr.refreshFootprint.clear() + } + + if sr.refreshFootprint.condensed && !condensedBefore { + sr.refreshMemoryLimitExceeded.Inc(1) + } + } } return br, nil } +// tryCondenseRefreshSpans attempts to condense the refresh spans in order to +// save memory. Returns true if we managed to condense them below maxBytes. +func (sr *txnSpanRefresher) tryCondenseRefreshSpans(ctx context.Context, maxBytes int64) bool { + if sr.knobs.CondenseRefreshSpansFilter != nil && !sr.knobs.CondenseRefreshSpansFilter() { + return false + } + sr.refreshFootprint.maybeCondense(ctx, sr.riGen, maxBytes) + return sr.refreshFootprint.bytes < maxBytes +} + // sendLockedWithRefreshAttempts sends the batch through the wrapped sender. It // catches serializable errors and attempts to avoid them by refreshing the txn // at a larger timestamp. @@ -294,8 +311,13 @@ func (sr *txnSpanRefresher) maybeRetrySend( // Try updating the txn spans so we can retry. if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok { + sr.refreshFail.Inc(1) + if sr.refreshFootprint.condensed { + sr.refreshFailWithCondensedSpans.Inc(1) + } return nil, pErr } + sr.refreshSuccess.Inc(1) // We've refreshed all of the read spans successfully and bumped // ba.Txn's timestamps. Attempt the request again. @@ -308,7 +330,6 @@ func (sr *txnSpanRefresher) maybeRetrySend( } log.VEventf(ctx, 2, "retry successful @%s", retryBr.Txn.ReadTimestamp) - sr.autoRetryCounter.Inc(1) return retryBr, nil } @@ -323,9 +344,8 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( if sr.refreshInvalid { log.VEvent(ctx, 2, "can't refresh txn spans; not valid") - sr.refreshSpanBytesExceededCounter.Inc(1) return false - } else if len(sr.refreshSpans) == 0 { + } else if sr.refreshFootprint.empty() { log.VEvent(ctx, 2, "there are no txn spans to refresh") sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp) return true @@ -335,7 +355,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // TODO(nvanbenschoten): actually merge spans. refreshSpanBa := roachpb.BatchRequest{} refreshSpanBa.Txn = refreshTxn - addRefreshes := func(refreshes []roachpb.Span) { + addRefreshes := func(refreshes condensableSpanSet) { // We're going to check writes between the previous refreshed timestamp, if // any, and the timestamp we want to bump the transaction to. Note that if // we've already refreshed the transaction before, we don't need to check @@ -347,7 +367,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // refreshed). Checking below that timestamp (like we would, for example, if // we simply used txn.OrigTimestamp here), could cause false-positives that // would fail the refresh. - for _, u := range refreshes { + for _, u := range refreshes.asSlice() { var req roachpb.Request if len(u.EndKey) == 0 { req = &roachpb.RefreshRequest{ @@ -365,7 +385,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( req.Header().Span(), sr.refreshedTimestamp, refreshTxn.WriteTimestamp) } } - addRefreshes(sr.refreshSpans) + addRefreshes(sr.refreshFootprint) // Send through wrapped lockedSender. Unlocks while sending then re-locks. if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil { @@ -393,8 +413,7 @@ func (sr *txnSpanRefresher) appendRefreshSpans( ba.RefreshSpanIterate(br, func(span roachpb.Span) { log.VEventf(ctx, 3, "recording span to refresh: %s", span) - sr.refreshSpans = append(sr.refreshSpans, span) - sr.refreshSpansBytes += int64(len(span.Key) + len(span.EndKey)) + sr.refreshFootprint.insert(span) }) return nil } @@ -404,7 +423,7 @@ func (sr *txnSpanRefresher) appendRefreshSpans( // for the "server-side refresh" optimization, where batches are re-evaluated // at a higher read-timestamp without returning to transaction coordinator. func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool { - return sr.canAutoRetry && !sr.refreshInvalid && len(sr.refreshSpans) == 0 && !txn.CommitTimestampFixed + return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed } // forwardRefreshTimestampOnResponse updates the refresher's tracked @@ -437,7 +456,7 @@ func (sr *txnSpanRefresher) populateLeafFinalState(tfs *roachpb.LeafTxnFinalStat tfs.RefreshInvalid = sr.refreshInvalid if !sr.refreshInvalid { // Copy mutable state so access is safe for the caller. - tfs.RefreshSpans = append([]roachpb.Span(nil), sr.refreshSpans...) + tfs.RefreshSpans = append([]roachpb.Span(nil), sr.refreshFootprint.asSlice()...) } } @@ -447,41 +466,32 @@ func (sr *txnSpanRefresher) importLeafFinalState( ) { if tfs.RefreshInvalid { sr.refreshInvalid = true - sr.refreshSpans = nil + sr.refreshFootprint.clear() } else if !sr.refreshInvalid { - if tfs.RefreshSpans != nil { - sr.refreshSpans, _ = roachpb.MergeSpans(append(sr.refreshSpans, tfs.RefreshSpans...)) - } - } - // Recompute the size of the refreshes. - sr.refreshSpansBytes = 0 - for _, u := range sr.refreshSpans { - sr.refreshSpansBytes += int64(len(u.Key) + len(u.EndKey)) + sr.refreshFootprint.insert(tfs.RefreshSpans...) + sr.refreshFootprint.maybeCondense(ctx, sr.riGen, MaxTxnRefreshSpansBytes.Get(&sr.st.SV)) } } // epochBumpedLocked implements the txnInterceptor interface. func (sr *txnSpanRefresher) epochBumpedLocked() { - sr.refreshSpans = nil + sr.refreshFootprint.clear() sr.refreshInvalid = false - sr.refreshSpansBytes = 0 sr.refreshedTimestamp.Reset() } // createSavepointLocked is part of the txnReqInterceptor interface. func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepoint) { - s.refreshSpans = make([]roachpb.Span, len(sr.refreshSpans)) - copy(s.refreshSpans, sr.refreshSpans) + s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice())) + copy(s.refreshSpans, sr.refreshFootprint.asSlice()) s.refreshInvalid = sr.refreshInvalid - s.refreshSpanBytes = sr.refreshSpansBytes } // rollbackToSavepointLocked is part of the txnReqInterceptor interface. func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) { - sr.refreshSpans = make([]roachpb.Span, len(s.refreshSpans)) - copy(sr.refreshSpans, s.refreshSpans) + sr.refreshFootprint.clear() + sr.refreshFootprint.insert(s.refreshSpans...) sr.refreshInvalid = s.refreshInvalid - sr.refreshSpansBytes = s.refreshSpanBytes } // closeLocked implements the txnInterceptor interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index a199c8d9c620..4295d4d50542 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -25,12 +25,14 @@ import ( func makeMockTxnSpanRefresher() (txnSpanRefresher, *mockLockedSender) { mockSender := &mockLockedSender{} return txnSpanRefresher{ - st: cluster.MakeTestingClusterSettings(), - knobs: new(ClientTestingKnobs), - wrapped: mockSender, - canAutoRetry: true, - autoRetryCounter: metric.NewCounter(metaAutoRetriesRates), - refreshSpanBytesExceededCounter: metric.NewCounter(metaRefreshSpanBytesExceeded), + st: cluster.MakeTestingClusterSettings(), + knobs: new(ClientTestingKnobs), + wrapped: mockSender, + canAutoRetry: true, + refreshSuccess: metric.NewCounter(metaRefreshSuccess), + refreshFail: metric.NewCounter(metaRefreshFail), + refreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), + refreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), }, mockSender } @@ -70,9 +72,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, + tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshSpansBytes) + require.Equal(t, int64(3), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Scan with limit. Only the scanned keys are added to the refresh spans. @@ -97,9 +100,9 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}}, - tsr.refreshSpans) + tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(5), tsr.refreshSpansBytes) + require.Equal(t, int64(5), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) } @@ -210,9 +213,8 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshSpansBytes) require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. @@ -273,12 +275,15 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp) require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp) require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp) - require.Equal(t, int64(1), tsr.autoRetryCounter.Count()) + require.Equal(t, int64(1), tsr.refreshSuccess.Count()) + require.Equal(t, int64(0), tsr.refreshFail.Count()) } else { require.Nil(t, br) require.NotNil(t, pErr) require.Equal(t, ba.Txn.ReadTimestamp, tsr.refreshedTimestamp) - require.Equal(t, int64(0), tsr.autoRetryCounter.Count()) + require.Equal(t, int64(0), tsr.refreshSuccess.Count()) + // Note that we don't check the tsr.refreshFail metric here as tests + // here expect the refresh to not be attempted, not to fail. } }) } @@ -308,9 +313,8 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. @@ -356,18 +360,41 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Equal(t, tsr.knobs.MaxTxnRefreshAttempts, refreshes) } +type singleRangeIterator struct{} + +func (s singleRangeIterator) Valid() bool { + return true +} + +func (s singleRangeIterator) Seek(context.Context, roachpb.RKey, ScanDirection) {} + +func (s singleRangeIterator) Error() error { + return nil +} + +func (s singleRangeIterator) Desc() *roachpb.RangeDescriptor { + return &roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + } +} + // TestTxnSpanRefresherMaxTxnRefreshSpansBytes tests that the txnSpanRefresher -// only collects up to kv.transaction.max_refresh_spans_bytes refresh bytes -// before throwing away refresh spans and refusing to attempt to refresh -// transactions. +// collapses spans after they exceed kv.transaction.max_refresh_spans_bytes +// refresh bytes. func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() tsr, mockSender := makeMockTxnSpanRefresher() + tsr.riGen = rangeIteratorFactory{factory: func() condensableSpanSetRangeIterator { + return singleRangeIterator{} + }} txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") - keyC, keyD := roachpb.Key("c"), roachpb.Key("d") + keyC := roachpb.Key("c") + keyD, keyE := roachpb.Key("d"), roachpb.Key("e") // Set MaxTxnRefreshSpansBytes limit to 3 bytes. MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) @@ -382,13 +409,13 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) - // Send another batch that pushes us above the limit. The refresh spans - // should become invalid. + // Send another batch that pushes us above the limit. The tracked spans are + // adjacent so the spans will be merged, but not condensed. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyB, EndKey: keyC}} ba.Add(&scanArgs2) @@ -397,28 +424,29 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyC}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) + require.False(t, tsr.refreshFootprint.condensed) + require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count()) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) - // Once invalid, the refresh spans should stay invalid. + // Exceed the limit again, this time with a non-adjacent span such that + // condensing needs to occur. ba.Requests = nil - scanArgs3 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} + scanArgs3 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyD, EndKey: keyE}} ba.Add(&scanArgs3) br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyE}}, tsr.refreshFootprint.asSlice()) + require.True(t, tsr.refreshFootprint.condensed) + require.False(t, tsr.refreshInvalid) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) - - // Make sure that the metric due to the refresh span bytes being exceeded - // has not been incremented. - require.Equal(t, int64(0), tsr.refreshSpanBytesExceededCounter.Count()) + require.Equal(t, int64(1), tsr.refreshMemoryLimitExceeded.Count()) + require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count()) // Return a transaction retry error and make sure the metric indicating that // we did not retry due to the refresh span bytes in incremented. @@ -431,7 +459,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { exp := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "") require.Equal(t, exp, pErr.GetDetail()) require.Nil(t, br) - require.Equal(t, int64(1), tsr.refreshSpanBytesExceededCounter.Count()) + require.Equal(t, int64(1), tsr.refreshFailWithCondensedSpans.Count()) } // TestTxnSpanRefresherAssignsCanForwardReadTimestamp tests that the @@ -446,9 +474,6 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) - // Send a Put request. Should set CanForwardReadTimestamp flag. Should not // collect refresh spans. var ba roachpb.BatchRequest @@ -468,7 +493,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr := tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Nil(t, tsr.refreshSpans) + require.Nil(t, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send a Put request for a transaction with a fixed commit timestamp. @@ -492,7 +517,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, baFixed) require.Nil(t, pErr) require.NotNil(t, br) - require.Nil(t, tsr.refreshSpans) + require.Nil(t, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send a Scan request. Should set CanForwardReadTimestamp flag. Should @@ -514,11 +539,10 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send another Scan request. Should NOT set CanForwardReadTimestamp flag. - // Should push the spans above the limit. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} ba.Add(&scanArgs2) @@ -536,8 +560,8 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyB}, {Key: keyC, EndKey: keyD}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) // Send another Put request. Still should NOT set CanForwardReadTimestamp flag. ba.Requests = nil @@ -556,8 +580,8 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyB}, {Key: keyC, EndKey: keyD}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) // Increment the transaction's epoch and send another Put request. Should // set CanForwardReadTimestamp flag. @@ -578,7 +602,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) } @@ -594,9 +618,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) - // Send an EndTxn request. Should set CanCommitAtHigherTimestamp and // CanForwardReadTimestamp flags. var ba roachpb.BatchRequest @@ -651,7 +672,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send another EndTxn request. Should NOT set CanCommitAtHigherTimestamp @@ -674,7 +695,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - // Send another batch to push the spans above the limit. + // Send another batch. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} ba.Add(&scanArgs2) @@ -683,8 +704,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) // Send another EndTxn request. Still should NOT set // CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags. @@ -705,8 +724,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) // Increment the transaction's epoch and send another EndTxn request. Should // set CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags. @@ -728,7 +745,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) } @@ -738,6 +755,8 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() tsr, _ := makeMockTxnSpanRefresher() + // Disable span condensing. + tsr.knobs.CondenseRefreshSpansFilter = func() bool { return false } txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -756,17 +775,16 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the spans. tsr.epochBumpedLocked() - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) // Send a batch above the limit. @@ -778,17 +796,16 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, int64(0), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the invalid status. tsr.epochBumpedLocked() - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) } @@ -820,42 +837,33 @@ func TestTxnSpanRefresherSavepoint(t *testing.T) { require.NotNil(t, br) } read(keyA) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) s := savepoint{} tsr.createSavepointLocked(ctx, &s) // Another read after the savepoint was created. read(keyB) - require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice()) require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) - require.Less(t, s.refreshSpanBytes, tsr.refreshSpansBytes) require.False(t, s.refreshInvalid) // Rollback the savepoint and check that refresh spans were overwritten. tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) - - // Set MaxTxnRefreshSpansBytes limit low and then exceed it. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 1) - read(keyB) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) // Check that rolling back to the savepoint resets refreshInvalid. + tsr.refreshInvalid = true tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, tsr.refreshSpansBytes, s.refreshSpanBytes) require.False(t, tsr.refreshInvalid) - // Exceed the limit again and then create a savepoint. - read(keyB) - require.True(t, tsr.refreshInvalid) + // Set refreshInvalid and then create a savepoint. + tsr.refreshInvalid = true s = savepoint{} tsr.createSavepointLocked(ctx, &s) require.True(t, s.refreshInvalid) - require.Empty(t, s.refreshSpans) // Rollback to the savepoint check that refreshes are still invalid. tsr.rollbackToSavepointLocked(ctx, s) - require.Empty(t, tsr.refreshSpans) require.True(t, tsr.refreshInvalid) } diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index 12943abf774f..8a8b78960e66 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -19,13 +19,17 @@ import ( // TxnMetrics holds all metrics relating to KV transactions. type TxnMetrics struct { - Aborts *metric.Counter - Commits *metric.Counter - Commits1PC *metric.Counter // Commits which finished in a single phase - ParallelCommits *metric.Counter // Commits which entered the STAGING state - AutoRetries *metric.Counter // Auto retries which avoid client-side restarts - RefreshSpanBytesExceeded *metric.Counter // Transactions which don't refresh due to span bytes - Durations *metric.Histogram + Aborts *metric.Counter + Commits *metric.Counter + Commits1PC *metric.Counter // Commits which finished in a single phase + ParallelCommits *metric.Counter // Commits which entered the STAGING state + + RefreshSuccess *metric.Counter + RefreshFail *metric.Counter + RefreshFailWithCondensedSpans *metric.Counter + RefreshMemoryLimitExceeded *metric.Counter + + Durations *metric.Histogram // Restarts is the number of times we had to restart the transaction. Restarts *metric.Histogram @@ -66,16 +70,33 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } - metaAutoRetriesRates = metric.Metadata{ - Name: "txn.autoretries", - Help: "Number of automatic retries to avoid serializable restarts", - Measurement: "Retries", + metaRefreshSuccess = metric.Metadata{ + Name: "txn.refresh.success", + Help: "Number of successful refreshes.", + Measurement: "Refreshes", + Unit: metric.Unit_COUNT, + } + metaRefreshFail = metric.Metadata{ + Name: "txn.refresh.fail", + Help: "Number of failed refreshes.", + Measurement: "Refreshes", + Unit: metric.Unit_COUNT, + } + metaRefreshFailWithCondensedSpans = metric.Metadata{ + Name: "txn.refresh.fail_with_condensed_spans", + Help: "Number of failed refreshes for transactions whose read " + + "tracking lost fidelity because of condensing. Such a failure " + + "could be a false conflict. Failures counted here are also counted " + + "in txn.refresh.fail, and the respective transactions are also counted in " + + "txn.refresh.memory_limit_exceeded.", + Measurement: "Refreshes", Unit: metric.Unit_COUNT, } - metaRefreshSpanBytesExceeded = metric.Metadata{ - Name: "txn.refreshspanbytesexceeded", - Help: "Number of transaction retries which fail to refresh due to the refresh span bytes", - Measurement: "Retries", + metaRefreshMemoryLimitExceeded = metric.Metadata{ + Name: "txn.refresh.memory_limit_exceeded", + Help: "Number of transaction which exceed the refresh span bytes limit, causing " + + "their read spans to be condensed.", + Measurement: "Transactions", Unit: metric.Unit_COUNT, } metaDurationsHistograms = metric.Metadata{ @@ -175,8 +196,10 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), - AutoRetries: metric.NewCounter(metaAutoRetriesRates), - RefreshSpanBytesExceeded: metric.NewCounter(metaRefreshSpanBytesExceeded), + RefreshFail: metric.NewCounter(metaRefreshFail), + RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), + RefreshSuccess: metric.NewCounter(metaRefreshSuccess), + RefreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow), Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3), RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld), diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 0321053b072b..a4f49b89ac62 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -765,12 +765,20 @@ var charts = []sectionDescription{ Metrics: []string{"txn.aborts"}, }, { - Title: "Auto Retries", - Metrics: []string{"txn.autoretries"}, + Title: "Successful refreshes", + Metrics: []string{"txn.refresh.success"}, }, { - Title: "Refresh Span Bytes Exceeded", - Metrics: []string{"txn.refreshspanbytesexceeded"}, + Title: "Failed refreshes", + Metrics: []string{"txn.refresh.fail"}, + }, + { + Title: "Failed refreshes with condensed spans", + Metrics: []string{"txn.refresh.fail_with_condensed_spans"}, + }, + { + Title: "Transactions exceeding refresh spans memory limit", + Metrics: []string{"txn.refresh.memory_limit_exceeded"}, }, { Title: "Commits",