Skip to content

Commit

Permalink
kv: disallow parallel commit of batch with read-only requests
Browse files Browse the repository at this point in the history
Fixes cockroachdb#44315.

This commit fixes an issue where KV batches with read-only requests were allowed
to be issued as parallel commits. This should not have been allowed because the
outcome of these read-only requests, notably Get and Scan requests, is not taken
into consideration by the status resolution process for STAGING transactions.

This would have caused serious atomicity issues if parallel commit batches
(non-1PC committing batches) were issued with read-only requests by users of the
KV API. Luckily, in practice that wouldn't actually happen as we never issue
batches like that so there's no concern about production clusters hitting this
bug. Still, this is a very good catch as this was a serious footgun and fixing
this hardens the KV API. We should also still backport this to v19.2 just to be
safe.

Release note: None
  • Loading branch information
nvanbenschoten committed Feb 4, 2020
1 parent 7362d0c commit ede3096
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 15 deletions.
56 changes: 41 additions & 15 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ func (tc *txnCommitter) SendLocked(
et.Key = ba.Txn.Key
}

// Determine whether the commit can be run in parallel with the rest of the
// writes in the batch. If not, move the in-flight writes currently attached
// to the EndTransaction request to the IntentSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTransaction
// Determine whether the commit request can be run in parallel with the rest
// of the requests in the batch. If not, move the in-flight writes currently
// attached to the EndTxn request to the IntentSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTxn
// request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallelWithWrites(ba, et) {
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ba, et) {
// NB: when parallel commits is disabled, this is the best place to
// detect whether the batch has only distinct spans. We can set this
// flag based on whether any of previously declared in-flight writes
Expand Down Expand Up @@ -191,8 +191,8 @@ func (tc *txnCommitter) SendLocked(
// all in-flight writes will succeed with the EndTransaction and can
// decide to skip the STAGING state.
//
// This is also possible if we never attached any in-flight writes to the
// EndTransaction request, either because canCommitInParallelWithWrites
// This is also possible if we never attached any in-flight writes to
// the EndTransaction request, either because canCommitInParallel
// returned false or because there were no unproven in-flight writes
// (see txnPipeliner) and there were no writes in the batch request.
return br, nil
Expand Down Expand Up @@ -277,9 +277,11 @@ func (tc *txnCommitter) sendLockedWithElidedEndTransaction(
return br, nil
}

// canCommitInParallelWithWrites determines whether the batch can issue its
// committing EndTransaction in parallel with other in-flight writes.
func (tc *txnCommitter) canCommitInParallelWithWrites(
// canCommitInParallel determines whether the batch can issue its committing
// EndTxn in parallel with the rest of its requests and with any in-flight
// writes, which all should have corresponding QueryIntent requests in the
// batch.
func (tc *txnCommitter) canCommitInParallel(
ba roachpb.BatchRequest, et *roachpb.EndTransactionRequest,
) bool {
if !tc.st.Version.IsActive(cluster.VersionParallelCommits) {
Expand All @@ -301,13 +303,37 @@ func (tc *txnCommitter) canCommitInParallelWithWrites(
return false
}

// Similar to how we can't pipeline ranged writes, we also can't commit in
// parallel with them. The reason for this is that the status resolution
// process for STAGING transactions wouldn't know where to look for the
// intents.
// Check whether every request in the batch is compatable with a parallel
// commit. If any are incompatible then we cannot perform a parallel commit.
// We ignore the last request in the slice because we know it is the EndTxn.
for _, ru := range ba.Requests[:len(ba.Requests)-1] {
req := ru.GetInner()
if roachpb.IsTransactionWrite(req) && roachpb.IsRange(req) {
switch {
case roachpb.IsTransactionWrite(req):
if roachpb.IsRange(req) {
// Similar to how we can't pipeline ranged writes, we also can't
// commit in parallel with them. The reason for this is that the
// status resolution process for STAGING transactions wouldn't
// know where to look for the corresponding intents.
return false
}
// All other point writes are included in the EndTxn request's
// InFlightWrites set and are visible to the status resolution
// process for STAGING transactions. Populating InFlightWrites
// has already been done by the txnPipeliner.

case req.Method() == roachpb.QueryIntent:
// QueryIntent requests are compatable with parallel commits. The
// intents being queried are also attached to the EndTxn request's
// InFlightWrites set and are visible to the status resolution
// process for STAGING transactions. Populating InFlightWrites has
// already been done by the txnPipeliner.

default:
// All other request types, notably Get and Scan requests, are
// incompatible with parallel commits because their outcome is
// not taken into consideration by the status resolution process
// for STAGING transactions.
return false
}
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,35 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) {
br, pErr = tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send the same batch but with a point read instead of a point write.
// In-flight writes should not be attached because read-only requests
// cannot be parallelized with a commit.
ba.Requests = nil
getArgs := roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}
getArgs.Sequence = 2
etArgsCopy = etArgs
ba.Add(&getArgs, &qiArgs, &etArgsCopy)

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 3)
require.IsType(t, &roachpb.EndTransactionRequest{}, ba.Requests[2].GetInner())

et := ba.Requests[2].GetInner().(*roachpb.EndTransactionRequest)
require.True(t, et.Commit)
require.Len(t, et.IntentSpans, 2)
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, et.IntentSpans)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
return br, nil
})

br, pErr = tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
}

// TestTxnCommitterAsyncExplicitCommitTask verifies that when txnCommitter
Expand Down

0 comments on commit ede3096

Please sign in to comment.