Skip to content

Commit

Permalink
sql: fix cleanup of not exhausted pausable portals in some cases
Browse files Browse the repository at this point in the history
Previously, if we executed a flow (either vectorized or row-based) that
contains a row-by-row processor in "pausable portal" model and didn't
fully exhaust that flow (meaning that it could still produce more rows),
this could result in an error when closing the portal because that
processor might not be closed properly. Vectorized operators aren't
affected by this, but row-by-row processors effectively require
`ConsumerClosed` method to be called on them, and this might not happen.
In particular, it was assumed that this method would get called by
`execinfra.Run` loop, but for pausable portal model we needed to adjust
it so that the loop could be exited (and then re-entered) when switching
to another portal; thus, we lost the guarantee that `ConsumerClosed` is
always called.

This commit fixes this problem by teaching both vectorized and row-based
flows to always call `ConsumerClosed` on their "head" processor on
`Cleanup` (which is called when closing the portal). Now, `Cleanup`
calls `ConsumerClosed` unconditionally (when the flow _might_ be running
in the pausable portal mode), and this is safe given the interface
adjustment in the previous commit.

Release note (bug fix): Previously, when evaluating some queries (most
likely with a lookup join) via "multiple active portals" execution
mode (preview feature that can be enabled via
`multiple_active_portals_enabled` session variable) CockroachDB could
encounter an internal error like `unexpected 40960 leftover bytes` if
that portal wasn't fully consumed. This is now fixed.
  • Loading branch information
yuzefovich committed Sep 14, 2023
1 parent 7ea6764 commit 568cf9b
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ func (f *vectorizedFlow) Setup(
// Resume is part of the Flow interface.
func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) {
if f.batchFlowCoordinator != nil {
// Resume is expected to be called only for pausable portals, for which
// we must be using limitedCommandResult which currently doesn't
// implement the execinfra.BatchReceiver interface, so we shouldn't have
// a batch flow coordinator here.
recv.Push(
nil, /* row */
&execinfrapb.ProducerMetadata{
Expand Down Expand Up @@ -384,6 +388,9 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// as well as closes all the closers.
f.creator.cleanup(ctx)

// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()

f.tempStorage.Lock()
created := f.tempStorage.path != ""
f.tempStorage.Unlock()
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,30 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
return onCleanupStart, onCleanupEnd
}

// ConsumerClosedOnHeadProc calls ConsumerClosed method on the "head" processor
// of this flow to make sure that all resources are released. This is needed for
// pausable portal execution model where execinfra.Run might never call
// ConsumerClosed on the source (i.e. the "head" processor).
//
// The method is only called if:
// - there is exactly 1 processor in the flow that runs in its own goroutine
// (which is always the case for pausable portal model at this time)
// - that single processor implements execinfra.RowSource interface (those
// processors that don't implement it shouldn't be running through pausable
// portal model).
//
// Otherwise, this method is a noop.
func (f *FlowBase) ConsumerClosedOnHeadProc() {
if len(f.processors) != 1 {
return
}
rs, ok := f.processors[0].(execinfra.RowSource)
if !ok {
return
}
rs.ConsumerClosed()
}

// Cleanup is part of the Flow interface.
// NOTE: this implements only the shared cleanup logic between row-based and
// vectorized flows.
Expand Down
99 changes: 99 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
Original file line number Diff line number Diff line change
Expand Up @@ -1306,3 +1306,102 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end

subtest incomplete_processor_cleanup

send
Query {"String": "DEALLOCATE ALL;"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE TABLE t110486_1 (k1 PRIMARY KEY) AS VALUES (1), (2), (3);"}
Query {"String": "CREATE TABLE t110486_2 (k2 PRIMARY KEY) AS VALUES (1), (2), (3);"}
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Run the query that uses a row-by-row processor (the join reader) which
# previously wasn't cleaned up properly. This first execution doesn't trigger
# #110846 bug because it is effectively run in EXPLAIN ANALYZE flavor due to txn
# stats sampling (we prefer to run it explicitly rather than disable the txn
# stats sampling).
send
Parse {"Name": "q1", "Query": "SELECT * FROM t110486_1 INNER LOOKUP JOIN t110486_2 ON k1 = k2;"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Execute {"Portal": "p1", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Now run the query again (not in EXPLAIN ANALYZE flavor)
send
Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Do the same again using the row-based engine directly.
send crdb_only
Query {"String": "SET vectorize = off;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Bind {"DestinationPortal": "p3", "PreparedStatement": "q1"}
Execute {"Portal": "p3", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send crdb_only
Query {"String": "RESET vectorize;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"RESET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end
2 changes: 2 additions & 0 deletions pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ func (f *rowBasedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()
f.FlowBase.Cleanup(ctx)
f.Release()
}
Expand Down

0 comments on commit 568cf9b

Please sign in to comment.