Skip to content

Commit

Permalink
Add support for pipeline mode of PostgreSQL-14
Browse files Browse the repository at this point in the history
Fixes ged#401
  • Loading branch information
larskanis committed Oct 4, 2021
1 parent 5895a37 commit 875b779
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 0 deletions.
1 change: 1 addition & 0 deletions ext/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ module PG
have_func 'PQresultVerboseErrorMessage' # since PostgreSQL-9.6
have_func 'PQencryptPasswordConn' # since PostgreSQL-10
have_func 'PQresultMemorySize' # since PostgreSQL-12
have_func 'PQenterPipelineMode' # since PostgreSQL-14
have_func 'timegm'
have_func 'rb_gc_adjust_memory_usage' # since ruby-2.4
have_func 'rb_gc_mark_movable' # since ruby-2.7
Expand Down
34 changes: 34 additions & 0 deletions ext/pg.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,19 @@ Init_pg_ext()
/* Result#result_status constant - Single tuple from larger resultset. */
rb_define_const(rb_mPGconstants, "PGRES_SINGLE_TUPLE", INT2FIX(PGRES_SINGLE_TUPLE));

#ifdef HAVE_PQENTERPIPELINEMODE
/* Result#result_status constant - The PG::Result represents a synchronization point in pipeline mode, requested by Connection#pipeline_sync.
*
* This status occurs only when pipeline mode has been selected. */
rb_define_const(rb_mPGconstants, "PGRES_PIPELINE_SYNC", INT2FIX(PGRES_PIPELINE_SYNC));

/* Result#result_status constant - The PG::Result represents a pipeline that has received an error from the server.
*
* Connection#get_result must be called repeatedly, and each time it will return this status code until the end of the current pipeline, at which point it will return PG::PGRES_PIPELINE_SYNC and normal processing can resume.
*/
rb_define_const(rb_mPGconstants, "PGRES_PIPELINE_ABORTED", INT2FIX(PGRES_PIPELINE_ABORTED));
#endif

/****** Result CONSTANTS: result error field codes ******/

/* Result#result_error_field argument constant
Expand Down Expand Up @@ -645,6 +658,27 @@ Init_pg_ext()
rb_define_const(rb_mPGconstants, "PG_DIAG_CONSTRAINT_NAME", INT2FIX(PG_DIAG_CONSTRAINT_NAME));
#endif

#ifdef HAVE_PQENTERPIPELINEMODE
/* Connection#pipeline_status constant
*
* The libpq connection is in pipeline mode.
*/
rb_define_const(rb_mPGconstants, "PQ_PIPELINE_ON", INT2FIX(PQ_PIPELINE_ON));

/* Connection#pipeline_status constant
*
* The libpq connection is not in pipeline mode.
*/
rb_define_const(rb_mPGconstants, "PQ_PIPELINE_OFF", INT2FIX(PQ_PIPELINE_OFF));

/* Connection#pipeline_status constant
*
* The libpq connection is in pipeline mode and an error occurred while processing the current pipeline.
* The aborted flag is cleared when PQgetResult returns a result of type PGRES_PIPELINE_SYNC.
*/
rb_define_const(rb_mPGconstants, "PQ_PIPELINE_ABORTED", INT2FIX(PQ_PIPELINE_ABORTED));
#endif

/* Invalid OID constant */
rb_define_const(rb_mPGconstants, "INVALID_OID", INT2FIX(InvalidOid));
rb_define_const(rb_mPGconstants, "InvalidOid", INT2FIX(InvalidOid));
Expand Down
131 changes: 131 additions & 0 deletions ext/pg_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,9 @@ pgconn_describe_portal(self, stmt_name)
* * +PGRES_NONFATAL_ERROR+
* * +PGRES_FATAL_ERROR+
* * +PGRES_COPY_BOTH+
* * +PGRES_SINGLE_TUPLE+
* * +PGRES_PIPELINE_SYNC+
* * +PGRES_PIPELINE_ABORTED+
*/
static VALUE
pgconn_make_empty_pgresult(VALUE self, VALUE status)
Expand Down Expand Up @@ -3561,6 +3564,126 @@ pgconn_ssl_attribute_names(VALUE self)
#endif


#ifdef HAVE_PQENTERPIPELINEMODE
/*
* call-seq:
* conn.pipeline_status -> Integer
*
* Returns the current pipeline mode status of the libpq connection.
*
* PQpipelineStatus can return one of the following values:
*
* * PQ_PIPELINE_ON - The libpq connection is in pipeline mode.
* * PQ_PIPELINE_OFF - The libpq connection is not in pipeline mode.
* * PQ_PIPELINE_ABORTED - The libpq connection is in pipeline mode and an error occurred while processing the current pipeline.
* The aborted flag is cleared when PQgetResult returns a result of type PGRES_PIPELINE_SYNC.
*
* Available since PostgreSQL-14
*/
static VALUE
pgconn_pipeline_status(VALUE self)
{
int res = PQpipelineStatus(pg_get_pgconn(self));
return INT2FIX(res);
}


/*
* call-seq:
* conn.enter_pipeline_mode -> nil
*
* Causes a connection to enter pipeline mode if it is currently idle or already in pipeline mode.
*
* Raises PG::Error and has no effect if the connection is not currently idle, i.e., it has a result ready, or it is waiting for more input from the server, etc.
* This function does not actually send anything to the server, it just changes the libpq connection state.
*
* Available since PostgreSQL-14
*/
static VALUE
pgconn_enter_pipeline_mode(VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
int res = PQenterPipelineMode(conn);
if( res == 1 ) {
return Qnil;
} else {
rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn));
}
}

/*
* call-seq:
* conn.exit_pipeline_mode -> nil
*
* Causes a connection to exit pipeline mode if it is currently in pipeline mode with an empty queue and no pending results.
*
* Takes no action if not in pipeline mode.
* Raises PG::Error if the current statement isn't finished processing, or PQgetResult has not been called to collect results from all previously sent query.
*
* Available since PostgreSQL-14
*/
static VALUE
pgconn_exit_pipeline_mode(VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
int res = PQexitPipelineMode(conn);
if( res == 1 ) {
return Qnil;
} else {
rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn));
}
}


/*
* call-seq:
* conn.pipeline_sync -> nil
*
* Marks a synchronization point in a pipeline by sending a sync message and flushing the send buffer.
* This serves as the delimiter of an implicit transaction and an error recovery point; see Section 34.5.1.3 of the PostgreSQL documentation.
*
* Raises PG::Error if the connection is not in pipeline mode or sending a sync message failed.
*
* Available since PostgreSQL-14
*/
static VALUE
pgconn_pipeline_sync(VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
int res = PQpipelineSync(conn);
if( res == 1 ) {
return Qnil;
} else {
rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn));
}
}

/*
* call-seq:
* conn.pipeline_sync -> nil
*
* Sends a request for the server to flush its output buffer.
*
* The server flushes its output buffer automatically as a result of Connection#pipeline_sync being called, or on any request when not in pipeline mode.
* This function is useful to cause the server to flush its output buffer in pipeline mode without establishing a synchronization point.
* Note that the request is not itself flushed to the server automatically; use Connection#flush if necessary.
*
* Available since PostgreSQL-14
*/
static VALUE
pgconn_send_flush_request(VALUE self)
{
PGconn *conn = pg_get_pgconn(self);
int res = PQsendFlushRequest(conn);
if( res == 1 ) {
return Qnil;
} else {
rb_raise(rb_ePGerror, "%s", PQerrorMessage(conn));
}
}

#endif

/**************************************************************************
* LARGE OBJECT SUPPORT
**************************************************************************/
Expand Down Expand Up @@ -4414,6 +4537,14 @@ init_pg_connection()
rb_define_method(rb_cPGconn, "ssl_attribute_names", pgconn_ssl_attribute_names, 0);
#endif

#ifdef HAVE_PQENTERPIPELINEMODE
rb_define_method(rb_cPGconn, "pipeline_status", pgconn_pipeline_status, 0);
rb_define_method(rb_cPGconn, "enter_pipeline_mode", pgconn_enter_pipeline_mode, 0);
rb_define_method(rb_cPGconn, "exit_pipeline_mode", pgconn_exit_pipeline_mode, 0);
rb_define_method(rb_cPGconn, "pipeline_sync", pgconn_pipeline_sync, 0);
rb_define_method(rb_cPGconn, "send_flush_request", pgconn_send_flush_request, 0);
#endif

/****** PG::Connection INSTANCE METHODS: Large Object Support ******/
rb_define_method(rb_cPGconn, "lo_creat", pgconn_locreat, -1);
rb_define_alias(rb_cPGconn, "locreat", "lo_creat");
Expand Down
9 changes: 9 additions & 0 deletions ext/pg_result.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,16 @@ pg_result_check( VALUE self )
case PGRES_SINGLE_TUPLE:
case PGRES_EMPTY_QUERY:
case PGRES_COMMAND_OK:
#ifdef HAVE_PQENTERPIPELINEMODE
case PGRES_PIPELINE_SYNC:
#endif
return self;
case PGRES_BAD_RESPONSE:
case PGRES_FATAL_ERROR:
case PGRES_NONFATAL_ERROR:
#ifdef HAVE_PQENTERPIPELINEMODE
case PGRES_PIPELINE_ABORTED:
#endif
error = rb_str_new2( PQresultErrorMessage(this->pgresult) );
break;
default:
Expand Down Expand Up @@ -518,6 +524,9 @@ static void pgresult_init_fnames(VALUE self)
* * +PGRES_NONFATAL_ERROR+
* * +PGRES_FATAL_ERROR+
* * +PGRES_COPY_BOTH+
* * +PGRES_SINGLE_TUPLE+
* * +PGRES_PIPELINE_SYNC+
* * +PGRES_PIPELINE_ABORTED+
*/
static VALUE
pgresult_result_status(VALUE self)
Expand Down
11 changes: 11 additions & 0 deletions spec/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ def self::included( mod )
[@conn.escape_string(desc.slice(-60))]
example.run
ensure
if @conn.respond_to?(:exit_pipeline_mode) &&
@conn.pipeline_status != PG::PQ_PIPELINE_OFF
@conn.pipeline_sync
# Fetch results until two successive nil's
loop do
unless @conn.get_result
break unless @conn.get_result
end
end
@conn.exit_pipeline_mode
end
@conn.exec( 'ROLLBACK' ) unless example.metadata[:without_transaction]
end
end
Expand Down
115 changes: 115 additions & 0 deletions spec/pg/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,121 @@ def interrupt_thread(exc=nil)

end

context "pipeline mode", :postgresql_14 do

describe "pipeline_status" do
it "can enter and exit the pipeline mode" do
@conn.enter_pipeline_mode
expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_ON )
@conn.exit_pipeline_mode
expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_OFF )
end
end

describe "enter_pipeline_mode" do
it "does nothing if already in pipeline mode" do
@conn.enter_pipeline_mode
@conn.enter_pipeline_mode
expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_ON )
end

it "raises an error when called with pending results" do
@conn.send_query "select 1"
expect {
@conn.enter_pipeline_mode
}.to raise_error(PG::Error)
@conn.get_last_result
end
end

describe "exit_pipeline_mode" do
it "does nothing if not in pipeline mode" do
@conn.exit_pipeline_mode
expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_OFF )
end

it "raises an error when called with pending results" do
@conn.enter_pipeline_mode
@conn.send_query "select 1"
expect {
@conn.exit_pipeline_mode
}.to raise_error(PG::Error)
@conn.send_flush_request
@conn.get_last_result
end
end

describe "pipeline_sync" do
it "sends a sync message" do
@conn.enter_pipeline_mode
@conn.send_query "select 6"
@conn.pipeline_sync
expect( @conn.get_result.result_status ).to eq( PG::PGRES_TUPLES_OK )
expect( @conn.get_result ).to be_nil
expect( @conn.get_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC )
expect( @conn.get_result ).to be_nil
expect( @conn.get_result ).to be_nil
@conn.exit_pipeline_mode
end

it "raises an error when not in pipeline mode" do
expect {
@conn.pipeline_sync
}.to raise_error(PG::Error)
end
end

describe "send_flush_request" do
it "flushs all results" do
@conn.enter_pipeline_mode
@conn.send_query "select 1"
@conn.send_flush_request
@conn.flush
expect( @conn.get_result.result_status ).to eq( PG::PGRES_TUPLES_OK )
expect( @conn.get_result ).to be_nil
expect( @conn.get_result ).to be_nil
end

it "raises an error when called with pending results" do
@conn.send_query "select 1"
expect {
@conn.send_flush_request
}.to raise_error(PG::Error)
end
end

describe "get_last_result" do
it "delivers PGRES_PIPELINE_SYNC" do
@conn.enter_pipeline_mode
@conn.send_query "select 6"
@conn.pipeline_sync
expect( @conn.get_last_result.values ).to eq( [["6"]] )
expect( @conn.get_last_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC )
@conn.exit_pipeline_mode
end

it "raises an error for PGRES_PIPELINE_ABORT" do
@conn.enter_pipeline_mode
@conn.send_query("garbage")
@conn.send_query("SELECT 7")
@conn.pipeline_sync
begin
@conn.get_last_result
rescue PG::SyntaxError => err1
end
expect( err1.result.result_status ).to eq( PG::PGRES_FATAL_ERROR )
begin
@conn.get_last_result
rescue PG::UnableToSend => err2
end
expect( err2.result.result_status ).to eq( PG::PGRES_PIPELINE_ABORTED )
expect( @conn.pipeline_status ).to eq( PG::PQ_PIPELINE_ABORTED )
expect( @conn.get_last_result.result_status ).to eq( PG::PGRES_PIPELINE_SYNC )
@conn.exit_pipeline_mode
end
end
end

context "multinationalization support" do

describe "rubyforge #22925: m17n support" do
Expand Down

0 comments on commit 875b779

Please sign in to comment.