From 974c752fcbecdb5c751cd6b419ff28a69eb71e76 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 2 Aug 2023 02:46:00 -0400 Subject: [PATCH] kvcoord: test additional ambiguous failure states This change builds on the testing introduced in #107323 with additional subtests evaluating the behavior of different race outcomes with contended transactions where the first transaction experiences an RPC failure (i.e. an ambiguous write). Part of: #103817 Release note: None --- .../kvcoord/dist_sender_ambiguous_test.go | 1143 ++++++++++++++++- 1 file changed, 1141 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 85d38e7dbc70..5fba569a9850 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -420,8 +420,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Filtering and logging annotations for the request interceptor. tMu.Lock() tMu.filter = func(req *interceptedReq) bool { - // Log all requests on txn1 or txn2, except for heartbeats. - if (req.txnName == "txn1" || req.txnName == "txn2") && !req.ba.IsSingleHeartbeatTxnRequest() { + // Log all requests on txn1/txn2/txn3, except for heartbeats. + if (req.txnName == "txn1" || req.txnName == "txn2" || req.txnName == "txn3") && !req.ba.IsSingleHeartbeatTxnRequest() { return true } @@ -509,6 +509,280 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { close(doneCh) } + t.Run("writer reader conflict", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2", txn2Ready) { + return + } + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, "txn2") + _ = getInBatch(tCtx, txn, keyA) + assert.NoError(t, txn.Commit(ctx)) + }() + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2 + // can start. + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 8. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 9. txn1->n1: Refresh(a) + // 10. txn1->n1: Refresh(b) + // 11. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (7) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) + } + + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation is evaluated after txn1's Refreshes, + // or after txn1 completes with error. + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // -- Results in + // "transaction unexpectedly committed" due to the recovery completing + // first. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("writer writer conflict", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2", txn2Ready) { + return + } + tCtx := context.Background() + + // txn2 performs simple Puts on conflicting keys, causing it to issue a + // PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, "txn2") + batch := txn.NewBatch() + batch.Put(keyA, 0) + batch.Put(keyB, 0) + assert.NoError(t, txn.CommitInBatch(ctx, batch)) + t.Logf("txn2 finished here") + }() + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2 + // can start. + close(txn2Ready) + } + + // 5. txn2->n1: Put(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n2: Put(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b) + // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) + } + + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation is evaluated after txn1's Refreshes, + // or after txn1 completes with error. + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // -- Results in + // "transaction unexpectedly committed" due to the recovery completing + // first. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) t.Run("workload conflict", func(t *testing.T) { initTest(t) defer finishTest(t) @@ -630,6 +904,871 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { t.Logf("txn1 completed with err: %+v", err) wg.Wait() + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("recovery before refresh fails", func(t *testing.T) { + initTest(t) + + keyAPrime := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("a'"))) + defer func() { + _, err := db.Del(ctx, keyAPrime) + require.NoError(t, err) + }() + + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + defer close(txn1Done) + if !waitUntilReady(t, "txn1", txn1Ready) { + return + } + tCtx := context.Background() + txn := db.NewTxn(tCtx, "txn1") + vals := getInBatch(tCtx, txn, keyA, keyB, keyAPrime) + + batch := txn.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + txn1ResultCh <- txn.CommitInBatch(tCtx, batch) + }() + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2", txn2Ready) { + return + } + tCtx := context.Background() + txn := db.NewTxn(tCtx, "txn2") + + // The intent from txn2 on a' will cause txn1's read refresh to fail. + assert.NoError(t, txn.Put(tCtx, keyAPrime, 100)) + _ = getInBatch(tCtx, txn, keyA) + assert.NoError(t, txn.Commit(tCtx)) + }() + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, we can + // move the lease. + close(txn2Ready) + } + + // 5. txn2->n1: Put(a') + // 6. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Allow to proceed and finish. Since txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b,c) -- This fails due to txn2's intent on c. + // Causes the transaction coordinator to return a retriable error, + // although the transaction has been actually committed during recovery; + // a highly problematic bug. + + // + // This way we can prevent txn1's intents from being resolved too early. + if _, ok := req.ba.GetArg(kvpb.PushTxn); ok && cp == AfterSending && + resp != nil && resp.err == nil { + req.pauseUntil(t, txn1Done, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an transaction retry error from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.False(t, tErr.PrevTxnAborted()) + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check") + }) + t.Run("recovery after refresh fails", func(t *testing.T) { + initTest(t) + + keyC := roachpb.Key(encoding.EncodeBytesAscending(tablePrefix.Clone(), []byte("c"))) + defer func() { + _, err := db.Del(ctx, keyC) + require.NoError(t, err) + }() + + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + otherTxnsReady := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + defer close(txn1Done) + if !waitUntilReady(t, "txn1", txn1Ready) { + return + } + tCtx := context.Background() + txn := db.NewTxn(tCtx, "txn1") + vals := getInBatch(tCtx, txn, keyA, keyB, keyC) + + batch := txn.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + txn1ResultCh <- txn.CommitInBatch(tCtx, batch) + }() + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2/txn3", otherTxnsReady) { + return + } + tCtx := context.Background() + + // The intent from txn3 will cause txn1's read refresh to fail. + txn3 := db.NewTxn(tCtx, "txn3") + batch := txn3.NewBatch() + batch.Put(keyC, 100) + assert.NoError(t, txn3.CommitInBatch(tCtx, batch)) + + txn2 := db.NewTxn(tCtx, "txn2") + vals := getInBatch(tCtx, txn2, keyA, keyB) + + batch = txn2.NewBatch() + batch.Put(keyA, vals[0]-xferAmount) + batch.Put(keyB, vals[1]+xferAmount) + assert.NoError(t, txn2.CommitInBatch(tCtx, batch)) + }() + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b), Get(c) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2/txn3 can start. + close(otherTxnsReady) + } + + // 5. txn3->n2: Put(c), EndTxn(parallel commit) + // 6. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 7. txn2->n2: Get(b) + // 8. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 9. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 10. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 11. txn1->n1: Refresh(a) + // 12. txn1->n1: Refresh(b,c) -- This fails due to txn3's write on c. + // Causes the transaction coordinator to return a retriable error, + // although the transaction could be actually committed during recovery; + // a highly problematic bug. + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation should be evaluated after txn1 completes, + // in this case with a problematic retriable error. + req.pauseUntil(t, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + // While txn2's Puts can only occur after txn1 is marked as explicitly + // committed, if the Recovery and the subsequent txn2 Put(b) operations + // happen before txn1 retries its Put(b) on n1, it will encounter txn2's + // intent and get a WriteTooOld error instead of potentially being an + // idempotent replay. + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an transaction retry error from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionRetryWithProtoRefreshError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected incorrect TransactionRetryWithProtoRefreshError due to failed refresh") + require.False(t, tErr.PrevTxnAborted()) + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check") + }) + t.Run("recovery after transfer lease", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go runTxn(t, "txn2", &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + // + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + close(leaseMoveReady) + req.pauseUntil(t, leaseMoveComplete, cp) + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n1: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed, but + // pauses before returning so that txn1's intents don't get cleaned up. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + close(recoverComplete) + } + + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, recoverComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // -- NB: If ambiguous errors were propagated, txn1 would end here. + // -- NB: When ambiguous errors are not propagated, txn1 continues with: + // + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b) + // 12. txn1->n1: EndTxn(commit) -- Recovery has already completed, so this + // request fails with "transaction unexpectedly committed". + + // + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + req.pauseUntil(t, txn1Done, cp) + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): While we expect an AmbiguousResultError once the + // immediate changes outlined in #103817 are implemented, right now this is + // essentially validating the existence of the bug. This needs to be fixed, + // and we should not see an assertion failure from the transaction + // coordinator once fixed. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("retry sees other intent", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go runTxn(t, "txn2", &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runLeaseMover(t, &wg, leaseMoveReady, leaseMoveComplete) + + // KV Request sequencing. + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning the + // response so we can inject network failure. + // + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + close(leaseMoveReady) + req.pauseUntil(t, leaseMoveComplete, cp) + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n1: Get(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Recovery should mark txn1 committed. + // 9. txn2->n1: Put(a), EndTxn(parallel commit) -- Writes intent. + // 10. txn2->n1: Put(b) + // 11. txn2->n1: EndTxn(commit) + if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(recoverComplete) + } + + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, recoverComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry sees intent, so returns a WriteTooOld + // error. Since the transaction had an earlier ambiguous failure on a + // batch with a commit, it should propagate the ambiguous error. + + // + if req.txnName == "txn2" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, txn1Done, cp) + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + aErr := (*kvpb.AmbiguousResultError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to encountering an intent on retry") + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("recovery before retry at same timestamp", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + recoverComplete := make(chan struct{}) + resolveIntentComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2", txn2Ready) { + return + } + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, "txn2") + _ = getInBatch(tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + }() + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n1: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 5. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Allow to proceed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // 8. _->n1: ResolveIntent(txn1, b) + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == AfterSending { + t.Logf("%s - complete", req.prefix) + close(resolveIntentComplete) + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, resolveIntentComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry gets WriteTooOld and cannot perform a + // serverside refresh as reads were already returned. Fails, and should + // propagate the ambiguous error. + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + // TODO(sarkesian): Once we incorporate secondary errors into the + // AmbiguousResultError, check that we see the WriteTooOldError. + aErr := (*kvpb.AmbiguousResultError)(nil) + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &aErr, + "expected AmbiguousResultError due to encountering an intent on retry") + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") + }) + t.Run("recovery after retry at same timestamp", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + putRetryReady := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go runTxn(t, "txn1", &wg, txn1Ready, txn1Done, txn1ResultCh) + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2", txn2Ready) { + return + } + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, "txn2") + _ = getInBatch(tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + }() + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n1: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 5. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 6. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 7. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries, but before its final + // EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + close(putRetryReady) + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, putRetryReady, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 8. txn1->n1: Put(b) -- Retry gets evaluated as idempotent replay and + // correctly succeeds. + // -- NB: In this case, txn1 returns without error here, as there is no + // need to refresh and the final EndTxn can be split off and run + // asynchronously. + + // 9. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) + } + + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // -- even though the + // recovery won, this is allowed. See makeTxnCommitExplicitLocked(..). + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + + require.NoErrorf(t, err, "expected txn1 to succeed") + }) + t.Run("recovery before retry with serverside refresh", func(t *testing.T) { + initTest(t) + defer finishTest(t) + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + recoverComplete := make(chan struct{}) + resolveIntentComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Place second range on n1 (same as first). + secondRange := tc.LookupRangeOrFatal(t, keyB) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + requireRangeLease(t, secondRange, 0) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + defer close(txn1Done) + if !waitUntilReady(t, "txn1", txn1Ready) { + return + } + tCtx := context.Background() + txn := db.NewTxn(tCtx, "txn1") + batch := txn.NewBatch() + batch.Put(keyA, 100) + batch.Put(keyB, 100) + txn1ResultCh <- txn.CommitInBatch(tCtx, batch) + }() + go func() { + defer wg.Done() + if !waitUntilReady(t, "txn2", txn2Ready) { + return + } + tCtx := context.Background() + + // txn2 just performs a simple GetForUpdate on a conflicting key, causing + // it to issue a PushTxn for txn1 which will kick off recovery. + txn := db.NewTxn(tCtx, "txn2") + _ = getInBatch(tCtx, txn, keyB) + assert.NoError(t, txn.Commit(ctx)) + }() + + // KV Request sequencing. + var txn1KeyBWriteCount int64 + var txn1KeyBResolveIntentCount int64 + tMu.Lock() + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + putReq, hasPut := req.ba.GetArg(kvpb.Put) + var keyBWriteCount int64 = -1 + var keyBResolveIntentCount int64 = -1 + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 2. txn1->n1: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && putReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBWriteCount = atomic.AddInt64(&txn1KeyBWriteCount, 1) + if keyBWriteCount == 1 { + close(txn2Ready) + } + } + + // 3. txn2->n1: Get(b) -- Discovers txn1's locks, issues push request. + // 4. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 5. _->n1: RecoverTxn(txn1) -- Allow to proceed. + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // 6. _->n1: ResolveIntent(txn1, b) + if riReq, ok := req.ba.GetArg(kvpb.ResolveIntent); ok && riReq.Header().Key.Equal(keyB) && cp == AfterSending { + keyBResolveIntentCount = atomic.AddInt64(&txn1KeyBResolveIntentCount, 1) + if keyBResolveIntentCount == 1 { + t.Logf("%s - complete", req.prefix) + close(resolveIntentComplete) + } + } + + // n1: Put(b) to + // return with error> + if req.txnName == "txn1" && keyBWriteCount == 1 && cp == AfterSending { + // Hold the operation open until we are ready to retry, after which we + // will return the injected failure. + req.pauseUntil(t, resolveIntentComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 7. txn1->n1: Put(b) -- Retry gets WriteTooOld, performs a + // serverside refresh, and succeeds. + // 8. txn1->n1: EndTxn(commit) -- Results in "transaction unexpectedly + // committed" due to the recovery completing first. + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %+v", err) + wg.Wait() + // TODO(sarkesian): While we expect an AmbiguousResultError once the // immediate changes outlined in #103817 are implemented, right now this is // essentially validating the existence of the bug. This needs to be fixed,