Skip to content

Commit

Permalink
kv: unify client and server-side refresh code paths
Browse files Browse the repository at this point in the history
This commit is a pure refactor that unifies various code paths that perform
transaction refreshing. It consolidates the conditions used to check whether a
refresh is permitted for a Transaction, the mechanism through which a
Transaction protobuf is updated during a Refresh (`Transaction.Refresh`), and,
most importantly, it consolidates the logic that determines the necessary
refresh timestamp from a `roachpb.Error` (`TransactionRefreshTimestamp`).

This lays the groundwork to eventually be able to perform server-side refreshes
for `ReadWithinUncertaintyIntervalError`s. However, there is a bit of complexity
with doing so because of read latches, which is mentioned in a comment.
  • Loading branch information
nvanbenschoten committed Dec 7, 2021
1 parent 39923c0 commit 0acc506
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 68 deletions.
36 changes: 28 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,18 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Check for an error which can be retried after updating spans.
canRefreshTxn, refreshTxn := roachpb.CanTransactionRefresh(ctx, pErr)
if !canRefreshTxn || !sr.canAutoRetry {
txn := pErr.GetTxn()
if txn == nil || !sr.canForwardReadTimestamp(txn) {
return nil, pErr
}
// Check for an error which can be refreshed away to avoid a client-side
// transaction restart.
ok, refreshTS := roachpb.TransactionRefreshTimestamp(pErr)
if !ok {
return nil, pErr
}
refreshTxn := txn.Clone()
refreshTxn.Refresh(refreshTS)
log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr)

// Try updating the txn spans so we can retry.
Expand Down Expand Up @@ -445,10 +452,14 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked(
return ba, nil
}

canRefreshTxn, refreshTxn := roachpb.PrepareTransactionForRefresh(ba.Txn, ba.Txn.WriteTimestamp)
if !canRefreshTxn || !sr.canAutoRetry {
return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn)
// If the transaction cannot change its read timestamp, no refresh is
// possible.
if !sr.canForwardReadTimestamp(ba.Txn) {
return ba, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn)
}

refreshTxn := ba.Txn.Clone()
refreshTxn.Refresh(ba.Txn.WriteTimestamp)
log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s",
refreshTxn.ReadTimestamp, ba)

Expand Down Expand Up @@ -570,6 +581,15 @@ func (sr *txnSpanRefresher) appendRefreshSpans(
return nil
}

// canForwardReadTimestampWithoutRefresh returns whether the transaction can
// forward its read timestamp after refreshing all the reads that has performed
// to this point. This requires that the transaction's timestamp has not leaked.
// It also requires that the txnSpanRefresher has been configured to allow
// auto-retries.
func (sr *txnSpanRefresher) canForwardReadTimestamp(txn *roachpb.Transaction) bool {
return !txn.CommitTimestampFixed && sr.canAutoRetry
}

// canForwardReadTimestampWithoutRefresh returns whether the transaction can
// forward its read timestamp without refreshing any read spans. This allows for
// the "server-side refresh" optimization, where batches are re-evaluated at a
Expand All @@ -581,9 +601,9 @@ func (sr *txnSpanRefresher) appendRefreshSpans(
// is required.
//
// Note that when deciding whether a transaction can be bumped to a particular
// timestamp, the transaction's deadling must also be taken into account.
// timestamp, the transaction's deadline must also be taken into account.
func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool {
return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed
return sr.canForwardReadTimestamp(txn) && !sr.refreshInvalid && sr.refreshFootprint.empty()
}

// forwardRefreshTimestampOnResponse updates the refresher's tracked
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,11 @@ func EndTxn(
return txnResult, nil
}

// IsEndTxnExceedingDeadline returns true if the transaction exceeded its
// deadline.
func IsEndTxnExceedingDeadline(t hlc.Timestamp, args *roachpb.EndTxnRequest) bool {
return args.Deadline != nil && args.Deadline.LessEq(t)
// IsEndTxnExceedingDeadline returns true if the transaction's provisional
// commit timestamp exceeded its deadline. If so, the transaction should not be
// allowed to commit.
func IsEndTxnExceedingDeadline(commitTS hlc.Timestamp, deadline *hlc.Timestamp) bool {
return deadline != nil && deadline.LessEq(commitTS)
}

// IsEndTxnTriggeringRetryError returns true if the EndTxnRequest cannot be
Expand All @@ -415,7 +416,7 @@ func IsEndTxnTriggeringRetryError(
}

// A transaction must obey its deadline, if set.
if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args) {
if !retry && IsEndTxnExceedingDeadline(txn.WriteTimestamp, args.Deadline) {
exceededBy := txn.WriteTimestamp.GoTime().Sub(args.Deadline.GoTime())
extraMsg = fmt.Sprintf(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s)",
Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,21 @@ func maybeBumpReadTimestampToWriteTimestamp(
if ba.Txn == nil {
return false
}
if !ba.CanForwardReadTimestamp {
return false
}
if ba.Txn.ReadTimestamp == ba.Txn.WriteTimestamp {
return false
}
arg, ok := ba.GetArg(roachpb.EndTxn)
if !ok {
return false
}
etArg := arg.(*roachpb.EndTxnRequest)
if ba.CanForwardReadTimestamp && !batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, etArg) {
return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans)
et := arg.(*roachpb.EndTxnRequest)
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
}
return false
return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans)
}

// tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts.
Expand Down Expand Up @@ -230,7 +233,7 @@ func tryBumpBatchTimestamp(
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
ba.Timestamp = ts
if txn := ba.Txn; txn == nil {
if ba.Txn == nil {
return true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
Expand All @@ -240,8 +243,6 @@ func tryBumpBatchTimestamp(
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba.Txn = ba.Txn.Clone()
ba.Txn.ReadTimestamp = ts
ba.Txn.WriteTimestamp = ts
ba.Txn.WriteTooOld = false
ba.Txn.Refresh(ts)
return true
}
54 changes: 29 additions & 25 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,41 +544,45 @@ func canDoServersideRetry(
deadline = et.Deadline
}
}

var newTimestamp hlc.Timestamp
if ba.Txn != nil {
if pErr != nil {
// TODO(nvanbenschoten): I am intentionally not allowing server-side
// refreshes of ReadWithinUncertaintyIntervalErrors for now, even though
// that is the eventual goal here. I'd like to lift this limitation in a
// dedicated commit. The commit will likely need to be accompanied by an
// above-latching retry loop, because read latches will usually prevent
// below-latch retries of ReadWithinUncertaintyIntervalErrors. See the
// comment in tryBumpBatchTimestamp.
if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok {
return false
}

if pErr != nil {
var ok bool
ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
}
} else {
if !br.Txn.WriteTooOld {
log.Fatalf(ctx, "expected the WriteTooOld flag to be set")
}
newTimestamp = br.Txn.WriteTimestamp
}
} else {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called for non-txn request without error")
}
switch tErr := pErr.GetDetail().(type) {
case *roachpb.WriteTooOldError:
// Locking scans hit WriteTooOld errors if they encounter values at
// timestamps higher than their read timestamps. The encountered
// timestamps are guaranteed to be greater than the txn's read
// timestamp, but not its write timestamp. So, when determining what
// the new timestamp should be, we make sure to not regress the
// txn's write timestamp.
newTimestamp = tErr.ActualTimestamp
if ba.Txn != nil {
newTimestamp.Forward(pErr.GetTxn().WriteTimestamp)
}
case *roachpb.TransactionRetryError:
if ba.Txn == nil {
// TODO(andrei): I don't know if TransactionRetryError is possible for
// non-transactional batches, but some tests inject them for 1PC
// transactions. I'm not sure how to deal with them, so let's not retry.
return false
}
newTimestamp = pErr.GetTxn().WriteTimestamp
default:
// TODO(andrei): Handle other retriable errors too.
return false
}
} else {
if !br.Txn.WriteTooOld {
log.Fatalf(ctx, "programming error: expected the WriteTooOld flag to be set")
}
newTimestamp = br.Txn.WriteTimestamp
}

if deadline != nil && deadline.LessEq(newTimestamp) {
if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans)
Expand Down
31 changes: 9 additions & 22 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,33 +1494,20 @@ func PrepareTransactionForRetry(
return txn
}

// PrepareTransactionForRefresh returns whether the transaction can be refreshed
// to the specified timestamp to avoid a client-side transaction restart. If
// true, returns a cloned, updated Transaction object with the provisional
// commit timestamp and read timestamp set appropriately.
func PrepareTransactionForRefresh(txn *Transaction, timestamp hlc.Timestamp) (bool, *Transaction) {
if txn.CommitTimestampFixed {
return false, nil
}
newTxn := txn.Clone()
newTxn.Refresh(timestamp)
return true, newTxn
}

// CanTransactionRefresh returns whether the transaction specified in the
// supplied error can be retried at a refreshed timestamp to avoid a client-side
// transaction restart. If true, returns a cloned, updated Transaction object
// with the provisional commit timestamp and read timestamp set appropriately.
func CanTransactionRefresh(ctx context.Context, pErr *Error) (bool, *Transaction) {
// TransactionRefreshTimestamp returns whether the supplied error is a retry
// error that can be discarded if the transaction in the error is refreshed. If
// true, the function returns the timestamp that the Transaction object should
// be refreshed at in order to discard the error and avoid a restart.
func TransactionRefreshTimestamp(pErr *Error) (bool, hlc.Timestamp) {
txn := pErr.GetTxn()
if txn == nil {
return false, nil
return false, hlc.Timestamp{}
}
timestamp := txn.WriteTimestamp
switch err := pErr.GetDetail().(type) {
case *TransactionRetryError:
if err.Reason != RETRY_SERIALIZABLE && err.Reason != RETRY_WRITE_TOO_OLD {
return false, nil
return false, hlc.Timestamp{}
}
case *WriteTooOldError:
// TODO(andrei): Chances of success for on write-too-old conditions might be
Expand All @@ -1532,9 +1519,9 @@ func CanTransactionRefresh(ctx context.Context, pErr *Error) (bool, *Transaction
case *ReadWithinUncertaintyIntervalError:
timestamp.Forward(readWithinUncertaintyIntervalRetryTimestamp(err))
default:
return false, nil
return false, hlc.Timestamp{}
}
return PrepareTransactionForRefresh(txn, timestamp)
return true, timestamp
}

func readWithinUncertaintyIntervalRetryTimestamp(
Expand Down

0 comments on commit 0acc506

Please sign in to comment.