From 0acc506802a5c2702ff0ab13281ae48af456adbf Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 7 Dec 2021 12:40:14 -0500 Subject: [PATCH] kv: unify client and server-side refresh code paths This commit is a pure refactor that unifies various code paths that perform transaction refreshing. It consolidates the conditions used to check whether a refresh is permitted for a Transaction, the mechanism through which a Transaction protobuf is updated during a Refresh (`Transaction.Refresh`), and, most importantly, it consolidates the logic that determines the necessary refresh timestamp from a `roachpb.Error` (`TransactionRefreshTimestamp`). This lays the groundwork to eventually be able to perform server-side refreshes for `ReadWithinUncertaintyIntervalError`s. However, there is a bit of complexity with doing so because of read latches, which is mentioned in a comment. --- .../kvcoord/txn_interceptor_span_refresher.go | 36 ++++++++++--- .../kvserver/batcheval/cmd_end_transaction.go | 11 ++-- pkg/kv/kvserver/replica_batch_updates.go | 17 +++--- pkg/kv/kvserver/replica_evaluate.go | 54 ++++++++++--------- pkg/roachpb/data.go | 31 ++++------- 5 files changed, 81 insertions(+), 68 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 7755422b1e01..3f878f874578 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -295,11 +295,18 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int, ) (*roachpb.BatchResponse, *roachpb.Error) { - // Check for an error which can be retried after updating spans. - canRefreshTxn, refreshTxn := roachpb.CanTransactionRefresh(ctx, pErr) - if !canRefreshTxn || !sr.canAutoRetry { + txn := pErr.GetTxn() + if txn == nil || !sr.canForwardReadTimestamp(txn) { return nil, pErr } + // Check for an error which can be refreshed away to avoid a client-side + // transaction restart. + ok, refreshTS := roachpb.TransactionRefreshTimestamp(pErr) + if !ok { + return nil, pErr + } + refreshTxn := txn.Clone() + refreshTxn.Refresh(refreshTS) log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr) // Try updating the txn spans so we can retry. @@ -445,10 +452,14 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked( return ba, nil } - canRefreshTxn, refreshTxn := roachpb.PrepareTransactionForRefresh(ba.Txn, ba.Txn.WriteTimestamp) - if !canRefreshTxn || !sr.canAutoRetry { - return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn) + // If the transaction cannot change its read timestamp, no refresh is + // possible. + if !sr.canForwardReadTimestamp(ba.Txn) { + return ba, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn) } + + refreshTxn := ba.Txn.Clone() + refreshTxn.Refresh(ba.Txn.WriteTimestamp) log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s", refreshTxn.ReadTimestamp, ba) @@ -570,6 +581,15 @@ func (sr *txnSpanRefresher) appendRefreshSpans( return nil } +// canForwardReadTimestampWithoutRefresh returns whether the transaction can +// forward its read timestamp after refreshing all the reads that has performed +// to this point. This requires that the transaction's timestamp has not leaked. +// It also requires that the txnSpanRefresher has been configured to allow +// auto-retries. +func (sr *txnSpanRefresher) canForwardReadTimestamp(txn *roachpb.Transaction) bool { + return !txn.CommitTimestampFixed && sr.canAutoRetry +} + // canForwardReadTimestampWithoutRefresh returns whether the transaction can // forward its read timestamp without refreshing any read spans. This allows for // the "server-side refresh" optimization, where batches are re-evaluated at a @@ -581,9 +601,9 @@ func (sr *txnSpanRefresher) appendRefreshSpans( // is required. // // Note that when deciding whether a transaction can be bumped to a particular -// timestamp, the transaction's deadling must also be taken into account. +// timestamp, the transaction's deadline must also be taken into account. func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool { - return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed + return sr.canForwardReadTimestamp(txn) && !sr.refreshInvalid && sr.refreshFootprint.empty() } // forwardRefreshTimestampOnResponse updates the refresher's tracked diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index c02fb52db4a2..31211c7dcf63 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -387,10 +387,11 @@ func EndTxn( return txnResult, nil } -// IsEndTxnExceedingDeadline returns true if the transaction exceeded its -// deadline. -func IsEndTxnExceedingDeadline(t hlc.Timestamp, args *roachpb.EndTxnRequest) bool { - return args.Deadline != nil && args.Deadline.LessEq(t) +// IsEndTxnExceedingDeadline returns true if the transaction's provisional +// commit timestamp exceeded its deadline. If so, the transaction should not be +// allowed to commit. +func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline *hlc.Timestamp) bool { + return deadline != nil && deadline.LessEq(commitTS) } // IsEndTxnTriggeringRetryError returns true if the EndTxnRequest cannot be @@ -415,7 +416,7 @@ func IsEndTxnTriggeringRetryError( } // A transaction must obey its deadline, if set. - if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args) { + if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args.Deadline) { exceededBy := txn.WriteTimestamp.GoTime().Sub(args.Deadline.GoTime()) extraMsg = fmt.Sprintf( "txn timestamp pushed too much; deadline exceeded by %s (%s > %s)", diff --git a/pkg/kv/kvserver/replica_batch_updates.go b/pkg/kv/kvserver/replica_batch_updates.go index d6f35fc8fa1c..70269d523d21 100644 --- a/pkg/kv/kvserver/replica_batch_updates.go +++ b/pkg/kv/kvserver/replica_batch_updates.go @@ -182,6 +182,9 @@ func maybeBumpReadTimestampToWriteTimestamp( if ba.Txn == nil { return false } + if !ba.CanForwardReadTimestamp { + return false + } if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp { return false } @@ -189,11 +192,11 @@ func maybeBumpReadTimestampToWriteTimestamp( if !ok { return false } - etArg := arg.(*roachpb.EndTxnRequest) - if ba.CanForwardReadTimestamp && !batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, etArg) { - return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans) + et := arg.(*roachpb.EndTxnRequest) + if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) { + return false } - return false + return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans) } // tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts. @@ -230,7 +233,7 @@ func tryBumpBatchTimestamp( log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp) } ba.Timestamp = ts - if txn := ba.Txn; txn == nil { + if ba.Txn == nil { return true } if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) { @@ -240,8 +243,6 @@ func tryBumpBatchTimestamp( log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) ba.Txn = ba.Txn.Clone() - ba.Txn.ReadTimestamp = ts - ba.Txn.WriteTimestamp = ts - ba.Txn.WriteTooOld = false + ba.Txn.Refresh(ts) return true } diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 28dc9583dc10..9ece8125038a 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -544,41 +544,45 @@ func canDoServersideRetry( deadline = et.Deadline } } + var newTimestamp hlc.Timestamp + if ba.Txn != nil { + if pErr != nil { + // TODO(nvanbenschoten): I am intentionally not allowing server-side + // refreshes of ReadWithinUncertaintyIntervalErrors for now, even though + // that is the eventual goal here. I'd like to lift this limitation in a + // dedicated commit. The commit will likely need to be accompanied by an + // above-latching retry loop, because read latches will usually prevent + // below-latch retries of ReadWithinUncertaintyIntervalErrors. See the + // comment in tryBumpBatchTimestamp. + if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok { + return false + } - if pErr != nil { + var ok bool + ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr) + if !ok { + return false + } + } else { + if !br.Txn.WriteTooOld { + log.Fatalf(ctx, "expected the WriteTooOld flag to be set") + } + newTimestamp = br.Txn.WriteTimestamp + } + } else { + if pErr == nil { + log.Fatalf(ctx, "canDoServersideRetry called for non-txn request without error") + } switch tErr := pErr.GetDetail().(type) { case *roachpb.WriteTooOldError: - // Locking scans hit WriteTooOld errors if they encounter values at - // timestamps higher than their read timestamps. The encountered - // timestamps are guaranteed to be greater than the txn's read - // timestamp, but not its write timestamp. So, when determining what - // the new timestamp should be, we make sure to not regress the - // txn's write timestamp. newTimestamp = tErr.ActualTimestamp - if ba.Txn != nil { - newTimestamp.Forward(pErr.GetTxn().WriteTimestamp) - } - case *roachpb.TransactionRetryError: - if ba.Txn == nil { - // TODO(andrei): I don't know if TransactionRetryError is possible for - // non-transactional batches, but some tests inject them for 1PC - // transactions. I'm not sure how to deal with them, so let's not retry. - return false - } - newTimestamp = pErr.GetTxn().WriteTimestamp default: - // TODO(andrei): Handle other retriable errors too. return false } - } else { - if !br.Txn.WriteTooOld { - log.Fatalf(ctx, "programming error: expected the WriteTooOld flag to be set") - } - newTimestamp = br.Txn.WriteTimestamp } - if deadline != nil && deadline.LessEq(newTimestamp) { + if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) { return false } return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans) diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 673454cda1d7..e5eb08f6bfee 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -1494,33 +1494,20 @@ func PrepareTransactionForRetry( return txn } -// PrepareTransactionForRefresh returns whether the transaction can be refreshed -// to the specified timestamp to avoid a client-side transaction restart. If -// true, returns a cloned, updated Transaction object with the provisional -// commit timestamp and read timestamp set appropriately. -func PrepareTransactionForRefresh(txn *Transaction, timestamp hlc.Timestamp) (bool, *Transaction) { - if txn.CommitTimestampFixed { - return false, nil - } - newTxn := txn.Clone() - newTxn.Refresh(timestamp) - return true, newTxn -} - -// CanTransactionRefresh returns whether the transaction specified in the -// supplied error can be retried at a refreshed timestamp to avoid a client-side -// transaction restart. If true, returns a cloned, updated Transaction object -// with the provisional commit timestamp and read timestamp set appropriately. -func CanTransactionRefresh(ctx context.Context, pErr *Error) (bool, *Transaction) { +// TransactionRefreshTimestamp returns whether the supplied error is a retry +// error that can be discarded if the transaction in the error is refreshed. If +// true, the function returns the timestamp that the Transaction object should +// be refreshed at in order to discard the error and avoid a restart. +func TransactionRefreshTimestamp(pErr *Error) (bool, hlc.Timestamp) { txn := pErr.GetTxn() if txn == nil { - return false, nil + return false, hlc.Timestamp{} } timestamp := txn.WriteTimestamp switch err := pErr.GetDetail().(type) { case *TransactionRetryError: if err.Reason != RETRY_SERIALIZABLE && err.Reason != RETRY_WRITE_TOO_OLD { - return false, nil + return false, hlc.Timestamp{} } case *WriteTooOldError: // TODO(andrei): Chances of success for on write-too-old conditions might be @@ -1532,9 +1519,9 @@ func CanTransactionRefresh(ctx context.Context, pErr *Error) (bool, *Transaction case *ReadWithinUncertaintyIntervalError: timestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(err)) default: - return false, nil + return false, hlc.Timestamp{} } - return PrepareTransactionForRefresh(txn, timestamp) + return true, timestamp } func readWithinUncertaintyIntervalRetryTimestamp(