diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index dd71af584971..aa4fb4377977 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2060,6 +2060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientRestart bool // client-side txn restart expServerRefresh bool // server-side refresh expOnePhaseCommit bool // 1PC commits + expParallelCommitAutoRetry bool // parallel commit auto-retries expFailure string // regexp pattern to match on error, if not empty } type testCase struct { @@ -3045,6 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expServerRefresh: true, expClientRefreshSuccess: false, expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, }, }, { @@ -3062,10 +3064,11 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The Put to "c" will succeed with a forwarded timestamp. However, the // txn has already staged on the other range at an earlier timestamp. As a // result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a client-side refresh and parallel commit auto-retry. allIsoLevels: &expect{ expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3149,6 +3152,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expServerRefresh: true, expClientRefreshSuccess: false, expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, }, }, { @@ -3165,11 +3169,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The Put to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3184,12 +3189,13 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "a" will fail, failing the parallel commit and forcing a - // client-side refresh. + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, }, }, { @@ -3205,11 +3211,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, priorReads: true, // The Put to "c" will fail, failing the parallel commit and forcing a - // client-side refresh. + // client-side refresh and parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3387,11 +3394,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The cput to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3435,11 +3443,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The cput to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3615,6 +3624,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { require.Equal(t, exp.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess") require.Equal(t, exp.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts") require.Equal(t, exp.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC") + require.Equal(t, exp.expParallelCommitAutoRetry, metrics.ParallelCommitAutoRetries.Count() != 0, "TxnMetrics.ParallelCommitAutoRetries") } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 7e78838a5de6..2367dec634b3 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -163,8 +163,8 @@ type TxnCoordSender struct { txnHeartbeater txnSeqNumAllocator txnPipeliner - txnSpanRefresher txnCommitter + txnSpanRefresher txnMetricRecorder txnLockGatekeeper // not in interceptorStack array. } @@ -254,6 +254,7 @@ func newRootTxnCoordSender( tcs.interceptorAlloc.txnCommitter = txnCommitter{ st: tcf.st, stopper: tcs.stopper, + metrics: &tcs.metrics, mu: &tcs.mu.Mutex, } tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{ @@ -274,18 +275,20 @@ func newRootTxnCoordSender( // never generate transaction retry errors that could be avoided // with a refresh. &tcs.interceptorAlloc.txnPipeliner, + // The committer sits above the span refresher because it will + // never generate transaction retry errors that could be avoided + // with a refresh. However, it may re-issue parts of "successful" + // batches that failed to qualify for the parallel commit condition. + // These re-issued batches benefit from pre-emptive refreshes in the + // span refresher. + &tcs.interceptorAlloc.txnCommitter, // The span refresher may resend entire batches to avoid transaction // retries. Because of that, we need to be careful which interceptors // sit below it in the stack. + // The span refresher sits below the committer, so it must be prepared + // to see STAGING transaction protos in responses and errors. It should + // defer to the committer for how to handle those. &tcs.interceptorAlloc.txnSpanRefresher, - // The committer sits beneath the span refresher so that any - // retryable errors that it generates have a chance of being - // "refreshed away" without the need for a txn restart. Because the - // span refresher can re-issue batches, it needs to be careful about - // what parts of the batch it mutates. Any mutation needs to be - // idempotent and should avoid writing to memory when not changing - // it to avoid looking like a data race. - &tcs.interceptorAlloc.txnCommitter, // The metrics recorder sits at the bottom of the stack so that it // can observe all transformations performed by other interceptors. &tcs.interceptorAlloc.txnMetricRecorder, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 21dfa1198c46..a408f2438a4c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -89,12 +89,25 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting( // satisfied and the transaction is still in-progress (and could still be // committed or aborted at a later time). There are a number of reasons why // some of the requests in the final batch may have failed: -// - intent writes: these requests may fail to write an intent due to a logical -// error like a ConditionFailedError. They also could have succeeded at writing -// an intent but failed to write it at the desired timestamp because they ran -// into the timestamp cache or another committed value. In the first case, the -// txnCommitter will receive an error. In the second, it will generate one in -// needTxnRetryAfterStaging. +// +// - intent writes (error): these requests may fail to write an intent due to +// a logical error like a ConditionFailedError during evaluation. In these +// cases, the txnCommitter will receive an error and can conclude that +// intent was never written and so the implicit commit condition is not +// satisfied. The error is returned to the client. +// +// - intent writes (successful but pushed): these requests may also succeed at +// writing an intent but fail to write it at the desired (staging) timestamp +// because they run into the timestamp cache or another committed value +// (forms of contention). In these cases, the txnCommitter will receive a +// successful response, but can determine (isTxnCommitImplicit) that the +// transaction does not satisfy the implicit commit condition because one or +// more of its intents are written with a timestamp higher than the staging +// transaction record's. It will retry the transaction commit by re-issuing +// the EndTxn request (retryTxnCommitAfterFailedParallelCommit) to attempt +// to move the transaction directly to the explicitly committed state. This +// retry is called a "parallel commit auto-retry". +// // - query intents: these requests may fail because they discover that one of the // previously issued writes has failed; either because it never left an intent // or because it left one at too high of a timestamp. In this case, the request @@ -102,6 +115,7 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting( // set. It will also prevent the write from ever succeeding in the future, which // ensures that the transaction will never suddenly become implicitly committed // at a later point due to the write eventually succeeding (e.g. after a replay). +// // - end txn: this request may fail with a TransactionRetryError for any number of // reasons, such as if the transaction's provisional commit timestamp has been // pushed past its read timestamp. In all of these cases, an error will be @@ -117,6 +131,7 @@ type txnCommitter struct { st *cluster.Settings stopper *stop.Stopper wrapped lockedSender + metrics *TxnMetrics mu sync.Locker } @@ -142,17 +157,11 @@ func (tc *txnCommitter) SendLocked( return tc.sendLockedWithElidedEndTxn(ctx, ba, et) } - // Assign the transaction's key to the Request's header if it isn't already - // set. This is the only place where EndTxnRequest.Key is assigned, but we - // could be dealing with a re-issued batch after a refresh. Remember, the - // committer is below the span refresh on the interceptor stack. - if et.Key == nil { - et.Key = ba.Txn.Key - } else if len(et.InFlightWrites) > 0 { - // Parallel commits should be disabled on retries by splitEndTxnAndRetrySend. - return nil, kvpb.NewError(errors.AssertionFailedf( - "re-issued EndTxn request with InFlightWrites: %v", et)) + // Assign the transaction's key to the Request's header. + if et.Key != nil { + return nil, kvpb.NewError(errors.AssertionFailedf("client must not assign Key to EndTxn")) } + et.Key = ba.Txn.Key // Determine whether the commit request can be run in parallel with the rest // of the requests in the batch. If not, move the in-flight writes currently @@ -212,17 +221,24 @@ func (tc *txnCommitter) SendLocked( return nil, kvpb.NewErrorf("unexpected response status without error: %v", br.Txn) } - // Determine whether the transaction needs to either retry or refresh. When - // the EndTxn request evaluated while STAGING the transaction record, it - // performed this check. However, the transaction proto may have changed due - // to writes evaluated concurrently with the EndTxn even if none of those - // writes returned an error. Remember that the transaction proto we see here - // could be a combination of protos from responses, all merged by - // DistSender. - if pErr := needTxnRetryAfterStaging(br); pErr != nil { - log.VEventf(ctx, 2, "parallel commit failed since some writes were pushed. "+ - "Synthesized err: %s", pErr) - return nil, pErr + // Determine whether the transaction satisfies the implicit commit condition. + // If not, it needs to retry the EndTxn request, and possibly also refresh if + // it is serializable. + implicitCommit, err := isTxnCommitImplicit(br) + if err != nil { + return nil, kvpb.NewError(err) + } + + // Retry the EndTxn request (and nothing else) if the transaction does not + // satisfy the implicit commit condition. This EndTxn request will not be + // in-flight concurrently with any other writes (they all succeeded), so it + // will move the transaction record directly to the COMMITTED state. + // + // Note that we leave the transaction record that we wrote in the STAGING + // state, which is not ideal. But as long as we continue heartbeating the + // txn record, it being PENDING or STAGING does not make a difference. + if !implicitCommit { + return tc.retryTxnCommitAfterFailedParallelCommit(ctx, ba, br) } // If the transaction doesn't need to retry then it is implicitly committed! @@ -384,42 +400,71 @@ func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) ([]roachpb.Sp return roachpb.MergeSpans(&m) } -// needTxnRetryAfterStaging determines whether the transaction needs to refresh -// (see txnSpanRefresher) or retry based on the batch response of a parallel -// commit attempt. -func needTxnRetryAfterStaging(br *kvpb.BatchResponse) *kvpb.Error { +// isTxnCommitImplicit determines whether the transaction has satisfied the +// implicit commit requirements. It is used to determine whether the transaction +// needs to retry its EndTxn based on the response to a parallel commit attempt. +func isTxnCommitImplicit(br *kvpb.BatchResponse) (bool, error) { if len(br.Responses) == 0 { - return kvpb.NewErrorf("no responses in BatchResponse: %v", br) + return false, errors.AssertionFailedf("no responses in BatchResponse: %v", br) } lastResp := br.Responses[len(br.Responses)-1].GetInner() etResp, ok := lastResp.(*kvpb.EndTxnResponse) if !ok { - return kvpb.NewErrorf("unexpected response in BatchResponse: %v", lastResp) + return false, errors.AssertionFailedf("unexpected response in BatchResponse: %v", lastResp) } if etResp.StagingTimestamp.IsEmpty() { - return kvpb.NewErrorf("empty StagingTimestamp in EndTxnResponse: %v", etResp) + return false, errors.AssertionFailedf("empty StagingTimestamp in EndTxnResponse: %v", etResp) } - if etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) { - // If the timestamp that the transaction record was staged at - // is less than the timestamp of the transaction in the batch - // response then one of the concurrent writes was pushed to - // a higher timestamp. This violates the "implicit commit" - // condition and neither the transaction coordinator nor any - // other concurrent actor will consider this transaction to - // be committed as is. - // Note that we leave the transaction record that we wrote in the STAGING - // state, which is not ideal. But as long as we continue heartbeating the - // txn record, it being PENDING or STAGING does not make a difference. - reason := kvpb.RETRY_SERIALIZABLE - if br.Txn.WriteTooOld { - reason = kvpb.RETRY_WRITE_TOO_OLD - } - err := kvpb.NewTransactionRetryError( - reason, "serializability failure concurrent with STAGING") - txn := cloneWithStatus(br.Txn, roachpb.PENDING) - return kvpb.NewErrorWithTxn(err, txn) + // If the timestamp that the transaction record was staged at is less than + // the timestamp of the transaction in the batch response then one of the + // concurrent writes was pushed to a higher timestamp. This violates the + // "implicit commit" condition and neither the transaction coordinator nor + // any other concurrent actor will consider this transaction to be committed + // as is. + failed := etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) + return !failed, nil +} + +// retryTxnCommitAfterFailedParallelCommit retries the batch's EndTxn request +// after the batch has previously succeeded (with the response br), but failed +// to qualify for the implicit commit condition. This EndTxn request will not be +// in-flight concurrently with any other writes (they all succeeded), so it will +// move the transaction record directly to the COMMITTED state. +// +// If successful, the response for the re-issued EndTxn request is stitched back +// together with the rest of the BatchResponse and returned. +func (tc *txnCommitter) retryTxnCommitAfterFailedParallelCommit( + ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, +) (*kvpb.BatchResponse, *kvpb.Error) { + log.Eventf(ctx, "parallel commit failed; retrying EndTxn request") + tc.metrics.ParallelCommitAutoRetries.Inc(1) + + // Issue a batch containing only the EndTxn request. + etIdx := len(ba.Requests) - 1 + baSuffix := ba.ShallowCopy() + baSuffix.Requests = ba.Requests[etIdx:] + baSuffix.Txn = cloneWithStatus(br.Txn, roachpb.PENDING) + // Update the EndTxn request to move the in-flight writes currently attached + // to the EndTxn request to the LockSpans and clear the in-flight write set; + // the writes already succeeded and will not be in-flight concurrently with + // the EndTxn request. + { + et := baSuffix.Requests[0].GetEndTxn().ShallowCopy().(*kvpb.EndTxnRequest) + et.LockSpans, _ = mergeIntoSpans(et.LockSpans, et.InFlightWrites) + et.InFlightWrites = nil + baSuffix.Requests[0].Value.(*kvpb.RequestUnion_EndTxn).EndTxn = et } - return nil + brSuffix, pErr := tc.wrapped.SendLocked(ctx, baSuffix) + if pErr != nil { + return nil, pErr + } + + // Combine the responses. + br.Responses[etIdx] = kvpb.ResponseUnion{} + if err := br.Combine(ctx, brSuffix, []int{etIdx}, ba); err != nil { + return nil, kvpb.NewError(err) + } + return br, nil } // makeTxnCommitExplicitAsync launches an async task that attempts to move the diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index a8c8585f9e23..eb58244b724b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" @@ -27,10 +28,12 @@ import ( func makeMockTxnCommitter() (txnCommitter, *mockLockedSender) { mockSender := &mockLockedSender{} + metrics := MakeTxnMetrics(metric.TestSampleInterval) return txnCommitter{ st: cluster.MakeTestingClusterSettings(), stopper: stop.NewStopper(), wrapped: mockSender, + metrics: &metrics, mu: new(syncutil.Mutex), }, mockSender } @@ -402,61 +405,78 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) { <-explicitCommitCh } -// TestTxnCommitterRetryAfterStaging verifies that txnCommitter returns a retry -// error when a write performed in parallel with staging a transaction is pushed -// to a timestamp above the staging timestamp. +// TestTxnCommitterRetryAfterStaging verifies that txnCommitter performs a +// parallel commit auto-retry when a write performed in parallel with staging a +// transaction is pushed to a timestamp above the staging timestamp. func TestTxnCommitterRetryAfterStaging(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - testutils.RunTrueAndFalse(t, "WriteTooOld", func(t *testing.T, writeTooOld bool) { - tc, mockSender := makeMockTxnCommitter() - defer tc.stopper.Stop(ctx) + tc, mockSender := makeMockTxnCommitter() + defer tc.stopper.Stop(ctx) - txn := makeTxnProto() - keyA := roachpb.Key("a") + txn := makeTxnProto() + keyA := roachpb.Key("a") - ba := &kvpb.BatchRequest{} - ba.Header = kvpb.Header{Txn: &txn} - putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} - etArgs := kvpb.EndTxnRequest{Commit: true} - putArgs.Sequence = 1 - etArgs.Sequence = 2 - etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} - ba.Add(&putArgs, &etArgs) + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} + etArgs := kvpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} + ba.Add(&putArgs, &etArgs) - mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { - require.Len(t, ba.Requests, 2) - require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) - require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + onFirstSend := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.Equal(t, roachpb.PENDING, ba.Txn.Status) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) - et := ba.Requests[1].GetInner().(*kvpb.EndTxnRequest) - require.True(t, et.Commit) - require.Len(t, et.InFlightWrites, 1) - require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0]) + et := ba.Requests[1].GetEndTxn() + require.True(t, et.Commit) + require.Nil(t, et.LockSpans) + require.Len(t, et.InFlightWrites, 1) + require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0]) - br := ba.CreateReply() - br.Txn = ba.Txn - br.Txn.Status = roachpb.STAGING - br.Responses[1].GetInner().(*kvpb.EndTxnResponse).StagingTimestamp = br.Txn.WriteTimestamp - - // Pretend the PutRequest was split and sent to a different Range. It - // could hit the timestamp cache, or a WriteTooOld error (which sets the - // WriteTooOld flag). The intent will be written but the response - // transaction's timestamp will be larger than the staging timestamp. - br.Txn.WriteTooOld = writeTooOld - br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(1, 0) - return br, nil - }) + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.STAGING + br.Responses[1].GetEndTxn().StagingTimestamp = br.Txn.WriteTimestamp + + // Pretend the PutRequest was split and sent to a different Range. It + // could hit the timestamp cache or a WriteTooOld error (which sets the + // WriteTooOld flag which is stripped by the txnSpanRefresher). The intent + // will be written but the response transaction's timestamp will be larger + // than the staging timestamp. + br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(1, 0) + return br, nil + } + sawRetry := false + onRetry := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + sawRetry = true + require.Len(t, ba.Requests, 1) + require.Equal(t, roachpb.PENDING, ba.Txn.Status) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) - br, pErr := tc.SendLocked(ctx, ba) - require.Nil(t, br) - require.NotNil(t, pErr) - require.IsType(t, &kvpb.TransactionRetryError{}, pErr.GetDetail()) - expReason := kvpb.RETRY_SERIALIZABLE - if writeTooOld { - expReason = kvpb.RETRY_WRITE_TOO_OLD - } - require.Equal(t, expReason, pErr.GetDetail().(*kvpb.TransactionRetryError).Reason) - }) + et := ba.Requests[0].GetEndTxn() + require.True(t, et.Commit) + require.Nil(t, et.InFlightWrites) + require.Len(t, et.LockSpans, 1) + require.Equal(t, roachpb.Span{Key: keyA}, et.LockSpans[0]) + + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + return br, nil + } + mockSender.ChainMockSend(onFirstSend, onRetry) + + br, pErr := tc.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.Equal(t, roachpb.COMMITTED, br.Txn.Status) + require.True(t, sawRetry) + require.Equal(t, int64(1), tc.metrics.ParallelCommitAutoRetries.Count()) } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index c4e5bdc94b29..0b01acf3e683 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/google/btree" ) @@ -363,10 +364,10 @@ func (tp *txnPipeliner) attachLocksToEndTxn( } et := args.(*kvpb.EndTxnRequest) if len(et.LockSpans) > 0 { - return ba, kvpb.NewErrorf("client must not pass intents to EndTxn") + return ba, kvpb.NewError(errors.AssertionFailedf("client must not pass intents to EndTxn")) } if len(et.InFlightWrites) > 0 { - return ba, kvpb.NewErrorf("client must not pass in-flight writes to EndTxn") + return ba, kvpb.NewError(errors.AssertionFailedf("client must not pass in-flight writes to EndTxn")) } // Populate et.LockSpans and et.InFlightWrites. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index c7622c968f58..54248d6ddcf2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -300,6 +300,18 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( refreshFrom := txn.ReadTimestamp refreshToTxn := txn.Clone() refreshToTxn.Refresh(refreshTS) + switch refreshToTxn.Status { + case roachpb.PENDING: + case roachpb.STAGING: + // If the batch resulted in an error but the EndTxn request succeeded, + // staging the transaction record in the process, downgrade the status + // back to PENDING. Even though the transaction record may have a status + // of STAGING, we know that the transaction failed to implicitly commit. + refreshToTxn.Status = roachpb.PENDING + default: + return nil, kvpb.NewError(errors.AssertionFailedf( + "unexpected txn status during refresh: %v", refreshToTxn)) + } log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshToTxn.ReadTimestamp, pErr) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index cd570842d7e6..08e5b231d68e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -350,6 +350,102 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { } } +// TestTxnSpanRefresherDowngradesStagingTxnStatus tests that the txnSpanRefresher +// tolerates retry errors with a STAGING transaction status. In such cases, it +// will downgrade the status to PENDING before refreshing and retrying, because +// the presence of an error proves that the transaction failed to implicitly +// commit. +func TestTxnSpanRefresherDowngradesStagingTxnStatus(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + txn := makeTxnProto() + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + conflictTs := txn.WriteTimestamp.Add(15, 0) + + // Collect some refresh spans. + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + scanArgs := kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB}} + ba.Add(&scanArgs) + + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + require.Zero(t, tsr.refreshedTimestamp) + + // Hook up a chain of mocking functions. + onPutAndEndTxn := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + require.True(t, ba.Requests[1].GetEndTxn().IsParallelCommit()) + + // Return a write-too-old error with a STAGING status, emulating a + // successful EndTxn request and a failed Put request. This mixed success + // state is possible if the requests were split across ranges. + pErrTxn := ba.Txn.Clone() + pErrTxn.Status = roachpb.STAGING + pErr := &kvpb.WriteTooOldError{ActualTimestamp: conflictTs} + return nil, kvpb.NewErrorWithTxn(pErr, pErrTxn) + } + onRefresh := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.Equal(t, conflictTs, ba.Txn.ReadTimestamp) + require.IsType(t, &kvpb.RefreshRangeRequest{}, ba.Requests[0].GetInner()) + + refReq := ba.Requests[0].GetRefreshRange() + require.Equal(t, scanArgs.Span(), refReq.Span()) + require.Equal(t, txn.ReadTimestamp, refReq.RefreshFrom) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + return br, nil + } + onPut := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + return br, nil + } + onEndTxn := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + require.False(t, ba.Requests[0].GetEndTxn().IsParallelCommit()) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + return br, nil + } + mockSender.ChainMockSend(onPutAndEndTxn, onRefresh, onPut, onEndTxn) + + // Send a request that will hit a write-too-old error while also returning a + // STAGING transaction status. + ba.Requests = nil + putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyB}} + etArgs := kvpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyB, Sequence: 1}} + ba.Add(&putArgs, &etArgs) + + br, pErr = tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.Equal(t, roachpb.COMMITTED, br.Txn.Status) +} + // TestTxnSpanRefresherMaxRefreshAttempts tests that the txnSpanRefresher // attempts some number of retries before giving up and passing retryable // errors back up the stack. diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index ac3c8f2fcd15..d5f2ff42e434 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -19,11 +19,12 @@ import ( // TxnMetrics holds all metrics relating to KV transactions. type TxnMetrics struct { - Aborts *metric.Counter - Commits *metric.Counter - Commits1PC *metric.Counter // Commits which finished in a single phase - ParallelCommits *metric.Counter // Commits which entered the STAGING state - CommitWaits *metric.Counter // Commits that waited for linearizability + Aborts *metric.Counter + Commits *metric.Counter + Commits1PC *metric.Counter // Commits which finished in a single phase + ParallelCommits *metric.Counter // Commits which entered the STAGING state + ParallelCommitAutoRetries *metric.Counter // Commits which were retried after entering the STAGING state + CommitWaits *metric.Counter // Commits that waited for linearizability ClientRefreshSuccess *metric.Counter ClientRefreshFail *metric.Counter @@ -82,6 +83,12 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } + metaParallelCommitAutoRetries = metric.Metadata{ + Name: "txn.parallelcommits.auto_retries", + Help: "Number of commit tries after successful failed parallel commit attempts", + Measurement: "Retries", + Unit: metric.Unit_COUNT, + } metaCommitWaitCount = metric.Metadata{ Name: "txn.commit_waits", Help: "Number of KV transactions that had to commit-wait on commit " + @@ -275,6 +282,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), + ParallelCommitAutoRetries: metric.NewCounter(metaParallelCommitAutoRetries), CommitWaits: metric.NewCounter(metaCommitWaitCount), ClientRefreshSuccess: metric.NewCounter(metaClientRefreshSuccess), ClientRefreshFail: metric.NewCounter(metaClientRefreshFail),