diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index d741698f857b..6a1db7130bdd 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -32,7 +32,7 @@ func (ex *connExecutor) execPrepare( ) (fsm.Event, fsm.EventPayload) { retErr := func(err error) (fsm.Event, fsm.EventPayload) { - return eventNonRetriableErr{IsCommit: fsm.False}, eventNonRetriableErrPayload{err: err} + return ex.makeErrEvent(err, parseCmd.AST) } // The anonymous statement can be overwritten. @@ -160,12 +160,19 @@ func (ex *connExecutor) prepare( if err := tree.ProcessPlaceholderAnnotations(stmt.AST, placeholderHints); err != nil { return nil, err } + // Preparing needs a transaction because it needs to retrieve db/table - // descriptors for type checking. - // TODO(andrei): Needing a transaction for preparing seems non-sensical, as - // the prepared statement outlives the txn. I hope that it's not used for - // anything other than getting a timestamp. - txn := kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get()) + // descriptors for type checking. If we already have an open transaction for + // this planner, use it. Using the user's transaction here is critical for + // proper deadlock detection. At the time of writing it is the case that any + // data read on behalf of this transaction is not cached for use in other + // transactions. It's critical that this fact remain true but nothing really + // enforces it. If we create a new transaction (newTxn is true), we'll need to + // finish it before we return. + newTxn, txn := false, ex.state.mu.txn + if txn == nil || !txn.IsOpen() { + newTxn, txn = true, kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get()) + } ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) p := &ex.planner @@ -174,11 +181,19 @@ func (ex *connExecutor) prepare( p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations) flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p) if err != nil { - txn.CleanupOnError(ctx, err) + // NB: if this is not a new transaction then let the connExecutor state + // machine decide whether we should clean up intents; we may be restarting + // and want to leave them in place. + if newTxn { + txn.CleanupOnError(ctx, err) + } return nil, err } - if err := txn.CommitOrCleanup(ctx); err != nil { - return nil, err + if newTxn { + // Clean up the newly created transaction if we made one. + if err := txn.CommitOrCleanup(ctx); err != nil { + return nil, err + } } // Account for the memory used by this prepared statement. diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 82654b27abce..2976320b496a 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/jackc/pgx" "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -558,3 +560,190 @@ func TestQueryProgress(t *testing.T) { // flaky, we just make sure we see one of 4x% or 5x% require.Regexp(t, `executing \([45]\d\.`, progress) } + +// This test ensures that when in an explicit transaction, statement preparation +// uses the user's transaction and thus properly interacts with deadlock +// detection. +func TestPrepareInExplicitTransactionDoesNotDeadlock(t *testing.T) { + defer leaktest.AfterTest(t)() + + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + testDB := sqlutils.MakeSQLRunner(sqlDB) + testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + testDB.Exec(t, "CREATE TABLE bar (i INT PRIMARY KEY)") + + tx1, err := sqlDB.Begin() + require.NoError(t, err) + + tx2, err := sqlDB.Begin() + require.NoError(t, err) + + // So now I really want to try to have a deadlock. + + _, err = tx1.Exec("ALTER TABLE foo ADD COLUMN j INT NOT NULL") + require.NoError(t, err) + + _, err = tx2.Exec("ALTER TABLE bar ADD COLUMN j INT NOT NULL") + require.NoError(t, err) + + // Now we want tx2 to get blocked on tx1 and stay blocked, then we want to + // push tx1 above tx2 and have it get blocked in planning. + errCh := make(chan error) + go func() { + _, err := tx2.Exec("ALTER TABLE foo ADD COLUMN k INT NOT NULL") + errCh <- err + }() + select { + case <-time.After(time.Millisecond): + case err := <-errCh: + t.Fatalf("expected the transaction to block, got %v", err) + default: + } + + // Read from foo so that we can push tx1 above tx2. + testDB.Exec(t, "SELECT count(*) FROM foo") + + // Write into foo to push tx1 + _, err = tx1.Exec("INSERT INTO foo VALUES (1)") + require.NoError(t, err) + + // Plan a query which will resolve bar during planning time, this would block + // and deadlock if it were run on a new transaction. + _, err = tx1.Prepare("SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1") + require.NoError(t, err) + + // Try to commit tx1. Either it should get a RETRY_SERIALIZABLE error or + // tx2 should. Ensure that either one or both of them does. + if tx1Err := tx1.Commit(); tx1Err == nil { + // tx1 committed successfully, ensure tx2 failed. + tx2ExecErr := <-errCh + require.Regexp(t, "RETRY_SERIALIZABLE", tx2ExecErr) + _ = tx2.Rollback() + } else { + require.Regexp(t, "RETRY_SERIALIZABLE", tx1Err) + tx2ExecErr := <-errCh + require.NoError(t, tx2ExecErr) + if tx2CommitErr := tx2.Commit(); tx2CommitErr != nil { + require.Regexp(t, "RETRY_SERIALIZABLE", tx2CommitErr) + } + } +} + +// This test ensures that when in an explicit transaction and statement +// preparation uses the user's transaction, errors during those planning queries +// are handled correctly. +func TestErrorDuringPrepareInExplicitTransactionPropagates(t *testing.T) { + defer leaktest.AfterTest(t)() + + filter := newDynamicRequestFilter() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: filter.filter, + }, + }, + }) + defer s.Stopper().Stop(context.Background()) + + testDB := sqlutils.MakeSQLRunner(sqlDB) + testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + testDB.Exec(t, "CREATE TABLE bar (i INT PRIMARY KEY)") + + // This test will create an explicit transaction that encounters an error on + // a latter statement during planning of SHOW COLUMNS. The planning for this + // SHOW COLUMNS will be run in the user's transaction. The test will inject + // errors into the execution of that planning query and ensure that the user's + // transaction state evolves appropriately. + + // Use pgx so that we can introspect error codes returned from cockroach. + pgURL, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "", url.User("root")) + defer cleanup() + conf, err := pgx.ParseConnectionString(pgURL.String()) + require.NoError(t, err) + conn, err := pgx.Connect(conf) + require.NoError(t, err) + + tx, err := conn.Begin() + require.NoError(t, err) + + _, err = tx.Exec("SAVEPOINT cockroach_restart") + require.NoError(t, err) + + // Do something with the user's transaction so that we'll use the user + // transaction in the planning of the below `SHOW COLUMNS`. + _, err = tx.Exec("INSERT INTO foo VALUES (1)") + require.NoError(t, err) + + // Inject an error that will happen during planning. + filter.setFilter(func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.Txn == nil { + return nil + } + if req, ok := ba.GetArg(roachpb.Get); ok { + get := req.(*roachpb.GetRequest) + _, tableID, err := keys.DecodeTablePrefix(get.Key) + if err != nil || tableID != keys.NamespaceTableID { + err = nil + return nil + } + return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError( + "boom", ba.Txn.ID, *ba.Txn)) + } + return nil + }) + + // Plan a query will get a restart error during planning. + _, err = tx.Prepare("show_columns", "SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1") + require.Regexp(t, + "restart transaction: TransactionRetryWithProtoRefreshError: boom", err) + pgErr, ok := err.(pgx.PgError) + require.True(t, ok) + require.Equal(t, pgcode.SerializationFailure, pgErr.Code) + + // Clear the error producing filter, restart the transaction, and run it to + // completion. + filter.setFilter(nil) + + _, err = tx.Exec("ROLLBACK TO SAVEPOINT cockroach_restart") + require.NoError(t, err) + + _, err = tx.Exec("INSERT INTO foo VALUES (1)") + require.NoError(t, err) + _, err = tx.Prepare("show_columns", "SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1") + require.NoError(t, err) + require.NoError(t, tx.Commit()) +} + +// dynamicRequestFilter exposes a filter method which is a +// storagebase.ReplicaRequestFilter but can be set dynamically. +type dynamicRequestFilter struct { + v atomic.Value +} + +func newDynamicRequestFilter() *dynamicRequestFilter { + f := &dynamicRequestFilter{} + f.v.Store(storagebase.ReplicaRequestFilter(noopRequestFilter)) + return f +} + +func (f *dynamicRequestFilter) setFilter(filter storagebase.ReplicaRequestFilter) { + if filter == nil { + f.v.Store(storagebase.ReplicaRequestFilter(noopRequestFilter)) + } else { + f.v.Store(filter) + } +} + +// noopRequestFilter is a storagebase.ReplicaRequestFilter. +func (f *dynamicRequestFilter) filter( + ctx context.Context, request roachpb.BatchRequest, +) *roachpb.Error { + return f.v.Load().(storagebase.ReplicaRequestFilter)(ctx, request) +} + +// noopRequestFilter is a storagebase.ReplicaRequestFilter that does nothing. +func noopRequestFilter(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error { + return nil +}