diff --git a/conn.go b/conn.go index 4362eef6b..5eb76ad03 100644 --- a/conn.go +++ b/conn.go @@ -652,6 +652,13 @@ optionLoop: rows.resultReader = c.pgConn.ExecParams(ctx, sql, c.eqb.paramValues, sd.ParamOIDs, c.eqb.paramFormats, resultFormats) } else { rows.resultReader = c.pgConn.ExecPrepared(ctx, sd.Name, c.eqb.paramValues, c.eqb.paramFormats, resultFormats) + readerErr := rows.resultReader.Err() + if readerErr != nil { + err = c.stmtcache.StatementErrored(ctx, sql, readerErr) + if err != nil { + rows.err = errors.Errorf("cleaning up from '%s': %s", readerErr.Error(), err.Error()) + } + } } return rows, rows.err diff --git a/conn_test.go b/conn_test.go index 8a604acd9..d0c33a385 100644 --- a/conn_test.go +++ b/conn_test.go @@ -879,3 +879,131 @@ func TestDomainType(t *testing.T) { } }) } + +func TestStmtCacheInvalidationConn(t *testing.T) { + ctx := context.Background() + + conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE")) + defer closeConn(t, conn) + + // create a table and fill it with some data + _, err := conn.Exec(ctx, ` + DROP TABLE IF EXISTS drop_cols; + CREATE TABLE drop_cols ( + id SERIAL PRIMARY KEY NOT NULL, + f1 int NOT NULL, + f2 int NOT NULL + ); + `) + require.NoError(t, err) + _, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)") + require.NoError(t, err) + + getSQL := "SELECT * FROM drop_cols WHERE id = $1" + + // This query will populate the statement cache. We don't care about the result. + rows, err := conn.Query(ctx, getSQL, 1) + require.NoError(t, err) + rows.Close() + + // Now, change the schema of the table out from under the statement, making it invalid. + _, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1") + require.NoError(t, err) + + // We must get an error the first time we try to re-execute a bad statement. + // It is up to the application to determine if it wants to try again. We punt to + // the application because there is no clear recovery path in the case of failed transactions + // or batch operations and because automatic retry is tricky and we don't want to get + // it wrong at such an importaint layer of the stack. + rows, err = conn.Query(ctx, getSQL, 1) + if err == nil { + t.Fatal("expected InvalidCachedStatementPlanError: no error") + } + if !strings.Contains(err.Error(), "cached plan must not change result type") { + t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error()) + } + rows.Close() + + // On retry, the statement should have been flushed from the cache. + rows, err = conn.Query(ctx, getSQL, 1) + require.NoError(t, err) + rows.Next() + err = rows.Err() + require.NoError(t, err) + rows.Close() + + ensureConnValid(t, conn) +} + +func TestStmtCacheInvalidationTx(t *testing.T) { + ctx := context.Background() + + conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE")) + defer closeConn(t, conn) + + // create a table and fill it with some data + _, err := conn.Exec(ctx, ` + DROP TABLE IF EXISTS drop_cols; + CREATE TABLE drop_cols ( + id SERIAL PRIMARY KEY NOT NULL, + f1 int NOT NULL, + f2 int NOT NULL + ); + `) + require.NoError(t, err) + _, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)") + require.NoError(t, err) + + tx, err := conn.Begin(ctx) + require.NoError(t, err) + + getSQL := "SELECT * FROM drop_cols WHERE id = $1" + + // This query will populate the statement cache. We don't care about the result. + rows, err := tx.Query(ctx, getSQL, 1) + require.NoError(t, err) + rows.Close() + + // Now, change the schema of the table out from under the statement, making it invalid. + _, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1") + require.NoError(t, err) + + // We must get an error the first time we try to re-execute a bad statement. + // It is up to the application to determine if it wants to try again. We punt to + // the application because there is no clear recovery path in the case of failed transactions + // or batch operations and because automatic retry is tricky and we don't want to get + // it wrong at such an importaint layer of the stack. + rows, err = tx.Query(ctx, getSQL, 1) + require.NoError(t, err) + rows.Next() + err = rows.Err() + if err == nil { + t.Fatal("expected InvalidCachedStatementPlanError: no error") + } + if !strings.Contains(err.Error(), "cached plan must not change result type") { + t.Fatalf("expected InvalidCachedStatementPlanError, got: %s", err.Error()) + } + rows.Close() + + rows, err = tx.Query(ctx, getSQL, 1) + require.NoError(t, err) // error does not pop up immediately + rows.Next() + err = rows.Err() + // Retries within the same transaction are errors (really anything except a rollbakc + // will be an error in this transaction). + require.Error(t, err) + rows.Close() + + err = tx.Rollback(ctx) + require.NoError(t, err) + + // once we've rolled back, retries will work + rows, err = conn.Query(ctx, getSQL, 1) + require.NoError(t, err) + rows.Next() + err = rows.Err() + require.NoError(t, err) + rows.Close() + + ensureConnValid(t, conn) +}