Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: remove PrepareRetryableError from the (*kv.Txn) API #86361

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,8 +1272,13 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
func (tc *TxnCoordSender) PrepareRetryableError(ctx context.Context, msg string) error {
tc.mu.Lock()
defer tc.mu.Unlock()
return roachpb.NewTransactionRetryWithProtoRefreshError(
msg, tc.mu.txn.ID, tc.mu.txn)
abortedError := roachpb.NewTransactionAbortedError(
roachpb.ABORT_REASON_UNKNOWN,
)
retryErr := tc.handleRetryableErrLocked(ctx,
roachpb.NewErrorWithTxn(abortedError, tc.mu.txn.Clone()))
retryErr.Msg = msg
return retryErr
}

// Step is part of the TxnSender interface.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// PrepareRetryableError is part of the client.TxnSender interface.
func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg string) error {
return roachpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
retryErr := roachpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
return retryErr
}

// Step is part of the TxnSender interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ type TxnSender interface {

// PrepareRetryableError generates a
// TransactionRetryWithProtoRefreshError with a payload initialized
// from this txn.
// from this txn. The method also moves the transaction into the
// txnRestart state, forcing the client to handle the error before
// being able to use the transaction again.
PrepareRetryableError(ctx context.Context, msg string) error

// TestingCloneTxn returns a clone of the transaction's current
Expand Down
41 changes: 15 additions & 26 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
}

var retryable bool
var retryErr *roachpb.TransactionRetryWithProtoRefreshError
if err != nil {
if errors.HasType(err, (*roachpb.UnhandledRetryableError)(nil)) {
if txn.typ == RootTxn {
Expand All @@ -984,8 +985,8 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// applies only in the case where this is the root transaction.
log.Fatalf(ctx, "unexpected UnhandledRetryableError at the txn.exec() level: %s", err)
}
} else if t := (*roachpb.TransactionRetryWithProtoRefreshError)(nil); errors.As(err, &t) {
if !txn.IsRetryableErrMeantForTxn(*t) {
} else if errors.As(err, &retryErr) {
if !txn.IsRetryableErrMeantForTxn(*retryErr) {
// Make sure the txn record that err carries is for this txn.
// If it's not, we terminate the "retryable" character of the error. We
// might get a TransactionRetryWithProtoRefreshError if the closure ran another
Expand All @@ -1000,7 +1001,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
break
}

txn.PrepareForRetry(ctx)
txn.prepareForRetry(ctx, retryErr)
}

return err
Expand All @@ -1009,13 +1010,21 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// PrepareForRetry needs to be called before a retry to perform some
// book-keeping and clear errors when possible.
func (txn *Txn) PrepareForRetry(ctx context.Context) {
txn.prepareForRetry(ctx, nil /* retryErr */)
}

func (txn *Txn) prepareForRetry(
ctx context.Context, retryErr *roachpb.TransactionRetryWithProtoRefreshError,
) {
// TODO(andrei): I think commit triggers are reset in the wrong place. See #18170.
txn.commitTriggers = nil

txn.mu.Lock()
defer txn.mu.Unlock()

retryErr := txn.mu.sender.GetTxnRetryableErr(ctx)
if retryErr == nil {
retryErr = txn.mu.sender.GetTxnRetryableErr(ctx)
}
if retryErr == nil {
return
}
Expand Down Expand Up @@ -1446,37 +1455,17 @@ func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
}

// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
// cause the txn to be retried.
// cause the txn to be aborted and retried.
//
// The transaction's epoch is bumped, simulating to an extent what the
// TxnCoordSender does on retriable errors. The transaction's timestamp is only
// bumped to the extent that txn.ReadTimestamp is racheted up to txn.WriteTimestamp.
// TODO(andrei): This method should take in an up-to-date timestamp, but
// unfortunately its callers don't currently have that handy.
// TxnCoordSender does on aborted errors.
func (txn *Txn) GenerateForcedRetryableError(ctx context.Context, msg string) error {
txn.mu.Lock()
defer txn.mu.Unlock()
now := txn.db.clock.NowAsClockTimestamp()
txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp())
txn.resetDeadlineLocked()
return txn.mu.sender.PrepareRetryableError(ctx, msg)
}

// PrepareRetryableError returns a
// TransactionRetryWithProtoRefreshError that will cause the txn to be
// retried. The current txn parameters are used. The txn remains valid
// for use.
func (txn *Txn) PrepareRetryableError(ctx context.Context, msg string) error {
if txn.typ != RootTxn {
return errors.WithContextTags(
errors.AssertionFailedf("PrepareRetryableError() called on leaf txn"), ctx)
}

txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.PrepareRetryableError(ctx, msg)
}

// ManualRestart bumps the transactions epoch, and can upgrade the timestamp.
// An uninitialized timestamp can be passed to leave the timestamp alone.
//
Expand Down
61 changes: 61 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -37,6 +38,66 @@ import (
"golang.org/x/sync/errgroup"
)

// TestPrepareRetryableErrorAndCleanupOnErrorInTxn exercises the behavior
// of creating a retryable error inside (*DB).Txn and then cleaning up
// the intents before propagating it to the loop. This is mirrors the
// logic used by the connExecutor when discovering a violation of the
// two-version invariant, but with the retry management happening in
// the kv package as opposed to the manual management that happens in the
// sql layer.
func TestGenerateForcedRetryableErrorInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

ts, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer ts.Stopper().Stop(ctx)

k, err := ts.ScratchRange()
require.NoError(t, err)
mkKey := func(s string) roachpb.Key {
return encoding.EncodeStringAscending(k[:len(k):len(k)], s)
}
checkKey := func(t *testing.T, s string, exp int64) {
got, err := db.Get(ctx, mkKey(s))
require.NoError(t, err)
require.Equal(t, exp, got.ValueInt())
}
var i int
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Ensure that at the start, there is no data.
{
got, err := txn.Get(ctx, mkKey("a"))
if err != nil {
return err
}
if got.Exists() {
return errors.AssertionFailedf("expected key to not exist, got %v", got)
}
}
// Perform a write to key "a".
if err := txn.Put(ctx, mkKey("a"), 1); err != nil {
return err
}
// Retry exactly by propagating a retry error two times, cleaning up
// on the second time to ensure that that does not cause problems, but
// not cleaning up the first time to make sure the transaction's writes
// are no longer visible.
if i++; i < 3 {
if i == 1 {
if err := txn.Rollback(ctx); err != nil {
return err
}
}

return txn.GenerateForcedRetryableError(ctx, "force retry")
}
return txn.Put(ctx, mkKey("b"), 2)
}))
checkKey(t, "a", 1)
checkKey(t, "b", 2)
}

// Test the behavior of a txn.Rollback() issued after txn.Commit() failing with
// an ambiguous error.
func TestRollbackAfterAmbiguousCommit(t *testing.T) {
Expand Down
28 changes: 13 additions & 15 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,20 @@ func CheckTwoVersionInvariant(
return false, descsCol.MaybeUpdateDeadline(ctx, txn)
}

// Restart the transaction so that it is able to replay itself at a newer timestamp
// Release the rest of our leases on unmodified descriptors so we don't hold
// up schema changes there and potentially create a deadlock.
descsCol.ReleaseLeases(ctx)
// Abort the transaction so that it is able to replay itself at a newer timestamp
// with the hope that the next time around there will be leases only at the current
// version.
retryErr := txn.PrepareRetryableError(ctx,
fmt.Sprintf(
`cannot publish new versions for descriptors: %v, old versions still in use`,
descs))
// We cleanup the transaction and create a new transaction after
//
// We clean up the transaction and create a new transaction after
// waiting for the invariant to be satisfied because the wait time
// might be extensive and intents can block out leases being created
// on a descriptor.
//
// TODO(vivek): Change this to restart a txn while fixing #20526 . All the
// descriptor intents can be laid down here after the invariant
// has been checked.
txn.CleanupOnError(ctx, retryErr)
// Release the rest of our leases on unmodified descriptors so we don't hold
// up schema changes there and potentially create a deadlock.
descsCol.ReleaseLeases(ctx)
if err := txn.Rollback(ctx); err != nil {
return true, err
}

// Wait until all older version leases have been released or expired.
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
Expand All @@ -316,7 +311,10 @@ func CheckTwoVersionInvariant(
onRetryBackoff()
}
}
return true, retryErr
return true, txn.GenerateForcedRetryableError(ctx,
fmt.Sprintf(
`cannot publish new versions for descriptors: %v, old versions still in use`,
descs))
}

// CheckSpanCountLimit checks whether committing the set of uncommitted tables
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,8 @@ func (ex *connExecutor) execStmtInOpenState(
IsCommit: fsm.FromBool(isCommit(ast)),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
txn.ManualRestart(ctx, ex.server.cfg.Clock.Now())
payload := eventRetriableErrPayload{
err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"),
err: txn.GenerateForcedRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"),
rewCap: rc,
}
return ev, payload, nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package sql

import (
"context"
"fmt"
"reflect"
"time"

Expand Down Expand Up @@ -116,10 +115,10 @@ func (p *planner) waitForDescriptorSchemaChanges(

// Drop all leases and locks due to the current transaction, and, in the
// process, abort the transaction.
retryErr := p.txn.PrepareRetryableError(ctx,
fmt.Sprintf("schema change waiting for concurrent schema changes on descriptor %d", descID))
p.txn.CleanupOnError(ctx, retryErr)
p.Descriptors().ReleaseAll(ctx)
if err := p.txn.Rollback(ctx); err != nil {
return err
}

// Wait for the descriptor to no longer be claimed by a schema change.
start := timeutil.Now()
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b
Expand All @@ -436,7 +436,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b
Expand All @@ -462,7 +462,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b
Expand All @@ -487,7 +487,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b
Expand All @@ -511,7 +511,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand All @@ -536,7 +536,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand Down Expand Up @@ -565,7 +565,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.True}, b
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand Down