Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: unify client and server-side refresh code paths #73557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,18 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Check for an error which can be retried after updating spans.
canRefreshTxn, refreshTxn := roachpb.CanTransactionRefresh(ctx, pErr)
if !canRefreshTxn || !sr.canAutoRetry {
txn := pErr.GetTxn()
if txn == nil || !sr.canForwardReadTimestamp(txn) {
return nil, pErr
}
// Check for an error which can be refreshed away to avoid a client-side
// transaction restart.
ok, refreshTS := roachpb.TransactionRefreshTimestamp(pErr)
if !ok {
return nil, pErr
}
refreshTxn := txn.Clone()
refreshTxn.Refresh(refreshTS)
log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr)

// Try updating the txn spans so we can retry.
Expand Down Expand Up @@ -445,10 +452,14 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked(
return ba, nil
}

canRefreshTxn, refreshTxn := roachpb.PrepareTransactionForRefresh(ba.Txn, ba.Txn.WriteTimestamp)
if !canRefreshTxn || !sr.canAutoRetry {
return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn)
// If the transaction cannot change its read timestamp, no refresh is
// possible.
if !sr.canForwardReadTimestamp(ba.Txn) {
return ba, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn)
}

refreshTxn := ba.Txn.Clone()
refreshTxn.Refresh(ba.Txn.WriteTimestamp)
log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s",
refreshTxn.ReadTimestamp, ba)

Expand Down Expand Up @@ -477,6 +488,9 @@ func newRetryErrorOnFailedPreemptiveRefresh(txn *roachpb.Transaction) *roachpb.E
// than sr.refreshedTimestamp. All implicated timestamp caches are updated with
// the final transaction timestamp. Returns whether the refresh was successful
// or not.
//
// The provided transaction should be a Clone() of the original transaction with
// its ReadTimestamp adjusted by the Refresh() method.
func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
ctx context.Context, refreshTxn *roachpb.Transaction,
) (ok bool) {
Expand Down Expand Up @@ -570,6 +584,15 @@ func (sr *txnSpanRefresher) appendRefreshSpans(
return nil
}

// canForwardReadTimestampWithoutRefresh returns whether the transaction can
// forward its read timestamp after refreshing all the reads that has performed
// to this point. This requires that the transaction's timestamp has not leaked.
// It also requires that the txnSpanRefresher has been configured to allow
// auto-retries.
func (sr *txnSpanRefresher) canForwardReadTimestamp(txn *roachpb.Transaction) bool {
return sr.canAutoRetry && !txn.CommitTimestampFixed
}

// canForwardReadTimestampWithoutRefresh returns whether the transaction can
// forward its read timestamp without refreshing any read spans. This allows for
// the "server-side refresh" optimization, where batches are re-evaluated at a
Expand All @@ -581,9 +604,9 @@ func (sr *txnSpanRefresher) appendRefreshSpans(
// is required.
//
// Note that when deciding whether a transaction can be bumped to a particular
// timestamp, the transaction's deadling must also be taken into account.
// timestamp, the transaction's deadline must also be taken into account.
func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool {
return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed
return sr.canForwardReadTimestamp(txn) && !sr.refreshInvalid && sr.refreshFootprint.empty()
}

// forwardRefreshTimestampOnResponse updates the refresher's tracked
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,11 @@ func EndTxn(
return txnResult, nil
}

// IsEndTxnExceedingDeadline returns true if the transaction exceeded its
// deadline.
func IsEndTxnExceedingDeadline(t hlc.Timestamp, args *roachpb.EndTxnRequest) bool {
return args.Deadline != nil && args.Deadline.LessEq(t)
// IsEndTxnExceedingDeadline returns true if the transaction's provisional
// commit timestamp exceeded its deadline. If so, the transaction should not be
// allowed to commit.
func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline *hlc.Timestamp) bool {
return deadline != nil && deadline.LessEq(commitTS)
}

// IsEndTxnTriggeringRetryError returns true if the EndTxnRequest cannot be
Expand All @@ -415,7 +416,7 @@ func IsEndTxnTriggeringRetryError(
}

// A transaction must obey its deadline, if set.
if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args) {
if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args.Deadline) {
exceededBy := txn.WriteTimestamp.GoTime().Sub(args.Deadline.GoTime())
extraMsg = fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s)",
Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,21 @@ func maybeBumpReadTimestampToWriteTimestamp(
if ba.Txn == nil {
return false
}
if !ba.CanForwardReadTimestamp {
return false
}
if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
return false
}
arg, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return false
}
etArg := arg.(*roachpb.EndTxnRequest)
if ba.CanForwardReadTimestamp && !batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, etArg) {
return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans)
et := arg.(*roachpb.EndTxnRequest)
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
}
return false
return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans)
}

// tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts.
Expand Down Expand Up @@ -230,7 +233,7 @@ func tryBumpBatchTimestamp(
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
ba.Timestamp = ts
if txn := ba.Txn; txn == nil {
if ba.Txn == nil {
return true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
Expand All @@ -240,8 +243,6 @@ func tryBumpBatchTimestamp(
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba.Txn = ba.Txn.Clone()
ba.Txn.ReadTimestamp = ts
ba.Txn.WriteTimestamp = ts
ba.Txn.WriteTooOld = false
ba.Txn.Refresh(ts)
return true
}
54 changes: 29 additions & 25 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,41 +544,45 @@ func canDoServersideRetry(
deadline = et.Deadline
}
}

var newTimestamp hlc.Timestamp
if ba.Txn != nil {
if pErr != nil {
// TODO(nvanbenschoten): This is intentionally not allowing server-side
// refreshes of ReadWithinUncertaintyIntervalErrors for now, even though
// that is the eventual goal here. Lifting that limitation will likely
// need to be accompanied by an above-latching retry loop, because read
// latches will usually prevent below-latch retries of
// ReadWithinUncertaintyIntervalErrors. See the comment in
// tryBumpBatchTimestamp.
if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok {
return false
}

if pErr != nil {
var ok bool
ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
}
} else {
if !br.Txn.WriteTooOld {
log.Fatalf(ctx, "expected the WriteTooOld flag to be set")
}
newTimestamp = br.Txn.WriteTimestamp
}
} else {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called for non-txn request without error")
}
switch tErr := pErr.GetDetail().(type) {
case *roachpb.WriteTooOldError:
// Locking scans hit WriteTooOld errors if they encounter values at
// timestamps higher than their read timestamps. The encountered
// timestamps are guaranteed to be greater than the txn's read
// timestamp, but not its write timestamp. So, when determining what
// the new timestamp should be, we make sure to not regress the
// txn's write timestamp.
newTimestamp = tErr.ActualTimestamp
if ba.Txn != nil {
newTimestamp.Forward(pErr.GetTxn().WriteTimestamp)
}
case *roachpb.TransactionRetryError:
if ba.Txn == nil {
// TODO(andrei): I don't know if TransactionRetryError is possible for
// non-transactional batches, but some tests inject them for 1PC
// transactions. I'm not sure how to deal with them, so let's not retry.
return false
}
newTimestamp = pErr.GetTxn().WriteTimestamp
default:
// TODO(andrei): Handle other retriable errors too.
return false
}
} else {
if !br.Txn.WriteTooOld {
log.Fatalf(ctx, "programming error: expected the WriteTooOld flag to be set")
}
newTimestamp = br.Txn.WriteTimestamp
}

if deadline != nil && deadline.LessEq(newTimestamp) {
if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans)
Expand Down
31 changes: 9 additions & 22 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,33 +1494,20 @@ func PrepareTransactionForRetry(
return txn
}

// PrepareTransactionForRefresh returns whether the transaction can be refreshed
// to the specified timestamp to avoid a client-side transaction restart. If
// true, returns a cloned, updated Transaction object with the provisional
// commit timestamp and read timestamp set appropriately.
func PrepareTransactionForRefresh(txn *Transaction, timestamp hlc.Timestamp) (bool, *Transaction) {
if txn.CommitTimestampFixed {
return false, nil
}
newTxn := txn.Clone()
newTxn.Refresh(timestamp)
return true, newTxn
}

// CanTransactionRefresh returns whether the transaction specified in the
// supplied error can be retried at a refreshed timestamp to avoid a client-side
// transaction restart. If true, returns a cloned, updated Transaction object
// with the provisional commit timestamp and read timestamp set appropriately.
func CanTransactionRefresh(ctx context.Context, pErr *Error) (bool, *Transaction) {
// TransactionRefreshTimestamp returns whether the supplied error is a retry
// error that can be discarded if the transaction in the error is refreshed. If
// true, the function returns the timestamp that the Transaction object should
// be refreshed at in order to discard the error and avoid a restart.
func TransactionRefreshTimestamp(pErr *Error) (bool, hlc.Timestamp) {
txn := pErr.GetTxn()
if txn == nil {
return false, nil
return false, hlc.Timestamp{}
}
timestamp := txn.WriteTimestamp
switch err := pErr.GetDetail().(type) {
case *TransactionRetryError:
if err.Reason != RETRY_SERIALIZABLE && err.Reason != RETRY_WRITE_TOO_OLD {
return false, nil
return false, hlc.Timestamp{}
}
case *WriteTooOldError:
// TODO(andrei): Chances of success for on write-too-old conditions might be
Expand All @@ -1532,9 +1519,9 @@ func CanTransactionRefresh(ctx context.Context, pErr *Error) (bool, *Transaction
case *ReadWithinUncertaintyIntervalError:
timestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(err))
default:
return false, nil
return false, hlc.Timestamp{}
}
return PrepareTransactionForRefresh(txn, timestamp)
return true, timestamp
}

func readWithinUncertaintyIntervalRetryTimestamp(
Expand Down