Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[READY FOR REVIEW] [DO NOT MERGE] fix prepared statement cache invalidation #860

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ included below.
* pgtype has been spun off to a separate package (github.com/jackc/pgtype).
* pgproto3 has been spun off to a separate package (github.com/jackc/pgproto3/v2).
* Logical replication support has been spun off to a separate package (github.com/jackc/pglogrepl).
* Lower level PostgreSQL functionality is now implemented in a separate package (github.com/jackc/pgconn).
* Lower level PostgreSQL functionality is now implemented in a separate package (github.com/ethanpailes/pgconn).
* Tests are now configured with environment variables.
* Conn has an automatic statement cache by default.
* Batch interface has been simplified.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pgx follows semantic versioning for the documented public API on stable releases
pgx is the head of a family of PostgreSQL libraries. Many of these can be used independently. Many can also be accessed
from pgx for lower-level control.

### [github.com/jackc/pgconn](https://github.com/jackc/pgconn)
### [github.com/ethanpailes/pgconn](https://github.com/ethanpailes/pgconn)

`pgconn` is a lower-level PostgreSQL database driver that operates at nearly the same level as the C library `libpq`.

Expand Down
2 changes: 1 addition & 1 deletion batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pgx
import (
"context"

"github.com/jackc/pgconn"
"github.com/ethanpailes/pgconn"
errors "golang.org/x/xerrors"
)

Expand Down
4 changes: 2 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"os"
"testing"

"github.com/jackc/pgconn"
"github.com/jackc/pgconn/stmtcache"
"github.com/ethanpailes/pgconn"
"github.com/ethanpailes/pgconn/stmtcache"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
4 changes: 2 additions & 2 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"testing"
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgconn/stmtcache"
"github.com/ethanpailes/pgconn"
"github.com/ethanpailes/pgconn/stmtcache"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
Expand Down
69 changes: 67 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

errors "golang.org/x/xerrors"

"github.com/jackc/pgconn"
"github.com/jackc/pgconn/stmtcache"
"github.com/ethanpailes/pgconn"
"github.com/ethanpailes/pgconn/stmtcache"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4/internal/sanitize"
)
Expand Down Expand Up @@ -74,6 +74,15 @@ type Conn struct {
wbuf []byte
preallocatedRows []connRows
eqb extendedQueryBuilder

// We track whether or not this connection has any outstanding un-`Close`d transactions
// because certain operations (like deallocating a prepared statement) are not guaranteed to
// succeed in the middle of a transaction.
inTx bool
// A queue of statements to clobber from the statement cache as soon as it is safe to do so
// (once we are not in a transaction or running batch sql).
stmtsToFlush []string

}

// Identifier a PostgreSQL identifier or name. Identifiers can be composed of
Expand Down Expand Up @@ -652,6 +661,16 @@ optionLoop:
rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.paramValues, sd.ParamOIDs, c.eqb.paramFormats, resultFormats)
} else {
rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.paramValues, c.eqb.paramFormats, resultFormats)
if IsInvalidCachedStatementPlanError(rows.resultReader.Err()) {
rows.fatal(rows.resultReader.Err())
// If the statement has been invalidated out from under us, we won't retry, but we do
// clobber it out of the cache so that an application level retry has a prayer of
// succeeding.
err = c.invalidateStmt(ctx, sd.SQL) // already have an error to return
if err != nil {
rows.err = errors.Errorf("cleaning up from '%s': %s", rows.err.Error(), err.Error())
}
}
}

return rows, rows.err
Expand Down Expand Up @@ -790,3 +809,49 @@ func (c *Conn) sanitizeForSimpleQuery(sql string, args ...interface{}) (string,

return sanitize.SanitizeSQL(sql, valueArgs...)
}

// invalidateStmt purges the given prepared statement from the stmtcache, or queues it up
// for flushing if the connection is not in a good state to flush a prepared statement.
func (c *Conn) invalidateStmt(ctx context.Context, sql string) error {
// TODO(ethan): for some reason I'm trying to invalidate a statement when the connection
// is locked. Wut do, I don't think I can explictly unlock, and I'm not even
// really sure what this lock is protecting anyway. I think I may need to test
// if the connection is busy here as well, though I'm not really sure how I
// can arrange to be notified when the lock is released.
if c.inTx {
c.stmtsToFlush = append(c.stmtsToFlush, sql)
return nil
} else {
return c.stmtcache.ClearStmt(ctx, sql)
}
}

func (c *Conn) setInTx(ctx context.Context, value bool) error {
c.inTx = value

if !c.inTx {
// flush any outstanding statements
for _, sql := range c.stmtsToFlush {
err := c.stmtcache.ClearStmt(ctx, sql)
if err != nil {
return err
}
}
}

return nil
}

// IsInvalidCachedStatementPlanError inspects the given error and returns true if the
// error is due to the fact that we just tried to execute a cached prepared statement
// that is now invalid due to a schema change.
func IsInvalidCachedStatementPlanError(err error) bool {
pgErr, ok := err.(*pgconn.PgError)
if !ok {
return false
}

return pgErr.Severity == "ERROR" &&
pgErr.Code == "0A000" &&
pgErr.Message == "cached plan must not change result type"
}
131 changes: 129 additions & 2 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"testing"
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgconn/stmtcache"
"github.com/ethanpailes/pgconn"
"github.com/ethanpailes/pgconn/stmtcache"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -879,3 +879,130 @@ func TestDomainType(t *testing.T) {
}
})
}

func TestStmtCacheInvalidationConn(t *testing.T) {
ctx := context.Background()

conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)

// create a table and fill it with some data
_, err := conn.Exec(ctx, `
DROP TABLE IF EXISTS drop_cols;
CREATE TABLE drop_cols (
id SERIAL PRIMARY KEY NOT NULL,
f1 int NOT NULL,
f2 int NOT NULL
);
`)
require.NoError(t, err)
_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
require.NoError(t, err)

getSQL := "SELECT * FROM drop_cols WHERE id = $1"

// This query will populate the statement cache. We don't care about the result.
rows, err := conn.Query(ctx, getSQL, 1)
require.NoError(t, err)
rows.Close()

// Now, change the schema of the table out from under the statement, making it invalid.
_, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
require.NoError(t, err)

// We must get an error the first time we try to re-execute a bad statement.
// It is up to the application to determine if it wants to try again. We punt to
// the application because there is no clear recovery path in the case of failed transactions
// or batch operations and because automatic retry is tricky and we don't want to get
// it wrong at such an importaint layer of the stack.
rows, err = conn.Query(ctx, getSQL, 1)
if !pgx.IsInvalidCachedStatementPlanError(err) {
if err == nil {
t.Fatal("expected InvalidCachedStatementPlanError: no error")
} else {
t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error())
}
}
rows.Close()

// On retry, the statement should have been flushed from the cache.
rows, err = conn.Query(ctx, getSQL, 1)
require.NoError(t, err)
rows.Next()
err = rows.Err()
require.NoError(t, err)
rows.Close()

ensureConnValid(t, conn)
}

func TestStmtCacheInvalidationTx(t *testing.T) {
ctx := context.Background()

conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)

// create a table and fill it with some data
_, err := conn.Exec(ctx, `
DROP TABLE IF EXISTS drop_cols;
CREATE TABLE drop_cols (
id SERIAL PRIMARY KEY NOT NULL,
f1 int NOT NULL,
f2 int NOT NULL
);
`)
require.NoError(t, err)
_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
require.NoError(t, err)

tx, err := conn.Begin(ctx)
require.NoError(t, err)

getSQL := "SELECT * FROM drop_cols WHERE id = $1"

// This query will populate the statement cache. We don't care about the result.
rows, err := tx.Query(ctx, getSQL, 1)
require.NoError(t, err)
rows.Close()

// Now, change the schema of the table out from under the statement, making it invalid.
_, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
require.NoError(t, err)

// We must get an error the first time we try to re-execute a bad statement.
// It is up to the application to determine if it wants to try again. We punt to
// the application because there is no clear recovery path in the case of failed transactions
// or batch operations and because automatic retry is tricky and we don't want to get
// it wrong at such an importaint layer of the stack.
rows, err = tx.Query(ctx, getSQL, 1)
if !pgx.IsInvalidCachedStatementPlanError(err) {
if err == nil {
t.Fatal("expected InvalidCachedStatementPlanError: no error")
} else {
t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error())
}
}
rows.Close()

rows, err = tx.Query(ctx, getSQL, 1)
require.NoError(t, err) // error does not pop up immediately
rows.Next()
err = rows.Err()
// Retries within the same transaction are errors (really anything except a rollbakc
// will be an error in this transaction).
require.Error(t, err)
rows.Close()

err = tx.Rollback(ctx)
require.NoError(t, err)

// once we've rolled back, retries will work
rows, err = conn.Query(ctx, getSQL, 1)
require.NoError(t, err)
rows.Next()
err = rows.Err()
require.NoError(t, err)
rows.Close()

ensureConnValid(t, conn)
}
2 changes: 1 addition & 1 deletion copy_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"io"

"github.com/jackc/pgconn"
"github.com/ethanpailes/pgconn"
"github.com/jackc/pgio"
errors "golang.org/x/xerrors"
)
Expand Down
2 changes: 1 addition & 1 deletion copy_from_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

"github.com/jackc/pgconn"
"github.com/ethanpailes/pgconn"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/require"
errors "golang.org/x/xerrors"
Expand Down
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ go.uber.org/zap, github.com/rs/zerolog, and the testing log are provided in the

Lower Level PostgreSQL Functionality

pgx is implemented on top of github.com/jackc/pgconn a lower level PostgreSQL driver. The Conn.PgConn() method can be
pgx is implemented on top of github.com/ethanpailes/pgconn a lower level PostgreSQL driver. The Conn.PgConn() method can be
used to access this lower layer.

PgBouncer
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/cockroachdb/apd v1.1.0
github.com/gofrs/uuid v3.2.0+incompatible
github.com/jackc/pgconn v1.7.1
github.com/ethanpailes/pgconn v1.7.101
github.com/jackc/pgio v1.0.0
github.com/jackc/pgproto3/v2 v2.0.5
github.com/jackc/pgtype v1.6.1
Expand Down
Loading