Skip to content

Commit

Permalink
sql: make sure pgwire bind always happens in a transaction
Browse files Browse the repository at this point in the history
The approach of using a transaction matches what we do for pgwire Parse
messages already. This is important to make sure that user-defined types
are leased correctly.

This also updated the SQL EXECUTE command to resolve user-defined types
so that it gets the latest changes.

Release note (bug fix): Adding new values to a user-defined enum type
will previously would cause a prepared statement using that type to not
work. This now works as expected.
  • Loading branch information
rafiss committed Oct 19, 2021
1 parent c8c186f commit 9db9e35
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (ex *connExecutor) execStmtInOpenState(
return makeErrEvent(err)
}
var err error
pinfo, err = fillInPlaceholders(ctx, ps, name, e.Params, ex.sessionData().SearchPath)
pinfo, err = ex.planner.fillInPlaceholders(ctx, ps, name, e.Params)
if err != nil {
return makeErrEvent(err)
}
Expand Down
76 changes: 53 additions & 23 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,33 +372,63 @@ func (ex *connExecutor) execBind(
"expected %d arguments, got %d", numQArgs, len(bindCmd.Args)))
}

for i, arg := range bindCmd.Args {
k := tree.PlaceholderIdx(i)
t := ps.InferredTypes[i]
if arg == nil {
// nil indicates a NULL argument value.
qargs[k] = tree.DNull
} else {
typ, ok := types.OidToType[t]
if !ok {
var err error
typ, err = ex.planner.ResolveTypeByOID(ctx, t)
// We need to make sure type resolution happens within a transaction.
// Otherwise, for user-defined types we won't take the correct leases and
// will get back out of date type information.
// This code path is only used by the wire-level Bind command. The
// SQL EXECUTE command (which also needs to bind and resolve types) is
// handled separately in conn_executor_exec.
resolve := func(ctx context.Context, txn *kv.Txn) (err error) {
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
p := &ex.planner
ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */)

for i, arg := range bindCmd.Args {
k := tree.PlaceholderIdx(i)
t := ps.InferredTypes[i]
if arg == nil {
// nil indicates a NULL argument value.
qargs[k] = tree.DNull
} else {
typ, ok := types.OidToType[t]
if !ok {
var err error
typ, err = ex.planner.ResolveTypeByOID(ctx, t)
if err != nil {
return err
}
}
d, err := pgwirebase.DecodeDatum(
ex.planner.EvalContext(),
typ,
qArgFormatCodes[i],
arg,
)
if err != nil {
return nil, err
return pgerror.Wrapf(err, pgcode.ProtocolViolation, "error in argument for %s", k)
}
qargs[k] = d
}
d, err := pgwirebase.DecodeDatum(
ex.planner.EvalContext(),
typ,
qArgFormatCodes[i],
arg,
)
if err != nil {
return retErr(pgerror.Wrapf(err, pgcode.ProtocolViolation,
"error in argument for %s", k))
}
qargs[k] = d
}
return nil
}

if txn := ex.state.mu.txn; txn != nil && txn.IsOpen() {
// Use the existing transaction.
if err := resolve(ctx, txn); err != nil {
return retErr(err)
}
} else {
// Use a new transaction. This will handle retriable errors here rather
// than bubbling them up to the connExecutor state machine.
if err := ex.server.cfg.DB.Txn(ctx, resolve); err != nil {
return retErr(err)
}
// Prepare with an implicit transaction will end up creating
// a new transaction. Once this transaction is complete,
// we can safely release the leases, otherwise we will
// incorrectly hold leases for later operations.
ex.extraTxnState.descCollection.ReleaseAll(ctx)
}
}

Expand Down
20 changes: 13 additions & 7 deletions pkg/sql/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

// fillInPlaceholder helps with the EXECUTE foo(args) SQL statement: it takes in
// a prepared statement returning
// the referenced prepared statement and correctly updated placeholder info.
// See https://www.postgresql.org/docs/current/static/sql-execute.html for details.
func fillInPlaceholders(
ctx context.Context,
ps *PreparedStatement,
name string,
params tree.Exprs,
searchPath sessiondata.SearchPath,
func (p *planner) fillInPlaceholders(
ctx context.Context, ps *PreparedStatement, name string, params tree.Exprs,
) (*tree.PlaceholderInfo, error) {
if len(ps.Types) != len(params) {
return nil, pgerror.Newf(pgcode.Syntax,
Expand All @@ -47,6 +43,16 @@ func fillInPlaceholders(
if !ok {
return nil, errors.AssertionFailedf("no type for placeholder %s", idx)
}

// For user-defined types, we need to resolve the type to make sure we get
// the latest changes to the type.
if _, ok := types.OidToType[typ.Oid()]; !ok {
var err error
typ, err = p.ResolveTypeByOID(ctx, typ.Oid())
if err != nil {
return nil, err
}
}
typedExpr, err := schemaexpr.SanitizeVarFreeExpr(
ctx, e, typ, "EXECUTE parameter" /* context */, &semaCtx, tree.VolatilityVolatile,
)
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/enums
Original file line number Diff line number Diff line change
Expand Up @@ -1545,3 +1545,27 @@ ORDER BY a.id, b.id
{a,b} {a} false false false false
{a,b} {b} false true true false
{a,b} {a,b} true false true true

# Make sure that adding a new enum value works well with PREPARE/EXECUTE.
subtest regression_70378

statement ok
CREATE TYPE enum_70378 AS ENUM ('a', 'b');
CREATE TABLE enum_table (a enum_70378);
PREPARE q AS INSERT INTO enum_table VALUES($1);
EXECUTE q('a')

query T
SELECT * FROM enum_table
----
a

statement ok
ALTER TYPE enum_70378 ADD VALUE 'c';
EXECUTE q('c')

query T rowsort
SELECT * FROM enum_table
----
a
c
89 changes: 82 additions & 7 deletions pkg/sql/pgwire/testdata/pgtest/bind_and_resolve
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@ ReadyForQuery
# planner transaction but never set it. This was pretty much the only
# way you could do such a thing.

send
send crdb_only
Query {"String": "BEGIN AS OF SYSTEM TIME '1s'"}
Sync
----


# TODO(ajwerner): Why are there two ReadyForQuery?

until
# There are two ReadyForQuerys because a simply query was followed by Sync
until crdb_only
ErrorResponse
ReadyForQuery
ReadyForQuery
Expand All @@ -55,16 +53,93 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
send crdb_only
Bind {"DestinationPortal": "p7", "PreparedStatement": "s7", "ParameterFormatCodes": [0], "Parameters": [{"text":"T"}]}
Execute {"Portal": "p7"}
Sync
----

until
until crdb_only
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"52"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "DROP TABLE IF EXISTS tab"}
----

until ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DROP TABLE"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "BEGIN"}
----

until ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"BEGIN"}
{"Type":"ReadyForQuery","TxStatus":"T"}

send
Parse {"Name": "s8", "Query": "SELECT relname FROM pg_class WHERE oid = $1::regclass"}
Bind {"DestinationPortal": "p8", "PreparedStatement": "s8", "ResultFormatCodes": [0], "Parameters": [{"text":"t"}]}
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"ReadyForQuery","TxStatus":"T"}

send
Query {"String": "ALTER TABLE t RENAME TO tab"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"ALTER TABLE"}
{"Type":"ReadyForQuery","TxStatus":"T"}

send
Execute {"Portal": "p8"}
Sync
----

until noncrdb_only
ReadyForQuery
----
{"Type":"DataRow","Values":[{"text":"t"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"T"}

# Currently, CRDB differs in that it returns the new table name here, but the
# important part of this test is that it asserts that binding the placeholder
# parameter occurred before the table rename, which matches the Postgres
# behavior.
# TODO(rafi): To be fully correct, we still should return table name 't' here.
until crdb_only
ReadyForQuery
----
{"Type":"DataRow","Values":[{"text":"tab"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"T"}

send
Query {"String": "COMMIT"}
----

until ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"COMMIT"}
{"Type":"ReadyForQuery","TxStatus":"I"}
58 changes: 55 additions & 3 deletions pkg/sql/pgwire/testdata/pgtest/enum
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,30 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send noncrdb_only
Parse {"Name": "s1", "Query": "INSERT INTO tb VALUES ($1)"}
Bind {"DestinationPortal": "p", "PreparedStatement": "s1", "ParameterFormatCodes": [0], "Parameters": [{"text":"hi"}]}
Execute {"Portal": "p"}
Sync
----

until noncrdb_only
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Ensure that our value was successfully inserted.
send crdb_only
send
Query {"String": "SELECT * FROM tb"}
----

until crdb_only
until ignore_type_oids ignore_table_oids ignore_data_type_sizes
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"x","TableOID":54,"TableAttributeNumber":1,"DataTypeOID":100052,"DataTypeSize":-1,"TypeModifier":-1,"Format":0}]}
{"Type":"RowDescription","Fields":[{"Name":"x","TableOID":0,"TableAttributeNumber":1,"DataTypeOID":0,"DataTypeSize":0,"TypeModifier":-1,"Format":0}]}
{"Type":"DataRow","Values":[{"text":"hi"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
Expand Down Expand Up @@ -193,3 +208,40 @@ ReadyForQuery
{"Type":"BindComplete"}
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "ALTER TYPE te ADD VALUE 'hola'"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"ALTER TYPE"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Reuse original prepared statement now that there's a new enum value.
send
Bind {"DestinationPortal": "p", "PreparedStatement": "s1", "ParameterFormatCodes": [0], "Parameters": [{"text":"hola"}]}
Execute {"Portal": "p"}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"CommandComplete","CommandTag":"INSERT 0 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Ensure that our value was successfully inserted.
send
Query {"String": "SELECT * FROM tb WHERE x = 'hola'"}
----

until ignore_type_oids ignore_table_oids ignore_data_type_sizes
ReadyForQuery
----
{"Type":"RowDescription","Fields":[{"Name":"x","TableOID":0,"TableAttributeNumber":1,"DataTypeOID":0,"DataTypeSize":0,"TypeModifier":-1,"Format":0}]}
{"Type":"DataRow","Values":[{"text":"hola"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
12 changes: 6 additions & 6 deletions pkg/sql/tests/enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ func (i intItem) Less(o btree.Item) bool {
}

// TestEnumPlaceholderWithAsOfSystemTime is a regression test for an edge case
// with bind where we would not properly deal with leases involving types. At
// the time of writing this test, we still don't deal with such leases properly
// but we did fix any really dangerous hazards.
// with bind where we would not properly deal with leases involving types.
func TestEnumPlaceholderWithAsOfSystemTime(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -132,11 +130,13 @@ func TestEnumPlaceholderWithAsOfSystemTime(t *testing.T) {
// Before the commit which introduced this test, the below statement would
// crash the server.
q := fmt.Sprintf("SELECT k FROM tab AS OF SYSTEM TIME %s WHERE v = $1", afterInsert)
db.Exec(t, q, "a")
require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "a"))
db.Exec(t, "ALTER TYPE typ RENAME VALUE 'a' TO 'd'")
db.Exec(t, "ALTER TYPE typ RENAME VALUE 'b' TO 'a'")
got := db.QueryStr(t, q, "a")
require.Equal(t, [][]string{{"1"}}, got)
// The AOST does not apply to the transaction that binds 'a' to the
// placeholder.
require.Equal(t, [][]string{}, db.QueryStr(t, q, "a"))
require.Equal(t, [][]string{{"1"}}, db.QueryStr(t, q, "d"))
}

// TestEnumDropValueCheckConstraint tests that check constraints containing
Expand Down

0 comments on commit 9db9e35

Please sign in to comment.