From 0b945bfac352b93844de36203de0cc76f0edff85 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 24 Mar 2020 18:50:33 -0400 Subject: [PATCH] sql: use user transaction if we have one to prepare queries Prepartion of certain queries requires performing reads against the database. If the user has already laid down intents, these reads may become part of a dependency cycle. Prior to this commit, these reads would be on a different transaction and thus this cycle would not be detected by our deadlock detection mechanism. This change opts to use the user's transaction for planning if there is one and thus will properly interact with deadlock detection. Fixes #46447. Release justification: fixes for high-priority or high-severity bugs in existing functionality Release note: None --- pkg/sql/conn_executor_prepare.go | 33 ++++-- pkg/sql/conn_executor_test.go | 189 +++++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+), 9 deletions(-) diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index d741698f857b..6a1db7130bdd 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -32,7 +32,7 @@ func (ex *connExecutor) execPrepare( ) (fsm.Event, fsm.EventPayload) { retErr := func(err error) (fsm.Event, fsm.EventPayload) { - return eventNonRetriableErr{IsCommit: fsm.False}, eventNonRetriableErrPayload{err: err} + return ex.makeErrEvent(err, parseCmd.AST) } // The anonymous statement can be overwritten. @@ -160,12 +160,19 @@ func (ex *connExecutor) prepare( if err := tree.ProcessPlaceholderAnnotations(stmt.AST, placeholderHints); err != nil { return nil, err } + // Preparing needs a transaction because it needs to retrieve db/table - // descriptors for type checking. - // TODO(andrei): Needing a transaction for preparing seems non-sensical, as - // the prepared statement outlives the txn. I hope that it's not used for - // anything other than getting a timestamp. - txn := kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get()) + // descriptors for type checking. If we already have an open transaction for + // this planner, use it. Using the user's transaction here is critical for + // proper deadlock detection. At the time of writing it is the case that any + // data read on behalf of this transaction is not cached for use in other + // transactions. It's critical that this fact remain true but nothing really + // enforces it. If we create a new transaction (newTxn is true), we'll need to + // finish it before we return. + newTxn, txn := false, ex.state.mu.txn + if txn == nil || !txn.IsOpen() { + newTxn, txn = true, kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get()) + } ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) p := &ex.planner @@ -174,11 +181,19 @@ func (ex *connExecutor) prepare( p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations) flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p) if err != nil { - txn.CleanupOnError(ctx, err) + // NB: if this is not a new transaction then let the connExecutor state + // machine decide whether we should clean up intents; we may be restarting + // and want to leave them in place. + if newTxn { + txn.CleanupOnError(ctx, err) + } return nil, err } - if err := txn.CommitOrCleanup(ctx); err != nil { - return nil, err + if newTxn { + // Clean up the newly created transaction if we made one. + if err := txn.CommitOrCleanup(ctx); err != nil { + return nil, err + } } // Account for the memory used by this prepared statement. diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 82654b27abce..2976320b496a 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/jackc/pgx" "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -558,3 +560,190 @@ func TestQueryProgress(t *testing.T) { // flaky, we just make sure we see one of 4x% or 5x% require.Regexp(t, `executing \([45]\d\.`, progress) } + +// This test ensures that when in an explicit transaction, statement preparation +// uses the user's transaction and thus properly interacts with deadlock +// detection. +func TestPrepareInExplicitTransactionDoesNotDeadlock(t *testing.T) { + defer leaktest.AfterTest(t)() + + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + testDB := sqlutils.MakeSQLRunner(sqlDB) + testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + testDB.Exec(t, "CREATE TABLE bar (i INT PRIMARY KEY)") + + tx1, err := sqlDB.Begin() + require.NoError(t, err) + + tx2, err := sqlDB.Begin() + require.NoError(t, err) + + // So now I really want to try to have a deadlock. + + _, err = tx1.Exec("ALTER TABLE foo ADD COLUMN j INT NOT NULL") + require.NoError(t, err) + + _, err = tx2.Exec("ALTER TABLE bar ADD COLUMN j INT NOT NULL") + require.NoError(t, err) + + // Now we want tx2 to get blocked on tx1 and stay blocked, then we want to + // push tx1 above tx2 and have it get blocked in planning. + errCh := make(chan error) + go func() { + _, err := tx2.Exec("ALTER TABLE foo ADD COLUMN k INT NOT NULL") + errCh <- err + }() + select { + case <-time.After(time.Millisecond): + case err := <-errCh: + t.Fatalf("expected the transaction to block, got %v", err) + default: + } + + // Read from foo so that we can push tx1 above tx2. + testDB.Exec(t, "SELECT count(*) FROM foo") + + // Write into foo to push tx1 + _, err = tx1.Exec("INSERT INTO foo VALUES (1)") + require.NoError(t, err) + + // Plan a query which will resolve bar during planning time, this would block + // and deadlock if it were run on a new transaction. + _, err = tx1.Prepare("SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1") + require.NoError(t, err) + + // Try to commit tx1. Either it should get a RETRY_SERIALIZABLE error or + // tx2 should. Ensure that either one or both of them does. + if tx1Err := tx1.Commit(); tx1Err == nil { + // tx1 committed successfully, ensure tx2 failed. + tx2ExecErr := <-errCh + require.Regexp(t, "RETRY_SERIALIZABLE", tx2ExecErr) + _ = tx2.Rollback() + } else { + require.Regexp(t, "RETRY_SERIALIZABLE", tx1Err) + tx2ExecErr := <-errCh + require.NoError(t, tx2ExecErr) + if tx2CommitErr := tx2.Commit(); tx2CommitErr != nil { + require.Regexp(t, "RETRY_SERIALIZABLE", tx2CommitErr) + } + } +} + +// This test ensures that when in an explicit transaction and statement +// preparation uses the user's transaction, errors during those planning queries +// are handled correctly. +func TestErrorDuringPrepareInExplicitTransactionPropagates(t *testing.T) { + defer leaktest.AfterTest(t)() + + filter := newDynamicRequestFilter() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: filter.filter, + }, + }, + }) + defer s.Stopper().Stop(context.Background()) + + testDB := sqlutils.MakeSQLRunner(sqlDB) + testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + testDB.Exec(t, "CREATE TABLE bar (i INT PRIMARY KEY)") + + // This test will create an explicit transaction that encounters an error on + // a latter statement during planning of SHOW COLUMNS. The planning for this + // SHOW COLUMNS will be run in the user's transaction. The test will inject + // errors into the execution of that planning query and ensure that the user's + // transaction state evolves appropriately. + + // Use pgx so that we can introspect error codes returned from cockroach. + pgURL, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "", url.User("root")) + defer cleanup() + conf, err := pgx.ParseConnectionString(pgURL.String()) + require.NoError(t, err) + conn, err := pgx.Connect(conf) + require.NoError(t, err) + + tx, err := conn.Begin() + require.NoError(t, err) + + _, err = tx.Exec("SAVEPOINT cockroach_restart") + require.NoError(t, err) + + // Do something with the user's transaction so that we'll use the user + // transaction in the planning of the below `SHOW COLUMNS`. + _, err = tx.Exec("INSERT INTO foo VALUES (1)") + require.NoError(t, err) + + // Inject an error that will happen during planning. + filter.setFilter(func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.Txn == nil { + return nil + } + if req, ok := ba.GetArg(roachpb.Get); ok { + get := req.(*roachpb.GetRequest) + _, tableID, err := keys.DecodeTablePrefix(get.Key) + if err != nil || tableID != keys.NamespaceTableID { + err = nil + return nil + } + return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError( + "boom", ba.Txn.ID, *ba.Txn)) + } + return nil + }) + + // Plan a query will get a restart error during planning. + _, err = tx.Prepare("show_columns", "SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1") + require.Regexp(t, + "restart transaction: TransactionRetryWithProtoRefreshError: boom", err) + pgErr, ok := err.(pgx.PgError) + require.True(t, ok) + require.Equal(t, pgcode.SerializationFailure, pgErr.Code) + + // Clear the error producing filter, restart the transaction, and run it to + // completion. + filter.setFilter(nil) + + _, err = tx.Exec("ROLLBACK TO SAVEPOINT cockroach_restart") + require.NoError(t, err) + + _, err = tx.Exec("INSERT INTO foo VALUES (1)") + require.NoError(t, err) + _, err = tx.Prepare("show_columns", "SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1") + require.NoError(t, err) + require.NoError(t, tx.Commit()) +} + +// dynamicRequestFilter exposes a filter method which is a +// storagebase.ReplicaRequestFilter but can be set dynamically. +type dynamicRequestFilter struct { + v atomic.Value +} + +func newDynamicRequestFilter() *dynamicRequestFilter { + f := &dynamicRequestFilter{} + f.v.Store(storagebase.ReplicaRequestFilter(noopRequestFilter)) + return f +} + +func (f *dynamicRequestFilter) setFilter(filter storagebase.ReplicaRequestFilter) { + if filter == nil { + f.v.Store(storagebase.ReplicaRequestFilter(noopRequestFilter)) + } else { + f.v.Store(filter) + } +} + +// noopRequestFilter is a storagebase.ReplicaRequestFilter. +func (f *dynamicRequestFilter) filter( + ctx context.Context, request roachpb.BatchRequest, +) *roachpb.Error { + return f.v.Load().(storagebase.ReplicaRequestFilter)(ctx, request) +} + +// noopRequestFilter is a storagebase.ReplicaRequestFilter that does nothing. +func noopRequestFilter(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error { + return nil +}