Skip to content

Commit

Permalink
kv: remove PrepareRetryableError from the (*kv.Txn) API
Browse files Browse the repository at this point in the history
This error was never what anybody wanted. It has the odd side-effect
of leaving the transaction usable -- and, worse, the (*kv.Txn).exec loop would
retry but would not restart or replace the transaction if such an error were
returned.

Release justification: important correctness change as part of making the
SQL-over-HTTP APIs work for index recommendations.

Release note: None
  • Loading branch information
ajwerner committed Aug 18, 2022
1 parent 6bfcc60 commit bd8bc01
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 56 deletions.
1 change: 1 addition & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,8 +1272,13 @@ func (tc *TxnCoordSender) TestingCloneTxn() *roachpb.Transaction {
func (tc *TxnCoordSender) PrepareRetryableError(ctx context.Context, msg string) error {
tc.mu.Lock()
defer tc.mu.Unlock()
return roachpb.NewTransactionRetryWithProtoRefreshError(
msg, tc.mu.txn.ID, tc.mu.txn)
abortedError := roachpb.NewTransactionAbortedError(
roachpb.ABORT_REASON_UNKNOWN,
)
retryErr := tc.handleRetryableErrLocked(ctx,
roachpb.NewErrorWithTxn(abortedError, tc.mu.txn.Clone()))
retryErr.Msg = msg
return retryErr
}

// Step is part of the TxnSender interface.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func (m *MockTransactionalSender) DisablePipelining() error { return nil }

// PrepareRetryableError is part of the client.TxnSender interface.
func (m *MockTransactionalSender) PrepareRetryableError(ctx context.Context, msg string) error {
return roachpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
retryErr := roachpb.NewTransactionRetryWithProtoRefreshError(msg, m.txn.ID, *m.txn.Clone())
return retryErr
}

// Step is part of the TxnSender interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ type TxnSender interface {

// PrepareRetryableError generates a
// TransactionRetryWithProtoRefreshError with a payload initialized
// from this txn.
// from this txn. The method also moves the transaction into the
// txnRestart state, forcing the client to handle the error before
// being able to use the transaction again.
PrepareRetryableError(ctx context.Context, msg string) error

// TestingCloneTxn returns a clone of the transaction's current
Expand Down
25 changes: 2 additions & 23 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,38 +1446,17 @@ func (txn *Txn) SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp) error {
}

// GenerateForcedRetryableError returns a TransactionRetryWithProtoRefreshError that will
// cause the txn to be retried.
// cause the txn to be aborted and retried.
//
// The transaction's epoch is bumped, simulating to an extent what the
// TxnCoordSender does on retriable errors. The transaction's timestamp is
// bumped to the current clock timestamp.
//
// TODO(andrei): This method should take in an up-to-date timestamp, but
// unfortunately its callers don't currently have that handy.
// TxnCoordSender does on aborted errors.
func (txn *Txn) GenerateForcedRetryableError(ctx context.Context, msg string) error {
txn.mu.Lock()
defer txn.mu.Unlock()
now := txn.db.clock.NowAsClockTimestamp()
txn.mu.sender.ManualRestart(ctx, txn.mu.userPriority, now.ToTimestamp())
txn.resetDeadlineLocked()
return txn.mu.sender.PrepareRetryableError(ctx, msg)
}

// PrepareRetryableError returns a
// TransactionRetryWithProtoRefreshError that will cause the txn to be
// retried. The current txn parameters are used. The txn remains valid
// for use.
func (txn *Txn) PrepareRetryableError(ctx context.Context, msg string) error {
if txn.typ != RootTxn {
return errors.WithContextTags(
errors.AssertionFailedf("PrepareRetryableError() called on leaf txn"), ctx)
}

txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.PrepareRetryableError(ctx, msg)
}

// ManualRestart bumps the transactions epoch, and can upgrade the timestamp.
// An uninitialized timestamp can be passed to leave the timestamp alone.
//
Expand Down
61 changes: 61 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -37,6 +38,66 @@ import (
"golang.org/x/sync/errgroup"
)

// TestPrepareRetryableErrorAndCleanupOnErrorInTxn exercises the behavior
// of creating a retryable error inside (*DB).Txn and then cleaning up
// the intents before propagating it to the loop. This is mirrors the
// logic used by the connExecutor when discovering a violation of the
// two-version invariant, but with the retry management happening in
// the kv package as opposed to the manual management that happens in the
// sql layer.
func TestGenerateForcedRetryableErrorInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

ts, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer ts.Stopper().Stop(ctx)

k, err := ts.ScratchRange()
require.NoError(t, err)
mkKey := func(s string) roachpb.Key {
return encoding.EncodeStringAscending(k[:len(k):len(k)], s)
}
checkKey := func(t *testing.T, s string, exp int64) {
got, err := db.Get(ctx, mkKey(s))
require.NoError(t, err)
require.Equal(t, exp, got.ValueInt())
}
var i int
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Ensure that at the start, there is no data.
{
got, err := txn.Get(ctx, mkKey("a"))
if err != nil {
return err
}
if got.Exists() {
return errors.AssertionFailedf("expected key to not exist, got %v", got)
}
}
// Perform a write to key "a".
if err := txn.Put(ctx, mkKey("a"), 1); err != nil {
return err
}
// Retry exactly by propagating a retry error two times, cleaning up
// on the second time to ensure that that does not cause problems, but
// not cleaning up the first time to make sure the transaction's writes
// are no longer visible.
if i++; i < 3 {
if i == 1 {
if err := txn.Rollback(ctx); err != nil {
return err
}
}

return txn.GenerateForcedRetryableError(ctx, "force retry")
}
return txn.Put(ctx, mkKey("b"), 2)
}))
checkKey(t, "a", 1)
checkKey(t, "b", 2)
}

// Test the behavior of a txn.Rollback() issued after txn.Commit() failing with
// an ambiguous error.
func TestRollbackAfterAmbiguousCommit(t *testing.T) {
Expand Down
28 changes: 13 additions & 15 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,20 @@ func CheckTwoVersionInvariant(
return false, descsCol.MaybeUpdateDeadline(ctx, txn)
}

// Restart the transaction so that it is able to replay itself at a newer timestamp
// Release the rest of our leases on unmodified descriptors so we don't hold
// up schema changes there and potentially create a deadlock.
descsCol.ReleaseLeases(ctx)
// Abort the transaction so that it is able to replay itself at a newer timestamp
// with the hope that the next time around there will be leases only at the current
// version.
retryErr := txn.PrepareRetryableError(ctx,
fmt.Sprintf(
`cannot publish new versions for descriptors: %v, old versions still in use`,
descs))
// We cleanup the transaction and create a new transaction after
//
// We clean up the transaction and create a new transaction after
// waiting for the invariant to be satisfied because the wait time
// might be extensive and intents can block out leases being created
// on a descriptor.
//
// TODO(vivek): Change this to restart a txn while fixing #20526 . All the
// descriptor intents can be laid down here after the invariant
// has been checked.
txn.CleanupOnError(ctx, retryErr)
// Release the rest of our leases on unmodified descriptors so we don't hold
// up schema changes there and potentially create a deadlock.
descsCol.ReleaseLeases(ctx)
if err := txn.Rollback(ctx); err != nil {
return true, err
}

// Wait until all older version leases have been released or expired.
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
Expand All @@ -316,7 +311,10 @@ func CheckTwoVersionInvariant(
onRetryBackoff()
}
}
return true, retryErr
return true, txn.GenerateForcedRetryableError(ctx,
fmt.Sprintf(
`cannot publish new versions for descriptors: %v, old versions still in use`,
descs))
}

// CheckSpanCountLimit checks whether committing the set of uncommitted tables
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,8 @@ func (ex *connExecutor) execStmtInOpenState(
IsCommit: fsm.FromBool(isCommit(ast)),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
txn.ManualRestart(ctx, ex.server.cfg.Clock.Now())
payload := eventRetriableErrPayload{
err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"),
err: txn.GenerateForcedRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"),
rewCap: rc,
}
return ev, payload, nil
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package sql

import (
"context"
"fmt"
"reflect"
"time"

Expand Down Expand Up @@ -116,10 +115,10 @@ func (p *planner) waitForDescriptorSchemaChanges(

// Drop all leases and locks due to the current transaction, and, in the
// process, abort the transaction.
retryErr := p.txn.PrepareRetryableError(ctx,
fmt.Sprintf("schema change waiting for concurrent schema changes on descriptor %d", descID))
p.txn.CleanupOnError(ctx, retryErr)
p.Descriptors().ReleaseAll(ctx)
if err := p.txn.Rollback(ctx); err != nil {
return err
}

// Wait for the descriptor to no longer be claimed by a schema change.
start := timeutil.Now()
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b
Expand All @@ -436,7 +436,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b
Expand All @@ -462,7 +462,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b
Expand All @@ -487,7 +487,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b
Expand All @@ -511,7 +511,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand All @@ -536,7 +536,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand Down Expand Up @@ -565,7 +565,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.True}, b
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand Down

0 comments on commit bd8bc01

Please sign in to comment.