From 019d15736f8d9d7bef0f70bfb31550cc6807c83d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 12 May 2023 18:18:53 -0400 Subject: [PATCH] kv: decouple parallel commit auto-retries from refresh auto-retries Informs #100131. This commit decouples the concepts of a parallel commit auto-retry from a refresh auto-retry. This is important for weak isolation transactions which can tolerate write skew, where a failed parallel commit does not necessarily imply that the transaction needs to refresh its read spans in order to commit. Parallel commit auto-retries are performed when a parallel committing batch is successful in its execution but fails to qualify for the implicit commit condition. In such cases, the EndTxn request (and just the EndTxn request) is re-issued to move the transaction directly to the COMMITTED state. This only implies the need for a refresh for serializable transactions. For these transactions, a preemptive refresh will be performed by the txnSpanRefresher before the EndTxn is re-issued. Conversely, refresh auto-retries are performed when a batch fails to execute fully and returns an error. In such cases, the txnSpanRefresher will intercept the error, attempt to refresh, and then re-issue the entire original batch. Because we no longer make an attempt to determine which parts of a previously issued batch were successful (this was subtle and error-prone when it existed), we instead re-issue the entire batch and rely on idempotency. To make this change, this commit moves the txnCommitter interceptor above the txnSpanRefresher interceptor in the TxnCoordSender interceptor stack. This means that the txnCommitter no longer needs to guard against re-issued batches, because all retries are handled below it. Instead, the txnSpanRefresher now needs to learn about the STAGING transaction status. The txnSpanRefresher was already taught about the STAGING status on successful batches in ef773227, and is taught about the STAGING status on errors here. While the primary motivation of this change is to permit weak isolation transactions to perform parallel commit auto-retries without needing to refresh, it also has another benefit. Failed parallel commits no longer re-issue their entire batch on serialization failures. Instead, they just re-issue the EndTxn. As a result, this replaces and closes #93099. It also motivates release note below. Release note (performance improvement): Some large, long-running INSERT statements now perform less work during their commit phase and can run faster. --- .../kvcoord/dist_sender_server_test.go | 41 +++-- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 21 +-- .../kvcoord/txn_interceptor_committer.go | 153 +++++++++++------- .../kvcoord/txn_interceptor_committer_test.go | 114 +++++++------ .../kvcoord/txn_interceptor_pipeliner.go | 5 +- .../kvcoord/txn_interceptor_span_refresher.go | 12 ++ .../txn_interceptor_span_refresher_test.go | 96 +++++++++++ pkg/kv/kvclient/kvcoord/txn_metrics.go | 18 ++- 8 files changed, 328 insertions(+), 132 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index dd71af584971..6f27c607d0ac 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2060,6 +2060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientRestart bool // client-side txn restart expServerRefresh bool // server-side refresh expOnePhaseCommit bool // 1PC commits + expParallelCommitAutoRetry bool // parallel commit auto-retries expFailure string // regexp pattern to match on error, if not empty } type testCase struct { @@ -3045,6 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expServerRefresh: true, expClientRefreshSuccess: false, expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, }, }, { @@ -3062,10 +3064,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The Put to "c" will succeed with a forwarded timestamp. However, the // txn has already staged on the other range at an earlier timestamp. As a // result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry and preemptive client-side + // refresh. allIsoLevels: &expect{ expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3149,6 +3153,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expServerRefresh: true, expClientRefreshSuccess: false, expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, }, }, { @@ -3165,11 +3170,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The Put to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3184,12 +3190,13 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "a" will fail, failing the parallel commit and forcing a - // client-side refresh. + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, }, }, { @@ -3205,11 +3212,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, priorReads: true, // The Put to "c" will fail, failing the parallel commit and forcing a - // client-side refresh. + // parallel commit auto-retry and preemptive client-side refresh. allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3387,11 +3395,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The cput to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3435,11 +3444,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The cput to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3615,6 +3625,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { require.Equal(t, exp.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess") require.Equal(t, exp.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts") require.Equal(t, exp.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC") + require.Equal(t, exp.expParallelCommitAutoRetry, metrics.ParallelCommitAutoRetries.Count() != 0, "TxnMetrics.ParallelCommitAutoRetries") } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 7e78838a5de6..2367dec634b3 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -163,8 +163,8 @@ type TxnCoordSender struct { txnHeartbeater txnSeqNumAllocator txnPipeliner - txnSpanRefresher txnCommitter + txnSpanRefresher txnMetricRecorder txnLockGatekeeper // not in interceptorStack array. } @@ -254,6 +254,7 @@ func newRootTxnCoordSender( tcs.interceptorAlloc.txnCommitter = txnCommitter{ st: tcf.st, stopper: tcs.stopper, + metrics: &tcs.metrics, mu: &tcs.mu.Mutex, } tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{ @@ -274,18 +275,20 @@ func newRootTxnCoordSender( // never generate transaction retry errors that could be avoided // with a refresh. &tcs.interceptorAlloc.txnPipeliner, + // The committer sits above the span refresher because it will + // never generate transaction retry errors that could be avoided + // with a refresh. However, it may re-issue parts of "successful" + // batches that failed to qualify for the parallel commit condition. + // These re-issued batches benefit from pre-emptive refreshes in the + // span refresher. + &tcs.interceptorAlloc.txnCommitter, // The span refresher may resend entire batches to avoid transaction // retries. Because of that, we need to be careful which interceptors // sit below it in the stack. + // The span refresher sits below the committer, so it must be prepared + // to see STAGING transaction protos in responses and errors. It should + // defer to the committer for how to handle those. &tcs.interceptorAlloc.txnSpanRefresher, - // The committer sits beneath the span refresher so that any - // retryable errors that it generates have a chance of being - // "refreshed away" without the need for a txn restart. Because the - // span refresher can re-issue batches, it needs to be careful about - // what parts of the batch it mutates. Any mutation needs to be - // idempotent and should avoid writing to memory when not changing - // it to avoid looking like a data race. - &tcs.interceptorAlloc.txnCommitter, // The metrics recorder sits at the bottom of the stack so that it // can observe all transformations performed by other interceptors. &tcs.interceptorAlloc.txnMetricRecorder, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 21dfa1198c46..a408f2438a4c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -89,12 +89,25 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting( // satisfied and the transaction is still in-progress (and could still be // committed or aborted at a later time). There are a number of reasons why // some of the requests in the final batch may have failed: -// - intent writes: these requests may fail to write an intent due to a logical -// error like a ConditionFailedError. They also could have succeeded at writing -// an intent but failed to write it at the desired timestamp because they ran -// into the timestamp cache or another committed value. In the first case, the -// txnCommitter will receive an error. In the second, it will generate one in -// needTxnRetryAfterStaging. +// +// - intent writes (error): these requests may fail to write an intent due to +// a logical error like a ConditionFailedError during evaluation. In these +// cases, the txnCommitter will receive an error and can conclude that +// intent was never written and so the implicit commit condition is not +// satisfied. The error is returned to the client. +// +// - intent writes (successful but pushed): these requests may also succeed at +// writing an intent but fail to write it at the desired (staging) timestamp +// because they run into the timestamp cache or another committed value +// (forms of contention). In these cases, the txnCommitter will receive a +// successful response, but can determine (isTxnCommitImplicit) that the +// transaction does not satisfy the implicit commit condition because one or +// more of its intents are written with a timestamp higher than the staging +// transaction record's. It will retry the transaction commit by re-issuing +// the EndTxn request (retryTxnCommitAfterFailedParallelCommit) to attempt +// to move the transaction directly to the explicitly committed state. This +// retry is called a "parallel commit auto-retry". +// // - query intents: these requests may fail because they discover that one of the // previously issued writes has failed; either because it never left an intent // or because it left one at too high of a timestamp. In this case, the request @@ -102,6 +115,7 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting( // set. It will also prevent the write from ever succeeding in the future, which // ensures that the transaction will never suddenly become implicitly committed // at a later point due to the write eventually succeeding (e.g. after a replay). +// // - end txn: this request may fail with a TransactionRetryError for any number of // reasons, such as if the transaction's provisional commit timestamp has been // pushed past its read timestamp. In all of these cases, an error will be @@ -117,6 +131,7 @@ type txnCommitter struct { st *cluster.Settings stopper *stop.Stopper wrapped lockedSender + metrics *TxnMetrics mu sync.Locker } @@ -142,17 +157,11 @@ func (tc *txnCommitter) SendLocked( return tc.sendLockedWithElidedEndTxn(ctx, ba, et) } - // Assign the transaction's key to the Request's header if it isn't already - // set. This is the only place where EndTxnRequest.Key is assigned, but we - // could be dealing with a re-issued batch after a refresh. Remember, the - // committer is below the span refresh on the interceptor stack. - if et.Key == nil { - et.Key = ba.Txn.Key - } else if len(et.InFlightWrites) > 0 { - // Parallel commits should be disabled on retries by splitEndTxnAndRetrySend. - return nil, kvpb.NewError(errors.AssertionFailedf( - "re-issued EndTxn request with InFlightWrites: %v", et)) + // Assign the transaction's key to the Request's header. + if et.Key != nil { + return nil, kvpb.NewError(errors.AssertionFailedf("client must not assign Key to EndTxn")) } + et.Key = ba.Txn.Key // Determine whether the commit request can be run in parallel with the rest // of the requests in the batch. If not, move the in-flight writes currently @@ -212,17 +221,24 @@ func (tc *txnCommitter) SendLocked( return nil, kvpb.NewErrorf("unexpected response status without error: %v", br.Txn) } - // Determine whether the transaction needs to either retry or refresh. When - // the EndTxn request evaluated while STAGING the transaction record, it - // performed this check. However, the transaction proto may have changed due - // to writes evaluated concurrently with the EndTxn even if none of those - // writes returned an error. Remember that the transaction proto we see here - // could be a combination of protos from responses, all merged by - // DistSender. - if pErr := needTxnRetryAfterStaging(br); pErr != nil { - log.VEventf(ctx, 2, "parallel commit failed since some writes were pushed. "+ - "Synthesized err: %s", pErr) - return nil, pErr + // Determine whether the transaction satisfies the implicit commit condition. + // If not, it needs to retry the EndTxn request, and possibly also refresh if + // it is serializable. + implicitCommit, err := isTxnCommitImplicit(br) + if err != nil { + return nil, kvpb.NewError(err) + } + + // Retry the EndTxn request (and nothing else) if the transaction does not + // satisfy the implicit commit condition. This EndTxn request will not be + // in-flight concurrently with any other writes (they all succeeded), so it + // will move the transaction record directly to the COMMITTED state. + // + // Note that we leave the transaction record that we wrote in the STAGING + // state, which is not ideal. But as long as we continue heartbeating the + // txn record, it being PENDING or STAGING does not make a difference. + if !implicitCommit { + return tc.retryTxnCommitAfterFailedParallelCommit(ctx, ba, br) } // If the transaction doesn't need to retry then it is implicitly committed! @@ -384,42 +400,71 @@ func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) ([]roachpb.Sp return roachpb.MergeSpans(&m) } -// needTxnRetryAfterStaging determines whether the transaction needs to refresh -// (see txnSpanRefresher) or retry based on the batch response of a parallel -// commit attempt. -func needTxnRetryAfterStaging(br *kvpb.BatchResponse) *kvpb.Error { +// isTxnCommitImplicit determines whether the transaction has satisfied the +// implicit commit requirements. It is used to determine whether the transaction +// needs to retry its EndTxn based on the response to a parallel commit attempt. +func isTxnCommitImplicit(br *kvpb.BatchResponse) (bool, error) { if len(br.Responses) == 0 { - return kvpb.NewErrorf("no responses in BatchResponse: %v", br) + return false, errors.AssertionFailedf("no responses in BatchResponse: %v", br) } lastResp := br.Responses[len(br.Responses)-1].GetInner() etResp, ok := lastResp.(*kvpb.EndTxnResponse) if !ok { - return kvpb.NewErrorf("unexpected response in BatchResponse: %v", lastResp) + return false, errors.AssertionFailedf("unexpected response in BatchResponse: %v", lastResp) } if etResp.StagingTimestamp.IsEmpty() { - return kvpb.NewErrorf("empty StagingTimestamp in EndTxnResponse: %v", etResp) + return false, errors.AssertionFailedf("empty StagingTimestamp in EndTxnResponse: %v", etResp) } - if etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) { - // If the timestamp that the transaction record was staged at - // is less than the timestamp of the transaction in the batch - // response then one of the concurrent writes was pushed to - // a higher timestamp. This violates the "implicit commit" - // condition and neither the transaction coordinator nor any - // other concurrent actor will consider this transaction to - // be committed as is. - // Note that we leave the transaction record that we wrote in the STAGING - // state, which is not ideal. But as long as we continue heartbeating the - // txn record, it being PENDING or STAGING does not make a difference. - reason := kvpb.RETRY_SERIALIZABLE - if br.Txn.WriteTooOld { - reason = kvpb.RETRY_WRITE_TOO_OLD - } - err := kvpb.NewTransactionRetryError( - reason, "serializability failure concurrent with STAGING") - txn := cloneWithStatus(br.Txn, roachpb.PENDING) - return kvpb.NewErrorWithTxn(err, txn) + // If the timestamp that the transaction record was staged at is less than + // the timestamp of the transaction in the batch response then one of the + // concurrent writes was pushed to a higher timestamp. This violates the + // "implicit commit" condition and neither the transaction coordinator nor + // any other concurrent actor will consider this transaction to be committed + // as is. + failed := etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) + return !failed, nil +} + +// retryTxnCommitAfterFailedParallelCommit retries the batch's EndTxn request +// after the batch has previously succeeded (with the response br), but failed +// to qualify for the implicit commit condition. This EndTxn request will not be +// in-flight concurrently with any other writes (they all succeeded), so it will +// move the transaction record directly to the COMMITTED state. +// +// If successful, the response for the re-issued EndTxn request is stitched back +// together with the rest of the BatchResponse and returned. +func (tc *txnCommitter) retryTxnCommitAfterFailedParallelCommit( + ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, +) (*kvpb.BatchResponse, *kvpb.Error) { + log.Eventf(ctx, "parallel commit failed; retrying EndTxn request") + tc.metrics.ParallelCommitAutoRetries.Inc(1) + + // Issue a batch containing only the EndTxn request. + etIdx := len(ba.Requests) - 1 + baSuffix := ba.ShallowCopy() + baSuffix.Requests = ba.Requests[etIdx:] + baSuffix.Txn = cloneWithStatus(br.Txn, roachpb.PENDING) + // Update the EndTxn request to move the in-flight writes currently attached + // to the EndTxn request to the LockSpans and clear the in-flight write set; + // the writes already succeeded and will not be in-flight concurrently with + // the EndTxn request. + { + et := baSuffix.Requests[0].GetEndTxn().ShallowCopy().(*kvpb.EndTxnRequest) + et.LockSpans, _ = mergeIntoSpans(et.LockSpans, et.InFlightWrites) + et.InFlightWrites = nil + baSuffix.Requests[0].Value.(*kvpb.RequestUnion_EndTxn).EndTxn = et } - return nil + brSuffix, pErr := tc.wrapped.SendLocked(ctx, baSuffix) + if pErr != nil { + return nil, pErr + } + + // Combine the responses. + br.Responses[etIdx] = kvpb.ResponseUnion{} + if err := br.Combine(ctx, brSuffix, []int{etIdx}, ba); err != nil { + return nil, kvpb.NewError(err) + } + return br, nil } // makeTxnCommitExplicitAsync launches an async task that attempts to move the diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index a8c8585f9e23..eb58244b724b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" @@ -27,10 +28,12 @@ import ( func makeMockTxnCommitter() (txnCommitter, *mockLockedSender) { mockSender := &mockLockedSender{} + metrics := MakeTxnMetrics(metric.TestSampleInterval) return txnCommitter{ st: cluster.MakeTestingClusterSettings(), stopper: stop.NewStopper(), wrapped: mockSender, + metrics: &metrics, mu: new(syncutil.Mutex), }, mockSender } @@ -402,61 +405,78 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) { <-explicitCommitCh } -// TestTxnCommitterRetryAfterStaging verifies that txnCommitter returns a retry -// error when a write performed in parallel with staging a transaction is pushed -// to a timestamp above the staging timestamp. +// TestTxnCommitterRetryAfterStaging verifies that txnCommitter performs a +// parallel commit auto-retry when a write performed in parallel with staging a +// transaction is pushed to a timestamp above the staging timestamp. func TestTxnCommitterRetryAfterStaging(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - testutils.RunTrueAndFalse(t, "WriteTooOld", func(t *testing.T, writeTooOld bool) { - tc, mockSender := makeMockTxnCommitter() - defer tc.stopper.Stop(ctx) + tc, mockSender := makeMockTxnCommitter() + defer tc.stopper.Stop(ctx) - txn := makeTxnProto() - keyA := roachpb.Key("a") + txn := makeTxnProto() + keyA := roachpb.Key("a") - ba := &kvpb.BatchRequest{} - ba.Header = kvpb.Header{Txn: &txn} - putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} - etArgs := kvpb.EndTxnRequest{Commit: true} - putArgs.Sequence = 1 - etArgs.Sequence = 2 - etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} - ba.Add(&putArgs, &etArgs) + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} + etArgs := kvpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} + ba.Add(&putArgs, &etArgs) - mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { - require.Len(t, ba.Requests, 2) - require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) - require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + onFirstSend := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.Equal(t, roachpb.PENDING, ba.Txn.Status) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) - et := ba.Requests[1].GetInner().(*kvpb.EndTxnRequest) - require.True(t, et.Commit) - require.Len(t, et.InFlightWrites, 1) - require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0]) + et := ba.Requests[1].GetEndTxn() + require.True(t, et.Commit) + require.Nil(t, et.LockSpans) + require.Len(t, et.InFlightWrites, 1) + require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0]) - br := ba.CreateReply() - br.Txn = ba.Txn - br.Txn.Status = roachpb.STAGING - br.Responses[1].GetInner().(*kvpb.EndTxnResponse).StagingTimestamp = br.Txn.WriteTimestamp - - // Pretend the PutRequest was split and sent to a different Range. It - // could hit the timestamp cache, or a WriteTooOld error (which sets the - // WriteTooOld flag). The intent will be written but the response - // transaction's timestamp will be larger than the staging timestamp. - br.Txn.WriteTooOld = writeTooOld - br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(1, 0) - return br, nil - }) + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.STAGING + br.Responses[1].GetEndTxn().StagingTimestamp = br.Txn.WriteTimestamp + + // Pretend the PutRequest was split and sent to a different Range. It + // could hit the timestamp cache or a WriteTooOld error (which sets the + // WriteTooOld flag which is stripped by the txnSpanRefresher). The intent + // will be written but the response transaction's timestamp will be larger + // than the staging timestamp. + br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(1, 0) + return br, nil + } + sawRetry := false + onRetry := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + sawRetry = true + require.Len(t, ba.Requests, 1) + require.Equal(t, roachpb.PENDING, ba.Txn.Status) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) - br, pErr := tc.SendLocked(ctx, ba) - require.Nil(t, br) - require.NotNil(t, pErr) - require.IsType(t, &kvpb.TransactionRetryError{}, pErr.GetDetail()) - expReason := kvpb.RETRY_SERIALIZABLE - if writeTooOld { - expReason = kvpb.RETRY_WRITE_TOO_OLD - } - require.Equal(t, expReason, pErr.GetDetail().(*kvpb.TransactionRetryError).Reason) - }) + et := ba.Requests[0].GetEndTxn() + require.True(t, et.Commit) + require.Nil(t, et.InFlightWrites) + require.Len(t, et.LockSpans, 1) + require.Equal(t, roachpb.Span{Key: keyA}, et.LockSpans[0]) + + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + return br, nil + } + mockSender.ChainMockSend(onFirstSend, onRetry) + + br, pErr := tc.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.Equal(t, roachpb.COMMITTED, br.Txn.Status) + require.True(t, sawRetry) + require.Equal(t, int64(1), tc.metrics.ParallelCommitAutoRetries.Count()) } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index c4e5bdc94b29..0b01acf3e683 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/google/btree" ) @@ -363,10 +364,10 @@ func (tp *txnPipeliner) attachLocksToEndTxn( } et := args.(*kvpb.EndTxnRequest) if len(et.LockSpans) > 0 { - return ba, kvpb.NewErrorf("client must not pass intents to EndTxn") + return ba, kvpb.NewError(errors.AssertionFailedf("client must not pass intents to EndTxn")) } if len(et.InFlightWrites) > 0 { - return ba, kvpb.NewErrorf("client must not pass in-flight writes to EndTxn") + return ba, kvpb.NewError(errors.AssertionFailedf("client must not pass in-flight writes to EndTxn")) } // Populate et.LockSpans and et.InFlightWrites. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index c7622c968f58..54248d6ddcf2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -300,6 +300,18 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( refreshFrom := txn.ReadTimestamp refreshToTxn := txn.Clone() refreshToTxn.Refresh(refreshTS) + switch refreshToTxn.Status { + case roachpb.PENDING: + case roachpb.STAGING: + // If the batch resulted in an error but the EndTxn request succeeded, + // staging the transaction record in the process, downgrade the status + // back to PENDING. Even though the transaction record may have a status + // of STAGING, we know that the transaction failed to implicitly commit. + refreshToTxn.Status = roachpb.PENDING + default: + return nil, kvpb.NewError(errors.AssertionFailedf( + "unexpected txn status during refresh: %v", refreshToTxn)) + } log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshToTxn.ReadTimestamp, pErr) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index cd570842d7e6..08e5b231d68e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -350,6 +350,102 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { } } +// TestTxnSpanRefresherDowngradesStagingTxnStatus tests that the txnSpanRefresher +// tolerates retry errors with a STAGING transaction status. In such cases, it +// will downgrade the status to PENDING before refreshing and retrying, because +// the presence of an error proves that the transaction failed to implicitly +// commit. +func TestTxnSpanRefresherDowngradesStagingTxnStatus(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + txn := makeTxnProto() + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + conflictTs := txn.WriteTimestamp.Add(15, 0) + + // Collect some refresh spans. + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + scanArgs := kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB}} + ba.Add(&scanArgs) + + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + require.Zero(t, tsr.refreshedTimestamp) + + // Hook up a chain of mocking functions. + onPutAndEndTxn := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + require.True(t, ba.Requests[1].GetEndTxn().IsParallelCommit()) + + // Return a write-too-old error with a STAGING status, emulating a + // successful EndTxn request and a failed Put request. This mixed success + // state is possible if the requests were split across ranges. + pErrTxn := ba.Txn.Clone() + pErrTxn.Status = roachpb.STAGING + pErr := &kvpb.WriteTooOldError{ActualTimestamp: conflictTs} + return nil, kvpb.NewErrorWithTxn(pErr, pErrTxn) + } + onRefresh := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.Equal(t, conflictTs, ba.Txn.ReadTimestamp) + require.IsType(t, &kvpb.RefreshRangeRequest{}, ba.Requests[0].GetInner()) + + refReq := ba.Requests[0].GetRefreshRange() + require.Equal(t, scanArgs.Span(), refReq.Span()) + require.Equal(t, txn.ReadTimestamp, refReq.RefreshFrom) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + return br, nil + } + onPut := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + return br, nil + } + onEndTxn := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + require.False(t, ba.Requests[0].GetEndTxn().IsParallelCommit()) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + return br, nil + } + mockSender.ChainMockSend(onPutAndEndTxn, onRefresh, onPut, onEndTxn) + + // Send a request that will hit a write-too-old error while also returning a + // STAGING transaction status. + ba.Requests = nil + putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyB}} + etArgs := kvpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyB, Sequence: 1}} + ba.Add(&putArgs, &etArgs) + + br, pErr = tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.Equal(t, roachpb.COMMITTED, br.Txn.Status) +} + // TestTxnSpanRefresherMaxRefreshAttempts tests that the txnSpanRefresher // attempts some number of retries before giving up and passing retryable // errors back up the stack. diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index ac3c8f2fcd15..d5f2ff42e434 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -19,11 +19,12 @@ import ( // TxnMetrics holds all metrics relating to KV transactions. type TxnMetrics struct { - Aborts *metric.Counter - Commits *metric.Counter - Commits1PC *metric.Counter // Commits which finished in a single phase - ParallelCommits *metric.Counter // Commits which entered the STAGING state - CommitWaits *metric.Counter // Commits that waited for linearizability + Aborts *metric.Counter + Commits *metric.Counter + Commits1PC *metric.Counter // Commits which finished in a single phase + ParallelCommits *metric.Counter // Commits which entered the STAGING state + ParallelCommitAutoRetries *metric.Counter // Commits which were retried after entering the STAGING state + CommitWaits *metric.Counter // Commits that waited for linearizability ClientRefreshSuccess *metric.Counter ClientRefreshFail *metric.Counter @@ -82,6 +83,12 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } + metaParallelCommitAutoRetries = metric.Metadata{ + Name: "txn.parallelcommits.auto_retries", + Help: "Number of commit tries after successful failed parallel commit attempts", + Measurement: "Retries", + Unit: metric.Unit_COUNT, + } metaCommitWaitCount = metric.Metadata{ Name: "txn.commit_waits", Help: "Number of KV transactions that had to commit-wait on commit " + @@ -275,6 +282,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), + ParallelCommitAutoRetries: metric.NewCounter(metaParallelCommitAutoRetries), CommitWaits: metric.NewCounter(metaCommitWaitCount), ClientRefreshSuccess: metric.NewCounter(metaClientRefreshSuccess), ClientRefreshFail: metric.NewCounter(metaClientRefreshFail),