diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index ea243317fa48..612d1413c3e6 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -289,6 +289,10 @@ func (f *vectorizedFlow) Setup( // Resume is part of the Flow interface. func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) { if f.batchFlowCoordinator != nil { + // Resume is expected to be called only for pausable portals, for which + // we must be using limitedCommandResult which currently doesn't + // implement the execinfra.BatchReceiver interface, so we shouldn't have + // a batch flow coordinator here. recv.Push( nil, /* row */ &execinfrapb.ProducerMetadata{ @@ -392,6 +396,9 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) { // as well as closes all the closers. f.creator.cleanup(ctx) + // Ensure that the "head" processor is always closed. + f.ConsumerClosedOnHeadProc() + f.tempStorage.Lock() created := f.tempStorage.path != "" f.tempStorage.Unlock() diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index e078d014df91..4b3422f3e81d 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -656,6 +656,30 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { return onCleanupStart, onCleanupEnd } +// ConsumerClosedOnHeadProc calls ConsumerClosed method on the "head" processor +// of this flow to make sure that all resources are released. This is needed for +// pausable portal execution model where execinfra.Run might never call +// ConsumerClosed on the source (i.e. the "head" processor). +// +// The method is only called if: +// - there is exactly 1 processor in the flow that runs in its own goroutine +// (which is always the case for pausable portal model at this time) +// - that single processor implements execinfra.RowSource interface (those +// processors that don't implement it shouldn't be running through pausable +// portal model). +// +// Otherwise, this method is a noop. +func (f *FlowBase) ConsumerClosedOnHeadProc() { + if len(f.processors) != 1 { + return + } + rs, ok := f.processors[0].(execinfra.RowSource) + if !ok { + return + } + rs.ConsumerClosed() +} + // Cleanup is part of the Flow interface. // NOTE: this implements only the shared cleanup logic between row-based and // vectorized flows. diff --git a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals index e8426aa017ed..65025589aa2e 100644 --- a/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals +++ b/pkg/sql/pgwire/testdata/pgtest/multiple_active_portals @@ -1306,3 +1306,102 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} subtest end + +subtest incomplete_processor_cleanup + +send +Query {"String": "DEALLOCATE ALL;"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Query {"String": "CREATE TABLE t110486_1 (k1 PRIMARY KEY) AS VALUES (1), (2), (3);"} +Query {"String": "CREATE TABLE t110486_2 (k2 PRIMARY KEY) AS VALUES (1), (2), (3);"} +---- + +until +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Run the query that uses a row-by-row processor (the join reader) which +# previously wasn't cleaned up properly. This first execution doesn't trigger +# #110846 bug because it is effectively run in EXPLAIN ANALYZE flavor due to txn +# stats sampling (we prefer to run it explicitly rather than disable the txn +# stats sampling). +send +Parse {"Name": "q1", "Query": "SELECT * FROM t110486_1 INNER LOOKUP JOIN t110486_2 ON k1 = k2;"} +Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"} +Execute {"Portal": "p1", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Now run the query again (not in EXPLAIN ANALYZE flavor) +send +Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"} +Execute {"Portal": "p2", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +# Do the same again using the row-based engine directly. +send crdb_only +Query {"String": "SET vectorize = off;"} +---- + +until crdb_only ignore=NoticeResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send +Bind {"DestinationPortal": "p3", "PreparedStatement": "q1"} +Execute {"Portal": "p3", "MaxRows": 1} +Sync +---- + +until +ReadyForQuery +---- +{"Type":"BindComplete"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]} +{"Type":"PortalSuspended"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +send crdb_only +Query {"String": "RESET vectorize;"} +---- + +until crdb_only ignore=NoticeResponse +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"RESET"} +{"Type":"ReadyForQuery","TxStatus":"I"} + +subtest end diff --git a/pkg/sql/rowflow/row_based_flow.go b/pkg/sql/rowflow/row_based_flow.go index 3ea73407daa4..3ddf30adf7c5 100644 --- a/pkg/sql/rowflow/row_based_flow.go +++ b/pkg/sql/rowflow/row_based_flow.go @@ -456,6 +456,8 @@ func (f *rowBasedFlow) Cleanup(ctx context.Context) { startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns() startCleanup() defer endCleanup() + // Ensure that the "head" processor is always closed. + f.ConsumerClosedOnHeadProc() f.FlowBase.Cleanup(ctx) f.Release() }