Skip to content

Commit

Permalink
libpq: Improve idle state handling in pipeline mode
Browse files Browse the repository at this point in the history
We were going into IDLE state too soon when executing queries via
PQsendQuery in pipeline mode, causing several scenarios to misbehave in
different ways -- most notably, as reported by Daniele Varrazzo, that a
warning message is produced by libpq:
  message type 0x33 arrived from server while idle
But it is also possible, if queries are sent and results consumed not in
lockstep, for the expected mediating NULL result values from PQgetResult
to be lost (a problem which has not been reported, but which is more
serious).

Fix this by introducing two new concepts: one is a command queue element
PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server
response to the Close message that is sent by PQsendQuery.  Because the
application is not expecting any PGresult from this, the mechanism to
consume it is a bit hackish.

The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE
state for libpq's state machine to differentiate "really idle" from
merely "the idle state that occurs in between reading results from the
server for elements in the pipeline".  This makes libpq not go fully
IDLE when the libpq command queue contains entries; in normal cases, we
only go IDLE once at the end of the pipeline, when the server response
to the final SYNC message is received.  (However, there are corner cases
it doesn't fix, such as terminating the query sequence by
PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is
what requires PGQUERY_CLOSE bit above.)

This last bit helps make the libpq state machine clearer; in particular
we can get rid of an ugly hack in pqParseInput3 to avoid considering
IDLE as such when the command queue contains entries.

A new test mode is added to libpq_pipeline.c to tickle some related
problematic cases.

Reported-by: Daniele Varrazzo <[email protected]>
Co-authored-by: Kyotaro Horiguchi <[email protected]>
Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
(cherry picked from commit 054325c5eeb3140a067ba66735c3d811163ecd6a)
  • Loading branch information
alvherre authored and kuntalghosh committed Nov 28, 2023
1 parent 1191db6 commit 3589139
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 35 deletions.
112 changes: 96 additions & 16 deletions src/interfaces/libpq/fe-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
if (conn->asyncStatus == PGASYNC_IDLE)
if (conn->asyncStatus == PGASYNC_IDLE ||
conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
pqPipelineProcessQueue(conn);
break;
}
Expand Down Expand Up @@ -1432,6 +1433,7 @@ static int
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
{
PGcmdQueueEntry *entry = NULL;
PGcmdQueueEntry *entry2 = NULL;

if (!PQsendQueryStart(conn, newQuery))
return 0;
Expand All @@ -1447,6 +1449,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
entry = pqAllocCmdQueueEntry(conn);
if (entry == NULL)
return 0; /* error msg already set */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
entry2 = pqAllocCmdQueueEntry(conn);
if (entry2 == NULL)
goto sendFailed;
}

/* Send the query message(s) */
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
Expand Down Expand Up @@ -1516,6 +1524,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)

/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);

/*
* When pipeline mode is in use, we need a second entry in the command
* queue to represent Close Portal message. This allows us later to wait
* for the CloseComplete message to be received before getting in IDLE
* state.
*/
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
entry2->queryclass = PGQUERY_CLOSE;
entry2->query = NULL;
pqAppendCmdQueueEntry(conn, entry2);
}

return 1;

sendFailed:
Expand Down Expand Up @@ -1763,11 +1785,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
case PGASYNC_PIPELINE_IDLE:
case PGASYNC_READY:
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
/* ok to queue */
break;

case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
Expand Down Expand Up @@ -2140,16 +2164,21 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE:
res = NULL; /* query is complete */
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
{
/*
* We're about to return the NULL that terminates the round of
* results from the current query; prepare to send the results
* of the next query when we're called next.
*/
pqPipelineProcessQueue(conn);
}
break;
case PGASYNC_PIPELINE_IDLE:
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);

/*
* We're about to return the NULL that terminates the round of
* results from the current query; prepare to send the results
* of the next query, if any, when we're called next. If there's
* no next element in the command queue, this gets us in IDLE
* state.
*/
pqPipelineProcessQueue(conn);
res = NULL; /* query is complete */
break;

case PGASYNC_READY:

/*
Expand All @@ -2170,7 +2199,7 @@ PQgetResult(PGconn *conn)
* We're about to send the results of the current query. Set
* us idle now, and ...
*/
conn->asyncStatus = PGASYNC_IDLE;
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;

/*
* ... in cases when we're sending a pipeline-sync result,
Expand Down Expand Up @@ -2216,6 +2245,22 @@ PQgetResult(PGconn *conn)
break;
}

/* If the next command we expect is CLOSE, read and consume it */
if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
{
if (res && res->resultStatus != PGRES_FATAL_ERROR)
{
conn->asyncStatus = PGASYNC_BUSY;
parseInput(conn);
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
}
else
/* we won't ever see the Close */
pqCommandQueueAdvance(conn);
}

/* Time to fire PGEVT_RESULTCREATE events, if there are any */
if (res && res->nEvents > 0)
(void) PQfireResultCreateEvents(conn, res);
Expand Down Expand Up @@ -3009,7 +3054,10 @@ PQexitPipelineMode(PGconn *conn)
if (!conn)
return 0;

if (conn->pipelineStatus == PQ_PIPELINE_OFF)
if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
(conn->asyncStatus == PGASYNC_IDLE ||
conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
conn->cmd_queue_head == NULL)
return 1;

switch (conn->asyncStatus)
Expand All @@ -3026,9 +3074,16 @@ PQexitPipelineMode(PGconn *conn)
libpq_gettext("cannot exit pipeline mode while busy\n"));
return 0;

default:
case PGASYNC_IDLE:
case PGASYNC_PIPELINE_IDLE:
/* OK */
break;

case PGASYNC_COPY_IN:
case PGASYNC_COPY_OUT:
case PGASYNC_COPY_BOTH:
appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
}

/* still work to process */
Expand Down Expand Up @@ -3065,6 +3120,10 @@ pqCommandQueueAdvance(PGconn *conn)
prevquery = conn->cmd_queue_head;
conn->cmd_queue_head = conn->cmd_queue_head->next;

/* If the queue is now empty, reset the tail too */
if (conn->cmd_queue_head == NULL)
conn->cmd_queue_tail = NULL;

/* and make it recyclable */
prevquery->next = NULL;
pqRecycleCmdQueueEntry(conn, prevquery);
Expand All @@ -3087,15 +3146,35 @@ pqPipelineProcessQueue(PGconn *conn)
case PGASYNC_BUSY:
/* client still has to process current query or results */
return;

case PGASYNC_IDLE:
/*
* If we're in IDLE mode and there's some command in the queue,
* get us into PIPELINE_IDLE mode and process normally. Otherwise
* there's nothing for us to do.
*/
if (conn->cmd_queue_head != NULL)
{
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
break;
}
return;

case PGASYNC_PIPELINE_IDLE:
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
/* next query please */
break;
}

/* Nothing to do if not in pipeline mode, or queue is empty */
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
conn->cmd_queue_head == NULL)
/*
* If there are no further commands to process in the queue, get us in
* "real idle" mode now.
*/
if (conn->cmd_queue_head == NULL)
{
conn->asyncStatus = PGASYNC_IDLE;
return;
}

/*
* Reset the error state. This and the next couple of steps correspond to
Expand Down Expand Up @@ -3188,6 +3267,7 @@ PQpipelineSync(PGconn *conn)
case PGASYNC_READY_MORE:
case PGASYNC_BUSY:
case PGASYNC_IDLE:
case PGASYNC_PIPELINE_IDLE:
/* OK to send sync */
break;
}
Expand Down
30 changes: 17 additions & 13 deletions src/interfaces/libpq/fe-protocol3.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,6 @@ pqParseInput3(PGconn *conn)
if (conn->asyncStatus != PGASYNC_IDLE)
return;

/*
* We're also notionally not-IDLE when in pipeline mode the state
* says "idle" (so we have completed receiving the results of one
* query from the server and dispatched them to the application)
* but another query is queued; yield back control to caller so
* that they can initiate processing of the next query in the
* queue.
*/
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
conn->cmd_queue_head != NULL)
return;

/*
* Unexpected message in IDLE state; need to recover somehow.
* ERROR messages are handled using the notice processor;
Expand Down Expand Up @@ -296,8 +284,24 @@ pqParseInput3(PGconn *conn)
}
break;
case '2': /* Bind Complete */
/* Nothing to do for this message type */
break;
case '3': /* Close Complete */
/* Nothing to do for these message types */
/*
* If we get CloseComplete when waiting for it, consume
* the queue element and keep going. A result is not
* expected from this message; it is just there so that
* we know to wait for it when PQsendQuery is used in
* pipeline mode, before going in IDLE state. Failing to
* do this makes us receive CloseComplete when IDLE, which
* creates problems.
*/
if (conn->cmd_queue_head &&
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
{
pqCommandQueueAdvance(conn);
}

break;
case 'S': /* parameter status */
if (getParameterStatus(conn))
Expand Down
6 changes: 4 additions & 2 deletions src/interfaces/libpq/libpq-int.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ typedef enum
* query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
} PGAsyncStatusType;

/* Target server type (decoded value of target_session_attrs) */
Expand Down Expand Up @@ -311,7 +312,8 @@ typedef enum
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
PGQUERY_SYNC /* Sync (at end of a pipeline) */
PGQUERY_SYNC, /* Sync (at end of a pipeline) */
PGQUERY_CLOSE
} PGQueryClass;

/*
Expand Down
Loading

0 comments on commit 3589139

Please sign in to comment.