Skip to content

Commit

Permalink
kv: remove PrepareRetryableError from the (*kv.Txn) API
Browse files Browse the repository at this point in the history
This error was never what anybody wanted. It has the odd side-effect
of leaving the transaction usable -- and, worse, the (*kv.Txn).exec loop would
retry but would not restart or replace the transaction if such an error were
returned.

Release justification: important correctness change as part of making the
SQL-over-HTTP APIs work for index recommendations.

Release note: None
  • Loading branch information
ajwerner committed Aug 18, 2022
1 parent 6bfcc60 commit e21b109
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
15 changes: 0 additions & 15 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,21 +1463,6 @@ func (txn *Txn) GenerateForcedRetryableError(ctx context.Context, msg string) er
return txn.mu.sender.PrepareRetryableError(ctx, msg)
}

// PrepareRetryableError returns a
// TransactionRetryWithProtoRefreshError that will cause the txn to be
// retried. The current txn parameters are used. The txn remains valid
// for use.
func (txn *Txn) PrepareRetryableError(ctx context.Context, msg string) error {
if txn.typ != RootTxn {
return errors.WithContextTags(
errors.AssertionFailedf("PrepareRetryableError() called on leaf txn"), ctx)
}

txn.mu.Lock()
defer txn.mu.Unlock()
return txn.mu.sender.PrepareRetryableError(ctx, msg)
}

// ManualRestart bumps the transactions epoch, and can upgrade the timestamp.
// An uninitialized timestamp can be passed to leave the timestamp alone.
//
Expand Down
59 changes: 59 additions & 0 deletions pkg/kv/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -37,6 +38,64 @@ import (
"golang.org/x/sync/errgroup"
)

// TestPrepareRetryableErrorAndCleanupOnErrorInTxn exercises the behavior
// of creating a retryable error inside (*DB).Txn and then cleaning up
// the intents before propagating it to the loop. This is mirrors the
// logic used by the connExecutor when discovering a violation of the
// two-version invariant, but with the retry management happening in
// the kv package as opposed to the manual management that happens in the
// sql layer.
func TestGenerateForcedRetryableErrorInTxn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

ts, _, db := serverutils.StartServer(t, base.TestServerArgs{})
defer ts.Stopper().Stop(ctx)

k, err := ts.ScratchRange()
require.NoError(t, err)
mkKey := func(s string) roachpb.Key {
return encoding.EncodeStringAscending(k[:len(k):len(k)], s)
}
checkKey := func(t *testing.T, s string, exp int64) {
got, err := db.Get(ctx, mkKey(s))
require.NoError(t, err)
require.Equal(t, exp, got.ValueInt())
}
var i int
require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Ensure that at the start, there is no data.
{
got, err := txn.Get(ctx, mkKey("a"))
if err != nil {
return err
}
if got.Exists() {
return errors.AssertionFailedf("expected key to not exist, got %v", got)
}
}
// Perform a write to key "a".
if err := txn.Put(ctx, mkKey("a"), 1); err != nil {
return err
}
// Retry exactly by propagating a retry error two times, cleaning up
// on the second time to ensure that that does not cause problems, but
// not cleaning up the first time to make sure the transaction's writes
// are no longer visible.
if i++; i < 3 {
err := txn.GenerateForcedRetryableError(ctx, "force retry")
if i == 1 {
txn.CleanupOnError(ctx, err)
}
return err
}
return txn.Put(ctx, mkKey("b"), 2)
}))
checkKey(t, "a", 1)
checkKey(t, "b", 2)
}

// Test the behavior of a txn.Rollback() issued after txn.Commit() failing with
// an ambiguous error.
func TestRollbackAfterAmbiguousCommit(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func CheckTwoVersionInvariant(
// Restart the transaction so that it is able to replay itself at a newer timestamp
// with the hope that the next time around there will be leases only at the current
// version.
retryErr := txn.PrepareRetryableError(ctx,
retryErr := txn.GenerateForcedRetryableError(ctx,
fmt.Sprintf(
`cannot publish new versions for descriptors: %v, old versions still in use`,
descs))
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,9 +714,8 @@ func (ex *connExecutor) execStmtInOpenState(
IsCommit: fsm.FromBool(isCommit(ast)),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
txn.ManualRestart(ctx, ex.server.cfg.Clock.Now())
payload := eventRetriableErrPayload{
err: txn.PrepareRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"),
err: txn.GenerateForcedRetryableError(ctx, "serializable transaction timestamp pushed (detected by connExecutor)"),
rewCap: rc,
}
return ev, payload, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (p *planner) waitForDescriptorSchemaChanges(

// Drop all leases and locks due to the current transaction, and, in the
// process, abort the transaction.
retryErr := p.txn.PrepareRetryableError(ctx,
retryErr := p.txn.GenerateForcedRetryableError(ctx,
fmt.Sprintf("schema change waiting for concurrent schema changes on descriptor %d", descID))
p.txn.CleanupOnError(ctx, retryErr)
p.Descriptors().ReleaseAll(ctx)
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b
Expand All @@ -436,7 +436,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.False}, b
Expand All @@ -462,7 +462,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b
Expand All @@ -487,7 +487,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: dummyRewCap,
}
return eventRetriableErr{CanAutoRetry: fsm.True, IsCommit: fsm.True}, b
Expand All @@ -511,7 +511,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand All @@ -536,7 +536,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand Down Expand Up @@ -565,7 +565,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.True}, b
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestTransitions(t *testing.T) {
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
err: ts.mu.txn.PrepareRetryableError(ctx, "test retriable err"),
err: ts.mu.txn.GenerateForcedRetryableError(ctx, "test retriable err"),
rewCap: rewindCapability{},
}
return eventRetriableErr{CanAutoRetry: fsm.False, IsCommit: fsm.False}, b
Expand Down

0 comments on commit e21b109

Please sign in to comment.