From 7320018c7504caee4ff8f69d75715ed6820402ac Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 18 Mar 2020 19:29:40 -0400 Subject: [PATCH] kvcoord: condense read spans when they exceed the memory limit Before this patch, once a transaction exceeds the kv.transaction.max_refresh_spans_bytes limit, it stopped tracking reads and it didn't attempt to refresh any more when pushed. This patch make the span refresher condense the spans when it runs out of memory instead. So we'll get bigger spans and potentially false conflicts, but at least we have a chance at refreshing. In particular, it'll succeed if there's no writes anywhere. The condensing is performed using the condensableSpanSet, like we do in the pipeliner interceptor for the tracking of write intents. Internally, that guy condenses spans in ranges with lots of reads. We've seen people run into kv.transaction.max_refresh_spans_bytes in the past, so this should help many uses cases. But in particular I've written this patch because, without it, I'm scared about the effects of 20.1's reduction in the closed timestamp target duration to 3s from a previous 30s. Every transaction writing something after having run for longer than that will get pushed, so being able to refresh is getting more important. Fixes #46095 Release note (general change): Transactions reading a lot of data behave better when exceeding the memory limit set by kv.transaction.max_refresh_spans_bytes. Such transactions now attempt to resolve the conflicts they run into instead of being forced to always retry. Increasing kv.transaction.max_refresh_spans_bytes should no longer be necessary for most workloads. Release justification: fix for new "functionality" - the reduction in the closed timestamp target duration. --- .../kvclient/kvcoord/condensable_span_set.go | 47 +++-- pkg/kv/kvclient/kvcoord/dist_sender.go | 4 - .../kvcoord/dist_sender_server_test.go | 60 ++++-- pkg/kv/kvclient/kvcoord/range_iter.go | 3 - pkg/kv/kvclient/kvcoord/testing_knobs.go | 6 + pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 27 ++- .../kvcoord/txn_coord_sender_savepoints.go | 5 +- .../kvclient/kvcoord/txn_coord_sender_test.go | 2 +- .../kvcoord/txn_interceptor_pipeliner.go | 37 +++- .../kvcoord/txn_interceptor_pipeliner_test.go | 4 +- .../kvcoord/txn_interceptor_span_refresher.go | 125 +++++++------ .../txn_interceptor_span_refresher_test.go | 172 +++++++++--------- pkg/kv/kvclient/kvcoord/txn_metrics.go | 57 ++++-- pkg/ts/catalog/chart_catalog.go | 16 +- 14 files changed, 344 insertions(+), 221 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set.go b/pkg/kv/kvclient/kvcoord/condensable_span_set.go index 8eee6ba97fe4..05cd98808e62 100644 --- a/pkg/kv/kvclient/kvcoord/condensable_span_set.go +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set.go @@ -29,13 +29,22 @@ import ( type condensableSpanSet struct { s []roachpb.Span bytes int64 + + // condensed is set if we ever condensed the spans. Meaning, if the set of + // spans currently tracked has lost fidelity compared to the spans inserted. + // Note that we might have otherwise mucked with the inserted spans to save + // memory without losing fidelity, in which case this flag would not be set + // (e.g. merging overlapping or adjacent spans). + condensed bool } -// insert adds a new span to the condensable span set. No attempt to condense -// the set or deduplicate the new span with existing spans is made. -func (s *condensableSpanSet) insert(sp roachpb.Span) { - s.s = append(s.s, sp) - s.bytes += spanSize(sp) +// insert adds new spans to the condensable span set. No attempt to condense the +// set or deduplicate the new span with existing spans is made. +func (s *condensableSpanSet) insert(spans ...roachpb.Span) { + s.s = append(s.s, spans...) + for _, sp := range spans { + s.bytes += spanSize(sp) + } } // mergeAndSort merges all overlapping spans. Calling this method will not @@ -60,11 +69,17 @@ func (s *condensableSpanSet) mergeAndSort() { // exceeded, the method is more aggressive in its attempt to reduce the memory // footprint of the span set. Not only will it merge overlapping spans, but // spans within the same range boundaries are also condensed. +// +// Returns true if condensing was done. Note that, even if condensing was +// performed, this doesn't guarantee that the size was reduced below the byte +// limit. Condensing is only performed at the level of individual ranges, not +// across ranges, so it's possible to not be able to condense as much as +// desired. func (s *condensableSpanSet) maybeCondense( - ctx context.Context, riGen RangeIteratorGen, maxBytes int64, -) { + ctx context.Context, riGen rangeIteratorFactory, maxBytes int64, +) bool { if s.bytes < maxBytes { - return + return false } // Start by attempting to simply merge the spans within the set. This alone @@ -73,14 +88,10 @@ func (s *condensableSpanSet) maybeCondense( // lower in this method. s.mergeAndSort() if s.bytes < maxBytes { - return + return false } - if riGen == nil { - // If we were not given a RangeIteratorGen, we cannot condense the spans. - return - } - ri := riGen() + ri := riGen.newRangeIterator() // Divide spans by range boundaries and condense. Iterate over spans // using a range iterator and add each to a bucket keyed by range @@ -101,7 +112,7 @@ func (s *condensableSpanSet) maybeCondense( if !ri.Valid() { // We haven't modified s.s yet, so it is safe to return. log.Warningf(ctx, "failed to condense lock spans: %v", ri.Error()) - return + return false } rangeID := ri.Desc().RangeID if l := len(buckets); l > 0 && buckets[l-1].rangeID == rangeID { @@ -143,6 +154,8 @@ func (s *condensableSpanSet) maybeCondense( s.bytes += spanSize(cs) s.s = append(s.s, cs) } + s.condensed = true + return true } // asSlice returns the set as a slice of spans. @@ -156,6 +169,10 @@ func (s *condensableSpanSet) empty() bool { return len(s.s) == 0 } +func (s *condensableSpanSet) clear() { + *s = condensableSpanSet{} +} + func spanSize(sp roachpb.Span) int64 { return int64(len(sp.Key) + len(sp.EndKey)) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 2b6a7ffa1e41..bf04fe7e00d2 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -210,9 +210,6 @@ type DistSender struct { // It is copied out of the rpcContext at construction time and used in // testing. clusterID *base.ClusterIDContainer - // rangeIteratorGen returns a range iterator bound to the DistSender. - // Used to avoid allocations. - rangeIteratorGen RangeIteratorGen // disableFirstRangeUpdates disables updates of the first range via // gossip. Used by tests which want finer control of the contents of the @@ -303,7 +300,6 @@ func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender { ds.asyncSenderSem.UpdateCapacity(uint64(senderConcurrencyLimit.Get(&cfg.Settings.SV))) }) ds.rpcContext.Stopper.AddCloser(ds.asyncSenderSem.Closer("stopper")) - ds.rangeIteratorGen = func() *RangeIterator { return NewRangeIterator(ds) } if g != nil { ctx := ds.AnnotateCtx(context.Background()) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 1be557a00293..1a338f912179 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -1522,8 +1522,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return nil } + var refreshSpansCondenseFilter atomic.Value s, _, _ := serverutils.StartServer(t, - base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}}) + base.TestServerArgs{Knobs: base.TestingKnobs{ + Store: &storeKnobs, + KVClient: &kvcoord.ClientTestingKnobs{ + CondenseRefreshSpansFilter: func() bool { + fnVal := refreshSpansCondenseFilter.Load() + if fn, ok := fnVal.(func() bool); ok { + return fn() + } + return true + }, + }}}) + + disableCondensingRefreshSpans := func() bool { return false } + ctx := context.Background() defer s.Stopper().Stop(ctx) @@ -1550,13 +1564,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { } testCases := []struct { - name string - beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts - afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp - retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried - filter func(storagebase.FilterArgs) *roachpb.Error - priorReads bool - tsLeaked bool + name string + beforeTxnStart func(context.Context, *kv.DB) error // called before the txn starts + afterTxnStart func(context.Context, *kv.DB) error // called after the txn chooses a timestamp + retryable func(context.Context, *kv.Txn) error // called during the txn; may be retried + filter func(storagebase.FilterArgs) *roachpb.Error + refreshSpansCondenseFilter func() bool + priorReads bool + tsLeaked bool // If both of these are false, no retries. txnCoordRetry bool clientRetry bool @@ -1709,7 +1724,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { { // If we've exhausted the limit for tracking refresh spans but we // already refreshed, keep running the txn. - name: "forwarded timestamp with too many refreshes, read only", + name: "forwarded timestamp with too many refreshes, read only", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { return db.Put(ctx, "a", "value") }, @@ -1740,7 +1756,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { // has been pushed, if we successfully commit then we won't hit an // error. This is the case even if the final batch itself causes a // no-op refresh because the txn has no refresh spans. - name: "forwarded timestamp with too many refreshes in batch commit with no-op refresh", + name: "forwarded timestamp with too many refreshes in batch commit " + + "with no-op refresh", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err @@ -1773,7 +1791,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { // has been pushed, if we successfully commit then we won't hit an // error. This is the case even if the final batch itself causes a // real refresh. - name: "forwarded timestamp with too many refreshes in batch commit with refresh", + name: "forwarded timestamp with too many refreshes in batch commit " + + "with refresh", + refreshSpansCondenseFilter: disableCondensingRefreshSpans, afterTxnStart: func(ctx context.Context, db *kv.DB) error { _, err := db.Get(ctx, "a") // set ts cache return err @@ -2541,9 +2561,13 @@ func TestTxnCoordSenderRetries(t *testing.T) { filterFn.Store(tc.filter) defer filterFn.Store((func(storagebase.FilterArgs) *roachpb.Error)(nil)) } + if tc.refreshSpansCondenseFilter != nil { + refreshSpansCondenseFilter.Store(tc.refreshSpansCondenseFilter) + defer refreshSpansCondenseFilter.Store((func() bool)(nil)) + } var metrics kvcoord.TxnMetrics - var lastAutoRetries int64 + var lastRefreshes int64 var hadClientRetry bool epoch := 0 if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -2575,7 +2599,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics() - lastAutoRetries = metrics.AutoRetries.Count() + lastRefreshes = metrics.RefreshSuccess.Count() return tc.retryable(ctx, txn) }); err != nil { @@ -2591,11 +2615,11 @@ func TestTxnCoordSenderRetries(t *testing.T) { // from the cluster setup are still ongoing and can experience // their own retries, this might increase by more than one, so we // can only check here that it's >= 1. - autoRetries := metrics.AutoRetries.Count() - lastAutoRetries - if tc.txnCoordRetry && autoRetries == 0 { - t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries) - } else if !tc.txnCoordRetry && autoRetries != 0 { - t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries) + refreshes := metrics.RefreshSuccess.Count() - lastRefreshes + if tc.txnCoordRetry && refreshes == 0 { + t.Errorf("expected [at least] one txn coord sender auto retry; got %d", refreshes) + } else if !tc.txnCoordRetry && refreshes != 0 { + t.Errorf("expected no txn coord sender auto retries; got %d", refreshes) } if tc.clientRetry && !hadClientRetry { t.Errorf("expected but did not experience client retry") diff --git a/pkg/kv/kvclient/kvcoord/range_iter.go b/pkg/kv/kvclient/kvcoord/range_iter.go index 990cf2d1a1c0..ca8aaaa2ebc9 100644 --- a/pkg/kv/kvclient/kvcoord/range_iter.go +++ b/pkg/kv/kvclient/kvcoord/range_iter.go @@ -34,9 +34,6 @@ type RangeIterator struct { err error } -// RangeIteratorGen is a generator of RangeIterators. -type RangeIteratorGen func() *RangeIterator - // NewRangeIterator creates a new RangeIterator. func NewRangeIterator(ds *DistSender) *RangeIterator { return &RangeIterator{ diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 1f6ae51177e9..762e6cb9fb08 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -23,6 +23,12 @@ type ClientTestingKnobs struct { // spans for a single transactional batch. // 0 means use a default. -1 means disable refresh. MaxTxnRefreshAttempts int + + // CondenseRefreshSpansFilter, if set, is called when the span refresher is + // considering condensing the refresh spans. If it returns false, condensing + // will not be attempted and the span refresher will behave as if condensing + // failed to save enough memory. + CondenseRefreshSpansFilter func() bool } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index f46fc5de5f4a..3c36c77ad27a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -217,10 +217,6 @@ func newRootTxnCoordSender( // txnLockGatekeeper at the bottom of the stack to connect it with the // TxnCoordSender's wrapped sender. First, each of the interceptor objects // is initialized. - var riGen RangeIteratorGen - if ds, ok := tcf.wrapped.(*DistSender); ok { - riGen = ds.rangeIteratorGen - } tcs.interceptorAlloc.txnHeartbeater.init( tcf.AmbientContext, tcs.stopper, @@ -241,7 +237,7 @@ func newRootTxnCoordSender( clock: tcs.clock, txn: &tcs.mu.txn, } - tcs.initCommonInterceptors(tcf, txn, kv.RootTxn, riGen) + tcs.initCommonInterceptors(tcf, txn, kv.RootTxn) // Once the interceptors are initialized, piece them all together in the // correct order. @@ -279,8 +275,12 @@ func newRootTxnCoordSender( } func (tc *TxnCoordSender) initCommonInterceptors( - tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, riGen RangeIteratorGen, + tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, typ kv.TxnType, ) { + var riGen rangeIteratorFactory + if ds, ok := tcf.wrapped.(*DistSender); ok { + riGen.ds = ds + } tc.interceptorAlloc.txnPipeliner = txnPipeliner{ st: tcf.st, riGen: riGen, @@ -288,13 +288,16 @@ func (tc *TxnCoordSender) initCommonInterceptors( tc.interceptorAlloc.txnSpanRefresher = txnSpanRefresher{ st: tcf.st, knobs: &tcf.testingKnobs, + riGen: riGen, // We can only allow refresh span retries on root transactions // because those are the only places where we have all of the // refresh spans. If this is a leaf, as in a distributed sql flow, // we need to propagate the error to the root for an epoch restart. - canAutoRetry: typ == kv.RootTxn, - autoRetryCounter: tc.metrics.AutoRetries, - refreshSpanBytesExceededCounter: tc.metrics.RefreshSpanBytesExceeded, + canAutoRetry: typ == kv.RootTxn, + refreshSuccess: tc.metrics.RefreshSuccess, + refreshFail: tc.metrics.RefreshFail, + refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans, + refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded, } tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{ wrapped: tc.wrapped, @@ -348,11 +351,7 @@ func newLeafTxnCoordSender( // txnLockGatekeeper at the bottom of the stack to connect it with the // TxnCoordSender's wrapped sender. First, each of the interceptor objects // is initialized. - var riGen RangeIteratorGen - if ds, ok := tcf.wrapped.(*DistSender); ok { - riGen = ds.rangeIteratorGen - } - tcs.initCommonInterceptors(tcf, txn, kv.LeafTxn, riGen) + tcs.initCommonInterceptors(tcf, txn, kv.LeafTxn) // Per-interceptor leaf initialization. If/when more interceptors // need leaf initialization, this should be turned into an interface diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go index ef5d0a313bc5..2d4e944ca538 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go @@ -41,9 +41,8 @@ type savepoint struct { seqNum enginepb.TxnSeq // txnSpanRefresher fields. - refreshSpans []roachpb.Span - refreshInvalid bool - refreshSpanBytes int64 + refreshSpans []roachpb.Span + refreshInvalid bool } var _ kv.SavepointToken = (*savepoint)(nil) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 7ee7f251bb76..671b728faefb 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -907,7 +907,7 @@ func TestTxnMultipleCoord(t *testing.T) { // Verify presence of both locks. tcs := txn.Sender().(*TxnCoordSender) - refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshSpans + refreshSpans := tcs.interceptorAlloc.txnSpanRefresher.refreshFootprint.asSlice() require.Equal(t, []roachpb.Span{{Key: key}, {Key: key2}}, refreshSpans) ba := txn.NewBatch() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index c7748734e55e..0d2ad741cbd4 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -176,10 +176,8 @@ var trackedWritesMaxSize = settings.RegisterPublicIntSetting( // attached to any end transaction request that is passed through the pipeliner // to ensure that they the locks within them are released. type txnPipeliner struct { - st *cluster.Settings - // Optional; used to condense lock spans, if provided. If not provided, a - // transaction's lock footprint may grow without bound. - riGen RangeIteratorGen + st *cluster.Settings + riGen rangeIteratorFactory // used to condense lock spans, if provided wrapped lockedSender disabled bool @@ -205,6 +203,37 @@ type txnPipeliner struct { lockFootprint condensableSpanSet } +// condensableSpanSetRangeIterator describes the interface of RangeIterator +// needed by the condensableSpanSetRangeIterator. Useful for mocking an +// iterator in tests. +type condensableSpanSetRangeIterator interface { + Valid() bool + Seek(ctx context.Context, key roachpb.RKey, scanDir ScanDirection) + Error() error + Desc() *roachpb.RangeDescriptor +} + +// rangeIteratorFactory is used to create a condensableSpanSetRangeIterator +// lazily. It's used to avoid allocating an iterator when it's not needed. The +// factory can be configured either with a callback, used for mocking in tests, +// or with a DistSender. Can also be left empty for unittests that don't push +// memory limits in their span sets (and thus don't need collapsing). +type rangeIteratorFactory struct { + factory func() condensableSpanSetRangeIterator + ds *DistSender +} + +// newRangeIterator creates a range iterator. If no factory was configured, it panics. +func (f rangeIteratorFactory) newRangeIterator() condensableSpanSetRangeIterator { + if f.factory != nil { + return f.factory() + } + if f.ds != nil { + return NewRangeIterator(f.ds) + } + panic("no iterator factory configured") +} + // SendLocked implements the lockedSender interface. func (tp *txnPipeliner) SendLocked( ctx context.Context, ba roachpb.BatchRequest, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 7b72db70fb4a..5fbcca805519 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -560,9 +560,11 @@ func TestTxnPipelinerManyWrites(t *testing.T) { ctx := context.Background() tp, mockSender := makeMockTxnPipeliner() - // Disable maxInFlightSize and maxBatchSize limits/ + // Disable write_pipelining_max_outstanding_size, + // write_pipelining_max_batch_size, and max_intents_bytes limits. pipelinedWritesMaxInFlightSize.Override(&tp.st.SV, math.MaxInt64) pipelinedWritesMaxBatchSize.Override(&tp.st.SV, 0) + trackedWritesMaxSize.Override(&tp.st.SV, math.MaxInt64) const writes = 2048 keyBuf := roachpb.Key(strings.Repeat("a", writes+1)) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 48e9f0103253..5eac2c10c992 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -104,21 +104,21 @@ var MaxTxnRefreshSpansBytes = settings.RegisterPublicIntSetting( type txnSpanRefresher struct { st *cluster.Settings knobs *ClientTestingKnobs + riGen rangeIteratorFactory wrapped lockedSender - // refreshSpans contains key spans which were read during the transaction. In - // case the transaction's timestamp needs to be pushed, we can avoid a - // retriable error by "refreshing" these spans: verifying that there have been - // no changes to their data in between the timestamp at which they were read - // and the higher timestamp we want to move to. - refreshSpans []roachpb.Span + // refreshFootprint contains key spans which were read during the + // transaction. In case the transaction's timestamp needs to be pushed, we can + // avoid a retriable error by "refreshing" these spans: verifying that there + // have been no changes to their data in between the timestamp at which they + // were read and the higher timestamp we want to move to. + refreshFootprint condensableSpanSet // refreshInvalid is set if refresh spans have not been collected (because the - // memory budget was exceeded). When set, refreshSpans is empty. + // memory budget was exceeded). When set, refreshFootprint is empty. This is + // set when we've failed to condense the refresh spans below the target memory + // limit. refreshInvalid bool - // refreshSpansBytes is the total size in bytes of the spans - // encountered during this transaction that need to be refreshed - // to avoid serializable restart. - refreshSpansBytes int64 + // refreshedTimestamp keeps track of the largest timestamp that refreshed // don't fail on (i.e. if we'll refresh, we'll refreshFrom timestamp onwards). // After every epoch bump, it is initialized to the timestamp of the first @@ -127,12 +127,11 @@ type txnSpanRefresher struct { // canAutoRetry is set if the txnSpanRefresher is allowed to auto-retry. canAutoRetry bool - // autoRetryCounter counts the number of auto retries which avoid - // client-side restarts. - autoRetryCounter *metric.Counter - // refreshSpanBytesExceeded counter counts the number of transactions which - // do not refresh because they exceed the MaxRefreshSpanBytes. - refreshSpanBytesExceededCounter *metric.Counter + + refreshSuccess *metric.Counter + refreshFail *metric.Counter + refreshFailWithCondensedSpans *metric.Counter + refreshMemoryLimitExceeded *metric.Counter } // SendLocked implements the lockedSender interface. @@ -191,25 +190,48 @@ func (sr *txnSpanRefresher) SendLocked( return br, nil } - // Iterate over and aggregate refresh spans in the requests, - // qualified by possible resume spans in the responses, if we - // haven't yet exceeded the max read key bytes. + // Iterate over and aggregate refresh spans in the requests, qualified by + // possible resume spans in the responses. if !sr.refreshInvalid { if err := sr.appendRefreshSpans(ctx, ba, br); err != nil { return nil, roachpb.NewError(err) } - } - // Verify and enforce the size in bytes of all read-only spans - // doesn't exceed the max threshold. - if sr.refreshSpansBytes > MaxTxnRefreshSpansBytes.Get(&sr.st.SV) { - log.VEventf(ctx, 2, "refresh spans max size exceeded; clearing") - sr.refreshSpans = nil - sr.refreshInvalid = true - sr.refreshSpansBytes = 0 + // Check whether we should condense the refresh spans. + maxBytes := MaxTxnRefreshSpansBytes.Get(&sr.st.SV) + if sr.refreshFootprint.bytes >= maxBytes { + condensedBefore := sr.refreshFootprint.condensed + condensedSufficient := sr.tryCondenseRefreshSpans(ctx, maxBytes) + if condensedSufficient { + log.VEventf(ctx, 2, "condensed refresh spans for txn %s to %d bytes", + br.Txn, sr.refreshFootprint.bytes) + } else { + // Condensing was not enough. Giving up on tracking reads. Refreshed + // will not be possible. + log.VEventf(ctx, 2, "condensed refresh spans didn't save enough memory. txn %s. "+ + "refresh spans after condense: %d bytes", + br.Txn, sr.refreshFootprint.bytes) + sr.refreshInvalid = true + sr.refreshFootprint.clear() + } + + if sr.refreshFootprint.condensed && !condensedBefore { + sr.refreshMemoryLimitExceeded.Inc(1) + } + } } return br, nil } +// tryCondenseRefreshSpans attempts to condense the refresh spans in order to +// save memory. Returns true if we managed to condense them below maxBytes. +func (sr *txnSpanRefresher) tryCondenseRefreshSpans(ctx context.Context, maxBytes int64) bool { + if sr.knobs.CondenseRefreshSpansFilter != nil && !sr.knobs.CondenseRefreshSpansFilter() { + return false + } + sr.refreshFootprint.maybeCondense(ctx, sr.riGen, maxBytes) + return sr.refreshFootprint.bytes < maxBytes +} + // sendLockedWithRefreshAttempts sends the batch through the wrapped sender. It // catches serializable errors and attempts to avoid them by refreshing the txn // at a larger timestamp. @@ -294,8 +316,13 @@ func (sr *txnSpanRefresher) maybeRetrySend( // Try updating the txn spans so we can retry. if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok { + sr.refreshFail.Inc(1) + if sr.refreshFootprint.condensed { + sr.refreshFailWithCondensedSpans.Inc(1) + } return nil, pErr } + sr.refreshSuccess.Inc(1) // We've refreshed all of the read spans successfully and bumped // ba.Txn's timestamps. Attempt the request again. @@ -308,7 +335,6 @@ func (sr *txnSpanRefresher) maybeRetrySend( } log.VEventf(ctx, 2, "retry successful @%s", retryBr.Txn.ReadTimestamp) - sr.autoRetryCounter.Inc(1) return retryBr, nil } @@ -323,9 +349,8 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( if sr.refreshInvalid { log.VEvent(ctx, 2, "can't refresh txn spans; not valid") - sr.refreshSpanBytesExceededCounter.Inc(1) return false - } else if len(sr.refreshSpans) == 0 { + } else if sr.refreshFootprint.empty() { log.VEvent(ctx, 2, "there are no txn spans to refresh") sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp) return true @@ -335,7 +360,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // TODO(nvanbenschoten): actually merge spans. refreshSpanBa := roachpb.BatchRequest{} refreshSpanBa.Txn = refreshTxn - addRefreshes := func(refreshes []roachpb.Span) { + addRefreshes := func(refreshes *condensableSpanSet) { // We're going to check writes between the previous refreshed timestamp, if // any, and the timestamp we want to bump the transaction to. Note that if // we've already refreshed the transaction before, we don't need to check @@ -347,7 +372,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // refreshed). Checking below that timestamp (like we would, for example, if // we simply used txn.OrigTimestamp here), could cause false-positives that // would fail the refresh. - for _, u := range refreshes { + for _, u := range refreshes.asSlice() { var req roachpb.Request if len(u.EndKey) == 0 { req = &roachpb.RefreshRequest{ @@ -365,7 +390,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( req.Header().Span(), sr.refreshedTimestamp, refreshTxn.WriteTimestamp) } } - addRefreshes(sr.refreshSpans) + addRefreshes(&sr.refreshFootprint) // Send through wrapped lockedSender. Unlocks while sending then re-locks. if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil { @@ -393,8 +418,7 @@ func (sr *txnSpanRefresher) appendRefreshSpans( ba.RefreshSpanIterate(br, func(span roachpb.Span) { log.VEventf(ctx, 3, "recording span to refresh: %s", span) - sr.refreshSpans = append(sr.refreshSpans, span) - sr.refreshSpansBytes += int64(len(span.Key) + len(span.EndKey)) + sr.refreshFootprint.insert(span) }) return nil } @@ -404,7 +428,7 @@ func (sr *txnSpanRefresher) appendRefreshSpans( // for the "server-side refresh" optimization, where batches are re-evaluated // at a higher read-timestamp without returning to transaction coordinator. func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool { - return sr.canAutoRetry && !sr.refreshInvalid && len(sr.refreshSpans) == 0 && !txn.CommitTimestampFixed + return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed } // forwardRefreshTimestampOnResponse updates the refresher's tracked @@ -437,7 +461,7 @@ func (sr *txnSpanRefresher) populateLeafFinalState(tfs *roachpb.LeafTxnFinalStat tfs.RefreshInvalid = sr.refreshInvalid if !sr.refreshInvalid { // Copy mutable state so access is safe for the caller. - tfs.RefreshSpans = append([]roachpb.Span(nil), sr.refreshSpans...) + tfs.RefreshSpans = append([]roachpb.Span(nil), sr.refreshFootprint.asSlice()...) } } @@ -447,41 +471,32 @@ func (sr *txnSpanRefresher) importLeafFinalState( ) { if tfs.RefreshInvalid { sr.refreshInvalid = true - sr.refreshSpans = nil + sr.refreshFootprint.clear() } else if !sr.refreshInvalid { - if tfs.RefreshSpans != nil { - sr.refreshSpans, _ = roachpb.MergeSpans(append(sr.refreshSpans, tfs.RefreshSpans...)) - } - } - // Recompute the size of the refreshes. - sr.refreshSpansBytes = 0 - for _, u := range sr.refreshSpans { - sr.refreshSpansBytes += int64(len(u.Key) + len(u.EndKey)) + sr.refreshFootprint.insert(tfs.RefreshSpans...) + sr.refreshFootprint.maybeCondense(ctx, sr.riGen, MaxTxnRefreshSpansBytes.Get(&sr.st.SV)) } } // epochBumpedLocked implements the txnInterceptor interface. func (sr *txnSpanRefresher) epochBumpedLocked() { - sr.refreshSpans = nil + sr.refreshFootprint.clear() sr.refreshInvalid = false - sr.refreshSpansBytes = 0 sr.refreshedTimestamp.Reset() } // createSavepointLocked is part of the txnReqInterceptor interface. func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepoint) { - s.refreshSpans = make([]roachpb.Span, len(sr.refreshSpans)) - copy(s.refreshSpans, sr.refreshSpans) + s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice())) + copy(s.refreshSpans, sr.refreshFootprint.asSlice()) s.refreshInvalid = sr.refreshInvalid - s.refreshSpanBytes = sr.refreshSpansBytes } // rollbackToSavepointLocked is part of the txnReqInterceptor interface. func (sr *txnSpanRefresher) rollbackToSavepointLocked(ctx context.Context, s savepoint) { - sr.refreshSpans = make([]roachpb.Span, len(s.refreshSpans)) - copy(sr.refreshSpans, s.refreshSpans) + sr.refreshFootprint.clear() + sr.refreshFootprint.insert(s.refreshSpans...) sr.refreshInvalid = s.refreshInvalid - sr.refreshSpansBytes = s.refreshSpanBytes } // closeLocked implements the txnInterceptor interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index a199c8d9c620..4295d4d50542 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -25,12 +25,14 @@ import ( func makeMockTxnSpanRefresher() (txnSpanRefresher, *mockLockedSender) { mockSender := &mockLockedSender{} return txnSpanRefresher{ - st: cluster.MakeTestingClusterSettings(), - knobs: new(ClientTestingKnobs), - wrapped: mockSender, - canAutoRetry: true, - autoRetryCounter: metric.NewCounter(metaAutoRetriesRates), - refreshSpanBytesExceededCounter: metric.NewCounter(metaRefreshSpanBytesExceeded), + st: cluster.MakeTestingClusterSettings(), + knobs: new(ClientTestingKnobs), + wrapped: mockSender, + canAutoRetry: true, + refreshSuccess: metric.NewCounter(metaRefreshSuccess), + refreshFail: metric.NewCounter(metaRefreshFail), + refreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), + refreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), }, mockSender } @@ -70,9 +72,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, + tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshSpansBytes) + require.Equal(t, int64(3), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Scan with limit. Only the scanned keys are added to the refresh spans. @@ -97,9 +100,9 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}}, - tsr.refreshSpans) + tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(5), tsr.refreshSpansBytes) + require.Equal(t, int64(5), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) } @@ -210,9 +213,8 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshSpansBytes) require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. @@ -273,12 +275,15 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp) require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp) require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp) - require.Equal(t, int64(1), tsr.autoRetryCounter.Count()) + require.Equal(t, int64(1), tsr.refreshSuccess.Count()) + require.Equal(t, int64(0), tsr.refreshFail.Count()) } else { require.Nil(t, br) require.NotNil(t, pErr) require.Equal(t, ba.Txn.ReadTimestamp, tsr.refreshedTimestamp) - require.Equal(t, int64(0), tsr.autoRetryCounter.Count()) + require.Equal(t, int64(0), tsr.refreshSuccess.Count()) + // Note that we don't check the tsr.refreshFail metric here as tests + // here expect the refresh to not be attempted, not to fail. } }) } @@ -308,9 +313,8 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. @@ -356,18 +360,41 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Equal(t, tsr.knobs.MaxTxnRefreshAttempts, refreshes) } +type singleRangeIterator struct{} + +func (s singleRangeIterator) Valid() bool { + return true +} + +func (s singleRangeIterator) Seek(context.Context, roachpb.RKey, ScanDirection) {} + +func (s singleRangeIterator) Error() error { + return nil +} + +func (s singleRangeIterator) Desc() *roachpb.RangeDescriptor { + return &roachpb.RangeDescriptor{ + RangeID: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + } +} + // TestTxnSpanRefresherMaxTxnRefreshSpansBytes tests that the txnSpanRefresher -// only collects up to kv.transaction.max_refresh_spans_bytes refresh bytes -// before throwing away refresh spans and refusing to attempt to refresh -// transactions. +// collapses spans after they exceed kv.transaction.max_refresh_spans_bytes +// refresh bytes. func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() tsr, mockSender := makeMockTxnSpanRefresher() + tsr.riGen = rangeIteratorFactory{factory: func() condensableSpanSetRangeIterator { + return singleRangeIterator{} + }} txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") - keyC, keyD := roachpb.Key("c"), roachpb.Key("d") + keyC := roachpb.Key("c") + keyD, keyE := roachpb.Key("d"), roachpb.Key("e") // Set MaxTxnRefreshSpansBytes limit to 3 bytes. MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) @@ -382,13 +409,13 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) - // Send another batch that pushes us above the limit. The refresh spans - // should become invalid. + // Send another batch that pushes us above the limit. The tracked spans are + // adjacent so the spans will be merged, but not condensed. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyB, EndKey: keyC}} ba.Add(&scanArgs2) @@ -397,28 +424,29 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyC}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) + require.False(t, tsr.refreshFootprint.condensed) + require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count()) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) - // Once invalid, the refresh spans should stay invalid. + // Exceed the limit again, this time with a non-adjacent span such that + // condensing needs to occur. ba.Requests = nil - scanArgs3 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} + scanArgs3 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyD, EndKey: keyE}} ba.Add(&scanArgs3) br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyE}}, tsr.refreshFootprint.asSlice()) + require.True(t, tsr.refreshFootprint.condensed) + require.False(t, tsr.refreshInvalid) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) - - // Make sure that the metric due to the refresh span bytes being exceeded - // has not been incremented. - require.Equal(t, int64(0), tsr.refreshSpanBytesExceededCounter.Count()) + require.Equal(t, int64(1), tsr.refreshMemoryLimitExceeded.Count()) + require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count()) // Return a transaction retry error and make sure the metric indicating that // we did not retry due to the refresh span bytes in incremented. @@ -431,7 +459,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { exp := roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "") require.Equal(t, exp, pErr.GetDetail()) require.Nil(t, br) - require.Equal(t, int64(1), tsr.refreshSpanBytesExceededCounter.Count()) + require.Equal(t, int64(1), tsr.refreshFailWithCondensedSpans.Count()) } // TestTxnSpanRefresherAssignsCanForwardReadTimestamp tests that the @@ -446,9 +474,6 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) - // Send a Put request. Should set CanForwardReadTimestamp flag. Should not // collect refresh spans. var ba roachpb.BatchRequest @@ -468,7 +493,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr := tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Nil(t, tsr.refreshSpans) + require.Nil(t, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send a Put request for a transaction with a fixed commit timestamp. @@ -492,7 +517,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, baFixed) require.Nil(t, pErr) require.NotNil(t, br) - require.Nil(t, tsr.refreshSpans) + require.Nil(t, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send a Scan request. Should set CanForwardReadTimestamp flag. Should @@ -514,11 +539,10 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send another Scan request. Should NOT set CanForwardReadTimestamp flag. - // Should push the spans above the limit. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} ba.Add(&scanArgs2) @@ -536,8 +560,8 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyB}, {Key: keyC, EndKey: keyD}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) // Send another Put request. Still should NOT set CanForwardReadTimestamp flag. ba.Requests = nil @@ -556,8 +580,8 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyB}, {Key: keyC, EndKey: keyD}}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) // Increment the transaction's epoch and send another Put request. Should // set CanForwardReadTimestamp flag. @@ -578,7 +602,7 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) } @@ -594,9 +618,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 3) - // Send an EndTxn request. Should set CanCommitAtHigherTimestamp and // CanForwardReadTimestamp flags. var ba roachpb.BatchRequest @@ -651,7 +672,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) // Send another EndTxn request. Should NOT set CanCommitAtHigherTimestamp @@ -674,7 +695,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - // Send another batch to push the spans above the limit. + // Send another batch. ba.Requests = nil scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}} ba.Add(&scanArgs2) @@ -683,8 +704,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) // Send another EndTxn request. Still should NOT set // CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags. @@ -705,8 +724,6 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) - require.True(t, tsr.refreshInvalid) // Increment the transaction's epoch and send another EndTxn request. Should // set CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags. @@ -728,7 +745,7 @@ func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) { br, pErr = tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) } @@ -738,6 +755,8 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() tsr, _ := makeMockTxnSpanRefresher() + // Disable span condensing. + tsr.knobs.CondenseRefreshSpansFilter = func() bool { return false } txn := makeTxnProto() keyA, keyB := roachpb.Key("a"), roachpb.Key("b") @@ -756,17 +775,16 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshSpansBytes) + require.Equal(t, int64(2), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the spans. tsr.epochBumpedLocked() - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) // Send a batch above the limit. @@ -778,17 +796,16 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.True(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) + require.Equal(t, int64(0), tsr.refreshFootprint.bytes) require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the invalid status. tsr.epochBumpedLocked() - require.Equal(t, []roachpb.Span(nil), tsr.refreshSpans) + require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(0), tsr.refreshSpansBytes) require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) } @@ -820,42 +837,33 @@ func TestTxnSpanRefresherSavepoint(t *testing.T) { require.NotNil(t, br) } read(keyA) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) s := savepoint{} tsr.createSavepointLocked(ctx, &s) // Another read after the savepoint was created. read(keyB) - require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshSpans) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, tsr.refreshFootprint.asSlice()) require.Equal(t, []roachpb.Span{{Key: keyA}}, s.refreshSpans) - require.Less(t, s.refreshSpanBytes, tsr.refreshSpansBytes) require.False(t, s.refreshInvalid) // Rollback the savepoint and check that refresh spans were overwritten. tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshSpans) - - // Set MaxTxnRefreshSpansBytes limit low and then exceed it. - MaxTxnRefreshSpansBytes.Override(&tsr.st.SV, 1) - read(keyB) - require.True(t, tsr.refreshInvalid) + require.Equal(t, []roachpb.Span{{Key: keyA}}, tsr.refreshFootprint.asSlice()) // Check that rolling back to the savepoint resets refreshInvalid. + tsr.refreshInvalid = true tsr.rollbackToSavepointLocked(ctx, s) - require.Equal(t, tsr.refreshSpansBytes, s.refreshSpanBytes) require.False(t, tsr.refreshInvalid) - // Exceed the limit again and then create a savepoint. - read(keyB) - require.True(t, tsr.refreshInvalid) + // Set refreshInvalid and then create a savepoint. + tsr.refreshInvalid = true s = savepoint{} tsr.createSavepointLocked(ctx, &s) require.True(t, s.refreshInvalid) - require.Empty(t, s.refreshSpans) // Rollback to the savepoint check that refreshes are still invalid. tsr.rollbackToSavepointLocked(ctx, s) - require.Empty(t, tsr.refreshSpans) require.True(t, tsr.refreshInvalid) } diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index 12943abf774f..d2317568353c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -19,13 +19,17 @@ import ( // TxnMetrics holds all metrics relating to KV transactions. type TxnMetrics struct { - Aborts *metric.Counter - Commits *metric.Counter - Commits1PC *metric.Counter // Commits which finished in a single phase - ParallelCommits *metric.Counter // Commits which entered the STAGING state - AutoRetries *metric.Counter // Auto retries which avoid client-side restarts - RefreshSpanBytesExceeded *metric.Counter // Transactions which don't refresh due to span bytes - Durations *metric.Histogram + Aborts *metric.Counter + Commits *metric.Counter + Commits1PC *metric.Counter // Commits which finished in a single phase + ParallelCommits *metric.Counter // Commits which entered the STAGING state + + RefreshSuccess *metric.Counter + RefreshFail *metric.Counter + RefreshFailWithCondensedSpans *metric.Counter + RefreshMemoryLimitExceeded *metric.Counter + + Durations *metric.Histogram // Restarts is the number of times we had to restart the transaction. Restarts *metric.Histogram @@ -66,16 +70,33 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } - metaAutoRetriesRates = metric.Metadata{ - Name: "txn.autoretries", - Help: "Number of automatic retries to avoid serializable restarts", - Measurement: "Retries", + metaRefreshSuccess = metric.Metadata{ + Name: "txn.refresh.success", + Help: "Number of successful refreshes", + Measurement: "Refreshes", + Unit: metric.Unit_COUNT, + } + metaRefreshFail = metric.Metadata{ + Name: "txn.refresh.fail", + Help: "Number of failed refreshes.", + Measurement: "Refreshes", + Unit: metric.Unit_COUNT, + } + metaRefreshFailWithCondensedSpans = metric.Metadata{ + Name: "txn.refresh.fail_with_condensed_spans", + Help: "Number of failed refreshes for transactions whose read " + + "tracking lost fidelity because of condensing. Such a failure " + + "could be a false conflict. Failures counted here are also counted " + + "in txn.refresh.fail, and the respective transactions are also counted in " + + "txn.refresh.memory_limit_exceeded.", + Measurement: "Refreshes", Unit: metric.Unit_COUNT, } - metaRefreshSpanBytesExceeded = metric.Metadata{ - Name: "txn.refreshspanbytesexceeded", - Help: "Number of transaction retries which fail to refresh due to the refresh span bytes", - Measurement: "Retries", + metaRefreshMemoryLimitExceeded = metric.Metadata{ + Name: "txn.refresh.memory_limit_exceeded", + Help: "Number of transaction which exceed the refresh span bytes limit, causing " + + "their read spans to be condensed.", + Measurement: "Transactions", Unit: metric.Unit_COUNT, } metaDurationsHistograms = metric.Metadata{ @@ -175,8 +196,10 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), - AutoRetries: metric.NewCounter(metaAutoRetriesRates), - RefreshSpanBytesExceeded: metric.NewCounter(metaRefreshSpanBytesExceeded), + RefreshFail: metric.NewCounter(metaRefreshFail), + RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), + RefreshSuccess: metric.NewCounter(metaRefreshSuccess), + RefreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow), Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3), RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld), diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 5d568de24a13..25114ee0885e 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -765,12 +765,20 @@ var charts = []sectionDescription{ Metrics: []string{"txn.aborts"}, }, { - Title: "Auto Retries", - Metrics: []string{"txn.autoretries"}, + Title: "Successful refreshes", + Metrics: []string{"txn.refresh.success"}, }, { - Title: "Refresh Span Bytes Exceeded", - Metrics: []string{"txn.refreshspanbytesexceeded"}, + Title: "Failed refreshes", + Metrics: []string{"txn.refresh.fail"}, + }, + { + Title: "Failed refreshes with condensed spans", + Metrics: []string{"txn.refresh.fail_with_condensed_spans"}, + }, + { + Title: "Transactions exceeding refresh spans memory limit", + Metrics: []string{"txn.refresh.memory_limit_exceeded"}, }, { Title: "Commits",