diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 1a338f912179..4343653cce7f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -38,7 +38,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -2630,6 +2631,279 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } +type pushExpectation int + +const ( + // expectPusheeTxnRecovery means we're expecting transaction recovery to be + // performed (after finding a STAGING txn record). + expectPusheeTxnRecovery pushExpectation = iota + // expectPusheeTxnRecordNotFound means we're expecting the push to not find the + // pushee txn record. + expectPusheeTxnRecordNotFound + // dontExpectAnything means we're not going to check the state in which the + // pusher found the pushee's txn record. + dontExpectAnything +) + +type expectedTxnResolution int + +const ( + expectAborted expectedTxnResolution = iota + expectCommitted +) + +// checkPushResult pushes the specified txn and checks that the pushee's +// resolution is the expected one. +func checkPushResult( + ctx context.Context, + db *kv.DB, + txn roachpb.Transaction, + expResolution expectedTxnResolution, + pushExpectation pushExpectation, +) error { + pushReq := roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txn.Key, + }, + PusheeTxn: txn.TxnMeta, + PushTo: hlc.Timestamp{}, + PushType: roachpb.PUSH_ABORT, + // We're going to Force the push in order to not wait for the pushee to + // expire. + Force: true, + } + ba := roachpb.BatchRequest{} + ba.Add(&pushReq) + + recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace") + defer cancel() + + resp, pErr := db.NonTransactionalSender().Send(recCtx, ba) + if pErr != nil { + return pErr.GoError() + } + + var statusErr error + pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status + switch pusheeStatus { + case roachpb.ABORTED: + if expResolution != expectAborted { + statusErr = errors.Errorf("transaction unexpectedly aborted") + } + case roachpb.COMMITTED: + if expResolution != expectCommitted { + statusErr = errors.Errorf("transaction unexpectedly committed") + } + default: + return errors.Errorf("unexpected txn status: %s", pusheeStatus) + } + + // Verify that we're not fooling ourselves and that checking for the implicit + // commit actually caused the txn recovery procedure to run. + recording := collectRec() + var resolutionErr error + switch pushExpectation { + case expectPusheeTxnRecovery: + expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short()) + if _, ok := recording.FindLogMessage(expMsg); !ok { + resolutionErr = errors.Errorf( + "recovery didn't run as expected (missing \"%s\"). recording: %s", + expMsg, recording) + } + case expectPusheeTxnRecordNotFound: + expMsg := "pushee txn record not found" + if _, ok := recording.FindLogMessage(expMsg); !ok { + resolutionErr = errors.Errorf( + "push didn't run as expected (missing \"%s\"). recording: %s", + expMsg, recording) + } + case dontExpectAnything: + } + + return errors.CombineErrors(statusErr, resolutionErr) +} + +// Test that, even though at the kvserver level requests are not idempotent +// across an EndTxn, a TxnCoordSender retry of the final batch after a refresh +// still works fine. We check that a transaction is not considered implicitly +// committed through a combination of writes from a previous attempt of the +// EndTxn batch and a STAGING txn record written by a newer attempt of that +// batch. +// Namely, the scenario is as follows: +// 1. client sends CPut(a) + CPut(b) + EndTxn. The CPut(a) is split by the +// DistSender from the rest. Note that the parallel commit mechanism is in +// effect here. +// 2. One of the two sides gets a WriteTooOldError, the other succeeds. +// The client needs to refresh. +// 3. The refresh succeeds. +// 4. The client resends the whole batch (note that we don't keep track of the +// previous partial success). +// 5. The batch is split again, and one of the two sides fails. +// +// This tests checks that, for the different combinations of failures across the +// two attempts of the request, the transaction is not erroneously considered to +// be committed. We don't want an intent laid down by the first attempt to +// satisfy a STAGING record from the 2nd attempt, or the other way around (an +// intent written in the 2nd attempt satisfying a STAGING record written on the +// first attempt). See subtests for more details. +func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + + var filterFn atomic.Value + var storeKnobs kvserver.StoreTestingKnobs + storeKnobs.EvalKnobs.TestingEvalFilter = + func(fArgs storagebase.FilterArgs) *roachpb.Error { + fnVal := filterFn.Load() + if fn, ok := fnVal.(func(storagebase.FilterArgs) *roachpb.Error); ok && fn != nil { + return fn(fArgs) + } + return nil + } + + // The left side is CPut(a), the right side is CPut(b)+EndTxn(STAGING). + type side int + const ( + left side = iota + right + ) + + testCases := []struct { + // sidePushedOnFirstAttempt controls which sub-batch will return a + // WriteTooOldError on the first attempt. + sidePushedOnFirstAttempt side + sideRejectedOnSecondAttempt side + txnRecExpectation pushExpectation + }{ + { + // On the first attempt, the left side succeeds in laying down an intent, + // while the right side fails. On the 2nd attempt, the right side succeeds + // while the left side fails. + // + // The point of this test is to check that the txn is not considered to be + // implicitly committed at this point. Handling this scenario requires + // special care. If we didn't do anything, then we'd end up with a STAGING + // txn record (from the second attempt of the request) and an intent on + // "a" from the first attempt. That intent would have a lower timestamp + // than the txn record and so the txn would be considered explicitly + // committed. If the txn were to be considered implicitly committed, and + // the intent on "a" was resolved, then write on a (when it eventually + // evaluates) might return wrong results, or be pushed, or generally get + // very confused about how its own transaction got committed already. + // + // We handle this scenario by disabling the parallel commit on the + // request's 2nd attempt. Thus, the EndTxn will be split from all the + // other requests, and the txn record is never written if anything fails. + sidePushedOnFirstAttempt: right, + sideRejectedOnSecondAttempt: left, + // The first attempt of right side contains a parallel commit (i.e. an + // EndTxn), but fails. The 2nd attempt of the right side will no longer + // contain an EndTxn, as explained above. So we expect the txn record to + // not exist. + txnRecExpectation: expectPusheeTxnRecordNotFound, + }, + { + // On the first attempt, the right side succeed in writing a STAGING txn + // record, but the left side fails. On the second attempt, the right side + // is rejected. + // + // The point of this test is to check that the txn is not considered + // implicitly committed at this point. All the intents are in place for + // the txn to be considered committed, but we rely on the fact that the + // intent on "a" has a timestamp that's too high (it gets the timestamp + // from the 2nd attempt, after a refresh, but the STAGING txn record has + // an older timestamp). If the txn were to be considered implicitly + // committed, it'd be bad as we are returning an error to the client + // telling it that the EndTxn failed. + sidePushedOnFirstAttempt: left, + sideRejectedOnSecondAttempt: right, + // The first attempt of the right side writes a STAGING txn record, so we + // expect to perform txn recovery. + txnRecExpectation: expectPusheeTxnRecovery, + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + s, _, db := serverutils.StartServer(t, + base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + keyA, keyA1, keyB, keyB1 := roachpb.Key("a"), roachpb.Key("a1"), roachpb.Key("b"), roachpb.Key("b1") + require.NoError(t, setupMultipleRanges(ctx, db, string(keyB))) + + origValA := roachpb.MakeValueFromString("initA") + require.NoError(t, db.Put(ctx, keyA, &origValA)) + origValB := roachpb.MakeValueFromString("initA") + require.NoError(t, db.Put(ctx, keyB, &origValB)) + + txn := db.NewTxn(ctx, "test txn") + + // Do a write to anchor the txn on b's range. + require.NoError(t, txn.Put(ctx, keyB1, "b1")) + + // Take a snapshot of the txn early. We'll use it when verifying if the txn is + // implicitly committed. If we didn't use this early snapshot and, instead, + // used the transaction with a bumped timestamp, then the push code would + // infer that the txn is not implicitly committed without actually running the + // recovery procedure. Using this snapshot mimics a pusher that ran into an + // old intent. + origTxn := txn.TestingCloneTxn() + + // Do a read to prevent the txn for performing server-side refreshes. + _, err := txn.Get(ctx, keyA1) + require.NoError(t, err) + + // After the txn started, do a conflicting read. This will cause one of + // the txn's upcoming CPuts to return a WriteTooOldError on the first + // attempt, causing in turn to refresh and a retry. Note that, being + // CPuts, the pushed writes don't defer the error by returning the + // WriteTooOld flag instead of a WriteTooOldError. + var readKey roachpb.Key + if tc.sidePushedOnFirstAttempt == left { + readKey = keyA + } else { + readKey = keyB + } + _, err = db.Get(ctx, readKey) + require.NoError(t, err) + + b := txn.NewBatch() + b.CPut(keyA, "a", &origValA) + b.CPut(keyB, "b", &origValB) + + var secondAttemptRejectKey roachpb.Key + if tc.sideRejectedOnSecondAttempt == left { + secondAttemptRejectKey = keyA + } else { + secondAttemptRejectKey = keyB + } + + // Install a filter which will reject requests touching + // secondAttemptRejectKey on the retry. + var count int32 + filterFn.Store(func(args storagebase.FilterArgs) *roachpb.Error { + put, ok := args.Req.(*roachpb.ConditionalPutRequest) + if !ok { + return nil + } + if !put.Key.Equal(secondAttemptRejectKey) { + return nil + } + count++ + // Reject the right request on the 2nd attempt. + if count == 2 { + return roachpb.NewErrorf("injected error; test rejecting request") + } + return nil + }) + + require.Error(t, txn.CommitInBatch(ctx, b), "injected") + require.NoError(t, checkPushResult(ctx, db, *origTxn, expectAborted, tc.txnRecExpectation)) + }) + } +} + // Test that we're being smart about the timestamp ranges that need to be // refreshed: when span are refreshed, they only need to be checked for writes // above the previous time when they've been refreshed, not from the diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 0c00d7438eef..1053c088d15e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -138,8 +138,34 @@ func (tc *txnCommitter) SendLocked( // set. This is the only place where EndTxnRequest.Key is assigned, but we // could be dealing with a re-issued batch after a refresh. Remember, the // committer is below the span refresh on the interceptor stack. + var etAttempt endTxnAttempt if et.Key == nil { et.Key = ba.Txn.Key + etAttempt = endTxnFirstAttempt + } else { + // If this is a retry, we'll disable parallel commit. Since the previous + // attempt might have partially succeeded (i.e. the batch might have been + // split into sub-batches and some of them might have evaluated + // successfully), there might be intents laying around. If we'd perform a + // parallel commit, and the batch gets split again, and the STAGING txn + // record were written before we evaluate some of the other sub-batche. We + // could technically enter the "implicitly committed" state before all the + // sub-batches are evaluated and this is problematic: there's a race between + // evaluating those requests and other pushers coming along and + // transitioning the txn to explicitly committed (and cleaning up all the + // intents), and the evaluations of the outstanding sub-batches. If the + // randos win, then the re-evaluations will fail because we don't have + // idempotency of evaluations across a txn commit (for example, the + // re-evaluations might notice that their transaction is already committed + // and get confused). + etAttempt = endTxnRetry + if len(et.InFlightWrites) > 0 { + // Make a copy of the EndTxn, since we're going to change it below to + // disable the parallel commit. + etCpy := *et + ba.Requests[len(ba.Requests)-1].SetInner(&etCpy) + et = &etCpy + } } // Determine whether the commit request can be run in parallel with the rest @@ -147,7 +173,7 @@ func (tc *txnCommitter) SendLocked( // attached to the EndTxn request to the LockSpans and clear the in-flight // write set; no writes will be in-flight concurrently with the EndTxn // request. - if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) { + if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et, etAttempt) { // NB: when parallel commits is disabled, this is the best place to // detect whether the batch has only distinct spans. We can set this // flag based on whether any of previously declared in-flight writes @@ -274,17 +300,33 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn( return br, nil } +// endTxnAttempt specifies whether it's the first time that we're attempting to +// evaluate an EndTxn request or whether it's a retry (i.e. after a successful +// refresh). There are some precautions we need to take when sending out +// retries. +type endTxnAttempt int + +const ( + endTxnFirstAttempt endTxnAttempt = iota + endTxnRetry +) + // canCommitInParallel determines whether the batch can issue its committing // EndTxn in parallel with the rest of its requests and with any in-flight // writes, which all should have corresponding QueryIntent requests in the // batch. func (tc *txnCommitter) canCommitInParallel( - ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, + ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, etAttempt endTxnAttempt, ) bool { if !parallelCommitsEnabled.Get(&tc.st.SV) { return false } + if etAttempt == endTxnRetry { + log.VEventf(ctx, 2, "retrying batch not eligible for parallel commit") + return false + } + // We're trying to parallel commit, not parallel abort. if !et.Commit { return false diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index 172b6b34510a..f986b6dccf64 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -459,3 +459,48 @@ func TestTxnCommitterRetryAfterStaging(t *testing.T) { require.Equal(t, expReason, pErr.GetDetail().(*roachpb.TransactionRetryError).Reason) }) } + +// Test that parallel commits are inhibited on retries (i.e. after a successful +// refresh caused by a parallel-commit batch). See comments in the interceptor +// about why this is necessary. +func TestTxnCommitterNoParallelCommitsOnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tc, mockSender := makeMockTxnCommitter() + defer tc.stopper.Stop(ctx) + + txn := makeTxnProto() + keyA := roachpb.Key("a") + + var ba roachpb.BatchRequest + ba.Header = roachpb.Header{Txn: &txn} + putArgs := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}} + etArgs := roachpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} + + // Pretend that this is a retry of the request (after a successful refresh). Having the key + // assigned is how the interceptor distinguishes retries. + etArgs.Key = keyA + + ba.Add(&putArgs, &etArgs) + + mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + require.Len(t, ba.Requests, 2) + require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + + et := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest) + require.True(t, et.Commit) + require.Len(t, et.InFlightWrites, 0, "expected parallel commit to be inhibited") + + br := ba.CreateReply() + br.Txn = ba.Txn + br.Txn.Status = roachpb.COMMITTED + return br, nil + }) + + _, pErr := tc.SendLocked(ctx, ba) + require.Nil(t, pErr) +} diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index de0141360a76..778b86a2a27a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -260,17 +260,13 @@ func PushTxn( recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) { err := roachpb.NewIndeterminateCommitError(reply.PusheeTxn) - if log.V(1) { - log.Infof(ctx, "%v", err) - } + log.VEventf(ctx, 1, "%v", err) return result.Result{}, err } if !pusherWins { err := roachpb.NewTransactionPushError(reply.PusheeTxn) - if log.V(1) { - log.Infof(ctx, "%v", err) - } + log.VEventf(ctx, 1, "%v", err) return result.Result{}, err }