From 6622e35e1375b2d16d4e4f9045f88de98cd9c580 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Wed, 26 Jul 2023 02:29:52 -0400 Subject: [PATCH] kvcoord: propagate ambiguous result errors that occur on commits Description TBD. Depends on #107596. Fixes: #103817 Release note (bug fix): TBD --- pkg/kv/kvclient/kvcoord/dist_sender.go | 13 +++-- .../kvserver/client_unexpected_commit_test.go | 49 ++++++++++--------- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 3a2b23a78fe5..8ff1e8bd70ef 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2293,6 +2293,8 @@ func (ds *DistSender) sendToReplicas( return nil, err } + log.VErrEventf(ctx, 2, "RPC error: %s", err) + // For most connection errors, we cannot tell whether or not the request // may have succeeded on the remote server (exceptions are captured in the // grpcutil.RequestDidNotStart function). We'll retry the request in order @@ -2310,10 +2312,6 @@ func (ds *DistSender) sendToReplicas( // ambiguity. // 2) SQL recognizes AmbiguousResultErrors and gives them a special code // (StatementCompletionUnknown). - // TODO(andrei): The use of this code is inconsistent because a) the - // DistSender tries to only return the code for commits, but it'll happily - // forward along AmbiguousResultErrors coming from the replica and b) we - // probably should be returning that code for non-commit statements too. // // We retry requests in order to avoid returning errors (in particular, // AmbiguousResultError). Retrying the batch will either: @@ -2342,8 +2340,13 @@ func (ds *DistSender) sendToReplicas( // if withCommit && !grpcutil.RequestDidNotStart(err) { ambiguousError = err + + // If this partial batch does not contain the EndTxn request, we need + // to ensure the ambiguous error is propagated to handle #103817. + if _, ok := ba.GetArg(kvpb.EndTxn); !ok { + return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError) + } } - log.VErrEventf(ctx, 2, "RPC error: %s", err) // If the error wasn't just a context cancellation and the down replica // is cached as the lease holder, evict it. The only other eviction diff --git a/pkg/kv/kvserver/client_unexpected_commit_test.go b/pkg/kv/kvserver/client_unexpected_commit_test.go index 7a9fd433db17..a86ab7948e72 100644 --- a/pkg/kv/kvserver/client_unexpected_commit_test.go +++ b/pkg/kv/kvserver/client_unexpected_commit_test.go @@ -68,12 +68,7 @@ func getJamKey( // succeeds prior to the original transaction's retries, an ambiguous error // should be raised. // -// NB: This case encounters a known issue described in #103817 and seen in #67765, -// where it currently is surfaced as an assertion failure that will result in a -// node crash. -// -// TODO(sarkesian): Validate the ambiguous result error once the initial fix as -// outlined in #103817 has been resolved. +// NB: This case deals with a known issue described in #103817 and seen in #67765. func TestTransactionUnexpectedlyCommitted(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -94,7 +89,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { txn1Ready := make(chan struct{}) txn2Ready := make(chan struct{}) leaseMoveReady := make(chan struct{}) - recoverReady := make(chan struct{}) + txn1Done := make(chan struct{}) + receivedETRetry := make(chan struct{}) recoverComplete := make(chan struct{}) networkManipReady := make(chan struct{}) networkManipComplete := make(chan struct{}) @@ -144,8 +140,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // // txn1: reattempt failed write b (n1) and attempt to finalize transaction // - // TODO(sarkesian): We currently see these operations, though raising - // amgibuous errors will require updating this schedule of operations. + // We currently see these operations: // txn1->n1: Get(a) // txn1->n2: Get(b) // txn1->n1: Put(a), EndTxn(parallel commit) @@ -153,6 +148,8 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // txn2->n1: Get(a) // _->n1: PushTxn(txn2->txn1) // + // -- NB: When ambiguous errors get propagated, txn1 will end here. + // When ambiguous errors are not propagated, txn1 continues with: // txn1->n1: Put(b) -- retry sees new lease start timestamp // txn1->n1: Refresh(a) // txn1->n1: Refresh(b) @@ -204,7 +201,7 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Ensure that txn1's post-refresh EndTxn occurs after recovery. if txnName == "txn1" && ba.IsSingleEndTxnRequest() { - close(recoverReady) + close(receivedETRetry) <-recoverComplete t.Logf("%sEndTxn op unpaused", tags.String()) } @@ -214,7 +211,13 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { // Once the RecoverTxn request is issued, as part of txn2's PushTxn // request, the lease can be moved. close(leaseMoveReady) - <-recoverReady + + // The RecoverTxn operation must be evaluated after txn1's Refreshes, + // or after txn1 completes with error. + select { + case <-receivedETRetry: + case <-txn1Done: + } t.Logf("%sRecoverTxn op unpaused", tags.String()) } @@ -436,18 +439,20 @@ func TestTransactionUnexpectedlyCommitted(t *testing.T) { close(networkManipComplete) // Await concurrent operations and validate results. - wg.Wait() err := <-txn1ResultCh + t.Logf("txn1 completed with err: %s", err) + close(txn1Done) + wg.Wait() - // TODO(sarkesian): While we expect an AmbiguousResultError once the immediate - // changes outlined in #103817 are implemented, right now this is essentially - // validating the existence of the bug. This needs to be fixed, and we should - // expect no assertion failures here. + // NB: While ideally we would hope to see a successful commit + // without error, with the near-term solution outlined in #103817 we expect + // an AmbiguousResultError in this case. + aErr := (*kvpb.AmbiguousResultError)(nil) tErr := (*kvpb.TransactionStatusError)(nil) - require.Truef(t, errors.HasAssertionFailure(err), - "expected AssertionFailedError due to sanity check on transaction already committed") - require.ErrorAsf(t, err, &tErr, - "expected TransactionStatusError due to being already committed") - require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, - "expected TransactionStatusError due to being already committed") + require.ErrorAsf(t, err, &aErr, + "expected ambiguous result error due to RPC error") + require.Falsef(t, errors.As(err, &tErr), + "did not expect TransactionStatusError due to being already committed") + require.Falsef(t, errors.HasAssertionFailure(err), + "expected no AssertionFailedError due to sanity check on transaction already committed") }