Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: deal with retriable errors when using a new txn #46829

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,34 +164,32 @@ func (ex *connExecutor) prepare(
// Preparing needs a transaction because it needs to retrieve db/table
// descriptors for type checking. If we already have an open transaction for
// this planner, use it. Using the user's transaction here is critical for
// proper deadlock detection. At the time of writing it is the case that any
// proper deadlock detection. At the time of writing, it is the case that any
// data read on behalf of this transaction is not cached for use in other
// transactions. It's critical that this fact remain true but nothing really
// enforces it. If we create a new transaction (newTxn is true), we'll need to
// finish it before we return.
newTxn, txn := false, ex.state.mu.txn
if txn == nil || !txn.IsOpen() {
newTxn, txn = true, kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get())

var flags planFlags
prepare := func(ctx context.Context, txn *kv.Txn) (err error) {
ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
p := &ex.planner
ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */)
p.stmt = &stmt
p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
flags, err = ex.populatePrepared(ctx, txn, placeholderHints, p)
return err
}

ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
p := &ex.planner
ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */)
p.stmt = &stmt
p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p)
if err != nil {
// NB: if this is not a new transaction then let the connExecutor state
// machine decide whether we should clean up intents; we may be restarting
// and want to leave them in place.
if newTxn {
txn.CleanupOnError(ctx, err)
if txn := ex.state.mu.txn; txn != nil && txn.IsOpen() {
// Use the existing transaction.
if err := prepare(ctx, txn); err != nil {
return nil, err
}
return nil, err
}
if newTxn {
// Clean up the newly created transaction if we made one.
if err := txn.CommitOrCleanup(ctx); err != nil {
} else {
// Use a new transaction. This will handle retriable errors here rather
// than bubbling them up to the connExecutor state machine.
if err := ex.server.cfg.DB.Txn(ctx, prepare); err != nil {
return nil, err
}
}
Expand All @@ -209,6 +207,11 @@ func (ex *connExecutor) prepare(
func (ex *connExecutor) populatePrepared(
ctx context.Context, txn *kv.Txn, placeholderHints tree.PlaceholderTypes, p *planner,
) (planFlags, error) {
if before := ex.server.cfg.TestingKnobs.BeforePrepare; before != nil {
if err := before(ctx, ex.planner.stmt.String(), txn); err != nil {
return 0, err
}
}
stmt := p.stmt
if err := p.semaCtx.Placeholders.Init(stmt.NumPlaceholders, placeholderHints); err != nil {
return 0, err
Expand Down
32 changes: 32 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/storagebase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -631,6 +632,37 @@ func TestPrepareInExplicitTransactionDoesNotDeadlock(t *testing.T) {
}
}

// TestRetriableErrorDuringPrepare ensures that when preparing and using a new
// transaction, retriable errors are handled properly and do not propagate to
// the user's transaction.
func TestRetriableErrorDuringPrepare(t *testing.T) {
defer leaktest.AfterTest(t)()
const uniqueString = "'a very unique string'"
var failed int64
const numToFail = 2 // only fail on the first two attempts
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
BeforePrepare: func(ctx context.Context, stmt string, txn *kv.Txn) error {
if strings.Contains(stmt, uniqueString) && atomic.AddInt64(&failed, 1) <= numToFail {
return roachpb.NewTransactionRetryWithProtoRefreshError("boom",
txn.ID(), *txn.TestingCloneTxn())
}
return nil
},
},
},
})
defer s.Stopper().Stop(context.Background())

testDB := sqlutils.MakeSQLRunner(sqlDB)
testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")

stmt, err := sqlDB.Prepare("SELECT " + uniqueString)
require.NoError(t, err)
defer func() { _ = stmt.Close() }()
}

// This test ensures that when in an explicit transaction and statement
// preparation uses the user's transaction, errors during those planning queries
// are handled correctly.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,10 @@ type ExecutorTestingKnobs struct {
// statement has been executed.
StatementFilter StatementFilter

// BeforePrepare can be used to trap execution of SQL statement preparation.
// If a nil error is returned, planning continues as usual.
BeforePrepare func(ctx context.Context, stmt string, txn *kv.Txn) error

// BeforeExecute is called by the Executor before plan execution. It is useful
// for synchronizing statement execution.
BeforeExecute func(ctx context.Context, stmt string)
Expand Down