diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index 53e18bcc877c..167ef96523d2 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -142,11 +142,12 @@ func (tc *txnCommitter) SendLocked( et.Key = ba.Txn.Key } - // Determine whether the commit can be run in parallel with the rest of the - // writes in the batch. If not, move the in-flight writes currently attached - // to the EndTxn request to the IntentSpans and clear the in-flight write - // set; no writes will be in-flight concurrently with the EndTxn request. - if len(et.InFlightWrites) > 0 && !tc.canCommitInParallelWithWrites(ctx, ba, et) { + // Determine whether the commit request can be run in parallel with the rest + // of the requests in the batch. If not, move the in-flight writes currently + // attached to the EndTxn request to the IntentSpans and clear the in-flight + // write set; no writes will be in-flight concurrently with the EndTxn + // request. + if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) { // NB: when parallel commits is disabled, this is the best place to // detect whether the batch has only distinct spans. We can set this // flag based on whether any of previously declared in-flight writes @@ -191,9 +192,9 @@ func (tc *txnCommitter) SendLocked( // the STAGING state. // // This is also possible if we never attached any in-flight writes to - // the EndTxn request, either because canCommitInParallelWithWrites - // returned false or because there were no unproven in-flight writes - // (see txnPipeliner) and there were no writes in the batch request. + // the EndTxn request, either because canCommitInParallel returned false + // or because there were no unproven in-flight writes (see txnPipeliner) + // and there were no writes in the batch request. return br, nil default: return nil, roachpb.NewErrorf("unexpected response status without error: %v", br.Txn) @@ -273,9 +274,11 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn( return br, nil } -// canCommitInParallelWithWrites determines whether the batch can issue its -// committing EndTxn in parallel with other in-flight writes. -func (tc *txnCommitter) canCommitInParallelWithWrites( +// canCommitInParallel determines whether the batch can issue its committing +// EndTxn in parallel with the rest of its requests and with any in-flight +// writes, which all should have corresponding QueryIntent requests in the +// batch. +func (tc *txnCommitter) canCommitInParallel( ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, ) bool { if !cluster.Version.IsActive(ctx, tc.st, cluster.VersionParallelCommits) { @@ -297,13 +300,37 @@ func (tc *txnCommitter) canCommitInParallelWithWrites( return false } - // Similar to how we can't pipeline ranged writes, we also can't commit in - // parallel with them. The reason for this is that the status resolution - // process for STAGING transactions wouldn't know where to look for the - // intents. + // Check whether every request in the batch is compatable with a parallel + // commit. If any are incompatible then we cannot perform a parallel commit. + // We ignore the last request in the slice because we know it is the EndTxn. for _, ru := range ba.Requests[:len(ba.Requests)-1] { req := ru.GetInner() - if roachpb.IsTransactionWrite(req) && roachpb.IsRange(req) { + switch { + case roachpb.IsTransactionWrite(req): + if roachpb.IsRange(req) { + // Similar to how we can't pipeline ranged writes, we also can't + // commit in parallel with them. The reason for this is that the + // status resolution process for STAGING transactions wouldn't + // know where to look for the corresponding intents. + return false + } + // All other point writes are included in the EndTxn request's + // InFlightWrites set and are visible to the status resolution + // process for STAGING transactions. Populating InFlightWrites + // has already been done by the txnPipeliner. + + case req.Method() == roachpb.QueryIntent: + // QueryIntent requests are compatable with parallel commits. The + // intents being queried are also attached to the EndTxn request's + // InFlightWrites set and are visible to the status resolution + // process for STAGING transactions. Populating InFlightWrites has + // already been done by the txnPipeliner. + + default: + // All other request types, notably Get and Scan requests, are + // incompatible with parallel commits because their outcome is + // not taken into consideration by the status resolution process + // for STAGING transactions. return false } } diff --git a/pkg/kv/txn_interceptor_committer_test.go b/pkg/kv/txn_interceptor_committer_test.go index 19e697959d3d..75df66cf0f17 100644 --- a/pkg/kv/txn_interceptor_committer_test.go +++ b/pkg/kv/txn_interceptor_committer_test.go @@ -295,6 +295,35 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) { br, pErr = tc.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) + + // Send the same batch but with a point read instead of a point write. + // In-flight writes should not be attached because read-only requests + // cannot be parallelized with a commit. + ba.Requests = nil + getArgs := roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}} + getArgs.Sequence = 2 + etArgsCopy = etArgs + ba.Add(&getArgs, &qiArgs, &etArgsCopy) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 3) + require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[2].GetInner()) + + et := ba.Requests[2].GetInner().(*roachpb.EndTxnRequest) + require.True(t, et.Commit) + require.Len(t, et.IntentSpans, 2) + require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, et.IntentSpans) + require.Len(t, et.InFlightWrites, 0) + + br = ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + return br, nil + }) + + br, pErr = tc.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) } // TestTxnCommitterAsyncExplicitCommitTask verifies that when txnCommitter