From 7200e88453bcf59575ebf7a274084800c2e0d157 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 14 Sep 2023 04:33:42 +0000 Subject: [PATCH] sql: fix cleanup of not exhausted pausable portals in some cases Previously, if we executed a flow (either vectorized or row-based) that contains a row-by-row processor in "pausable portal" model and didn't fully exhaust that flow (meaning that it could still produce more rows), this could result in an error when closing the portal because that processor might not be closed properly. Vectorized operators aren't affected by this, but row-by-row processors effectively require `ConsumerClosed` method to be called on them, and this might not happen. In particular, it was assumed that this method would get called by `execinfra.Run` loop, but for pausable portal model we needed to adjust it so that the loop could be exited (and then re-entered) when switching to another portal; thus, we lost the guarantee that `ConsumerClosed` is always called. This commit fixes this problem by teaching both vectorized and row-based flows to always call `ConsumerClosed` on their "head" processor on `Cleanup` (which is called when closing the portal). Now, `Cleanup` calls `ConsumerClosed` unconditionally (when the flow _might_ be running in the pausable portal mode), and this is safe given the interface adjustment in the previous commit. Release note (bug fix): Previously, when evaluating some queries (most likely with a lookup join) via "multiple active portals" execution mode (preview feature that can be enabled via `multiple_active_portals_enabled` session variable) CockroachDB could encounter an internal error like `unexpected 40960 leftover bytes` if that portal wasn't fully consumed. This is now fixed. --- pkg/sql/colflow/vectorized_flow.go | 7 ++ pkg/sql/flowinfra/flow.go | 24 +++++ .../testdata/pgtest/multiple_active_portals | 99 +++++++++++++++++++ pkg/sql/rowflow/row_based_flow.go | 2 + 4 files changed, 132 insertions(+) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index ea243317fa48..612d1413c3e6 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -289,6 +289,10 @@ func (f *vectorizedFlow) Setup( // Resume is part of the Flow interface. func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) { if f.batchFlowCoordinator != nil { + // Resume is expected to be called only for pausable portals, for which + // we must be using limitedCommandResult which currently doesn't + // implement the execinfra.BatchReceiver interface, so we shouldn't have + // a batch flow coordinator here. recv.Push( nil, /* row */ &execinfrapb.ProducerMetadata{ @@ -392,6 +396,9 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) { // as well as closes all the closers. f.creator.cleanup(ctx) + // Ensure that the "head" processor is always closed. + f.ConsumerClosedOnHeadProc() + f.tempStorage.Lock() created := f.tempStorage.path != "" f.tempStorage.Unlock() diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index e078d014df91..4b3422f3e81d 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -656,6 +656,30 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { return onCleanupStart, onCleanupEnd } +// ConsumerClosedOnHeadProc calls ConsumerClosed method on the "head" processor +// of this flow to make sure that all resources are released. This is needed for +// pausable portal execution model where execinfra.Run might never call +// ConsumerClosed on the source (i.e. the "head" processor). +// +// The method is only called if: +// - there is exactly 1 processor in the flow that runs in its own goroutine +// (which is always the case for pausable portal model at this time) +// - that single processor implements execinfra.RowSource interface (those +// processors that don't implement it shouldn't be running through pausable +// portal model). +// +// Otherwise, this method is a noop. +func (f *FlowBase) ConsumerClosedOnHeadProc() { + if len(f.processors) != 1 { + return + } + rs, ok := f.processors[0].(execinfra.RowSource) + if !ok { + return + } + rs.ConsumerClosed() +} + // Cleanup is part of the Flow interface. // NOTE: this implements only the shared cleanup logic between row-based and // vectorized flows. diff --git a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals index e8426aa017ed..65025589aa2e 100644 --- a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals +++ b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals @@ -1306,3 +1306,102 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} subtest end + +subtest incomplete_processor_cleanup + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "CREATE TABLE t110486_1 (k1 PRIMARY KEY) AS VALUES (1), (2), (3);"} +Query {"String": "CREATE TABLE t110486_2 (k2 PRIMARY KEY) AS VALUES (1), (2), (3);"} +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Run the query that uses a row-by-row processor (the join reader) which +# previously wasn't cleaned up properly. This first execution doesn't trigger +# #110846 bug because it is effectively run in EXPLAIN ANALYZE flavor due to txn +# stats sampling (we prefer to run it explicitly rather than disable the txn +# stats sampling). +send +Parse {"Name": "q1", "Query": "SELECT * FROM t110486_1 INNER LOOKUP JOIN t110486_2 ON k1 = k2;"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Now run the query again (not in EXPLAIN ANALYZE flavor) +send +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Do the same again using the row-based engine directly. +send crdb_only +Query {"String": "SET vectorize = off;"} +---- + +until crdb_only ignore=NoticeResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Bind {"DestinationPortal": "p3", "PreparedStatement": "q1"} +Execute {"Portal": "p3", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "RESET vectorize;"} +---- + +until crdb_only ignore=NoticeResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"RESET"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end diff --git a/pkg/sql/rowflow/row_based_flow.go b/pkg/sql/rowflow/row_based_flow.go index 3ea73407daa4..3ddf30adf7c5 100644 --- a/pkg/sql/rowflow/row_based_flow.go +++ b/pkg/sql/rowflow/row_based_flow.go @@ -456,6 +456,8 @@ func (f *rowBasedFlow) Cleanup(ctx context.Context) { startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns() startCleanup() defer endCleanup() + // Ensure that the "head" processor is always closed. + f.ConsumerClosedOnHeadProc() f.FlowBase.Cleanup(ctx) f.Release() }