diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 3b0d72c91eff..89c1f220b6b8 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -839,7 +839,7 @@ func checkInProgressBackupRestore( var allowResponse chan struct{} params := base.TestClusterArgs{} params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { for _, ru := range br.Responses { switch ru.GetInner().(type) { case *roachpb.ExportResponse, *roachpb.ImportResponse: @@ -3595,7 +3595,9 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) { params := base.TestClusterArgs{} params.ServerArgs.ExternalIODir = dir params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + TestingResponseFilter: func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { for _, ru := range br.Responses { switch ru.GetInner().(type) { case *roachpb.ExportResponse, *roachpb.ImportResponse: diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 116be2866487..2a179e2d0b7a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -34,11 +34,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -2883,98 +2883,6 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } -type pushExpectation int - -const ( - // expectPusheeTxnRecovery means we're expecting transaction recovery to be - // performed (after finding a STAGING txn record). - expectPusheeTxnRecovery pushExpectation = iota - // expectPusheeTxnRecordNotFound means we're expecting the push to not find the - // pushee txn record. - expectPusheeTxnRecordNotFound - // dontExpectAnything means we're not going to check the state in which the - // pusher found the pushee's txn record. - dontExpectAnything -) - -type expectedTxnResolution int - -const ( - expectAborted expectedTxnResolution = iota - expectCommitted -) - -// checkPushResult pushes the specified txn and checks that the pushee's -// resolution is the expected one. -func checkPushResult( - ctx context.Context, - db *kv.DB, - txn roachpb.Transaction, - expResolution expectedTxnResolution, - pushExpectation pushExpectation, -) error { - pushReq := roachpb.PushTxnRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: txn.Key, - }, - PusheeTxn: txn.TxnMeta, - PushTo: hlc.Timestamp{}, - PushType: roachpb.PUSH_ABORT, - // We're going to Force the push in order to not wait for the pushee to - // expire. - Force: true, - } - ba := roachpb.BatchRequest{} - ba.Add(&pushReq) - - recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace") - defer cancel() - - resp, pErr := db.NonTransactionalSender().Send(recCtx, ba) - if pErr != nil { - return pErr.GoError() - } - - var statusErr error - pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status - switch pusheeStatus { - case roachpb.ABORTED: - if expResolution != expectAborted { - statusErr = errors.Errorf("transaction unexpectedly aborted") - } - case roachpb.COMMITTED: - if expResolution != expectCommitted { - statusErr = errors.Errorf("transaction unexpectedly committed") - } - default: - return errors.Errorf("unexpected txn status: %s", pusheeStatus) - } - - // Verify that we're not fooling ourselves and that checking for the implicit - // commit actually caused the txn recovery procedure to run. - recording := collectRec() - var resolutionErr error - switch pushExpectation { - case expectPusheeTxnRecovery: - expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short()) - if _, ok := recording.FindLogMessage(expMsg); !ok { - resolutionErr = errors.Errorf( - "recovery didn't run as expected (missing \"%s\"). recording: %s", - expMsg, recording) - } - case expectPusheeTxnRecordNotFound: - expMsg := "pushee txn record not found" - if _, ok := recording.FindLogMessage(expMsg); !ok { - resolutionErr = errors.Errorf( - "push didn't run as expected (missing \"%s\"). recording: %s", - expMsg, recording) - } - case dontExpectAnything: - } - - return errors.CombineErrors(statusErr, resolutionErr) -} - // Test that, even though at the kvserver level requests are not idempotent // across an EndTxn, a TxnCoordSender retry of the final batch after a refresh // still works fine. We check that a transaction is not considered implicitly @@ -3024,7 +2932,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) { // WriteTooOldError on the first attempt. sidePushedOnFirstAttempt side sideRejectedOnSecondAttempt side - txnRecExpectation pushExpectation + txnRecExpectation kvclientutils.PushExpectation }{ { // On the first attempt, the left side succeeds in laying down an intent, @@ -3051,7 +2959,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) { // EndTxn), but fails. The 2nd attempt of the right side will no longer // contain an EndTxn, as explained above. So we expect the txn record to // not exist. - txnRecExpectation: expectPusheeTxnRecordNotFound, + txnRecExpectation: kvclientutils.ExpectPusheeTxnRecordNotFound, }, { // On the first attempt, the right side succeed in writing a STAGING txn @@ -3070,7 +2978,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) { sideRejectedOnSecondAttempt: right, // The first attempt of the right side writes a STAGING txn record, so we // expect to perform txn recovery. - txnRecExpectation: expectPusheeTxnRecovery, + txnRecExpectation: kvclientutils.ExpectPusheeTxnRecovery, }, } @@ -3151,7 +3059,9 @@ func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) { }) require.Error(t, txn.CommitInBatch(ctx, b), "injected") - require.NoError(t, checkPushResult(ctx, db, *origTxn, expectAborted, tc.txnRecExpectation)) + err = kvclientutils.CheckPushResult( + ctx, db, *origTxn, kvclientutils.ExpectAborted, tc.txnRecExpectation) + require.NoError(t, err) }) } } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index d7e878beb7a3..aa3bcb12b796 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -795,26 +795,37 @@ func (tc *TxnCoordSender) updateStateLocked( // Update our transaction with any information the error has. if errTxn := pErr.GetTxn(); errTxn != nil { - // Sanity checks. Finalized transactions are not supposed to get here. - if errTxn.Status != roachpb.PENDING { - if errTxn.Status == roachpb.COMMITTED { - // Finding out about our transaction being committed indicates a serious - // bug. Requests are not supposed to be sent on transactions after they - // are committed. - log.Errorf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn) - } else if errTxn.Status == roachpb.ABORTED { - // We only expect TransactionAbortedError to carry an aborted txn. In - // particular, the heartbeater doesn't like running when the transaction - // is know to be aborted. - log.Fatalf(ctx, "unexpected error with ABORTED txn: (%T) %s. ba: %s. txn: %s.", pErr.GoError(), pErr, ba, errTxn) - } + if errTxn.Status == roachpb.COMMITTED { + sanityCheckCommittedErr(ctx, pErr, ba) } - tc.mu.txn.Update(errTxn) } return pErr } +// sanityCheckCommittedErr verifies the circumstances in which we're receiving +// an error indicating a COMMITTED transaction. Only rollbacks should be +// encountering such errors. Marking a transaction as explicitly-committed can +// also encounter these errors, but those errors don't make it to the +// TxnCoordSender. +func sanityCheckCommittedErr(ctx context.Context, pErr *roachpb.Error, ba roachpb.BatchRequest) { + errTxn := pErr.GetTxn() + if errTxn == nil || errTxn.Status != roachpb.COMMITTED { + // We shouldn't have been called. + return + } + // The only case in which an error can have a COMMITTED transaction in it is + // when the request was a rollback. Rollbacks can race with commits if a + // context timeout expires while a commit request is in flight. + if ba.IsSingleAbortTxnRequest() { + return + } + // Finding out about our transaction being committed indicates a serious bug. + // Requests are not supposed to be sent on transactions after they are + // committed. + log.Fatalf(ctx, "transaction unexpectedly committed: %s. ba: %s. txn: %s.", pErr, ba, errTxn) +} + // setTxnAnchorKey sets the key at which to anchor the transaction record. The // transaction anchor key defaults to the first key written in a transaction. func (tc *TxnCoordSender) setTxnAnchorKeyLocked(key roachpb.Key) error { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 671b728faefb..6c57f190ebd1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -646,7 +646,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) { key := roachpb.Key("a") are := roachpb.NewAmbiguousResultError("very ambiguous") knobs := &kvserver.StoreTestingKnobs{ - TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { for _, req := range ba.Requests { if putReq, ok := req.GetInner().(*roachpb.PutRequest); ok && putReq.Key.Equal(key) { return roachpb.NewError(are) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go index f2ec557abd3a..3ae3309660bf 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go @@ -287,9 +287,12 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool { } if h.mu.txn.Status != roachpb.PENDING { - log.Fatalf(ctx, - "txn committed or aborted but heartbeat loop hasn't been signaled to stop. txn: %s", - h.mu.txn) + if h.mu.txn.Status == roachpb.COMMITTED { + log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn) + } + // If the transaction is aborted, there's no point in heartbeating. The + // client needs to send a rollback. + return false } // Clone the txn in order to put it in the heartbeat request. diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index cdd9aee01295..e669061177e4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1527,7 +1527,7 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { var mtc *multiTestContext storeCfg.TestingKnobs.TestingResponseFilter = func( - ba roachpb.BatchRequest, _ *roachpb.BatchResponse, + ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse, ) *roachpb.Error { del := ba.Requests[0].GetDelete() if del != nil && bytes.HasSuffix(del.Key, keys.LocalRangeDescriptorSuffix) && rand.Int()%4 == 0 { @@ -2777,7 +2777,9 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) { // This verifies that we're actually testing what we claim to. var sawMeta2Req int64 meta2CKey := keys.RangeMetaKey(cKey).AsRawKey() - storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + storeCfg.TestingKnobs.TestingResponseFilter = func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { for i, req := range ba.Requests { if g := req.GetGet(); g != nil && g.Key.Equal(meta2CKey) && br.Responses[i].GetGet().Value == nil { atomic.StoreInt64(&sawMeta2Req, 1) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index c973f3d6a66f..a4bf3c748302 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2597,7 +2597,7 @@ func TestAdminRelocateRangeSafety(t *testing.T) { var useSeenAdd atomic.Value useSeenAdd.Store(false) seenAdd := make(chan struct{}, 1) - responseFilter := func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + responseFilter := func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { if ba.IsSingleRequest() { changeReplicas, ok := ba.Requests[0].GetInner().(*roachpb.AdminChangeReplicasRequest) if ok && changeReplicas.Changes()[0].ChangeType == roachpb.ADD_REPLICA && useSeenAdd.Load().(bool) { diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 762c4fba6459..0e1ff0888053 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2813,7 +2813,7 @@ func TestStoreSplitRangeLookupRace(t *testing.T) { blockedRangeLookups := int32(0) rangeLookupIsBlocked := make(chan struct{}, 1) unblockRangeLookups := make(chan struct{}) - respFilter := func(ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + respFilter := func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { select { case <-blockRangeLookups: if kv.TestingIsRangeLookup(ba) && diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index d72074e08eb3..dd7a96ee5129 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -137,7 +137,9 @@ func TestCheckConsistencyReplay(t *testing.T) { } // Arrange to trigger a retry when a ComputeChecksum request arrives. - storeCfg.TestingKnobs.TestingResponseFilter = func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + storeCfg.TestingKnobs.TestingResponseFilter = func( + ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, + ) *roachpb.Error { state.Lock() defer state.Unlock() if ba.IsSingleComputeChecksumRequest() && !state.forcedRetry { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index df188d1dc3bd..6a416b7efde8 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -111,7 +111,7 @@ func (r *Replica) sendWithRangeID( log.Eventf(ctx, "replica.Send got error: %s", pErr) } else { if filter := r.store.cfg.TestingKnobs.TestingResponseFilter; filter != nil { - pErr = filter(*ba, br) + pErr = filter(ctx, *ba, br) } } return br, pErr diff --git a/pkg/kv/kvserver/storagebase/base.go b/pkg/kv/kvserver/storagebase/base.go index 489bd26338b1..586d00fad8e5 100644 --- a/pkg/kv/kvserver/storagebase/base.go +++ b/pkg/kv/kvserver/storagebase/base.go @@ -99,7 +99,7 @@ type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error) // ReplicaResponseFilter is used in unittests to modify the outbound // response returned to a waiting client after a replica command has // been processed. This filter is invoked only by the command proposer. -type ReplicaResponseFilter func(roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error +type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error // ContainsKey returns whether this range contains the specified key. func ContainsKey(desc *roachpb.RangeDescriptor, key roachpb.Key) bool { diff --git a/pkg/kv/main_test.go b/pkg/kv/main_test.go index bc6f075534e7..24a5781f4a5c 100644 --- a/pkg/kv/main_test.go +++ b/pkg/kv/main_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" ) func init() { @@ -26,6 +27,7 @@ func init() { func TestMain(m *testing.M) { serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) os.Exit(m.Run()) } diff --git a/pkg/kv/txn_external_test.go b/pkg/kv/txn_external_test.go new file mode 100644 index 000000000000..d0ac50a92f0f --- /dev/null +++ b/pkg/kv/txn_external_test.go @@ -0,0 +1,219 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kv_test + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// Test the behavior of a txn.Rollback() issued after txn.Commit() failing with +// an ambiguous error. +func TestRollbackAfterAmbiguousCommit(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + testCases := []struct { + name string + // The status of the transaction record at the time when we issue the + // rollback. + txnStatus roachpb.TransactionStatus + // If txnStatus == COMMITTED, setting txnRecordCleanedUp will make us + // cleanup the transaction. The txn record will be deleted. + txnRecordCleanedUp bool + // The error that we expect from txn.Rollback(). + expRollbackErr string + // Is the transaction expected to be committed or not after the + // txn.Rollback() call returns? + expCommitted bool + }{ + { + name: "txn cleaned up", + txnStatus: roachpb.COMMITTED, + txnRecordCleanedUp: true, + // The transaction will be committed, but at the same time the rollback + // will also appear to succeed (it'll be a no-op). This behavior is + // undesired. See #48302. + expCommitted: true, + expRollbackErr: "", + }, + { + name: "COMMITTED", + txnStatus: roachpb.COMMITTED, + txnRecordCleanedUp: false, + expCommitted: true, + expRollbackErr: "already committed", + }, + { + name: "STAGING", + txnStatus: roachpb.STAGING, + // The rollback succeeds. This behavior is undersired. See #48301. + expCommitted: false, + expRollbackErr: "", + }, + } + for _, testCase := range testCases { + if testCase.txnRecordCleanedUp { + require.Equal(t, roachpb.COMMITTED, testCase.txnStatus) + } + t.Run(testCase.name, func(t *testing.T) { + var filterSet int64 + var key roachpb.Key + commitBlocked := make(chan struct{}) + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // We're going to block the commit of the test's txn, letting the + // test cancel the request's ctx while the request is blocked. + TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + if atomic.LoadInt64(&filterSet) == 0 { + return nil + } + req, ok := ba.GetArg(roachpb.EndTxn) + if !ok { + return nil + } + commit := req.(*roachpb.EndTxnRequest) + if commit.Key.Equal(key) && commit.Commit { + // Inform the test that the commit is blocked. + commitBlocked <- struct{}{} + // Block until the client interrupts the commit. The client will + // cancel its context, at which point gRPC will return an error + // to the client and marshall the cancelation to the server. + <-ctx.Done() + } + return nil + }, + }, + }, + } + tci := serverutils.StartTestCluster(t, 2, base.TestClusterArgs{ServerArgs: args}) + tc := tci.(*testcluster.TestCluster) + defer tc.Stopper().Stop(ctx) + + key = tc.ScratchRange(t) + atomic.StoreInt64(&filterSet, 1) + + // This test uses a cluster with two nodes. It'll create a transaction + // having as a coordinator the node that's *not* the leaseholder for the + // range the txn is writing to. This is in order for the context + // cancelation scheme that we're going to employ to work: we need a + // non-local RPC such that canceling a requests context triggers an error + // for the request's sender without synchronizing with the request's + // evaluation. + rdesc := tc.LookupRangeOrFatal(t, key) + leaseHolder, err := tc.FindRangeLeaseHolder(rdesc, nil /* hint */) + require.NoError(t, err) + var db *kv.DB + if leaseHolder.NodeID == 1 { + db = tc.Servers[1].DB() + } else { + db = tc.Servers[0].DB() + } + + txn := db.NewTxn(ctx, "test") + require.NoError(t, txn.Put(ctx, key, "val")) + + // If the test wants the transaction to be committed and cleaned up, we'll + // do a read on the key we just wrote. That will act as a pipeline stall, + // resolving the in-flight write and eliminating the need for the STAGING + // status. + if testCase.txnStatus == roachpb.COMMITTED && testCase.txnRecordCleanedUp { + v, err := txn.Get(ctx, key) + require.NoError(t, err) + val, err := v.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "val", string(val)) + } + + // Send a commit request. It's going to get blocked after being evaluated, + // at which point we're going to cancel the request's ctx. The ctx + // cancelation will cause gRPC to interrupt the in-flight request, and the + // DistSender to return an ambiguous error. The transaction will be + // committed, through. + commitCtx, cancelCommit := context.WithCancel(ctx) + commitCh := make(chan error) + go func() { + commitCh <- txn.Commit(commitCtx) + }() + + select { + case <-commitBlocked: + case <-time.After(10 * time.Second): + t.Fatalf("commit not blocked") + } + + cancelCommit() + commitErr := <-commitCh + require.IsType(t, &roachpb.AmbiguousResultError{}, commitErr) + require.Regexp(t, `result is ambiguous \(context done during DistSender.Send: context canceled\)`, + commitErr) + + // If the test wants the upcoming rollback to find a COMMITTED record, + // we'll perform transaction recovery. This will leave the transaction in + // the COMMITTED state, without cleaning it up. + if !testCase.txnRecordCleanedUp && testCase.txnStatus == roachpb.COMMITTED { + // Sanity check - verify that the txn is STAGING. + txnProto := txn.TestingCloneTxn() + queryTxn := roachpb.QueryTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txnProto.Key, + }, + Txn: txnProto.TxnMeta, + } + b := kv.Batch{} + b.AddRawRequest(&queryTxn) + require.NoError(t, db.Run(ctx, &b)) + queryTxnRes := b.RawResponse().Responses[0].GetQueryTxn() + require.Equal(t, roachpb.STAGING, queryTxnRes.QueriedTxn.Status) + + // Perform transaction recovery. + require.NoError(t, kvclientutils.CheckPushResult(ctx, db, *txn.TestingCloneTxn(), + kvclientutils.ExpectCommitted, kvclientutils.ExpectPusheeTxnRecovery)) + } + + // Attempt the rollback and check its result. + rollbackErr := txn.Rollback(ctx) + if testCase.expRollbackErr == "" { + require.NoError(t, rollbackErr) + } else { + require.Regexp(t, testCase.expRollbackErr, rollbackErr) + } + + // Check the outcome of the transaction, independently from the outcome of + // the Rollback() above, by reading the value that it wrote. + committed := func() bool { + val, err := db.Get(ctx, key) + require.NoError(t, err) + if !val.Exists() { + return false + } + v, err := val.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "val", string(v)) + return true + }() + require.Equal(t, testCase.expCommitted, committed) + }) + } +} diff --git a/pkg/sql/ambiguous_commit_test.go b/pkg/sql/ambiguous_commit_test.go index 1df384f122a1..4d0e831b70e4 100644 --- a/pkg/sql/ambiguous_commit_test.go +++ b/pkg/sql/ambiguous_commit_test.go @@ -120,7 +120,9 @@ func TestAmbiguousCommit(t *testing.T) { if ambiguousSuccess { params.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingResponseFilter: func(args roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + TestingResponseFilter: func( + ctx context.Context, args roachpb.BatchRequest, _ *roachpb.BatchResponse, + ) *roachpb.Error { if req, ok := args.GetArg(roachpb.ConditionalPut); ok { return maybeRPCError(req.(*roachpb.ConditionalPutRequest)) } diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index f09c25883059..f4e8d50889a5 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -104,7 +104,7 @@ func RunJob( // related to bulk IO/backup/restore/import: Export, Import and AddSSTable. See // discussion on RunJob for where this might be useful. func BulkOpResponseFilter(allowProgressIota *chan struct{}) storagebase.ReplicaResponseFilter { - return func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + return func(_ context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { for _, ru := range br.Responses { switch ru.GetInner().(type) { case *roachpb.ExportResponse, *roachpb.ImportResponse, *roachpb.AddSSTableResponse: diff --git a/pkg/testutils/kvclientutils/txn_recovery.go b/pkg/testutils/kvclientutils/txn_recovery.go new file mode 100644 index 000000000000..44723e8883e1 --- /dev/null +++ b/pkg/testutils/kvclientutils/txn_recovery.go @@ -0,0 +1,124 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvclientutils + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/errors" +) + +// PushExpectation expresses an expectation for CheckPushResult about what the +// push did. +type PushExpectation int + +const ( + // ExpectPusheeTxnRecovery means we're expecting transaction recovery to be + // performed (after finding a STAGING txn record). + ExpectPusheeTxnRecovery PushExpectation = iota + // ExpectPusheeTxnRecordNotFound means we're expecting the push to not find the + // pushee txn record. + ExpectPusheeTxnRecordNotFound + // DontExpectAnything means we're not going to check the state in which the + // pusher found the pushee's txn record. + DontExpectAnything +) + +// ExpectedTxnResolution expresses an expectation for CheckPushResult about the +// outcome of the push. +type ExpectedTxnResolution int + +const ( + // ExpectAborted means that the pushee is expected to have been aborted. Note + // that a committed txn that has been cleaned up also results in an ABORTED + // result for a pusher. + ExpectAborted ExpectedTxnResolution = iota + // ExpectCommitted means that the pushee is expected to have found the pushee + // to be committed - or STAGING in which case the push will have performed + // successful transaction recovery. + ExpectCommitted +) + +// CheckPushResult pushes the specified txn and checks that the pushee's +// resolution is the expected one. +func CheckPushResult( + ctx context.Context, + db *kv.DB, + txn roachpb.Transaction, + expResolution ExpectedTxnResolution, + pushExpectation PushExpectation, +) error { + pushReq := roachpb.PushTxnRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: txn.Key, + }, + PusheeTxn: txn.TxnMeta, + PushTo: hlc.Timestamp{}, + PushType: roachpb.PUSH_ABORT, + // We're going to Force the push in order to not wait for the pushee to + // expire. + Force: true, + } + ba := roachpb.BatchRequest{} + ba.Add(&pushReq) + + recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace") + defer cancel() + + resp, pErr := db.NonTransactionalSender().Send(recCtx, ba) + if pErr != nil { + return pErr.GoError() + } + + var statusErr error + pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status + switch pusheeStatus { + case roachpb.ABORTED: + if expResolution != ExpectAborted { + statusErr = errors.Errorf("transaction unexpectedly aborted") + } + case roachpb.COMMITTED: + if expResolution != ExpectCommitted { + statusErr = errors.Errorf("transaction unexpectedly committed") + } + default: + return errors.Errorf("unexpected txn status: %s", pusheeStatus) + } + + // Verify that we're not fooling ourselves and that checking for the implicit + // commit actually caused the txn recovery procedure to run. + recording := collectRec() + var resolutionErr error + switch pushExpectation { + case ExpectPusheeTxnRecovery: + expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short()) + if _, ok := recording.FindLogMessage(expMsg); !ok { + resolutionErr = errors.Errorf( + "recovery didn't run as expected (missing \"%s\"). recording: %s", + expMsg, recording) + } + case ExpectPusheeTxnRecordNotFound: + expMsg := "pushee txn record not found" + if _, ok := recording.FindLogMessage(expMsg); !ok { + resolutionErr = errors.Errorf( + "push didn't run as expected (missing \"%s\"). recording: %s", + expMsg, recording) + } + case DontExpectAnything: + } + + return errors.CombineErrors(statusErr, resolutionErr) +} diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 99e21a8da4ef..cb5dba7b57e5 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -839,7 +839,7 @@ func ContextWithRecordingSpan( ) (retCtx context.Context, getRecording func() Recording, cancel func()) { tr := NewTracer() sp := tr.StartSpan(opName, Recordable, LogTagsFromCtx(ctx)) - StartRecording(sp, SingleNodeRecording) + StartRecording(sp, SnowballRecording) ctx, cancelCtx := context.WithCancel(ctx) ctx = opentracing.ContextWithSpan(ctx, sp)