diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 799bbc5fd92c..9931b5f87afc 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -382,7 +382,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { }) } - getInBatch := func(ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { + getInBatch := func(t *testing.T, ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { batch := txn.NewBatch() for _, key := range keys { batch.GetForUpdate(key) @@ -440,8 +440,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Filtering and logging annotations for the request interceptor. tMu.Lock() tMu.filter = func(req *interceptedReq) bool { - // Log all requests on txn1 or txn2, except for heartbeats. - if (req.txnName == "txn1" || req.txnName == "txn2") && !req.ba.IsSingleHeartbeatTxnRequest() { + // Log all requests on txn1/txn2/txn3, except for heartbeats. + if (req.txnName == "txn1" || req.txnName == "txn2" || req.txnName == "txn3") && !req.ba.IsSingleHeartbeatTxnRequest() { return true } @@ -477,10 +477,12 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { WHERE id IN ($1, $2)` const xferAmount = 10 + type opFn func(t *testing.T, name string) error + execWorkloadTxn := func(t *testing.T, name string) error { tCtx := context.Background() txn := db.NewTxn(tCtx, name) - vals := getInBatch(tCtx, txn, keyA, keyB) + vals := getInBatch(t, tCtx, txn, keyA, keyB) batch := txn.NewBatch() batch.Put(keyA, vals[0]-xferAmount) @@ -488,6 +490,14 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return txn.CommitInBatch(tCtx, batch) } + execLeaseMover := func(t *testing.T, name string) error { + desc, err := tc.LookupRange(keyB) + assert.NoError(t, err) + t.Logf("Transferring r%d lease to n%d", desc.RangeID, tc.Target(0).NodeID) + assert.NoError(t, tc.TransferRangeLease(desc, tc.Target(0))) + return nil + } + waitUntilReady := func(t *testing.T, name string, readyCh chan struct{}) (finishedWithoutTimeout bool) { select { case <-readyCh: @@ -499,7 +509,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { return true } - runTxn := func(t *testing.T, name string, wg *sync.WaitGroup, readyCh, doneCh chan struct{}, resultCh chan error) { + runConcurrentOp := func(t *testing.T, name string, execOp opFn, wg *sync.WaitGroup, readyCh, doneCh chan struct{}, resultCh chan error) { defer wg.Done() if doneCh != nil { defer close(doneCh) @@ -507,7 +517,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { if !waitUntilReady(t, name, readyCh) { return } - err := execWorkloadTxn(t, name) + err := execOp(t, name) if resultCh != nil { resultCh <- err } else { @@ -515,21 +525,50 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } } - runLeaseMover := func(t *testing.T, wg *sync.WaitGroup, readyCh chan struct{}, doneCh chan struct{}) { - defer wg.Done() - if !waitUntilReady(t, "lease mover", readyCh) { - return - } - - desc, err := tc.LookupRange(keyB) - assert.NoError(t, err) - t.Logf("Transferring r%d lease to n%d", desc.RangeID, tc.Target(0).NodeID) - assert.NoError(t, tc.TransferRangeLease(desc, tc.Target(0))) - - close(doneCh) + // The set of test cases that use the same request scheduling. + basicVariants := []struct { + name string + txn2Ops opFn + txn2PutsAfterTxn1 bool + }{ + { + name: "writer reader conflict", + txn2Ops: func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple Get on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _, err := txn.Get(tCtx, keyA) + assert.NoError(t, err) + assert.NoError(t, txn.Commit(ctx)) + return nil + }, + }, + { + name: "writer writer conflict", + txn2Ops: func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 performs simple Puts on conflicting keys, causing it to issue a + // PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + batch := txn.NewBatch() + batch.Put(keyA, 0) + batch.Put(keyB, 0) + assert.NoError(t, txn.CommitInBatch(ctx, batch)) + t.Logf("txn2 finished here") + return nil + }, + }, + { + name: "workload conflict", + txn2Ops: execWorkloadTxn, + txn2PutsAfterTxn1: true, + }, } - t.Run("workload conflict", func(t *testing.T) { + runBasicTestCase := func(t *testing.T, txn2Ops opFn, txn2PutsAfterTxn1 bool) { defer initSubTest(t)() // Checkpoints in test. @@ -547,9 +586,9 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Concurrent transactions. var wg sync.WaitGroup wg.Add(3) - go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) - go runTxn(t, "txn2", &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) - go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", txn2Ops, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) // KV Request sequencing. tMu.Lock() @@ -570,8 +609,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { close(txn2Ready) } - // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. - // 6. txn2->n2: Get(b) + // 5. txn2->n1: Get(a) OR Put(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n2: Get(b) OR Put(b) // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts // recovery. // 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we @@ -629,6 +668,301 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { req.pauseUntil(t, recoverComplete, cp) } + // + if txn2PutsAfterTxn1 && req.txnName == "txn2" && hasPut && cp == BeforeSending { + // While txn2's Puts can only occur after txn1 is marked as explicitly + // committed, if the Recovery and the subsequent txn2 Put(b) operations + // happen before txn1 retries its Put(b) on n1, it will encounter txn2's + // intent and get a WriteTooOld error instead of potentially being an + // idempotent replay. + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + } + + for _, variant := range basicVariants { + t.Run(variant.name, func(t *testing.T) { + runBasicTestCase(t, variant.txn2Ops, variant.txn2PutsAfterTxn1) + }) + } + + // Test cases with custom request scheduling. + t.Run("recovery before refresh fails", func(t *testing.T) { + keyAPrime := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("a'"))) + defer func() { + _, err := db.Del(ctx, keyAPrime) + require.NoError(t, err) + }() + + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Operation functions. + execTxn1 := func(t *testing.T, name string) error { + tCtx := context.Background() + txn := db.NewTxn(tCtx, name) + vals := getInBatch(t, tCtx, txn, keyA, keyB, keyAPrime) + + batch := txn.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + return txn.CommitInBatch(tCtx, batch) + } + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + txn := db.NewTxn(tCtx, name) + + // The intent from txn2 on a' will cause txn1's read refresh to fail. + assert.NoError(t, txn.Put(tCtx, keyAPrime, 100)) + _ = getInBatch(t, tCtx, txn, keyA) + assert.NoError(t, txn.Commit(tCtx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execTxn1, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, we can + // move the lease. + close(txn2Ready) + } + + // 5. txn2->n1: Put(a') + // 6. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Allow to proceed and finish. Since txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b,c) -- This fails due to txn2's intent on c. + // Causes the transaction coordinator to return a retriable error, + // although the transaction has been actually committed during recovery; + // a highly problematic bug. + + // + // This way we can prevent txn1's intents from being resolved too early. + if _, ok := req.ba.GetArg(kvpb.PushTxn); ok && cp == AfterSending && + resp != nil && resp.err == nil { + req.pauseUntil(t, txn1Done, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an transaction retry error from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.False(t, tErr.PrevTxnAborted()) + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check") + }) + t.Run("recovery after refresh fails", func(t *testing.T) { + keyC := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("c"))) + defer func() { + _, err := db.Del(ctx, keyC) + require.NoError(t, err) + }() + + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + otherTxnsReady := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + execTxn1 := func(t *testing.T, name string) error { + tCtx := context.Background() + txn := db.NewTxn(tCtx, name) + vals := getInBatch(t, tCtx, txn, keyA, keyB, keyC) + + batch := txn.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + return txn.CommitInBatch(tCtx, batch) + } + execOtherTxns := func(t *testing.T, name string) error { + tCtx := context.Background() + + // The intent from txn3 will cause txn1's read refresh to fail. + txn3 := db.NewTxn(tCtx, "txn3") + batch := txn3.NewBatch() + batch.Put(keyC, 100) + assert.NoError(t, txn3.CommitInBatch(tCtx, batch)) + + txn2 := db.NewTxn(tCtx, "txn2") + vals := getInBatch(t, tCtx, txn2, keyA, keyB) + + batch = txn2.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + assert.NoError(t, txn2.CommitInBatch(tCtx, batch)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execTxn1, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2/txn3", execOtherTxns, &wg, otherTxnsReady, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b), Get(c) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2/txn3 can start. + close(otherTxnsReady) + } + + // 5. txn3->n2: Put(c), EndTxn(parallel commit) + // 6. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 7. txn2->n2: Get(b) + // 8. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 9. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 10. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 11. txn1->n1: Refresh(a) + // 12. txn1->n1: Refresh(b,c) -- This fails due to txn3's write on c. + // Causes the transaction coordinator to return a retriable error, + // although the transaction could be actually committed during recovery; + // a highly problematic bug. + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation should be evaluated after txn1 completes, + // in this case with a problematic retriable error. + req.pauseUntil(t, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + // if req.txnName == "txn2" && hasPut && cp == BeforeSending { // While txn2's Puts can only occur after txn1 is marked as explicitly @@ -649,6 +983,565 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { t.Logf("txn1 completed with err: %+v", err) wg.Wait() + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an transaction retry error from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.False(t, tErr.PrevTxnAborted()) + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check") + }) + t.Run("recovery after transfer lease", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execWorkloadTxn, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + // + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + close(leaseMoveReady) + req.pauseUntil(t, leaseMoveComplete, cp) + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n1: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed, but + // pauses before returning so that txn1's intents don't get cleaned up. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + close(recoverComplete) + } + + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, recoverComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b) + // 12. txn1->n1: EndTxn(commit) -- Recovery has already completed, so this + // request fails with "transaction unexpectedly committed". + + // + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + req.pauseUntil(t, txn1Done, cp) + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("retry sees other intent", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execWorkloadTxn, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + // + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + close(leaseMoveReady) + req.pauseUntil(t, leaseMoveComplete, cp) + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n1: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed. + // 9. txn2->n1: Put(a), EndTxn(parallel commit) -- Writes intent. + // 10. txn2->n1: Put(b) + // 11. txn2->n1: EndTxn(commit) + if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(recoverComplete) + } + + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, recoverComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry sees intent, so returns a WriteTooOld + // error. Since the transaction had an earlier ambiguous failure on a + // batch with a commit, it should propagate the ambiguous error. + + // + if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, txn1Done, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + aErr := (*kvpb.AmbiguousResultError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to encountering an intent on retry") + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("recovery before retry at same timestamp", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + recoverComplete := make(chan struct{}) + resolveIntentComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Operation functions. + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _ = getInBatch(t, tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n1: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 5. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Allow to proceed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // 8. _->n1: ResolveIntent(txn1, b) + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == AfterSending { + t.Logf("%s - complete", req.prefix) + close(resolveIntentComplete) + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, resolveIntentComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry gets WriteTooOld and cannot perform a + // serverside refresh as reads were already returned. Fails, and should + // propagate the ambiguous error. + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): Once we incorporate secondary errors into the + // AmbiguousResultError, check that we see the WriteTooOldError. + aErr := (*kvpb.AmbiguousResultError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to encountering an intent on retry") + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("recovery after retry at same timestamp", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + putRetryReady := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Operation functions. + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _ = getInBatch(t, tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n1: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 5. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries, but before its final + // EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + close(putRetryReady) + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, putRetryReady, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 8. txn1->n1: Put(b) -- Retry gets evaluated as idempotent replay and + // correctly succeeds. + // -- NB: In this case, txn1 returns without error here, as there is no + // need to refresh and the final EndTxn can be split off and run + // asynchronously. + + // 9. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) + } + + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // -- even though the + // recovery won, this is allowed. See makeTxnCommitExplicitLocked(..). + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + require.NoErrorf(t, err, "expected txn1 to succeed") + }) + t.Run("recovery before retry with serverside refresh", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + recoverComplete := make(chan struct{}) + resolveIntentComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Operation functions. + execTxn1 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + batch := txn.NewBatch() + batch.Put(keyA, 100) + batch.Put(keyB, 100) + return txn.CommitInBatch(tCtx, batch) + } + execTxn2 := func(t *testing.T, name string) error { + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, name) + _ = getInBatch(t, tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + return nil + } + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runConcurrentOp(t, "txn1", execTxn1, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execTxn2, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + var txn1KeyBResolveIntentCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + var keyBResolveIntentCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 2. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 3. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 4. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 5. _->n1: RecoverTxn(txn1) -- Allow to proceed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // 6. _->n1: ResolveIntent(txn1, b) + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBResolveIntentCount = atomic.AddInt64(&txn1KeyBResolveIntentCount, 1) + if keyBResolveIntentCount == 1 { + t.Logf("%s - complete", req.prefix) + close(resolveIntentComplete) + } + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, resolveIntentComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld, performs a + // serverside refresh, and succeeds. + // 8. txn1->n1: EndTxn(commit) -- Results in "transaction unexpectedly + // committed" due to the recovery completing first. + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + // TODO(sarkesian): While we expect an AmbiguousResultError once the // immediate changes outlined in #103817 are implemented, right now this is // essentially validating the existence of the bug. This needs to be fixed,