Skip to content

Commit

Permalink
Merge #44608
Browse files Browse the repository at this point in the history
44608: kv: disallow parallel commit of batch with read-only requests r=nvanbenschoten a=nvanbenschoten

Fixes #44315.

This commit fixes an issue where KV batches with read-only requests were allowed
to be issued as parallel commits. This should not have been allowed because the
outcome of these read-only requests, notably Get and Scan requests, is not taken
into consideration by the status resolution process for STAGING transactions.

This would have caused serious atomicity issues if parallel commit batches
(non-1PC committing batches) were issued with read-only requests by users of the
KV API. Luckily, in practice that wouldn't actually happen as we never issue
batches like that so there's no concern about production clusters hitting this
bug. Still, this is a very good catch as this was a serious footgun and fixing
this hardens the KV API. We should also still backport this to v19.2 just to be
safe.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Feb 4, 2020
2 parents 7342f13 + 56a9934 commit 9bf1af0
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 16 deletions.
59 changes: 43 additions & 16 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ func (tc *txnCommitter) SendLocked(
et.Key = ba.Txn.Key
}

// Determine whether the commit can be run in parallel with the rest of the
// writes in the batch. If not, move the in-flight writes currently attached
// to the EndTxn request to the IntentSpans and clear the in-flight write
// set; no writes will be in-flight concurrently with the EndTxn request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallelWithWrites(ctx, ba, et) {
// Determine whether the commit request can be run in parallel with the rest
// of the requests in the batch. If not, move the in-flight writes currently
// attached to the EndTxn request to the IntentSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTxn
// request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) {
// NB: when parallel commits is disabled, this is the best place to
// detect whether the batch has only distinct spans. We can set this
// flag based on whether any of previously declared in-flight writes
Expand Down Expand Up @@ -191,9 +192,9 @@ func (tc *txnCommitter) SendLocked(
// the STAGING state.
//
// This is also possible if we never attached any in-flight writes to
// the EndTxn request, either because canCommitInParallelWithWrites
// returned false or because there were no unproven in-flight writes
// (see txnPipeliner) and there were no writes in the batch request.
// the EndTxn request, either because canCommitInParallel returned false
// or because there were no unproven in-flight writes (see txnPipeliner)
// and there were no writes in the batch request.
return br, nil
default:
return nil, roachpb.NewErrorf("unexpected response status without error: %v", br.Txn)
Expand Down Expand Up @@ -273,9 +274,11 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn(
return br, nil
}

// canCommitInParallelWithWrites determines whether the batch can issue its
// committing EndTxn in parallel with other in-flight writes.
func (tc *txnCommitter) canCommitInParallelWithWrites(
// canCommitInParallel determines whether the batch can issue its committing
// EndTxn in parallel with the rest of its requests and with any in-flight
// writes, which all should have corresponding QueryIntent requests in the
// batch.
func (tc *txnCommitter) canCommitInParallel(
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest,
) bool {
if !cluster.Version.IsActive(ctx, tc.st, cluster.VersionParallelCommits) {
Expand All @@ -297,13 +300,37 @@ func (tc *txnCommitter) canCommitInParallelWithWrites(
return false
}

// Similar to how we can't pipeline ranged writes, we also can't commit in
// parallel with them. The reason for this is that the status resolution
// process for STAGING transactions wouldn't know where to look for the
// intents.
// Check whether every request in the batch is compatable with a parallel
// commit. If any are incompatible then we cannot perform a parallel commit.
// We ignore the last request in the slice because we know it is the EndTxn.
for _, ru := range ba.Requests[:len(ba.Requests)-1] {
req := ru.GetInner()
if roachpb.IsTransactionWrite(req) && roachpb.IsRange(req) {
switch {
case roachpb.IsTransactionWrite(req):
if roachpb.IsRange(req) {
// Similar to how we can't pipeline ranged writes, we also can't
// commit in parallel with them. The reason for this is that the
// status resolution process for STAGING transactions wouldn't
// know where to look for the corresponding intents.
return false
}
// All other point writes are included in the EndTxn request's
// InFlightWrites set and are visible to the status resolution
// process for STAGING transactions. Populating InFlightWrites
// has already been done by the txnPipeliner.

case req.Method() == roachpb.QueryIntent:
// QueryIntent requests are compatable with parallel commits. The
// intents being queried are also attached to the EndTxn request's
// InFlightWrites set and are visible to the status resolution
// process for STAGING transactions. Populating InFlightWrites has
// already been done by the txnPipeliner.

default:
// All other request types, notably Get and Scan requests, are
// incompatible with parallel commits because their outcome is
// not taken into consideration by the status resolution process
// for STAGING transactions.
return false
}
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,35 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) {
br, pErr = tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send the same batch but with a point read instead of a point write.
// In-flight writes should not be attached because read-only requests
// cannot be parallelized with a commit.
ba.Requests = nil
getArgs := roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}
getArgs.Sequence = 2
etArgsCopy = etArgs
ba.Add(&getArgs, &qiArgs, &etArgsCopy)

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 3)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[2].GetInner())

et := ba.Requests[2].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.Len(t, et.IntentSpans, 2)
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, et.IntentSpans)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
return br, nil
})

br, pErr = tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
}

// TestTxnCommitterAsyncExplicitCommitTask verifies that when txnCommitter
Expand Down

0 comments on commit 9bf1af0

Please sign in to comment.