Skip to content

Commit

Permalink
Merge #39639
Browse files Browse the repository at this point in the history
39639: sql/pgwire: support explicitly closing in-progress portals r=mjibson a=mjibson

This is done by extending stmtBuf a bit so we can utilize its rewind
functionality when we see a portal close message.

Closes #39524

Release note (sql change): Support explicitly closing portals over the
wire after partial use.

Co-authored-by: Matt Jibson <[email protected]>
  • Loading branch information
craig[bot] and maddyblue committed Aug 14, 2019
2 parents 0ee3121 + 0d414c5 commit bad5e2e
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 69 deletions.
15 changes: 9 additions & 6 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,15 @@ func (buf *StmtBuf) ltrim(ctx context.Context, pos CmdPos) {
}
}

// AdvanceOne advances the cursor one Command over. The command over which the
// cursor will be positioned when this returns may not be in the buffer yet.
func (buf *StmtBuf) AdvanceOne() {
// AdvanceOne advances the cursor one Command over. The command over which
// the cursor will be positioned when this returns may not be in the buffer
// yet. The previous CmdPos is returned.
func (buf *StmtBuf) AdvanceOne() CmdPos {
buf.mu.Lock()
prev := buf.mu.curPos
buf.mu.curPos++
buf.mu.Unlock()
return prev
}

// seekToNextBatch moves the cursor position to the start of the next batch of
Expand Down Expand Up @@ -535,8 +538,8 @@ func (buf *StmtBuf) seekToNextBatch() error {
return nil
}

// rewind resets the buffer's position to pos.
func (buf *StmtBuf) rewind(ctx context.Context, pos CmdPos) {
// Rewind resets the buffer's position to pos.
func (buf *StmtBuf) Rewind(ctx context.Context, pos CmdPos) {
buf.mu.Lock()
defer buf.mu.Unlock()
if pos < buf.mu.startPos {
Expand Down Expand Up @@ -833,7 +836,7 @@ type rewindCapability struct {
// unlocks the respective ClientComm.
func (rc *rewindCapability) rewindAndUnlock(ctx context.Context) {
rc.cl.RTrim(ctx, rc.rewindPos)
rc.buf.rewind(ctx, rc.rewindPos)
rc.buf.Rewind(ctx, rc.rewindPos)
rc.cl.Close()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestStmtBuf(t *testing.T) {

// Now rewind.
expPos = 1
buf.rewind(ctx, expPos)
buf.Rewind(ctx, expPos)
cmd, pos, err = buf.CurCmd()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestStmtBufPreparedStmt(t *testing.T) {
assertPrepareStmt(t, cmd, "p2")

// Rewind to the first prepared stmt.
buf.rewind(ctx, CmdPos(1))
buf.Rewind(ctx, CmdPos(1))
cmd, _, err = buf.CurCmd()
if err != nil {
t.Fatal(err)
Expand Down
32 changes: 23 additions & 9 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro
}
r.seenTuples = 0

return r.moreResultsNeeded()
return r.moreResultsNeeded(ctx)
}
if _ /* flushed */, err := r.conn.maybeFlush(r.pos); err != nil {
return err
Expand All @@ -381,26 +381,39 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro
// moreResultsNeeded is a restricted connection handler that waits for more
// requests for rows from the active portal, during the "execute portal" flow
// when a limit has been specified.
func (r *limitedCommandResult) moreResultsNeeded() error {
func (r *limitedCommandResult) moreResultsNeeded(ctx context.Context) error {
// In an implicit transaction, a portal suspension is immediately
// followed by closing the portal.
if r.implicitTxn {
r.typ = noCompletionMsg
return sql.ErrLimitedResultClosed
}

r.conn.stmtBuf.AdvanceOne()
// Keep track of the previous CmdPos so we can rewind if needed.
prevPos := r.conn.stmtBuf.AdvanceOne()
for {
cmd, _, err := r.conn.stmtBuf.CurCmd()
cmd, curPos, err := r.conn.stmtBuf.CurCmd()
if err != nil {
return err
}
// TODO(mjibson): It would be nice to support the
// sql.DeletePreparedStmt type here (Close in PG parlance,
// which can delete portals or prepared statements). This would
// require having stmtbuf rewind by one. Again, this double
// state machine thing is not great.
switch c := cmd.(type) {
case sql.DeletePreparedStmt:
// The client wants to close a portal or statement. We
// support the case where it is exactly this
// portal. This is done by closing the portal in
// the same way implicit transactions do, but also
// rewinding the stmtBuf to still point to the portal
// close so that the state machine can do its part of
// the cleanup. We are in effect peeking to see if the
// next message is a delete portal.
if c.Type != pgwirebase.PreparePortal || c.Name != r.portalName {
return errors.WithDetail(sql.ErrLimitedResultNotSupported, "portals must be executed to completion")
}
r.typ = noCompletionMsg
// Rewind to before the delete so the AdvanceOne in
// connExecutor.execCmd ends up back on it.
r.conn.stmtBuf.Rewind(ctx, prevPos)
return sql.ErrLimitedResultClosed
case sql.ExecPortal:
// The happy case: the client wants more rows from the portal.
if c.Name != r.portalName {
Expand All @@ -425,5 +438,6 @@ func (r *limitedCommandResult) moreResultsNeeded() error {
// We got some other message, but we only support executing to completion.
return errors.WithDetail(sql.ErrLimitedResultNotSupported, "portals must be executed to completion")
}
prevPos = curPos
}
}
52 changes: 52 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/portals
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,55 @@ ReadyForQuery
{"Type":"DataRow","Values":[{"text":"here"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Execute a portal partially and close it.

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Bind
Execute {"MaxRows": 1}
Sync
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"BEGIN"}
{"Type":"ReadyForQuery","TxStatus":"T"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"T"}

# Close the empty portal then try to execute it. 80 = 'P'
send
Close {"ObjectType": 80}
Execute
Sync
----

until
ErrorResponse
ReadyForQuery
----
{"Type":"CloseComplete"}
{"Type":"ErrorResponse","Code":"34000"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send
Query {"String": "ROLLBACK"}
Query {"String": "SELECT 'here'"}
----

until ignore=RowDescription
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"DataRow","Values":[{"text":"here"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
52 changes: 0 additions & 52 deletions pkg/sql/pgwire/testdata/pgtest/portals_crbugs
Original file line number Diff line number Diff line change
Expand Up @@ -84,55 +84,3 @@ ReadyForQuery
{"Type":"DataRow","Values":[{"text":"here"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Execute a portal partially and close it. In PG this works, but we
# don't support manually closing a portal early.

send
Query {"String": "BEGIN"}
Parse {"Query": "SELECT * FROM generate_series(1, 2)"}
Bind
Execute {"MaxRows": 1}
Sync
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"BEGIN"}
{"Type":"ReadyForQuery","TxStatus":"T"}
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"T"}

# Close the empty portal then try to execute it. 80 = 'P'
send
Close {"ObjectType": 80}
Execute
Sync
----

until
ErrorResponse
ReadyForQuery
----
{"Type":"ErrorResponse","Code":"0A000"}
{"Type":"ReadyForQuery","TxStatus":"E"}

send
Query {"String": "ROLLBACK"}
Query {"String": "SELECT 'here'"}
----

until ignore=RowDescription
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"ROLLBACK"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"DataRow","Values":[{"text":"here"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}

0 comments on commit bad5e2e

Please sign in to comment.