From 875b779cec90fc8025f9ce828817e3c135c40abd Mon Sep 17 00:00:00 2001 From: Lars Kanis Date: Sun, 3 Oct 2021 21:27:33 +0200 Subject: [PATCH] Add support for pipeline mode of PostgreSQL-14 Fixes #401 --- ext/extconf.rb | 1 + ext/pg.c | 34 ++++++++++ ext/pg_connection.c | 131 +++++++++++++++++++++++++++++++++++++ ext/pg_result.c | 9 +++ spec/helpers.rb | 11 ++++ spec/pg/connection_spec.rb | 115 ++++++++++++++++++++++++++++++++ 6 files changed, 301 insertions(+) diff --git a/ext/extconf.rb b/ext/extconf.rb index 566e73116..a96950974 100755 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -146,6 +146,7 @@ module PG have_func 'PQresultVerboseErrorMessage' # since PostgreSQL-9.6 have_func 'PQencryptPasswordConn' # since PostgreSQL-10 have_func 'PQresultMemorySize' # since PostgreSQL-12 +have_func 'PQenterPipelineMode' # since PostgreSQL-14 have_func 'timegm' have_func 'rb_gc_adjust_memory_usage' # since ruby-2.4 have_func 'rb_gc_mark_movable' # since ruby-2.7 diff --git a/ext/pg.c b/ext/pg.c index 18af87bcd..020604bcb 100644 --- a/ext/pg.c +++ b/ext/pg.c @@ -526,6 +526,19 @@ Init_pg_ext() /* Result#result_status constant - Single tuple from larger resultset. */ rb_define_const(rb_mPGconstants, "PGRES_SINGLE_TUPLE", INT2FIX(PGRES_SINGLE_TUPLE)); +#ifdef HAVE_PQENTERPIPELINEMODE + /* Result#result_status constant - The PG::Result represents a synchronization point in pipeline mode, requested by Connection#pipeline_sync. + * + * This status occurs only when pipeline mode has been selected. */ + rb_define_const(rb_mPGconstants, "PGRES_PIPELINE_SYNC", INT2FIX(PGRES_PIPELINE_SYNC)); + + /* Result#result_status constant - The PG::Result represents a pipeline that has received an error from the server. + * + * Connection#get_result must be called repeatedly, and each time it will return this status code until the end of the current pipeline, at which point it will return PG::PGRES_PIPELINE_SYNC and normal processing can resume. + */ + rb_define_const(rb_mPGconstants, "PGRES_PIPELINE_ABORTED", INT2FIX(PGRES_PIPELINE_ABORTED)); +#endif + /****** Result CONSTANTS: result error field codes ******/ /* Result#result_error_field argument constant @@ -645,6 +658,27 @@ Init_pg_ext() rb_define_const(rb_mPGconstants, "PG_DIAG_CONSTRAINT_NAME", INT2FIX(PG_DIAG_CONSTRAINT_NAME)); #endif +#ifdef HAVE_PQENTERPIPELINEMODE + /* Connection#pipeline_status constant + * + * The libpq connection is in pipeline mode. + */ + rb_define_const(rb_mPGconstants, "PQ_PIPELINE_ON", INT2FIX(PQ_PIPELINE_ON)); + + /* Connection#pipeline_status constant + * + * The libpq connection is not in pipeline mode. + */ + rb_define_const(rb_mPGconstants, "PQ_PIPELINE_OFF", INT2FIX(PQ_PIPELINE_OFF)); + + /* Connection#pipeline_status constant + * + * The libpq connection is in pipeline mode and an error occurred while processing the current pipeline. + * The aborted flag is cleared when PQgetResult returns a result of type PGRES_PIPELINE_SYNC. + */ + rb_define_const(rb_mPGconstants, "PQ_PIPELINE_ABORTED", INT2FIX(PQ_PIPELINE_ABORTED)); +#endif + /* Invalid OID constant */ rb_define_const(rb_mPGconstants, "INVALID_OID", INT2FIX(InvalidOid)); rb_define_const(rb_mPGconstants, "InvalidOid", INT2FIX(InvalidOid)); diff --git a/ext/pg_connection.c b/ext/pg_connection.c index 17db6ea14..08c8230f1 100644 --- a/ext/pg_connection.c +++ b/ext/pg_connection.c @@ -1564,6 +1564,9 @@ pgconn_describe_portal(self, stmt_name) * * +PGRES_NONFATAL_ERROR+ * * +PGRES_FATAL_ERROR+ * * +PGRES_COPY_BOTH+ + * * +PGRES_SINGLE_TUPLE+ + * * +PGRES_PIPELINE_SYNC+ + * * +PGRES_PIPELINE_ABORTED+ */ static VALUE pgconn_make_empty_pgresult(VALUE self, VALUE status) @@ -3561,6 +3564,126 @@ pgconn_ssl_attribute_names(VALUE self) #endif +#ifdef HAVE_PQENTERPIPELINEMODE +/* + * call-seq: + * conn.pipeline_status -> Integer + * + * Returns the current pipeline mode status of the libpq connection. + * + * PQpipelineStatus can return one of the following values: + * + * * PQ_PIPELINE_ON - The libpq connection is in pipeline mode. + * * PQ_PIPELINE_OFF - The libpq connection is not in pipeline mode. + * * PQ_PIPELINE_ABORTED - The libpq connection is in pipeline mode and an error occurred while processing the current pipeline. + * The aborted flag is cleared when PQgetResult returns a result of type PGRES_PIPELINE_SYNC. + * + * Available since PostgreSQL-14 + */ +static VALUE +pgconn_pipeline_status(VALUE self) +{ + int res = PQpipelineStatus(pg_get_pgconn(self)); + return INT2FIX(res); +} + + +/* + * call-seq: + * conn.enter_pipeline_mode -> nil + * + * Causes a connection to enter pipeline mode if it is currently idle or already in pipeline mode. + * + * Raises PG::Error and has no effect if the connection is not currently idle, i.e., it has a result ready, or it is waiting for more input from the server, etc. + * This function does not actually send anything to the server, it just changes the libpq connection state. + * + * Available since PostgreSQL-14 + */ +static VALUE +pgconn_enter_pipeline_mode(VALUE self) +{ + PGconn *conn = pg_get_pgconn(self); + int res = PQenterPipelineMode(conn); + if( res == 1 ) { + return Qnil; + } else { + rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn)); + } +} + +/* + * call-seq: + * conn.exit_pipeline_mode -> nil + * + * Causes a connection to exit pipeline mode if it is currently in pipeline mode with an empty queue and no pending results. + * + * Takes no action if not in pipeline mode. + * Raises PG::Error if the current statement isn't finished processing, or PQgetResult has not been called to collect results from all previously sent query. + * + * Available since PostgreSQL-14 + */ +static VALUE +pgconn_exit_pipeline_mode(VALUE self) +{ + PGconn *conn = pg_get_pgconn(self); + int res = PQexitPipelineMode(conn); + if( res == 1 ) { + return Qnil; + } else { + rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn)); + } +} + + +/* + * call-seq: + * conn.pipeline_sync -> nil + * + * Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer. + * This serves as the delimiter of an implicit transaction and an error recovery point; see Section 34.5.1.3 of the PostgreSQL documentation. + * + * Raises PG::Error if the connection is not in pipeline mode or sending a sync message failed. + * + * Available since PostgreSQL-14 + */ +static VALUE +pgconn_pipeline_sync(VALUE self) +{ + PGconn *conn = pg_get_pgconn(self); + int res = PQpipelineSync(conn); + if( res == 1 ) { + return Qnil; + } else { + rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn)); + } +} + +/* + * call-seq: + * conn.pipeline_sync -> nil + * + * Sends a request for the server to flush its output buffer. + * + * The server flushes its output buffer automatically as a result of Connection#pipeline_sync being called, or on any request when not in pipeline mode. + * This function is useful to cause the server to flush its output buffer in pipeline mode without establishing a synchronization point. + * Note that the request is not itself flushed to the server automatically; use Connection#flush if necessary. + * + * Available since PostgreSQL-14 + */ +static VALUE +pgconn_send_flush_request(VALUE self) +{ + PGconn *conn = pg_get_pgconn(self); + int res = PQsendFlushRequest(conn); + if( res == 1 ) { + return Qnil; + } else { + rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn)); + } +} + +#endif + /************************************************************************** * LARGE OBJECT SUPPORT **************************************************************************/ @@ -4414,6 +4537,14 @@ init_pg_connection() rb_define_method(rb_cPGconn, "ssl_attribute_names", pgconn_ssl_attribute_names, 0); #endif +#ifdef HAVE_PQENTERPIPELINEMODE + rb_define_method(rb_cPGconn, "pipeline_status", pgconn_pipeline_status, 0); + rb_define_method(rb_cPGconn, "enter_pipeline_mode", pgconn_enter_pipeline_mode, 0); + rb_define_method(rb_cPGconn, "exit_pipeline_mode", pgconn_exit_pipeline_mode, 0); + rb_define_method(rb_cPGconn, "pipeline_sync", pgconn_pipeline_sync, 0); + rb_define_method(rb_cPGconn, "send_flush_request", pgconn_send_flush_request, 0); +#endif + /****** PG::Connection INSTANCE METHODS: Large Object Support ******/ rb_define_method(rb_cPGconn, "lo_creat", pgconn_locreat, -1); rb_define_alias(rb_cPGconn, "locreat", "lo_creat"); diff --git a/ext/pg_result.c b/ext/pg_result.c index ff6820248..041f8974c 100644 --- a/ext/pg_result.c +++ b/ext/pg_result.c @@ -318,10 +318,16 @@ pg_result_check( VALUE self ) case PGRES_SINGLE_TUPLE: case PGRES_EMPTY_QUERY: case PGRES_COMMAND_OK: +#ifdef HAVE_PQENTERPIPELINEMODE + case PGRES_PIPELINE_SYNC: +#endif return self; case PGRES_BAD_RESPONSE: case PGRES_FATAL_ERROR: case PGRES_NONFATAL_ERROR: +#ifdef HAVE_PQENTERPIPELINEMODE + case PGRES_PIPELINE_ABORTED: +#endif error = rb_str_new2( PQresultErrorMessage(this->pgresult) ); break; default: @@ -518,6 +524,9 @@ static void pgresult_init_fnames(VALUE self) * * +PGRES_NONFATAL_ERROR+ * * +PGRES_FATAL_ERROR+ * * +PGRES_COPY_BOTH+ + * * +PGRES_SINGLE_TUPLE+ + * * +PGRES_PIPELINE_SYNC+ + * * +PGRES_PIPELINE_ABORTED+ */ static VALUE pgresult_result_status(VALUE self) diff --git a/spec/helpers.rb b/spec/helpers.rb index 83f106845..032defa9b 100644 --- a/spec/helpers.rb +++ b/spec/helpers.rb @@ -33,6 +33,17 @@ def self::included( mod ) [@conn.escape_string(desc.slice(-60))] example.run ensure + if @conn.respond_to?(:exit_pipeline_mode) && + @conn.pipeline_status != PG::PQ_PIPELINE_OFF + @conn.pipeline_sync + # Fetch results until two successive nil's + loop do + unless @conn.get_result + break unless @conn.get_result + end + end + @conn.exit_pipeline_mode + end @conn.exec( 'ROLLBACK' ) unless example.metadata[:without_transaction] end end diff --git a/spec/pg/connection_spec.rb b/spec/pg/connection_spec.rb index 3a311f29d..dc4bd1974 100644 --- a/spec/pg/connection_spec.rb +++ b/spec/pg/connection_spec.rb @@ -1528,6 +1528,121 @@ def interrupt_thread(exc=nil) end + context "pipeline mode", :postgresql_14 do + + describe "pipeline_status" do + it "can enter and exit the pipeline mode" do + @conn.enter_pipeline_mode + expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_ON ) + @conn.exit_pipeline_mode + expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_OFF ) + end + end + + describe "enter_pipeline_mode" do + it "does nothing if already in pipeline mode" do + @conn.enter_pipeline_mode + @conn.enter_pipeline_mode + expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_ON ) + end + + it "raises an error when called with pending results" do + @conn.send_query "select 1" + expect { + @conn.enter_pipeline_mode + }.to raise_error(PG::Error) + @conn.get_last_result + end + end + + describe "exit_pipeline_mode" do + it "does nothing if not in pipeline mode" do + @conn.exit_pipeline_mode + expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_OFF ) + end + + it "raises an error when called with pending results" do + @conn.enter_pipeline_mode + @conn.send_query "select 1" + expect { + @conn.exit_pipeline_mode + }.to raise_error(PG::Error) + @conn.send_flush_request + @conn.get_last_result + end + end + + describe "pipeline_sync" do + it "sends a sync message" do + @conn.enter_pipeline_mode + @conn.send_query "select 6" + @conn.pipeline_sync + expect( @conn.get_result.result_status ).to eq( PG::PGRES_TUPLES_OK ) + expect( @conn.get_result ).to be_nil + expect( @conn.get_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC ) + expect( @conn.get_result ).to be_nil + expect( @conn.get_result ).to be_nil + @conn.exit_pipeline_mode + end + + it "raises an error when not in pipeline mode" do + expect { + @conn.pipeline_sync + }.to raise_error(PG::Error) + end + end + + describe "send_flush_request" do + it "flushs all results" do + @conn.enter_pipeline_mode + @conn.send_query "select 1" + @conn.send_flush_request + @conn.flush + expect( @conn.get_result.result_status ).to eq( PG::PGRES_TUPLES_OK ) + expect( @conn.get_result ).to be_nil + expect( @conn.get_result ).to be_nil + end + + it "raises an error when called with pending results" do + @conn.send_query "select 1" + expect { + @conn.send_flush_request + }.to raise_error(PG::Error) + end + end + + describe "get_last_result" do + it "delivers PGRES_PIPELINE_SYNC" do + @conn.enter_pipeline_mode + @conn.send_query "select 6" + @conn.pipeline_sync + expect( @conn.get_last_result.values ).to eq( [["6"]] ) + expect( @conn.get_last_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC ) + @conn.exit_pipeline_mode + end + + it "raises an error for PGRES_PIPELINE_ABORT" do + @conn.enter_pipeline_mode + @conn.send_query("garbage") + @conn.send_query("SELECT 7") + @conn.pipeline_sync + begin + @conn.get_last_result + rescue PG::SyntaxError => err1 + end + expect( err1.result.result_status ).to eq( PG::PGRES_FATAL_ERROR ) + begin + @conn.get_last_result + rescue PG::UnableToSend => err2 + end + expect( err2.result.result_status ).to eq( PG::PGRES_PIPELINE_ABORTED ) + expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_ABORTED ) + expect( @conn.get_last_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC ) + @conn.exit_pipeline_mode + end + end + end + context "multinationalization support" do describe "rubyforge #22925: m17n support" do