Skip to content

Commit

Permalink
kvcoord: propagate ambiguous result errors that occur on commits
Browse files Browse the repository at this point in the history
Description TBD.

Depends on cockroachdb#107596.

Fixes: cockroachdb#103817

Release note (bug fix): TBD
  • Loading branch information
AlexTalks committed Jul 26, 2023
1 parent 33de74a commit 6622e35
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 27 deletions.
13 changes: 8 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2293,6 +2293,8 @@ func (ds *DistSender) sendToReplicas(
return nil, err
}

log.VErrEventf(ctx, 2, "RPC error: %s", err)

// For most connection errors, we cannot tell whether or not the request
// may have succeeded on the remote server (exceptions are captured in the
// grpcutil.RequestDidNotStart function). We'll retry the request in order
Expand All @@ -2310,10 +2312,6 @@ func (ds *DistSender) sendToReplicas(
// ambiguity.
// 2) SQL recognizes AmbiguousResultErrors and gives them a special code
// (StatementCompletionUnknown).
// TODO(andrei): The use of this code is inconsistent because a) the
// DistSender tries to only return the code for commits, but it'll happily
// forward along AmbiguousResultErrors coming from the replica and b) we
// probably should be returning that code for non-commit statements too.
//
// We retry requests in order to avoid returning errors (in particular,
// AmbiguousResultError). Retrying the batch will either:
Expand Down Expand Up @@ -2342,8 +2340,13 @@ func (ds *DistSender) sendToReplicas(
//
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err

// If this partial batch does not contain the EndTxn request, we need
// to ensure the ambiguous error is propagated to handle #103817.
if _, ok := ba.GetArg(kvpb.EndTxn); !ok {
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
}
}
log.VErrEventf(ctx, 2, "RPC error: %s", err)

// If the error wasn't just a context cancellation and the down replica
// is cached as the lease holder, evict it. The only other eviction
Expand Down
49 changes: 27 additions & 22 deletions pkg/kv/kvserver/client_unexpected_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ func getJamKey(
// succeeds prior to the original transaction's retries, an ambiguous error
// should be raised.
//
// NB: This case encounters a known issue described in #103817 and seen in #67765,
// where it currently is surfaced as an assertion failure that will result in a
// node crash.
//
// TODO(sarkesian): Validate the ambiguous result error once the initial fix as
// outlined in #103817 has been resolved.
// NB: This case deals with a known issue described in #103817 and seen in #67765.
func TestTransactionUnexpectedlyCommitted(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -94,7 +89,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
txn1Ready := make(chan struct{})
txn2Ready := make(chan struct{})
leaseMoveReady := make(chan struct{})
recoverReady := make(chan struct{})
txn1Done := make(chan struct{})
receivedETRetry := make(chan struct{})
recoverComplete := make(chan struct{})
networkManipReady := make(chan struct{})
networkManipComplete := make(chan struct{})
Expand Down Expand Up @@ -144,15 +140,16 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
// <transfer b's lease to n1>
// txn1: reattempt failed write b (n1) and attempt to finalize transaction
//
// TODO(sarkesian): We currently see these operations, though raising
// amgibuous errors will require updating this schedule of operations.
// We currently see these operations:
// txn1->n1: Get(a)
// txn1->n2: Get(b)
// txn1->n1: Put(a), EndTxn(parallel commit)
// txn1->n2: Put(b) -- network failure!
// txn2->n1: Get(a)
// _->n1: PushTxn(txn2->txn1)
// <transfer b's lease to n1>
// -- NB: When ambiguous errors get propagated, txn1 will end here.
// When ambiguous errors are not propagated, txn1 continues with:
// txn1->n1: Put(b) -- retry sees new lease start timestamp
// txn1->n1: Refresh(a)
// txn1->n1: Refresh(b)
Expand Down Expand Up @@ -204,7 +201,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {

// Ensure that txn1's post-refresh EndTxn occurs after recovery.
if txnName == "txn1" && ba.IsSingleEndTxnRequest() {
close(recoverReady)
close(receivedETRetry)
<-recoverComplete
t.Logf("%sEndTxn op unpaused", tags.String())
}
Expand All @@ -214,7 +211,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
// Once the RecoverTxn request is issued, as part of txn2's PushTxn
// request, the lease can be moved.
close(leaseMoveReady)
<-recoverReady

// The RecoverTxn operation must be evaluated after txn1's Refreshes,
// or after txn1 completes with error.
select {
case <-receivedETRetry:
case <-txn1Done:
}
t.Logf("%sRecoverTxn op unpaused", tags.String())
}

Expand Down Expand Up @@ -436,18 +439,20 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) {
close(networkManipComplete)

// Await concurrent operations and validate results.
wg.Wait()
err := <-txn1ResultCh
t.Logf("txn1 completed with err: %s", err)
close(txn1Done)
wg.Wait()

// TODO(sarkesian): While we expect an AmbiguousResultError once the immediate
// changes outlined in #103817 are implemented, right now this is essentially
// validating the existence of the bug. This needs to be fixed, and we should
// expect no assertion failures here.
// NB: While ideally we would hope to see a successful commit
// without error, with the near-term solution outlined in #103817 we expect
// an AmbiguousResultError in this case.
aErr := (*kvpb.AmbiguousResultError)(nil)
tErr := (*kvpb.TransactionStatusError)(nil)
require.Truef(t, errors.HasAssertionFailure(err),
"expected AssertionFailedError due to sanity check on transaction already committed")
require.ErrorAsf(t, err, &tErr,
"expected TransactionStatusError due to being already committed")
require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason,
"expected TransactionStatusError due to being already committed")
require.ErrorAsf(t, err, &aErr,
"expected ambiguous result error due to RPC error")
require.Falsef(t, errors.As(err, &tErr),
"did not expect TransactionStatusError due to being already committed")
require.Falsef(t, errors.HasAssertionFailure(err),
"expected no AssertionFailedError due to sanity check on transaction already committed")
}

0 comments on commit 6622e35

Please sign in to comment.