Skip to content

Commit

Permalink
sql: use user transaction if we have one to prepare queries
Browse files Browse the repository at this point in the history
Prepartion of certain queries requires performing reads against the database.
If the user has already laid down intents, these reads may become part of a
dependency cycle. Prior to this commit, these reads would be on a different
transaction and thus this cycle would not be detected by our deadlock detection
mechanism.

This change opts to use the user's transaction for planning if there is one
and thus will properly interact with deadlock detection.

Fixes #46447.

Release justification: fixes for high-priority or high-severity bugs in
existing functionality

Release note: None
  • Loading branch information
ajwerner committed Mar 27, 2020
1 parent 31bce92 commit 0b945bf
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 9 deletions.
33 changes: 24 additions & 9 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (ex *connExecutor) execPrepare(
) (fsm.Event, fsm.EventPayload) {

retErr := func(err error) (fsm.Event, fsm.EventPayload) {
return eventNonRetriableErr{IsCommit: fsm.False}, eventNonRetriableErrPayload{err: err}
return ex.makeErrEvent(err, parseCmd.AST)
}

// The anonymous statement can be overwritten.
Expand Down Expand Up @@ -160,12 +160,19 @@ func (ex *connExecutor) prepare(
if err := tree.ProcessPlaceholderAnnotations(stmt.AST, placeholderHints); err != nil {
return nil, err
}

// Preparing needs a transaction because it needs to retrieve db/table
// descriptors for type checking.
// TODO(andrei): Needing a transaction for preparing seems non-sensical, as
// the prepared statement outlives the txn. I hope that it's not used for
// anything other than getting a timestamp.
txn := kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get())
// descriptors for type checking. If we already have an open transaction for
// this planner, use it. Using the user's transaction here is critical for
// proper deadlock detection. At the time of writing it is the case that any
// data read on behalf of this transaction is not cached for use in other
// transactions. It's critical that this fact remain true but nothing really
// enforces it. If we create a new transaction (newTxn is true), we'll need to
// finish it before we return.
newTxn, txn := false, ex.state.mu.txn
if txn == nil || !txn.IsOpen() {
newTxn, txn = true, kv.NewTxn(ctx, ex.server.cfg.DB, ex.server.cfg.NodeID.Get())
}

ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
p := &ex.planner
Expand All @@ -174,11 +181,19 @@ func (ex *connExecutor) prepare(
p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p)
if err != nil {
txn.CleanupOnError(ctx, err)
// NB: if this is not a new transaction then let the connExecutor state
// machine decide whether we should clean up intents; we may be restarting
// and want to leave them in place.
if newTxn {
txn.CleanupOnError(ctx, err)
}
return nil, err
}
if err := txn.CommitOrCleanup(ctx); err != nil {
return nil, err
if newTxn {
// Clean up the newly created transaction if we made one.
if err := txn.CommitOrCleanup(ctx); err != nil {
return nil, err
}
}

// Account for the memory used by this prepared statement.
Expand Down
189 changes: 189 additions & 0 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/jackc/pgx"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -558,3 +560,190 @@ func TestQueryProgress(t *testing.T) {
// flaky, we just make sure we see one of 4x% or 5x%
require.Regexp(t, `executing \([45]\d\.`, progress)
}

// This test ensures that when in an explicit transaction, statement preparation
// uses the user's transaction and thus properly interacts with deadlock
// detection.
func TestPrepareInExplicitTransactionDoesNotDeadlock(t *testing.T) {
defer leaktest.AfterTest(t)()

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

testDB := sqlutils.MakeSQLRunner(sqlDB)
testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")
testDB.Exec(t, "CREATE TABLE bar (i INT PRIMARY KEY)")

tx1, err := sqlDB.Begin()
require.NoError(t, err)

tx2, err := sqlDB.Begin()
require.NoError(t, err)

// So now I really want to try to have a deadlock.

_, err = tx1.Exec("ALTER TABLE foo ADD COLUMN j INT NOT NULL")
require.NoError(t, err)

_, err = tx2.Exec("ALTER TABLE bar ADD COLUMN j INT NOT NULL")
require.NoError(t, err)

// Now we want tx2 to get blocked on tx1 and stay blocked, then we want to
// push tx1 above tx2 and have it get blocked in planning.
errCh := make(chan error)
go func() {
_, err := tx2.Exec("ALTER TABLE foo ADD COLUMN k INT NOT NULL")
errCh <- err
}()
select {
case <-time.After(time.Millisecond):
case err := <-errCh:
t.Fatalf("expected the transaction to block, got %v", err)
default:
}

// Read from foo so that we can push tx1 above tx2.
testDB.Exec(t, "SELECT count(*) FROM foo")

// Write into foo to push tx1
_, err = tx1.Exec("INSERT INTO foo VALUES (1)")
require.NoError(t, err)

// Plan a query which will resolve bar during planning time, this would block
// and deadlock if it were run on a new transaction.
_, err = tx1.Prepare("SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1")
require.NoError(t, err)

// Try to commit tx1. Either it should get a RETRY_SERIALIZABLE error or
// tx2 should. Ensure that either one or both of them does.
if tx1Err := tx1.Commit(); tx1Err == nil {
// tx1 committed successfully, ensure tx2 failed.
tx2ExecErr := <-errCh
require.Regexp(t, "RETRY_SERIALIZABLE", tx2ExecErr)
_ = tx2.Rollback()
} else {
require.Regexp(t, "RETRY_SERIALIZABLE", tx1Err)
tx2ExecErr := <-errCh
require.NoError(t, tx2ExecErr)
if tx2CommitErr := tx2.Commit(); tx2CommitErr != nil {
require.Regexp(t, "RETRY_SERIALIZABLE", tx2CommitErr)
}
}
}

// This test ensures that when in an explicit transaction and statement
// preparation uses the user's transaction, errors during those planning queries
// are handled correctly.
func TestErrorDuringPrepareInExplicitTransactionPropagates(t *testing.T) {
defer leaktest.AfterTest(t)()

filter := newDynamicRequestFilter()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: filter.filter,
},
},
})
defer s.Stopper().Stop(context.Background())

testDB := sqlutils.MakeSQLRunner(sqlDB)
testDB.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)")
testDB.Exec(t, "CREATE TABLE bar (i INT PRIMARY KEY)")

// This test will create an explicit transaction that encounters an error on
// a latter statement during planning of SHOW COLUMNS. The planning for this
// SHOW COLUMNS will be run in the user's transaction. The test will inject
// errors into the execution of that planning query and ensure that the user's
// transaction state evolves appropriately.

// Use pgx so that we can introspect error codes returned from cockroach.
pgURL, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "", url.User("root"))
defer cleanup()
conf, err := pgx.ParseConnectionString(pgURL.String())
require.NoError(t, err)
conn, err := pgx.Connect(conf)
require.NoError(t, err)

tx, err := conn.Begin()
require.NoError(t, err)

_, err = tx.Exec("SAVEPOINT cockroach_restart")
require.NoError(t, err)

// Do something with the user's transaction so that we'll use the user
// transaction in the planning of the below `SHOW COLUMNS`.
_, err = tx.Exec("INSERT INTO foo VALUES (1)")
require.NoError(t, err)

// Inject an error that will happen during planning.
filter.setFilter(func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.Txn == nil {
return nil
}
if req, ok := ba.GetArg(roachpb.Get); ok {
get := req.(*roachpb.GetRequest)
_, tableID, err := keys.DecodeTablePrefix(get.Key)
if err != nil || tableID != keys.NamespaceTableID {
err = nil
return nil
}
return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError(
"boom", ba.Txn.ID, *ba.Txn))
}
return nil
})

// Plan a query will get a restart error during planning.
_, err = tx.Prepare("show_columns", "SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1")
require.Regexp(t,
"restart transaction: TransactionRetryWithProtoRefreshError: boom", err)
pgErr, ok := err.(pgx.PgError)
require.True(t, ok)
require.Equal(t, pgcode.SerializationFailure, pgErr.Code)

// Clear the error producing filter, restart the transaction, and run it to
// completion.
filter.setFilter(nil)

_, err = tx.Exec("ROLLBACK TO SAVEPOINT cockroach_restart")
require.NoError(t, err)

_, err = tx.Exec("INSERT INTO foo VALUES (1)")
require.NoError(t, err)
_, err = tx.Prepare("show_columns", "SELECT NULL FROM [SHOW COLUMNS FROM bar] LIMIT 1")
require.NoError(t, err)
require.NoError(t, tx.Commit())
}

// dynamicRequestFilter exposes a filter method which is a
// storagebase.ReplicaRequestFilter but can be set dynamically.
type dynamicRequestFilter struct {
v atomic.Value
}

func newDynamicRequestFilter() *dynamicRequestFilter {
f := &dynamicRequestFilter{}
f.v.Store(storagebase.ReplicaRequestFilter(noopRequestFilter))
return f
}

func (f *dynamicRequestFilter) setFilter(filter storagebase.ReplicaRequestFilter) {
if filter == nil {
f.v.Store(storagebase.ReplicaRequestFilter(noopRequestFilter))
} else {
f.v.Store(filter)
}
}

// noopRequestFilter is a storagebase.ReplicaRequestFilter.
func (f *dynamicRequestFilter) filter(
ctx context.Context, request roachpb.BatchRequest,
) *roachpb.Error {
return f.v.Load().(storagebase.ReplicaRequestFilter)(ctx, request)
}

// noopRequestFilter is a storagebase.ReplicaRequestFilter that does nothing.
func noopRequestFilter(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
return nil
}

0 comments on commit 0b945bf

Please sign in to comment.