Skip to content

Commit

Permalink
Merge #105109
Browse files Browse the repository at this point in the history
105109: kv: remove `TxnCoordSender.PrepareRetryableError`, rationalize `ManualRestart` r=arulajmani a=nvanbenschoten

This commit cleans up the implementation of `GenerateForcedRetryableError` by removing the `TxnCoordSender.PrepareRetryableError` method and having the `TxnCoordSender.ManualRestart` return a retryable error.

The cleanup also ensures that the `TxnCoordSender` is left in a `txnRetryableError` state after a `ManualRestart`, so that `Txn.PrepareForRetry` must be called before continuing to use the transaction.

The goal with both of these changes is to close the gap between the handling of error-driven txn restarts and manual restarts. It also reworks the code to construct the retry error in the same place that "handles" the error, which is important for future changes that plan to add more context to retry errors.

Release note: None
Epic: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jul 25, 2023
2 parents 6c107b3 + fecca8d commit 8551d57
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 53 deletions.
39 changes: 14 additions & 25 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,27 +1158,32 @@ func (tc *TxnCoordSender) RequiredFrontier() hlc.Timestamp {

// ManualRestart is part of the kv.TxnSender interface.
func (tc *TxnCoordSender) ManualRestart(
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp,
) {
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString,
) error {
tc.mu.Lock()
defer tc.mu.Unlock()

if tc.mu.txnState == txnFinalized {
log.Fatalf(ctx, "ManualRestart called on finalized txn: %s", tc.mu.txn)
if tc.mu.txnState != txnPending && tc.mu.txnState != txnRetryableError {
return errors.AssertionFailedf("cannot manually restart, current state: %s", tc.mu.txnState)
}

// Invalidate any writes performed by any workers after the retry updated
// the txn's proto but before we synchronized (some of these writes might
// have been performed at the wrong epoch).
tc.mu.txn.Restart(pri, 0 /* upgradePriority */, ts)

pErr := kvpb.NewTransactionRetryWithProtoRefreshError(
msg, tc.mu.txn.ID, tc.mu.txn)

// Move to a retryable error state, where all Send() calls fail until the
// state is cleared.
tc.mu.txnState = txnRetryableError
tc.mu.storedRetryableErr = pErr

// Reset state as this manual restart incremented the transaction's epoch.
for _, reqInt := range tc.interceptorStack {
reqInt.epochBumpedLocked()
}

// The txn might have entered the txnError state after the epoch was bumped.
// Reset the state for the retry.
tc.mu.txnState = txnPending
return pErr
}

// IsSerializablePushAndRefreshNotPossible is part of the kv.TxnSender interface.
Expand Down Expand Up @@ -1339,22 +1344,6 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
return tc.mu.txn.Clone()
}

// PrepareRetryableError is part of the kv.TxnSender interface.
func (tc *TxnCoordSender) PrepareRetryableError(
ctx context.Context, msg redact.RedactableString,
) error {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.mu.txnState != txnPending {
return errors.AssertionFailedf("cannot set a retryable error. current state: %s", tc.mu.txnState)
}
pErr := kvpb.NewTransactionRetryWithProtoRefreshError(
msg, tc.mu.txn.ID, tc.mu.txn)
tc.mu.storedRetryableErr = pErr
tc.mu.txnState = txnRetryableError
return pErr
}

// Step is part of the TxnSender interface.
func (tc *TxnCoordSender) Step(ctx context.Context) error {
// TODO(nvanbenschoten): it should be possible to make this assertion, but
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,8 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) {
}

// Restart the transaction with a new epoch.
txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now())
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now(), "force retry"))
txn.Sender().ClearTxnRetryableErr(ctx)

// Now immediately commit.
require.NoError(t, txn.Commit(ctx))
Expand Down Expand Up @@ -2770,14 +2771,16 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
before: func(t *testing.T, txn *kv.Txn) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next())
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry"))
txn.Sender().ClearTxnRetryableErr(ctx)
},
},
{
name: "write before, in prior epoch",
before: func(t *testing.T, txn *kv.Txn) {
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next())
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry"))
txn.Sender().ClearTxnRetryableErr(ctx)
},
},
{
Expand All @@ -2786,7 +2789,8 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
_, err := txn.Get(ctx, "k")
require.NoError(t, err)
require.NoError(t, txn.Put(ctx, "k", "v"))
txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next())
require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry"))
txn.Sender().ClearTxnRetryableErr(ctx)
},
},
} {
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@ func (m *MockTransactionalSender) RequiredFrontier() hlc.Timestamp {

// ManualRestart is part of the TxnSender interface.
func (m *MockTransactionalSender) ManualRestart(
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp,
) {
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString,
) error {
m.txn.Restart(pri, 0 /* upgradePriority */, ts)
return kvpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, m.txn)
}

// IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface.
Expand Down Expand Up @@ -196,13 +197,6 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr(
// DisablePipelining is part of the kv.TxnSender interface.
func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// PrepareRetryableError is part of the kv.TxnSender interface.
func (m *MockTransactionalSender) PrepareRetryableError(
ctx context.Context, msg redact.RedactableString,
) error {
return kvpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
}

// Step is part of the TxnSender interface.
func (m *MockTransactionalSender) Step(_ context.Context) error {
// At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires
Expand Down
16 changes: 6 additions & 10 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,15 @@ type TxnSender interface {
// timestamp and priority.
// An uninitialized timestamp can be passed to leave the timestamp
// alone.
// Returns a TransactionRetryWithProtoRefreshError with a payload
// initialized from this txn, which must be cleared by a call to
// ClearTxnRetryableErr before continuing to use the TxnSender.
//
// Used by the SQL layer which sometimes knows that a transaction
// will not be able to commit and prefers to restart early.
// It is also used after synchronizing concurrent actors using a txn
// when a retryable error is seen.
// TODO(andrei): this second use should go away once we move to a
// TxnAttempt model.
ManualRestart(context.Context, roachpb.UserPriority, hlc.Timestamp)
ManualRestart(
ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString,
) error

// UpdateStateOnRemoteRetryableErr updates the txn in response to an
// error encountered when running a request through the txn.
Expand Down Expand Up @@ -263,11 +264,6 @@ type TxnSender interface {
// IsLocking returns whether the transaction has begun acquiring locks.
IsLocking() bool

// PrepareRetryableError generates a
// TransactionRetryWithProtoRefreshError with a payload initialized
// from this txn.
PrepareRetryableError(ctx context.Context, msg redact.RedactableString) error

// TestingCloneTxn returns a clone of the transaction's current
// proto. This is for use by tests only. Use
// GetLeafTxnInitialState() instead when creating leaf transactions.
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,23 +1393,24 @@ func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
return txn.mu.sender.SetFixedTimestamp(ctx, ts)
}

// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
// cause the txn to be retried.
// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError
// that will cause the txn to be retried.
//
// The transaction's epoch is bumped, simulating to an extent what the
// TxnCoordSender does on retriable errors. The transaction's timestamp is only
// bumped to the extent that txn.ReadTimestamp is racheted up to txn.WriteTimestamp.
// TODO(andrei): This method should take in an up-to-date timestamp, but
// unfortunately its callers don't currently have that handy.
//
// As with other transaction retry errors, the caller must call PrepareForRetry
// before continuing to use the transaction.
func (txn *Txn) GenerateForcedRetryableError(
ctx context.Context, msg redact.RedactableString,
) error {
txn.mu.Lock()
defer txn.mu.Unlock()
now := txn.db.clock.NowAsClockTimestamp()
txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp())
txn.resetDeadlineLocked()
return txn.mu.sender.PrepareRetryableError(ctx, msg)
return txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp(), msg)
}

// IsSerializablePushAndRefreshNotPossible returns true if the transaction is
Expand Down

0 comments on commit 8551d57

Please sign in to comment.