diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index aa4fb4377977..154a624adf72 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -3215,8 +3215,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, - expParallelCommitAutoRetry: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, }, }, { diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index c8a50a1dd098..3032b67b37aa 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -786,49 +786,6 @@ func TestTxnCommitTimestampAdvancedByRefresh(t *testing.T) { require.NoError(t, err) } -// Test that in some write too old situations (i.e. when the server returns the -// WriteTooOld flag set and then the client fails to refresh), intents are -// properly left behind. -func TestTxnLeavesIntentBehindAfterWriteTooOldError(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - s := createTestDB(t) - defer s.Stop() - - key := []byte("b") - - txn := s.DB.NewTxn(ctx, "test txn") - // Perform a Get so that the transaction can't refresh. - _, err := txn.Get(ctx, key) - require.NoError(t, err) - - // Another guy writes at a higher timestamp. - require.NoError(t, s.DB.Put(ctx, key, "newer value")) - - // Now we write and expect a WriteTooOld. - intentVal := []byte("test") - err = txn.Put(ctx, key, intentVal) - require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err) - require.Regexp(t, "WriteTooOld", err) - - // Check that the intent was left behind. - b := kv.Batch{} - b.Header.ReadConsistency = kvpb.READ_UNCOMMITTED - b.Get(key) - require.NoError(t, s.DB.Run(ctx, &b)) - getResp := b.RawResponse().Responses[0].GetGet() - require.NotNil(t, getResp) - intent := getResp.IntentValue - require.NotNil(t, intent) - intentBytes, err := intent.GetBytes() - require.NoError(t, err) - require.Equal(t, intentVal, intentBytes) - - // Cleanup. - require.NoError(t, txn.Rollback(ctx)) -} - // Test that a transaction can be used after a CPut returns a // ConditionFailedError. This is not generally allowed for other errors, but // ConditionFailedError is special. diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 27c97f3163c0..a4e98f68c19c 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -98,23 +98,6 @@ func IsReadOnly(args Request) bool { return (flags&isRead) != 0 && (flags&isWrite) == 0 } -// IsBlindWrite returns true iff the request is a blind-write. A request is a -// blind-write if it is a write that does not observe any key-value state when -// modifying that state (such as puts and deletes). This is in contrast with -// read-write requests, which do observe key-value state when modifying the -// state and may base their modifications off of this existing state (such as -// conditional puts and increments). -// -// As a result of being "blind", blind-writes are allowed to be more freely -// re-ordered with other writes. In practice, this means that they can be -// evaluated with a read timestamp below another write and a write timestamp -// above that write without needing to re-evaluate. This allows the WriteTooOld -// error that is generated during such an occurrence to be deferred. -func IsBlindWrite(args Request) bool { - flags := args.flags() - return (flags&isRead) == 0 && (flags&isWrite) != 0 -} - // IsTransactional returns true if the request may be part of a // transaction. func IsTransactional(args Request) bool { diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 7a5044aba281..18511beedf24 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -215,25 +215,10 @@ func evaluateBatch( var mergedResult result.Result - // WriteTooOldErrors have particular handling. When a request encounters the - // error, we'd like to lay down an intent in order to avoid writers being - // starved. So, for blind writes, we swallow the error and instead we set the - // WriteTooOld flag on the response. For non-blind writes (e.g. CPut), we - // can't do that and so we just return the WriteTooOldError - see note on - // IsRead() stanza below. Upon receiving either a WriteTooOldError or a - // response with the WriteTooOld flag set, the client will attempt to bump - // the txn's read timestamp through a refresh. If successful, the client - // will retry this batch (in both cases). - // - // In any case, evaluation of the current batch always continue after a - // WriteTooOldError in order to find out if there's more conflicts and chose - // a final write timestamp. - var writeTooOldState struct { - err *kvpb.WriteTooOldError - // cantDeferWTOE is set when a WriteTooOldError cannot be deferred past the - // end of the current batch. - cantDeferWTOE bool - } + // WriteTooOldErrors have particular handling. Evaluation of the current batch + // continues after a WriteTooOldError in order to find out if there's more + // conflicts and chose a final write timestamp to return. + var deferredWriteTooOldErr *kvpb.WriteTooOldError // Only collect the scan stats if the tracing is enabled. var ss *kvpb.ScanStats @@ -254,6 +239,17 @@ func evaluateBatch( // Execute the command. args := union.GetInner() + if deferredWriteTooOldErr != nil && args.Method() == kvpb.EndTxn { + // ... unless we have been deferring a WriteTooOld error and have now + // reached an EndTxn request. In such cases, break and return the error. + // The transaction needs to handle the WriteTooOld error before it tries + // to commit. This short-circuiting is not necessary for correctness as + // the write batch will be discarded in favor of the deferred error, but + // we don't want to bother with potentially expensive EndTxn evaluation if + // we know the result will be thrown away. + break + } + if baHeader.Txn != nil { // Set the Request's sequence number on the TxnMeta for this // request. The MVCC layer (currently) uses TxnMeta to @@ -325,79 +321,6 @@ func evaluateBatch( reply.SetHeader(headerCopy) } - if err != nil { - // If an EndTxn wants to restart because of a write too old, we - // might have a better error to return to the client. - if retErr := (*kvpb.TransactionRetryError)(nil); errors.As(err, &retErr) && - retErr.Reason == kvpb.RETRY_WRITE_TOO_OLD && - args.Method() == kvpb.EndTxn && writeTooOldState.err != nil { - err = writeTooOldState.err - // Don't defer this error. We could perhaps rely on the client observing - // the WriteTooOld flag and retry the batch, but we choose not too. - writeTooOldState.cantDeferWTOE = true - } else if wtoErr := (*kvpb.WriteTooOldError)(nil); errors.As(err, &wtoErr) { - // We got a WriteTooOldError. We continue on to run all - // commands in the batch in order to determine the highest - // timestamp for more efficient retries. If the batch is - // transactional, we continue to lay down intents so that - // other concurrent overlapping transactions are forced - // through intent resolution and the chances of this batch - // succeeding when it will be retried are increased. - if writeTooOldState.err != nil { - writeTooOldState.err.ActualTimestamp.Forward( - wtoErr.ActualTimestamp) - } else { - writeTooOldState.err = wtoErr - } - - // For read-write requests that observe key-value state, we don't have - // the option of leaving an intent behind when they encounter a - // WriteTooOldError, so we have to return an error instead of a response - // with the WriteTooOld flag set (which would also leave intents - // behind). These requests need to be re-evaluated at the bumped - // timestamp in order for their results to be valid. The current - // evaluation resulted in an result that could well be different from - // what the request would return if it were evaluated at the bumped - // timestamp, which would cause the request to be rejected if it were - // sent again with the same sequence number after a refresh. - // - // Similarly, for read-only requests that encounter a WriteTooOldError, - // we don't have the option of returning a response with the WriteTooOld - // flag set because a response is not even generated in tandem with the - // WriteTooOldError. We could fix this and then allow WriteTooOldErrors - // to be deferred in these cases, but doing so would buy more into the - // extremely error-prone approach of retuning responses and errors - // together throughout the MVCC read path. Doing so is not desirable as - // it has repeatedly caused bugs in the past. Instead, we'd like to get - // rid of this pattern entirely and instead address the TODO below. - // - // TODO(andrei): What we really want to do here is either speculatively - // evaluate the request at the bumped timestamp and return that - // speculative result, or leave behind an unreplicated lock that won't - // prevent the request for evaluating again at the same sequence number - // but at a bumped timestamp. - if !kvpb.IsBlindWrite(args) { - writeTooOldState.cantDeferWTOE = true - } - - if baHeader.Txn != nil { - log.VEventf(ctx, 2, "setting WriteTooOld because of key: %s. wts: %s -> %s", - args.Header().Key, baHeader.Txn.WriteTimestamp, wtoErr.ActualTimestamp) - baHeader.Txn.WriteTimestamp.Forward(wtoErr.ActualTimestamp) - baHeader.Txn.WriteTooOld = true - } else { - // For non-transactional requests, there's nowhere to defer the error - // to. And the request has to fail because non-transactional batches - // should read and write at the same timestamp. - writeTooOldState.cantDeferWTOE = true - } - - // Clear error; we're done processing the WTOE for now and we'll return - // to considering it below after we've evaluated all requests. - err = nil - } - } - // Even on error, we need to propagate the result of evaluation. // // TODO(tbg): find out if that's true and why and improve the comment. @@ -409,12 +332,38 @@ func evaluateBatch( ) } + // Handle errors thrown by evaluation, either eagerly or through deferral. if err != nil { - pErr := kvpb.NewErrorWithTxn(err, baHeader.Txn) - // Initialize the error index. - pErr.SetErrorIndex(int32(index)) + var wtoErr *kvpb.WriteTooOldError + switch { + case errors.As(err, &wtoErr): + // We got a WriteTooOldError. We continue on to run all commands in the + // batch in order to determine the highest timestamp for more efficient + // retries. + if deferredWriteTooOldErr != nil { + deferredWriteTooOldErr.ActualTimestamp.Forward(wtoErr.ActualTimestamp) + } else { + deferredWriteTooOldErr = wtoErr + } + + if baHeader.Txn != nil { + log.VEventf(ctx, 2, "advancing write timestamp due to "+ + "WriteTooOld error on key: %s. wts: %s -> %s", + args.Header().Key, baHeader.Txn.WriteTimestamp, wtoErr.ActualTimestamp) + baHeader.Txn.WriteTimestamp.Forward(wtoErr.ActualTimestamp) + } - return nil, mergedResult, pErr + // Clear error and fall through to the success path; we're done + // processing the error for now. We'll return it below after we've + // evaluated all requests. + err = nil + + default: + // For all other error types, immediately propagate the error. + pErr := kvpb.NewErrorWithTxn(err, baHeader.Txn) + pErr.SetErrorIndex(int32(index)) + return nil, mergedResult, pErr + } } // If the last request was carried out with a limit, subtract the number @@ -449,21 +398,17 @@ func evaluateBatch( } // If we made it here, there was no error during evaluation, with the exception of - // a deferred WTOE. If it can't be deferred - return it now; otherwise it is swallowed. - // Note that we don't attach an Index to the returned Error. + // a deferred WriteTooOld error. Return that now. // // TODO(tbg): we could attach the index of the first WriteTooOldError seen, but does // that buy us anything? - if writeTooOldState.cantDeferWTOE { + if deferredWriteTooOldErr != nil { // NB: we can't do any error wrapping here yet due to compatibility with 20.2 nodes; // there needs to be an ErrorDetail here. - return nil, mergedResult, kvpb.NewErrorWithTxn(writeTooOldState.err, baHeader.Txn) + // TODO(nvanbenschoten): this comment is now stale. Address it. + return nil, mergedResult, kvpb.NewErrorWithTxn(deferredWriteTooOldErr, baHeader.Txn) } - // The batch evaluation will not return an error (i.e. either everything went - // fine or we're deferring a WriteTooOldError by having bumped - // baHeader.Txn.WriteTimestamp). - // Update the batch response timestamp field to the timestamp at which the // batch's reads were evaluated. if baHeader.Txn != nil { @@ -553,12 +498,13 @@ func evaluateCommand( return pd, err } -// canDoServersideRetry looks at the error produced by evaluating ba (or the -// WriteTooOldFlag in br.Txn if there's no error) and decides if it's possible -// to retry the batch evaluation at a higher timestamp. Retrying is sometimes -// possible in case of some retriable errors which ask for higher timestamps: -// for transactional requests, retrying is possible if the transaction had not -// performed any prior reads that need refreshing. +// canDoServersideRetry looks at the error produced by evaluating ba and decides +// if it's possible to retry the batch evaluation at a higher timestamp. +// +// Retrying is sometimes possible in case of some retriable errors which ask for +// higher timestamps. For transactional requests, retrying is possible if the +// transaction had not performed any prior reads that need refreshing. For +// non-transactional requests, retrying is always possible. // // This function is called both below and above latching, which is indicated by // the concurrency guard argument. The concurrency guard, if not nil, indicates @@ -578,10 +524,12 @@ func canDoServersideRetry( ctx context.Context, pErr *kvpb.Error, ba *kvpb.BatchRequest, - br *kvpb.BatchResponse, g *concurrency.Guard, deadline hlc.Timestamp, ) bool { + if pErr == nil { + log.Fatalf(ctx, "canDoServersideRetry called without error") + } if ba.Txn != nil { if !ba.CanForwardReadTimestamp { return false @@ -597,22 +545,12 @@ func canDoServersideRetry( var newTimestamp hlc.Timestamp if ba.Txn != nil { - if pErr != nil { - var ok bool - ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr) - if !ok { - return false - } - } else { - if !br.Txn.WriteTooOld { - log.Fatalf(ctx, "expected the WriteTooOld flag to be set") - } - newTimestamp = br.Txn.WriteTimestamp + var ok bool + ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr) + if !ok { + return false } } else { - if pErr == nil { - log.Fatalf(ctx, "canDoServersideRetry called for non-txn request without error") - } switch tErr := pErr.GetDetail().(type) { case *kvpb.WriteTooOldError: newTimestamp = tErr.RetryTimestamp() diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 999b49da1772..deea21189649 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -470,7 +470,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( // retry at a higher timestamp because it is not isolated at higher // timestamps. latchesHeld := g != nil - if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{}) { + if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, g, hlc.Timestamp{}) { // TODO(aayush,arul): These metrics are incorrect at the moment since // hitting this branch does not mean that we won't serverside retry, it // just means that we will have to reacquire latches. diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index c86a2691192e..fdc15eddba5c 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -806,7 +806,7 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError( // Attempt a server-side retry of the request. Note that we pass nil for // latchSpans, because we have already released our latches and plan to // re-acquire them if the retry is allowed. - if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, hlc.Timestamp{} /* deadline */) { + if !canDoServersideRetry(ctx, pErr, ba, nil /* g */, hlc.Timestamp{} /* deadline */) { r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1) return nil, pErr } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index ab3c459b03d5..c737db4e2dc1 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -4733,13 +4733,19 @@ func TestRPCRetryProtectionInTxn(t *testing.T) { // Replay the request. It initially tries to execute as a 1PC transaction, // but will fail because of a WriteTooOldError that pushes the transaction. - // This forces the txn to execute normally, at which point it fails because - // the EndTxn is detected to be a duplicate. + // This forces the txn to execute normally, at which point it fails, either + // because of a WriteTooOld error, if it cannot server-side refresh, or + // because the EndTxn is detected to be a duplicate, if it can server-side + // refresh to avoid the WriteTooOld error. Either way, it fails. _, pErr = tc.Sender().Send(ctx, ba) require.NotNil(t, pErr) - require.Regexp(t, - `TransactionAbortedError\(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY\)`, - pErr) + var expRx string + if noPriorReads { + expRx = `TransactionAbortedError\(ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY\)` + } else { + expRx = `WriteTooOldError` + } + require.Regexp(t, expRx, pErr) }) } @@ -4817,7 +4823,7 @@ func TestBatchRetryCantCommitIntents(t *testing.T) { // Send a put for keyA. ba := &kvpb.BatchRequest{} put := putArgs(key, []byte("value")) - ba.Header = kvpb.Header{Txn: txn} + ba.Header = kvpb.Header{Txn: txn, CanForwardReadTimestamp: true} ba.Add(&put) assignSeqNumsForReqs(txn, &put) if err := ba.SetActiveTimestamp(tc.Clock()); err != nil { @@ -4866,10 +4872,12 @@ func TestBatchRetryCantCommitIntents(t *testing.T) { } // Now replay put for key A; this succeeds as there's nothing to detect - // the replay. The WriteTooOld flag will be set though. + // the replay and server-side refreshes were permitted. The transaction's + // timestamp will be pushed due to the server-side refresh. + preReplayTxn := ba.Txn.Clone() br, pErr = tc.Sender().Send(ctx, ba) require.NoError(t, pErr.GoError()) - require.True(t, br.Txn.WriteTooOld) + require.True(t, preReplayTxn.ReadTimestamp.Less(br.Txn.ReadTimestamp)) // Intent should have been created. gArgs := getArgs(key) diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 9c8f75973d70..ace73130dfa3 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -667,21 +667,13 @@ func (r *Replica) evaluateWriteBatchWithServersideRefreshes( batch, br, res, pErr = r.evaluateWriteBatchWrapper(ctx, idKey, rec, ms, ba, g, st, ui) - var success bool - if pErr == nil { - wto := br.Txn != nil && br.Txn.WriteTooOld - success = !wto - } else { - success = false - } - // Allow one retry only; a non-txn batch containing overlapping // spans will always experience WriteTooOldError. - if success || retries > 0 { + if pErr == nil || retries > 0 { break } // If we can retry, set a higher batch timestamp and continue. - if !canDoServersideRetry(ctx, pErr, ba, br, g, deadline) { + if !canDoServersideRetry(ctx, pErr, ba, g, deadline) { r.store.Metrics().WriteEvaluationServerSideRetryFailure.Inc(1) break } else { diff --git a/pkg/kv/kvserver/txn_recovery_integration_test.go b/pkg/kv/kvserver/txn_recovery_integration_test.go index ab4b25a05f8e..af78ce8fa482 100644 --- a/pkg/kv/kvserver/txn_recovery_integration_test.go +++ b/pkg/kv/kvserver/txn_recovery_integration_test.go @@ -44,12 +44,6 @@ func TestTxnRecoveryFromStaging(t *testing.T) { // implicitCommit says whether we expect the transaction to satisfy the // implicit-commit condition. implicitCommit bool - // If implicitCommit is false, writeTooOld dictates what kind of push will - // be experienced by one of the txn's intents. An intent being pushed is the - // reason why the implicit-commit condition is expected to fail. We simulate - // both pushes by the timestamp cache, and by deferred write-too-old - // conditions. - writeTooOld bool // futureWrites dictates whether the transaction has been writing at the // present time or whether it has been writing into the future with a // synthetic timestamp. @@ -60,11 +54,6 @@ func TestTxnRecoveryFromStaging(t *testing.T) { }, { implicitCommit: false, - writeTooOld: false, - }, - { - implicitCommit: false, - writeTooOld: true, }, { implicitCommit: true, @@ -72,16 +61,10 @@ func TestTxnRecoveryFromStaging(t *testing.T) { }, { implicitCommit: false, - writeTooOld: false, - futureWrites: true, - }, - { - implicitCommit: false, - writeTooOld: true, futureWrites: true, }, } { - name := fmt.Sprintf("%d-commit:%t,writeTooOld:%t,futureWrites:%t", i, tc.implicitCommit, tc.writeTooOld, tc.futureWrites) + name := fmt.Sprintf("%d-commit:%t,futureWrites:%t", i, tc.implicitCommit, tc.futureWrites) t.Run(name, func(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) @@ -120,16 +103,9 @@ func TestTxnRecoveryFromStaging(t *testing.T) { // commit state. conflictH := kvpb.Header{Timestamp: txn.WriteTimestamp.Next()} if !tc.implicitCommit { - if !tc.writeTooOld { - gArgs := getArgs(keyB) - if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), conflictH, &gArgs); pErr != nil { - t.Fatal(pErr) - } - } else { - pArgs = putArgs(keyB, []byte("pusher val")) - if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), conflictH, &pArgs); pErr != nil { - t.Fatal(pErr) - } + gArgs := getArgs(keyB) + if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), conflictH, &gArgs); pErr != nil { + t.Fatal(pErr) } } diff --git a/pkg/roachpb/data.proto b/pkg/roachpb/data.proto index 9054cd8bdd84..75069b6a1b3b 100644 --- a/pkg/roachpb/data.proto +++ b/pkg/roachpb/data.proto @@ -454,6 +454,9 @@ message Transaction { // with a bumped write timestamp, so this flag is only telling us that a // refresh is less likely to succeed than in other cases where // ReadTimestamp != WriteTimestamp. + // + // TODO(nvanbenschoten): this flag is no longer assigned by v23.2 nodes. + // Remove it when compatibility with v23.1 nodes is no longer a concern. bool write_too_old = 12; // Set of spans that the transaction has acquired locks within. These are // spans which must be resolved on txn completion. Note that these spans