From 2f0a029fcac9f7017a8fbe9994af7e46865e7250 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 24 Mar 2020 17:45:01 -0400 Subject: [PATCH] kvclient/kvcoord: inhibit parallel commit when retrying EndTxn request The scenario that this patch addresses is the following (from #46431): 1. txn1 sends Put(a) + Put(b) + EndTxn 2. DistSender splits the Put(a) from the rest. 3. Put(a) succeeds, but the rest catches some retriable error. 4. TxnCoordSender gets the retriable error. The fact that a sub-batch succeeded is lost. We used to care about that fact, but we've successively gotten rid of that tracking across #35140 and #44661. 5. we refresh everything that came before this batch. The refresh succeeds. 6. we re-send the batch. It gets split again. The part with the EndTxn executes first. The transaction is now STAGING. More than that, the txn is in fact implicitly committed - the intent on a is already there since the previous attempt and, because it's at a lower timestamp than the txn record, it counts as golden for the purposes of verifying the implicit commit condition. 7. some other transaction wonders in, sees that txn1 is in its way, and transitions it to explicitly committed. 8. the Put(a) now tries to evaluate. It gets really confused. I guess that different things can happen; none of them good. One thing that I believe we've observed in #46299 is that, if there's another txn's intent there already, the Put will try to push it, enter the txnWaitQueue, eventually observe that its own txn is committed and return an error. The client thus gets an error (and a non-ambiguous one to boot) although the txn is committed. Even worse perhaps, I think it's possible for a request to return wrong results instead of an error. This patch fixes it by inhibiting the parallel commit when the EndTxn batch is retried. This way, there's never a STAGING record. Release note (bug fix): A rare bug causing errors to be returned for successfully committed transactions was fixed. The most common error message was "TransactionStatusError: already committed". Release justification: serious bug fix Fixes #46341 --- .../kvcoord/dist_sender_server_test.go | 276 +++++++++++++++++- .../kvcoord/txn_interceptor_committer.go | 46 ++- .../kvcoord/txn_interceptor_committer_test.go | 45 +++ pkg/kv/kvserver/batcheval/cmd_push_txn.go | 8 +- 4 files changed, 366 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 1a338f912179..4343653cce7f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -38,7 +38,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -2630,6 +2631,279 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } +type pushExpectation int + +const ( + // expectPusheeTxnRecovery means we're expecting transaction recovery to be + // performed (after finding a STAGING txn record). + expectPusheeTxnRecovery pushExpectation = iota + // expectPusheeTxnRecordNotFound means we're expecting the push to not find the + // pushee txn record. + expectPusheeTxnRecordNotFound + // dontExpectAnything means we're not going to check the state in which the + // pusher found the pushee's txn record. + dontExpectAnything +) + +type expectedTxnResolution int + +const ( + expectAborted expectedTxnResolution = iota + expectCommitted +) + +// checkPushResult pushes the specified txn and checks that the pushee's +// resolution is the expected one. +func checkPushResult( + ctx context.Context, + db *kv.DB, + txn roachpb.Transaction, + expResolution expectedTxnResolution, + pushExpectation pushExpectation, +) error { + pushReq := roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txn.Key, + }, + PusheeTxn: txn.TxnMeta, + PushTo: hlc.Timestamp{}, + PushType: roachpb.PUSH_ABORT, + // We're going to Force the push in order to not wait for the pushee to + // expire. + Force: true, + } + ba := roachpb.BatchRequest{} + ba.Add(&pushReq) + + recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace") + defer cancel() + + resp, pErr := db.NonTransactionalSender().Send(recCtx, ba) + if pErr != nil { + return pErr.GoError() + } + + var statusErr error + pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status + switch pusheeStatus { + case roachpb.ABORTED: + if expResolution != expectAborted { + statusErr = errors.Errorf("transaction unexpectedly aborted") + } + case roachpb.COMMITTED: + if expResolution != expectCommitted { + statusErr = errors.Errorf("transaction unexpectedly committed") + } + default: + return errors.Errorf("unexpected txn status: %s", pusheeStatus) + } + + // Verify that we're not fooling ourselves and that checking for the implicit + // commit actually caused the txn recovery procedure to run. + recording := collectRec() + var resolutionErr error + switch pushExpectation { + case expectPusheeTxnRecovery: + expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short()) + if _, ok := recording.FindLogMessage(expMsg); !ok { + resolutionErr = errors.Errorf( + "recovery didn't run as expected (missing \"%s\"). recording: %s", + expMsg, recording) + } + case expectPusheeTxnRecordNotFound: + expMsg := "pushee txn record not found" + if _, ok := recording.FindLogMessage(expMsg); !ok { + resolutionErr = errors.Errorf( + "push didn't run as expected (missing \"%s\"). recording: %s", + expMsg, recording) + } + case dontExpectAnything: + } + + return errors.CombineErrors(statusErr, resolutionErr) +} + +// Test that, even though at the kvserver level requests are not idempotent +// across an EndTxn, a TxnCoordSender retry of the final batch after a refresh +// still works fine. We check that a transaction is not considered implicitly +// committed through a combination of writes from a previous attempt of the +// EndTxn batch and a STAGING txn record written by a newer attempt of that +// batch. +// Namely, the scenario is as follows: +// 1. client sends CPut(a) + CPut(b) + EndTxn. The CPut(a) is split by the +// DistSender from the rest. Note that the parallel commit mechanism is in +// effect here. +// 2. One of the two sides gets a WriteTooOldError, the other succeeds. +// The client needs to refresh. +// 3. The refresh succeeds. +// 4. The client resends the whole batch (note that we don't keep track of the +// previous partial success). +// 5. The batch is split again, and one of the two sides fails. +// +// This tests checks that, for the different combinations of failures across the +// two attempts of the request, the transaction is not erroneously considered to +// be committed. We don't want an intent laid down by the first attempt to +// satisfy a STAGING record from the 2nd attempt, or the other way around (an +// intent written in the 2nd attempt satisfying a STAGING record written on the +// first attempt). See subtests for more details. +func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + + var filterFn atomic.Value + var storeKnobs kvserver.StoreTestingKnobs + storeKnobs.EvalKnobs.TestingEvalFilter = + func(fArgs storagebase.FilterArgs) *roachpb.Error { + fnVal := filterFn.Load() + if fn, ok := fnVal.(func(storagebase.FilterArgs) *roachpb.Error); ok && fn != nil { + return fn(fArgs) + } + return nil + } + + // The left side is CPut(a), the right side is CPut(b)+EndTxn(STAGING). + type side int + const ( + left side = iota + right + ) + + testCases := []struct { + // sidePushedOnFirstAttempt controls which sub-batch will return a + // WriteTooOldError on the first attempt. + sidePushedOnFirstAttempt side + sideRejectedOnSecondAttempt side + txnRecExpectation pushExpectation + }{ + { + // On the first attempt, the left side succeeds in laying down an intent, + // while the right side fails. On the 2nd attempt, the right side succeeds + // while the left side fails. + // + // The point of this test is to check that the txn is not considered to be + // implicitly committed at this point. Handling this scenario requires + // special care. If we didn't do anything, then we'd end up with a STAGING + // txn record (from the second attempt of the request) and an intent on + // "a" from the first attempt. That intent would have a lower timestamp + // than the txn record and so the txn would be considered explicitly + // committed. If the txn were to be considered implicitly committed, and + // the intent on "a" was resolved, then write on a (when it eventually + // evaluates) might return wrong results, or be pushed, or generally get + // very confused about how its own transaction got committed already. + // + // We handle this scenario by disabling the parallel commit on the + // request's 2nd attempt. Thus, the EndTxn will be split from all the + // other requests, and the txn record is never written if anything fails. + sidePushedOnFirstAttempt: right, + sideRejectedOnSecondAttempt: left, + // The first attempt of right side contains a parallel commit (i.e. an + // EndTxn), but fails. The 2nd attempt of the right side will no longer + // contain an EndTxn, as explained above. So we expect the txn record to + // not exist. + txnRecExpectation: expectPusheeTxnRecordNotFound, + }, + { + // On the first attempt, the right side succeed in writing a STAGING txn + // record, but the left side fails. On the second attempt, the right side + // is rejected. + // + // The point of this test is to check that the txn is not considered + // implicitly committed at this point. All the intents are in place for + // the txn to be considered committed, but we rely on the fact that the + // intent on "a" has a timestamp that's too high (it gets the timestamp + // from the 2nd attempt, after a refresh, but the STAGING txn record has + // an older timestamp). If the txn were to be considered implicitly + // committed, it'd be bad as we are returning an error to the client + // telling it that the EndTxn failed. + sidePushedOnFirstAttempt: left, + sideRejectedOnSecondAttempt: right, + // The first attempt of the right side writes a STAGING txn record, so we + // expect to perform txn recovery. + txnRecExpectation: expectPusheeTxnRecovery, + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + s, _, db := serverutils.StartServer(t, + base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + keyA, keyA1, keyB, keyB1 := roachpb.Key("a"), roachpb.Key("a1"), roachpb.Key("b"), roachpb.Key("b1") + require.NoError(t, setupMultipleRanges(ctx, db, string(keyB))) + + origValA := roachpb.MakeValueFromString("initA") + require.NoError(t, db.Put(ctx, keyA, &origValA)) + origValB := roachpb.MakeValueFromString("initA") + require.NoError(t, db.Put(ctx, keyB, &origValB)) + + txn := db.NewTxn(ctx, "test txn") + + // Do a write to anchor the txn on b's range. + require.NoError(t, txn.Put(ctx, keyB1, "b1")) + + // Take a snapshot of the txn early. We'll use it when verifying if the txn is + // implicitly committed. If we didn't use this early snapshot and, instead, + // used the transaction with a bumped timestamp, then the push code would + // infer that the txn is not implicitly committed without actually running the + // recovery procedure. Using this snapshot mimics a pusher that ran into an + // old intent. + origTxn := txn.TestingCloneTxn() + + // Do a read to prevent the txn for performing server-side refreshes. + _, err := txn.Get(ctx, keyA1) + require.NoError(t, err) + + // After the txn started, do a conflicting read. This will cause one of + // the txn's upcoming CPuts to return a WriteTooOldError on the first + // attempt, causing in turn to refresh and a retry. Note that, being + // CPuts, the pushed writes don't defer the error by returning the + // WriteTooOld flag instead of a WriteTooOldError. + var readKey roachpb.Key + if tc.sidePushedOnFirstAttempt == left { + readKey = keyA + } else { + readKey = keyB + } + _, err = db.Get(ctx, readKey) + require.NoError(t, err) + + b := txn.NewBatch() + b.CPut(keyA, "a", &origValA) + b.CPut(keyB, "b", &origValB) + + var secondAttemptRejectKey roachpb.Key + if tc.sideRejectedOnSecondAttempt == left { + secondAttemptRejectKey = keyA + } else { + secondAttemptRejectKey = keyB + } + + // Install a filter which will reject requests touching + // secondAttemptRejectKey on the retry. + var count int32 + filterFn.Store(func(args storagebase.FilterArgs) *roachpb.Error { + put, ok := args.Req.(*roachpb.ConditionalPutRequest) + if !ok { + return nil + } + if !put.Key.Equal(secondAttemptRejectKey) { + return nil + } + count++ + // Reject the right request on the 2nd attempt. + if count == 2 { + return roachpb.NewErrorf("injected error; test rejecting request") + } + return nil + }) + + require.Error(t, txn.CommitInBatch(ctx, b), "injected") + require.NoError(t, checkPushResult(ctx, db, *origTxn, expectAborted, tc.txnRecExpectation)) + }) + } +} + // Test that we're being smart about the timestamp ranges that need to be // refreshed: when span are refreshed, they only need to be checked for writes // above the previous time when they've been refreshed, not from the diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 0c00d7438eef..1053c088d15e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -138,8 +138,34 @@ func (tc *txnCommitter) SendLocked( // set. This is the only place where EndTxnRequest.Key is assigned, but we // could be dealing with a re-issued batch after a refresh. Remember, the // committer is below the span refresh on the interceptor stack. + var etAttempt endTxnAttempt if et.Key == nil { et.Key = ba.Txn.Key + etAttempt = endTxnFirstAttempt + } else { + // If this is a retry, we'll disable parallel commit. Since the previous + // attempt might have partially succeeded (i.e. the batch might have been + // split into sub-batches and some of them might have evaluated + // successfully), there might be intents laying around. If we'd perform a + // parallel commit, and the batch gets split again, and the STAGING txn + // record were written before we evaluate some of the other sub-batche. We + // could technically enter the "implicitly committed" state before all the + // sub-batches are evaluated and this is problematic: there's a race between + // evaluating those requests and other pushers coming along and + // transitioning the txn to explicitly committed (and cleaning up all the + // intents), and the evaluations of the outstanding sub-batches. If the + // randos win, then the re-evaluations will fail because we don't have + // idempotency of evaluations across a txn commit (for example, the + // re-evaluations might notice that their transaction is already committed + // and get confused). + etAttempt = endTxnRetry + if len(et.InFlightWrites) > 0 { + // Make a copy of the EndTxn, since we're going to change it below to + // disable the parallel commit. + etCpy := *et + ba.Requests[len(ba.Requests)-1].SetInner(&etCpy) + et = &etCpy + } } // Determine whether the commit request can be run in parallel with the rest @@ -147,7 +173,7 @@ func (tc *txnCommitter) SendLocked( // attached to the EndTxn request to the LockSpans and clear the in-flight // write set; no writes will be in-flight concurrently with the EndTxn // request. - if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) { + if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et, etAttempt) { // NB: when parallel commits is disabled, this is the best place to // detect whether the batch has only distinct spans. We can set this // flag based on whether any of previously declared in-flight writes @@ -274,17 +300,33 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn( return br, nil } +// endTxnAttempt specifies whether it's the first time that we're attempting to +// evaluate an EndTxn request or whether it's a retry (i.e. after a successful +// refresh). There are some precautions we need to take when sending out +// retries. +type endTxnAttempt int + +const ( + endTxnFirstAttempt endTxnAttempt = iota + endTxnRetry +) + // canCommitInParallel determines whether the batch can issue its committing // EndTxn in parallel with the rest of its requests and with any in-flight // writes, which all should have corresponding QueryIntent requests in the // batch. func (tc *txnCommitter) canCommitInParallel( - ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, + ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, etAttempt endTxnAttempt, ) bool { if !parallelCommitsEnabled.Get(&tc.st.SV) { return false } + if etAttempt == endTxnRetry { + log.VEventf(ctx, 2, "retrying batch not eligible for parallel commit") + return false + } + // We're trying to parallel commit, not parallel abort. if !et.Commit { return false diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index 172b6b34510a..f986b6dccf64 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -459,3 +459,48 @@ func TestTxnCommitterRetryAfterStaging(t *testing.T) { require.Equal(t, expReason, pErr.GetDetail().(*roachpb.TransactionRetryError).Reason) }) } + +// Test that parallel commits are inhibited on retries (i.e. after a successful +// refresh caused by a parallel-commit batch). See comments in the interceptor +// about why this is necessary. +func TestTxnCommitterNoParallelCommitsOnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tc, mockSender := makeMockTxnCommitter() + defer tc.stopper.Stop(ctx) + + txn := makeTxnProto() + keyA := roachpb.Key("a") + + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + putArgs := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}} + etArgs := roachpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} + + // Pretend that this is a retry of the request (after a successful refresh). Having the key + // assigned is how the interceptor distinguishes retries. + etArgs.Key = keyA + + ba.Add(&putArgs, &etArgs) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 2) + require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + + et := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest) + require.True(t, et.Commit) + require.Len(t, et.InFlightWrites, 0, "expected parallel commit to be inhibited") + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + return br, nil + }) + + _, pErr := tc.SendLocked(ctx, ba) + require.Nil(t, pErr) +} diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index de0141360a76..778b86a2a27a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -260,17 +260,13 @@ func PushTxn( recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) { err := roachpb.NewIndeterminateCommitError(reply.PusheeTxn) - if log.V(1) { - log.Infof(ctx, "%v", err) - } + log.VEventf(ctx, 1, "%v", err) return result.Result{}, err } if !pusherWins { err := roachpb.NewTransactionPushError(reply.PusheeTxn) - if log.V(1) { - log.Infof(ctx, "%v", err) - } + log.VEventf(ctx, 1, "%v", err) return result.Result{}, err }