diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index bb654ca077ac..0e83ae90387a 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1158,13 +1158,12 @@ func (tc *TxnCoordSender) RequiredFrontier() hlc.Timestamp { // ManualRestart is part of the kv.TxnSender interface. func (tc *TxnCoordSender) ManualRestart( - ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, -) { + ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString, +) error { tc.mu.Lock() defer tc.mu.Unlock() - - if tc.mu.txnState == txnFinalized { - log.Fatalf(ctx, "ManualRestart called on finalized txn: %s", tc.mu.txn) + if tc.mu.txnState != txnPending && tc.mu.txnState != txnRetryableError { + return errors.AssertionFailedf("cannot manually restart, current state: %s", tc.mu.txnState) } // Invalidate any writes performed by any workers after the retry updated @@ -1172,13 +1171,19 @@ func (tc *TxnCoordSender) ManualRestart( // have been performed at the wrong epoch). tc.mu.txn.Restart(pri, 0 /* upgradePriority */, ts) + pErr := kvpb.NewTransactionRetryWithProtoRefreshError( + msg, tc.mu.txn.ID, tc.mu.txn) + + // Move to a retryable error state, where all Send() calls fail until the + // state is cleared. + tc.mu.txnState = txnRetryableError + tc.mu.storedRetryableErr = pErr + + // Reset state as this manual restart incremented the transaction's epoch. for _, reqInt := range tc.interceptorStack { reqInt.epochBumpedLocked() } - - // The txn might have entered the txnError state after the epoch was bumped. - // Reset the state for the retry. - tc.mu.txnState = txnPending + return pErr } // IsSerializablePushAndRefreshNotPossible is part of the kv.TxnSender interface. @@ -1339,22 +1344,6 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction { return tc.mu.txn.Clone() } -// PrepareRetryableError is part of the kv.TxnSender interface. -func (tc *TxnCoordSender) PrepareRetryableError( - ctx context.Context, msg redact.RedactableString, -) error { - tc.mu.Lock() - defer tc.mu.Unlock() - if tc.mu.txnState != txnPending { - return errors.AssertionFailedf("cannot set a retryable error. current state: %s", tc.mu.txnState) - } - pErr := kvpb.NewTransactionRetryWithProtoRefreshError( - msg, tc.mu.txn.ID, tc.mu.txn) - tc.mu.storedRetryableErr = pErr - tc.mu.txnState = txnRetryableError - return pErr -} - // Step is part of the TxnSender interface. func (tc *TxnCoordSender) Step(ctx context.Context) error { // TODO(nvanbenschoten): it should be possible to make this assertion, but diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 68e4a1cd4675..088f521a5594 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -661,7 +661,8 @@ func TestTxnCoordSenderCleanupOnCommitAfterRestart(t *testing.T) { } // Restart the transaction with a new epoch. - txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now()) + require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), s.Clock.Now(), "force retry")) + txn.Sender().ClearTxnRetryableErr(ctx) // Now immediately commit. require.NoError(t, txn.Commit(ctx)) @@ -2770,14 +2771,16 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) { before: func(t *testing.T, txn *kv.Txn) { _, err := txn.Get(ctx, "k") require.NoError(t, err) - txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next()) + require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry")) + txn.Sender().ClearTxnRetryableErr(ctx) }, }, { name: "write before, in prior epoch", before: func(t *testing.T, txn *kv.Txn) { require.NoError(t, txn.Put(ctx, "k", "v")) - txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next()) + require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry")) + txn.Sender().ClearTxnRetryableErr(ctx) }, }, { @@ -2786,7 +2789,8 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) { _, err := txn.Get(ctx, "k") require.NoError(t, err) require.NoError(t, txn.Put(ctx, "k", "v")) - txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next()) + require.Error(t, txn.Sender().ManualRestart(ctx, txn.UserPriority(), txn.ReadTimestamp().Next(), "force retry")) + txn.Sender().ClearTxnRetryableErr(ctx) }, }, } { diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index d4493a67c247..9d66051fcb50 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -145,9 +145,10 @@ func (m *MockTransactionalSender) RequiredFrontier() hlc.Timestamp { // ManualRestart is part of the TxnSender interface. func (m *MockTransactionalSender) ManualRestart( - ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, -) { + ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString, +) error { m.txn.Restart(pri, 0 /* upgradePriority */, ts) + return kvpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, m.txn) } // IsSerializablePushAndRefreshNotPossible is part of the TxnSender interface. @@ -196,13 +197,6 @@ func (m *MockTransactionalSender) UpdateStateOnRemoteRetryableErr( // DisablePipelining is part of the kv.TxnSender interface. func (m *MockTransactionalSender) DisablePipelining() error { return nil } -// PrepareRetryableError is part of the kv.TxnSender interface. -func (m *MockTransactionalSender) PrepareRetryableError( - ctx context.Context, msg redact.RedactableString, -) error { - return kvpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone()) -} - // Step is part of the TxnSender interface. func (m *MockTransactionalSender) Step(_ context.Context) error { // At least one test (e.g sql/TestPortalsDestroyedOnTxnFinish) requires diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index da7faa8469fb..a9783eccd5f1 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -181,14 +181,15 @@ type TxnSender interface { // timestamp and priority. // An uninitialized timestamp can be passed to leave the timestamp // alone. + // Returns a TransactionRetryWithProtoRefreshError with a payload + // initialized from this txn, which must be cleared by a call to + // ClearTxnRetryableErr before continuing to use the TxnSender. // // Used by the SQL layer which sometimes knows that a transaction // will not be able to commit and prefers to restart early. - // It is also used after synchronizing concurrent actors using a txn - // when a retryable error is seen. - // TODO(andrei): this second use should go away once we move to a - // TxnAttempt model. - ManualRestart(context.Context, roachpb.UserPriority, hlc.Timestamp) + ManualRestart( + ctx context.Context, pri roachpb.UserPriority, ts hlc.Timestamp, msg redact.RedactableString, + ) error // UpdateStateOnRemoteRetryableErr updates the txn in response to an // error encountered when running a request through the txn. @@ -263,11 +264,6 @@ type TxnSender interface { // IsLocking returns whether the transaction has begun acquiring locks. IsLocking() bool - // PrepareRetryableError generates a - // TransactionRetryWithProtoRefreshError with a payload initialized - // from this txn. - PrepareRetryableError(ctx context.Context, msg redact.RedactableString) error - // TestingCloneTxn returns a clone of the transaction's current // proto. This is for use by tests only. Use // GetLeafTxnInitialState() instead when creating leaf transactions. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index ff8abe97eeea..67c6c0f9b9c6 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -1393,23 +1393,24 @@ func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error { return txn.mu.sender.SetFixedTimestamp(ctx, ts) } -// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will -// cause the txn to be retried. +// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError +// that will cause the txn to be retried. // // The transaction's epoch is bumped, simulating to an extent what the // TxnCoordSender does on retriable errors. The transaction's timestamp is only // bumped to the extent that txn.ReadTimestamp is racheted up to txn.WriteTimestamp. // TODO(andrei): This method should take in an up-to-date timestamp, but // unfortunately its callers don't currently have that handy. +// +// As with other transaction retry errors, the caller must call PrepareForRetry +// before continuing to use the transaction. func (txn *Txn) GenerateForcedRetryableError( ctx context.Context, msg redact.RedactableString, ) error { txn.mu.Lock() defer txn.mu.Unlock() now := txn.db.clock.NowAsClockTimestamp() - txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp()) - txn.resetDeadlineLocked() - return txn.mu.sender.PrepareRetryableError(ctx, msg) + return txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp(), msg) } // IsSerializablePushAndRefreshNotPossible returns true if the transaction is