Skip to content

Commit

Permalink
kvcoord: add "mixed version" test for AmbiguousReplayProtection flag
Browse files Browse the repository at this point in the history
This change builds upon the tests added to validate #107658, in this
case exercising the code path that would occur if the client did not
send this flag in a `BatchRequest`, or the server did not respect it. It
mocks a "mixed version" cluster by simply dropping the flag in
transport, and tests that the previous result - i.e. a
`TransactionStatusError` with `REASON_TXN_COMMITTED` - is returned,
without any other side effects or unexpected behavior.

Part of: #103817

Release note: None
  • Loading branch information
AlexTalks committed Oct 11, 2023
1 parent b0e0182 commit 0c7dddb
Showing 1 changed file with 153 additions and 0 deletions.
153 changes: 153 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ type interceptorTestConfig struct {
// Incremented atomically by concurrent operations holding the read lock.
lastInterceptedOpID int64

// modifyReq defines a function that can modify the intercepted BatchRequest.
modifyReq func(req *interceptedReq) (modified bool)

// filter defines a function that should return true for requests the test
// cares about - this includes logging, blocking, or overriding responses. All
// requests that return true should be logged.
Expand Down Expand Up @@ -316,6 +319,10 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
}
}

if tMu.modifyReq != nil && tMu.modifyReq(req) {
tMu.Logf("%s modified", req)
}

return nil
},
afterSend: func(ctx context.Context, req *interceptedReq, resp *interceptedResp) (overrideResp *interceptedResp) {
Expand Down Expand Up @@ -720,6 +727,152 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {

// Test cases with custom request scheduling.

// NB: This test can be removed for versions >=24.1.
t.Run("mixed version", func(t *testing.T) {
defer initSubTest(t)()

// Checkpoints in test.
txn1Ready := make(chan struct{})
txn2Ready := make(chan struct{})
leaseMoveReady := make(chan struct{})
leaseMoveComplete := make(chan struct{})
receivedFinalET := make(chan struct{})
recoverComplete := make(chan struct{})
txn1Done := make(chan struct{})

// Final result.
txn1ResultCh := make(chan error, 1)

// Concurrent transactions.
var wg sync.WaitGroup
wg.Add(3)
go runConcurrentOp(t, "txn1", execWorkloadTxn, &wg, txn1Ready, txn1Done, txn1ResultCh)
go runConcurrentOp(t, "txn2", execWorkloadTxn, &wg, txn2Ready, nil /* doneCh */, nil /* resultCh */)
go runConcurrentOp(t, "lease mover", execLeaseMover, &wg, leaseMoveReady, leaseMoveComplete, nil /* resultCh */)

// KV Request sequencing.
tMu.Lock()
tMu.modifyReq = func(req *interceptedReq) (modified bool) {
// In order to simulate a "mixed version" scenario, let's simply drop the
// AmbiguousReplayProtection flag from the batch request, as if the
// coordinator never sent it.
if req.ba.AmbiguousReplayProtection {
req.ba.AmbiguousReplayProtection = false
return true
}
return false
}
tMu.maybeWait = func(cp InterceptPoint, req *interceptedReq, resp *interceptedResp) (override error) {
_, hasPut := req.ba.GetArg(kvpb.Put)

// These conditions are checked in order of expected operations of the
// test.

// 1. txn1->n1: Get(a)
// 2. txn1->n2: Get(b)
// 3. txn1->n1: Put(a), EndTxn(parallel commit) -- Puts txn1 in STAGING.
// 4. txn1->n2: Put(b) -- Send the request, but pause before returning
// the response so we can inject network failure.
if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending {
// Once we have seen the write on txn1 to n2 that we will fail, txn2
// can start.
close(txn2Ready)
}

// 5. txn2->n1: Get(a) OR Put(a) -- Discovers txn1's locks, issues push request.
// 6. txn2->n2: Get(b) OR Put(b)
// 7. _->n1: PushTxn(txn2->txn1) -- Discovers txn1 in STAGING and starts
// recovery.
// 8. _->n1: RecoverTxn(txn1) -- Before sending, pause the request so we
// can ensure it gets evaluated after txn1 retries (and refreshes), but
// before its final EndTxn.
if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending {
// Once the RecoverTxn request is issued, as part of txn2's PushTxn
// request, the lease can be moved.
close(leaseMoveReady)
}

// <transfer b's lease to n1>
// <inject a network failure and finally allow (4) txn1->n2: Put(b) to
// return with error>
if req.txnName == "txn1" && hasPut && req.toNodeID == tc.Server(1).NodeID() && cp == AfterSending {
// Hold the operation open until we are ready to retry on the new
// replica, after which we will return the injected failure.
req.pauseUntil(t, leaseMoveComplete, cp)
t.Logf("%s - injected RPC error", req.prefix)
return grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", req.fromNodeID, req.toNodeID)
}

// 9. txn1->n1: Put(b) -- Retry on new leaseholder sees new lease start
// timestamp, and attempts to evaluate it as an idempotent replay, but at
// a higher timestamp, which breaks idempotency due to being on commit.

// -- NB: With ambiguous replay protection, txn1 should end here.
// -- NB: Without ambiguous replay protection, txn1 would continue with:

// 10. txn1->n1: Refresh(a)
// 11. txn1->n1: Refresh(b)
// 12. txn1->n1: EndTxn(commit) -- Before sending, pause the request so
// that we can allow (8) RecoverTxn(txn1) to proceed, simulating a race
// in which the recovery wins.
if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending {
close(receivedFinalET)
}

// <allow (8) RecoverTxn(txn1) to proceed and finish> -- because txn1
// is in STAGING and has all of its writes, it is implicitly committed,
// so the recovery will succeed in marking it explicitly committed.
if req.ba.IsSingleRecoverTxnRequest() && cp == BeforeSending {
// The RecoverTxn operation is evaluated after txn1's Refreshes,
// or after txn1 completes with error.
req.pauseUntilFirst(t, receivedFinalET, txn1Done, cp)
}
if req.ba.IsSingleRecoverTxnRequest() && cp == AfterSending {
t.Logf("%s - complete, resp={%s}", req.prefix, resp)
close(recoverComplete)
}

// <allow (12) EndTxn(commit) to proceed and execute> -- Results in
// "transaction unexpectedly committed" due to the recovery completing
// first.
if req.txnName == "txn1" && req.ba.IsSingleEndTxnRequest() && cp == BeforeSending {
req.pauseUntil(t, recoverComplete, cp)
}

// <allow txn2's Puts to execute>
if req.txnName == "txn2" && hasPut && cp == BeforeSending {
// While txn2's Puts can only occur after txn1 is marked as explicitly
// committed, if the Recovery and the subsequent txn2 Put(b) operations
// happen before txn1 retries its Put(b) on n1, it will encounter txn2's
// intent and get a WriteTooOld error instead of potentially being an
// idempotent replay.
<-txn1Done
}

return nil
}
tMu.Unlock()

// Start test, await concurrent operations and validate results.
close(txn1Ready)
err := <-txn1ResultCh
t.Logf("txn1 completed with err: %v", err)
wg.Wait()

// While we expect an AmbiguousResultError if the AmbiguousReplayProtection
// flag is sent/received, in the interests of ensuring that the client not
// sending the flag does not produce unexpected behavior, we expect the
// previous, incorrect TransactionStatusError with REASON_TXN_COMMITTED
// to be returned.
tErr := (*kvpb.TransactionStatusError)(nil)
require.ErrorAsf(t, err, &tErr,
"expected TransactionStatusError due to being already committed")
require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason,
"expected TransactionStatusError due to being already committed")
require.Truef(t, errors.HasAssertionFailure(err),
"expected AssertionFailedError due to sanity check on transaction already committed")
})

// The txn coordinator shouldn't respond with an incorrect retryable failure
// based on a refresh as the transaction may have already been committed.
t.Run("recovery before refresh fails", func(t *testing.T) {
Expand Down

0 comments on commit 0c7dddb

Please sign in to comment.