diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go index 8097129bab8e..34c19b9924b0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go @@ -207,6 +207,9 @@ type interceptorTestConfig struct { // Incremented atomically by concurrent operations holding the read lock. lastInterceptedOpID int64 + // modifyReq defines a function that can modify the intercepted BatchRequest. + modifyReq func(req *interceptedReq) (modified bool) + // filter defines a function that should return true for requests the test // cares about - this includes logging, blocking, or overriding responses. All // requests that return true should be logged. @@ -316,6 +319,10 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { } } + if tMu.modifyReq != nil && tMu.modifyReq(req) { + tMu.Logf("%s modified", req) + } + return nil }, afterSend: func(ctx context.Context, req *interceptedReq, resp *interceptedResp) (overrideResp *interceptedResp) { @@ -720,6 +727,152 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Test cases with custom request scheduling. + // NB: This test can be removed for versions >=24.1. + t.Run("mixed version", func(t *testing.T) { + defer initSubTest(t)() + + // Checkpoints in test. + txn1Ready := make(chan struct{}) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + leaseMoveComplete := make(chan struct{}) + receivedFinalET := make(chan struct{}) + recoverComplete := make(chan struct{}) + txn1Done := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(3) + go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh) + go runConcurrentOp(t, "txn2", execWorkloadTxn, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */) + go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */) + + // KV Request sequencing. + tMu.Lock() + tMu.modifyReq = func(req *interceptedReq) (modified bool) { + // In order to simulate a "mixed version" scenario, let's simply drop the + // AmbiguousReplayProtection flag from the batch request, as if the + // coordinator never sent it. + if req.ba.AmbiguousReplayProtection { + req.ba.AmbiguousReplayProtection = false + return true + } + return false + } + tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) { + _, hasPut := req.ba.GetArg(kvpb.Put) + + // These conditions are checked in order of expected operations of the + // test. + + // 1. txn1->n1: Get(a) + // 2. txn1->n2: Get(b) + // 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING. + // 4. txn1->n2: Put(b) -- Send the request, but pause before returning + // the response so we can inject network failure. + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Once we have seen the write on txn1 to n2 that we will fail, txn2 + // can start. + close(txn2Ready) + } + + // 5. txn2->n1: Get(a) OR Put(a) -- Discovers txn1's locks, issues push request. + // 6. txn2->n2: Get(b) OR Put(b) + // 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts + // recovery. + // 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we + // can ensure it gets evaluated after txn1 retries (and refreshes), but + // before its final EndTxn. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + } + + // + // n2: Put(b) to + // return with error> + if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending { + // Hold the operation open until we are ready to retry on the new + // replica, after which we will return the injected failure. + req.pauseUntil(t, leaseMoveComplete, cp) + t.Logf("%s - injected RPC error", req.prefix) + return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID) + } + + // 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start + // timestamp, and attempts to evaluate it as an idempotent replay, but at + // a higher timestamp, which breaks idempotency due to being on commit. + + // -- NB: With ambiguous replay protection, txn1 should end here. + // -- NB: Without ambiguous replay protection, txn1 would continue with: + + // 10. txn1->n1: Refresh(a) + // 11. txn1->n1: Refresh(b) + // 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so + // that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race + // in which the recovery wins. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + close(receivedFinalET) + } + + // -- because txn1 + // is in STAGING and has all of its writes, it is implicitly committed, + // so the recovery will succeed in marking it explicitly committed. + if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending { + // The RecoverTxn operation is evaluated after txn1's Refreshes, + // or after txn1 completes with error. + req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp) + } + if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending { + t.Logf("%s - complete, resp={%s}", req.prefix, resp) + close(recoverComplete) + } + + // -- Results in + // "transaction unexpectedly committed" due to the recovery completing + // first. + if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending { + req.pauseUntil(t, recoverComplete, cp) + } + + // + if req.txnName == "txn2" && hasPut && cp == BeforeSending { + // While txn2's Puts can only occur after txn1 is marked as explicitly + // committed, if the Recovery and the subsequent txn2 Put(b) operations + // happen before txn1 retries its Put(b) on n1, it will encounter txn2's + // intent and get a WriteTooOld error instead of potentially being an + // idempotent replay. + <-txn1Done + } + + return nil + } + tMu.Unlock() + + // Start test, await concurrent operations and validate results. + close(txn1Ready) + err := <-txn1ResultCh + t.Logf("txn1 completed with err: %v", err) + wg.Wait() + + // While we expect an AmbiguousResultError if the AmbiguousReplayProtection + // flag is sent/received, in the interests of ensuring that the client not + // sending the flag does not produce unexpected behavior, we expect the + // previous, incorrect TransactionStatusError with REASON_TXN_COMMITTED + // to be returned. + tErr := (*kvpb.TransactionStatusError)(nil) + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + }) + // The txn coordinator shouldn't respond with an incorrect retryable failure // based on a refresh as the transaction may have already been committed. t.Run("recovery before refresh fails", func(t *testing.T) {