diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 37a3e6deae5f..e0790ebc2bd9 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -1849,6 +1849,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") // put to advance txn ts }, + // No retry, preemptive refresh before commit. }, { name: "forwarded timestamp with get and put after timestamp leaked", @@ -1916,8 +1917,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.DelRange(ctx, "a", "b") }, - // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + // No retry, preemptive refresh before commit. }, { name: "forwarded timestamp with put in batch commit", @@ -1930,7 +1930,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("a", "put") return txn.CommitInBatch(ctx, b) }, - // No retries, 1pc commit. + // No retries, server-side refresh, 1pc commit. }, { name: "forwarded timestamp with cput in batch commit", @@ -1946,7 +1946,39 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("orig")) return txn.CommitInBatch(ctx, b) }, - // No retries, 1pc commit. + // No retries, server-side refresh, 1pc commit. + }, + { + name: "forwarded timestamp with get before commit", + afterTxnStart: func(ctx context.Context, db *kv.DB) error { + _, err := db.Get(ctx, "a") // set ts cache + return err + }, + retryable: func(ctx context.Context, txn *kv.Txn) error { + // Advance timestamp. + if err := txn.Put(ctx, "a", "put"); err != nil { + return err + } + _, err := txn.Get(ctx, "a2") + return err + }, + // No retry, preemptive refresh before get. + }, + { + name: "forwarded timestamp with scan before commit", + afterTxnStart: func(ctx context.Context, db *kv.DB) error { + _, err := db.Get(ctx, "a") // set ts cache + return err + }, + retryable: func(ctx context.Context, txn *kv.Txn) error { + // Advance timestamp. + if err := txn.Put(ctx, "a", "put"); err != nil { + return err + } + _, err := txn.Scan(ctx, "a2", "a3", 0) + return err + }, + // No retry, preemptive refresh before scan. }, { name: "forwarded timestamp with get in batch commit", @@ -1963,8 +1995,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Get("a2") return txn.CommitInBatch(ctx, b) }, - // Read-only request (Get) prevents server-side refresh. - txnCoordRetry: true, + // No retry, preemptive refresh before commit. }, { name: "forwarded timestamp with scan in batch commit", @@ -1981,6 +2012,35 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Scan("a2", "a3") return txn.CommitInBatch(ctx, b) }, + // No retry, preemptive refresh before commit. + }, + { + name: "forwarded timestamp with put and get in batch commit", + afterTxnStart: func(ctx context.Context, db *kv.DB) error { + _, err := db.Get(ctx, "a") // set ts cache + return err + }, + retryable: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.Get("a2") + b.Put("a", "put") // advance timestamp + return txn.CommitInBatch(ctx, b) + }, + // Read-only request (Get) prevents server-side refresh. + txnCoordRetry: true, + }, + { + name: "forwarded timestamp with put and scan in batch commit", + afterTxnStart: func(ctx context.Context, db *kv.DB) error { + _, err := db.Get(ctx, "a") // set ts cache + return err + }, + retryable: func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.Scan("a2", "a3") + b.Put("a", "put") // advance timestamp + return txn.CommitInBatch(ctx, b) + }, // Read-only request (Scan) prevents server-side refresh. txnCoordRetry: true, }, @@ -2046,7 +2106,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CommitInBatch(ctx, b) }, - txnCoordRetry: true, + // No retry, preemptive refresh before commit. }, { // Even if accounting for the refresh spans would have exhausted the @@ -2082,7 +2142,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CommitInBatch(ctx, b) }, - txnCoordRetry: true, + // No retry, preemptive refresh before commit. }, { name: "write too old with put", @@ -2090,6 +2150,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return db.Put(ctx, "a", "put") }, retryable: func(ctx context.Context, txn *kv.Txn) error { + fmt.Println("TXN IS", txn.TestingCloneTxn()) return txn.Put(ctx, "a", "put") }, }, @@ -2440,7 +2501,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("a", "new-put") return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry }, - // No retries, 1pc commit. + // No retries, server-side refresh, 1pc commit. }, { // This test is like the previous one in that the commit batch succeeds at @@ -2865,7 +2926,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } var metrics kvcoord.TxnMetrics - var lastRefreshes int64 + var lastAutoRetries int64 var hadClientRetry bool epoch := 0 if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -2897,7 +2958,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics() - lastRefreshes = metrics.RefreshSuccess.Count() + lastAutoRetries = metrics.RefreshAutoRetries.Count() return tc.retryable(ctx, txn) }); err != nil { @@ -2913,11 +2974,11 @@ func TestTxnCoordSenderRetries(t *testing.T) { // from the cluster setup are still ongoing and can experience // their own retries, this might increase by more than one, so we // can only check here that it's >= 1. - refreshes := metrics.RefreshSuccess.Count() - lastRefreshes - if tc.txnCoordRetry && refreshes == 0 { - t.Errorf("expected [at least] one txn coord sender auto retry; got %d", refreshes) - } else if !tc.txnCoordRetry && refreshes != 0 { - t.Errorf("expected no txn coord sender auto retries; got %d", refreshes) + autoRetries := metrics.RefreshAutoRetries.Count() - lastAutoRetries + if tc.txnCoordRetry && autoRetries == 0 { + t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries) + } else if !tc.txnCoordRetry && autoRetries != 0 { + t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries) } if tc.clientRetry && !hadClientRetry { t.Errorf("expected but did not experience client retry") diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 4c5b38fe9ba4..10ac9a14748e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -299,6 +299,7 @@ func (tc *TxnCoordSender) initCommonInterceptors( refreshFail: tc.metrics.RefreshFail, refreshFailWithCondensedSpans: tc.metrics.RefreshFailWithCondensedSpans, refreshMemoryLimitExceeded: tc.metrics.RefreshMemoryLimitExceeded, + refreshAutoRetries: tc.metrics.RefreshAutoRetries, } tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{ wrapped: tc.wrapped, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index d47a24a35c0e..375c2ecb813d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -132,6 +132,7 @@ type txnSpanRefresher struct { refreshFail *metric.Counter refreshFailWithCondensedSpans *metric.Counter refreshMemoryLimitExceeded *metric.Counter + refreshAutoRetries *metric.Counter } // SendLocked implements the lockedSender interface. @@ -162,20 +163,64 @@ func (sr *txnSpanRefresher) SendLocked( } // Set the batch's CanForwardReadTimestamp flag. - canFwdRTS := sr.canForwardReadTimestampWithoutRefresh(ba.Txn) - ba.CanForwardReadTimestamp = canFwdRTS - - maxAttempts := maxTxnRefreshAttempts - if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 { - if knob == -1 { - maxAttempts = 0 - } else { - maxAttempts = knob + ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn) + + // If we know that the transaction will need a refresh at some point because + // its write timestamp has diverged from its read timestamp, consider doing + // so preemptively. We perform a preemptive refresh if either a) doing so + // would be free because we have not yet accumulated any refresh spans, or + // b) the batch contains a committing EndTxn request that we know will be + // rejected if issued. + // + // The first case is straightforward. If the transaction has yet to perform + // any reads but has had its write timestamp bumped, refreshing is a trivial + // no-op. In this case, refreshing eagerly prevents the transaction for + // performing any future reads at its current read timestamp. Not doing so + // preemptively guarantees that we will need to perform a real refresh in + // the future if the transaction ever performs a read. At best, this would + // be wasted work. At worst, this could result in the future refresh + // failing. So we might as well refresh preemptively while doing so is free. + // + // Note that this first case here does NOT obviate the need for server-side + // refreshes. Notably, a transaction's write timestamp might be bumped in + // the same batch in which it performs its first read. In such cases, a + // preemptive refresh would not be needed but a reactive refresh would not + // be a trivial no-op. These situations are common for one-phase commit + // transactions. + // + // The second case is more complex. If the batch contains a committing + // EndTxn request that we know will need a refresh, we don't want to bother + // issuing it just for it to be rejected. Instead, preemptively refresh + // before issuing the EndTxn batch. If we view reads as acquiring a form of + // optimistic read locks under an optimistic concurrency control scheme (as + // is discussed in the comment on txnSpanRefresher) then this preemptive + // refresh immediately before the EndTxn is synonymous with the "validation" + // phase of a standard OCC transaction model. However, as an optimization + // compared to standard OCC, the validation phase is only performed when + // necessary in CockroachDB (i.e. if the transaction's writes have been + // pushed to higher timestamps). + if ba.Txn.ReadTimestamp != ba.Txn.WriteTimestamp { + // If true, tryUpdatingTxnSpans will trivially succeed. + refreshFree := ba.CanForwardReadTimestamp + + // If true, this batch is guaranteed to fail without a refresh. + args, hasET := ba.GetArg(roachpb.EndTxn) + refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit + + if refreshFree || refreshInevitable { + refreshedBa, pErr := sr.maybeRefreshPreemptively(ctx, ba) + if pErr != nil { + if refreshFree { + log.Fatalf(ctx, "preemptively refresh unexpected failed: %v", pErr) + } + return nil, pErr + } + ba = refreshedBa } } // Send through wrapped lockedSender. Unlocks while sending then re-locks. - br, pErr := sr.sendLockedWithRefreshAttempts(ctx, ba, maxAttempts) + br, pErr := sr.sendLockedWithRefreshAttempts(ctx, ba, sr.maxRefreshAttempts()) if pErr != nil { return nil, pErr } @@ -283,7 +328,7 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( } if pErr != nil { if maxRefreshAttempts > 0 { - br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts) + br, pErr = sr.maybeRefreshAndRetrySend(ctx, ba, pErr, maxRefreshAttempts) } else { log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted") } @@ -292,44 +337,34 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( return br, pErr } -// maybeRetrySend attempts to catch serializable errors and avoid them by -// refreshing the txn at a larger timestamp. If it succeeds at refreshing the +// maybeRefreshAndRetrySend attempts to catch serializable errors and avoid them +// by refreshing the txn at a larger timestamp. If it succeeds at refreshing the // txn timestamp, it recurses into sendLockedWithRefreshAttempts and retries the // batch. If the refresh fails, the input pErr is returned. -func (sr *txnSpanRefresher) maybeRetrySend( +func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int, ) (*roachpb.BatchResponse, *roachpb.Error) { // Check for an error which can be retried after updating spans. - canRetryTxn, retryTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr) - if !canRetryTxn || !sr.canAutoRetry { + canRefreshTxn, refreshTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr) + if !canRefreshTxn || !sr.canAutoRetry { return nil, pErr } - - // If a prefix of the batch was executed, collect refresh spans for - // that executed portion, and retry the remainder. The canonical - // case is a batch split between everything up to but not including - // the EndTxn. Requests up to the EndTxn succeed, but the EndTxn - // fails with a retryable error. We want to retry only the EndTxn. - ba.UpdateTxn(retryTxn) log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s", - ba, retryTxn.ReadTimestamp, pErr) + ba, refreshTxn.ReadTimestamp, pErr) // Try updating the txn spans so we can retry. - if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok { - sr.refreshFail.Inc(1) - if sr.refreshFootprint.condensed { - sr.refreshFailWithCondensedSpans.Inc(1) - } + if ok := sr.tryUpdatingTxnSpans(ctx, refreshTxn); !ok { + log.Eventf(ctx, "refresh failed; propagating original retry error") return nil, pErr } - sr.refreshSuccess.Inc(1) + log.Eventf(ctx, "refresh succeeded; retrying original request") + ba.UpdateTxn(refreshTxn) + sr.refreshAutoRetries.Inc(1) // We've refreshed all of the read spans successfully and bumped // ba.Txn's timestamps. Attempt the request again. - retryBr, retryErr := sr.sendLockedWithRefreshAttempts( - ctx, ba, maxRefreshAttempts-1, - ) + retryBr, retryErr := sr.sendLockedWithRefreshAttempts(ctx, ba, maxRefreshAttempts-1) if retryErr != nil { log.VEventf(ctx, 2, "retry failed with %s", retryErr) return nil, retryErr @@ -339,6 +374,44 @@ func (sr *txnSpanRefresher) maybeRetrySend( return retryBr, nil } +// Avoids allocations in maybeRefreshPreemptively. +var preemptiveSerializableErr = roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, "preemptive refresh") +var preemptiveWriteTooOldErr = roachpb.NewTransactionRetryError(roachpb.RETRY_WRITE_TOO_OLD, "preemptive refresh") + +// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp +// eagerly. Doing so can take advantage of opportunities where the refresh is +// free or can avoid wasting work issuing a batch containing an EndTxn that will +// necessarily throw a serializable error. The method returns a batch with an +// updated transaction if the refresh is successful, or a retry error if not. +func (sr *txnSpanRefresher) maybeRefreshPreemptively( + ctx context.Context, ba roachpb.BatchRequest, +) (roachpb.BatchRequest, *roachpb.Error) { + // Preemptively create the transaction retry error we know the server will + // return if we issue this batch. + retryErr := preemptiveSerializableErr + if ba.Txn.WriteTooOld { + retryErr = preemptiveWriteTooOldErr + } + pErr := roachpb.NewErrorWithTxn(retryErr, ba.Txn) + + canRefreshTxn, refreshTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr) + if !canRefreshTxn || !sr.canAutoRetry { + return roachpb.BatchRequest{}, pErr + } + log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s", + refreshTxn.ReadTimestamp, ba) + + // Try updating the txn spans at a timestamp that will allow us to commit. + if ok := sr.tryUpdatingTxnSpans(ctx, refreshTxn); !ok { + log.Eventf(ctx, "preemptive refresh failed; propagating retry error") + return roachpb.BatchRequest{}, pErr + } + + log.Eventf(ctx, "preemptive refresh succeeded") + ba.UpdateTxn(refreshTxn) + return ba, nil +} + // tryUpdatingTxnSpans sends Refresh and RefreshRange commands to all spans read // during the transaction to ensure that no writes were written more recently // than sr.refreshedTimestamp. All implicated timestamp caches are updated with @@ -346,7 +419,18 @@ func (sr *txnSpanRefresher) maybeRetrySend( // or not. func (sr *txnSpanRefresher) tryUpdatingTxnSpans( ctx context.Context, refreshTxn *roachpb.Transaction, -) bool { +) (ok bool) { + // Track the result of the refresh in metrics. + defer func() { + if ok { + sr.refreshSuccess.Inc(1) + } else { + sr.refreshFail.Inc(1) + if sr.refreshFootprint.condensed { + sr.refreshFailWithCondensedSpans.Inc(1) + } + } + }() if sr.refreshInvalid { log.VEvent(ctx, 2, "can't refresh txn spans; not valid") @@ -395,7 +479,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans( // Send through wrapped lockedSender. Unlocks while sending then re-locks. if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil { - log.VEventf(ctx, 2, "failed to refresh txn spans (%s); propagating original retry error", batchErr) + log.VEventf(ctx, 2, "failed to refresh txn spans (%s)", batchErr) return false } @@ -457,6 +541,18 @@ func (sr *txnSpanRefresher) forwardRefreshTimestampOnResponse( } } +// maxRefreshAttempts returns the configured number of times that a transaction +// should attempt to refresh its spans for a single batch. +func (sr *txnSpanRefresher) maxRefreshAttempts() int { + if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 { + if knob == -1 { + return 0 + } + return knob + } + return maxTxnRefreshAttempts +} + // setWrapped implements the txnInterceptor interface. func (sr *txnSpanRefresher) setWrapped(wrapped lockedSender) { sr.wrapped = wrapped } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index 6eb2a15c1082..8f255abd7573 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -34,6 +34,7 @@ func makeMockTxnSpanRefresher() (txnSpanRefresher, *mockLockedSender) { refreshFail: metric.NewCounter(metaRefreshFail), refreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), refreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), + refreshAutoRetries: metric.NewCounter(metaRefreshAutoRetries), }, mockSender } @@ -250,14 +251,17 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.True(t, tc.expRefresh) require.Len(t, ba.Requests, 2) + require.Equal(t, tc.expRefreshTS, ba.Txn.ReadTimestamp) require.IsType(t, &roachpb.RefreshRequest{}, ba.Requests[0].GetInner()) require.IsType(t, &roachpb.RefreshRangeRequest{}, ba.Requests[1].GetInner()) refReq := ba.Requests[0].GetRefresh() require.Equal(t, getArgs.Span(), refReq.Span()) + require.Equal(t, txn.ReadTimestamp, refReq.RefreshFrom) refRngReq := ba.Requests[1].GetRefreshRange() require.Equal(t, delRangeArgs.Span(), refRngReq.Span()) + require.Equal(t, txn.ReadTimestamp, refRngReq.RefreshFrom) br = ba.CreateReply() br.Txn = ba.Txn @@ -280,11 +284,13 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp) require.Equal(t, int64(1), tsr.refreshSuccess.Count()) require.Equal(t, int64(0), tsr.refreshFail.Count()) + require.Equal(t, int64(1), tsr.refreshAutoRetries.Count()) } else { require.Nil(t, br) require.NotNil(t, pErr) require.Equal(t, ba.Txn.ReadTimestamp, tsr.refreshedTimestamp) require.Equal(t, int64(0), tsr.refreshSuccess.Count()) + require.Equal(t, int64(0), tsr.refreshAutoRetries.Count()) // Note that we don't check the tsr.refreshFail metric here as tests // here expect the refresh to not be attempted, not to fail. } @@ -334,10 +340,12 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { onRefresh := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { refreshes++ require.Len(t, ba.Requests, 1) + require.Equal(t, txn.WriteTimestamp, ba.Txn.ReadTimestamp) require.IsType(t, &roachpb.RefreshRangeRequest{}, ba.Requests[0].GetInner()) refReq := ba.Requests[0].GetRefreshRange() require.Equal(t, scanArgs.Span(), refReq.Span()) + require.Equal(t, txn.ReadTimestamp, refReq.RefreshFrom) br = ba.CreateReply() br.Txn = ba.Txn @@ -364,6 +372,169 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.Equal(t, tsr.knobs.MaxTxnRefreshAttempts, refreshes) } +// TestTxnSpanrefresherPreemptiveRefresh tests that the txnSpanRefresher +// performs a preemptive client-side refresh when doing so would be free or when +// it observes a batch containing an EndTxn request that will necessarily throw +// a serializable error. +func TestTxnSpanrefresherPreemptiveRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + txn := makeTxnProto() + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + + // Push the txn so that it needs a refresh. + txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0) + origReadTs := txn.ReadTimestamp + pushedWriteTs := txn.WriteTimestamp + + // Send an EndTxn request that will need a refresh to succeed. Because + // no refresh spans have been recorded, the preemptive refresh should be + // free, so the txnSpanRefresher should do so. + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + etArgs := roachpb.EndTxnRequest{Commit: true} + ba.Add(&etArgs) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.True(t, ba.CanForwardReadTimestamp) + require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // The transaction should be refreshed. + require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) + + br := ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, int64(1), tsr.refreshSuccess.Count()) + require.Equal(t, int64(0), tsr.refreshFail.Count()) + require.Equal(t, int64(0), tsr.refreshAutoRetries.Count()) + require.True(t, tsr.refreshFootprint.empty()) + require.False(t, tsr.refreshInvalid) + + // Reset refreshedTimestamp to avoid confusing ourselves. + tsr.refreshedTimestamp = origReadTs + + // Send a Scan request. Again, because a preemptive refresh would be free, + // the txnSpanRefresher should do so. NOTE: This inhibits a server-side + // refreshes when we issue EndTxn requests through the rest of this test. + ba.Requests = nil + scanArgs := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}} + ba.Add(&scanArgs) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.True(t, ba.CanForwardReadTimestamp) + require.IsType(t, &roachpb.ScanRequest{}, ba.Requests[0].GetInner()) + + // The transaction should be refreshed. + require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr = tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, int64(2), tsr.refreshSuccess.Count()) + require.Equal(t, int64(0), tsr.refreshFail.Count()) + require.Equal(t, int64(0), tsr.refreshAutoRetries.Count()) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + + // Reset refreshedTimestamp to avoid confusing ourselves. + tsr.refreshedTimestamp = origReadTs + + // Now that we have accumulated refresh spans and refreshing is no longer a + // no-op, send an EndTxn request that will need a refresh to succeed. This + // should trigger a preemptive refresh. Try this twice. First, have the + // refresh fail, which should prevent the rest of the request from being + // issued. Second, have the refresh succeed, which should result in the + // batch being issued with the refreshed transaction. + ba.Requests = nil + ba.Add(&etArgs) + + onRefresh := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.IsType(t, &roachpb.RefreshRangeRequest{}, ba.Requests[0].GetInner()) + + refReq := ba.Requests[0].GetRefreshRange() + require.Equal(t, scanArgs.Span(), refReq.Span()) + require.Equal(t, origReadTs, refReq.RefreshFrom) + + return nil, roachpb.NewErrorf("encountered recently written key") + } + unexpected := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Fail(t, "unexpected") + return nil, nil + } + mockSender.ChainMockSend(onRefresh, unexpected) + + br, pErr = tsr.SendLocked(ctx, ba) + require.Nil(t, br) + require.NotNil(t, pErr) + require.Regexp(t, `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE - preemptive refresh\)`, pErr) + require.Equal(t, int64(2), tsr.refreshSuccess.Count()) + require.Equal(t, int64(1), tsr.refreshFail.Count()) + require.Equal(t, int64(0), tsr.refreshAutoRetries.Count()) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + + // Try again, but this time let the refresh succeed. + onRefresh = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.IsType(t, &roachpb.RefreshRangeRequest{}, ba.Requests[0].GetInner()) + + refReq := ba.Requests[0].GetRefreshRange() + require.Equal(t, scanArgs.Span(), refReq.Span()) + require.Equal(t, origReadTs, refReq.RefreshFrom) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + } + onEndTxn := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 1) + require.False(t, ba.CanForwardReadTimestamp) + require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + + // The transaction should be refreshed. + require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + } + mockSender.ChainMockSend(onRefresh, onEndTxn, unexpected) + + br, pErr = tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, int64(3), tsr.refreshSuccess.Count()) + require.Equal(t, int64(1), tsr.refreshFail.Count()) + require.Equal(t, int64(0), tsr.refreshAutoRetries.Count()) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) +} + type singleRangeIterator struct{} func (s singleRangeIterator) Valid() bool { diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index e99703344d19..9f2a29b1423e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -28,6 +28,7 @@ type TxnMetrics struct { RefreshFail *metric.Counter RefreshFailWithCondensedSpans *metric.Counter RefreshMemoryLimitExceeded *metric.Counter + RefreshAutoRetries *metric.Counter Durations *metric.Histogram @@ -99,6 +100,12 @@ var ( Measurement: "Transactions", Unit: metric.Unit_COUNT, } + metaRefreshAutoRetries = metric.Metadata{ + Name: "txn.refresh.auto_retries", + Help: "Number of batch retries after successful refreshes", + Measurement: "Retries", + Unit: metric.Unit_COUNT, + } metaDurationsHistograms = metric.Metadata{ Name: "txn.durations", Help: "KV transaction durations", @@ -196,10 +203,11 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), + RefreshSuccess: metric.NewCounter(metaRefreshSuccess), RefreshFail: metric.NewCounter(metaRefreshFail), RefreshFailWithCondensedSpans: metric.NewCounter(metaRefreshFailWithCondensedSpans), - RefreshSuccess: metric.NewCounter(metaRefreshSuccess), RefreshMemoryLimitExceeded: metric.NewCounter(metaRefreshMemoryLimitExceeded), + RefreshAutoRetries: metric.NewCounter(metaRefreshAutoRetries), Durations: metric.NewLatency(metaDurationsHistograms, histogramWindow), Restarts: metric.NewHistogram(metaRestartsHistogram, histogramWindow, 100, 3), RestartsWriteTooOld: telemetry.NewCounterWithMetric(metaRestartsWriteTooOld), diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index efb8adbf2c88..7372a6fde304 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2034,7 +2034,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { if _, err := txnOld.Inc(ctx, keyA, 4); err != nil { t.Fatal(err) } - const exp = `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE\)` + const exp = `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE - preemptive refresh\)` if err := txnOld.Commit(ctx); !testutils.IsError(err, exp) { t.Fatalf("expected retry error, got: %v; did we write under a read?", err) } diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 1cde6f3f5e73..694a3306fb18 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -1022,7 +1022,7 @@ INSERT INTO t.kv VALUES ('a', 'b'); // The transaction read at one timestamp and wrote at another so it // has to be restarted because the spans read were modified by the backfill. if err := txReadWrite.Commit(); !testutils.IsError(err, - "TransactionRetryError: retry txn \\(RETRY_SERIALIZABLE\\)") { + "TransactionRetryError: retry txn \\(RETRY_SERIALIZABLE - preemptive refresh\\)") { t.Fatalf("err = %v", err) } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 9bd6fcdec2ce..a677d0e41213 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -873,6 +873,10 @@ var charts = []sectionDescription{ Title: "Transactions exceeding refresh spans memory limit", Metrics: []string{"txn.refresh.memory_limit_exceeded"}, }, + { + Title: "Auto-Retries", + Metrics: []string{"txn.refresh.auto_retries"}, + }, { Title: "Commits", Metrics: []string{