Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql: fix cleanup of not exhausted pausable portals in some cases #110666

Merged
merged 3 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type generativeSplitAndScatterProcessor struct {
chunkEntrySplitAndScatterers []splitAndScatterer

// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiple times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type splitAndScatterProcessor struct {

scatterer splitAndScatterer
// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiples times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func (f *vectorizedFlow) Setup(
// Resume is part of the Flow interface.
func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) {
if f.batchFlowCoordinator != nil {
// Resume is expected to be called only for pausable portals, for which
// we must be using limitedCommandResult which currently doesn't
// implement the execinfra.BatchReceiver interface, so we shouldn't have
// a batch flow coordinator here.
recv.Push(
nil, /* row */
&execinfrapb.ProducerMetadata{
Expand Down Expand Up @@ -392,6 +396,9 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// as well as closes all the closers.
f.creator.cleanup(ctx)

// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()

f.tempStorage.Lock()
created := f.tempStorage.path != ""
f.tempStorage.Unlock()
Expand Down
16 changes: 2 additions & 14 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// RowChannelBufSize is the default buffer size of a RowChannel.
Expand Down Expand Up @@ -166,7 +164,7 @@ type RowSource interface {
ConsumerDone()

// ConsumerClosed informs the source that the consumer is done and will not
// make any more calls to Next(). Must only be called once on a given
// make any more calls to Next(). Must be called at least once on a given
// RowSource.
//
// Like ConsumerDone(), if the consumer of the source stops consuming rows
Expand Down Expand Up @@ -422,16 +420,6 @@ func (rb *rowSourceBase) consumerDone() {
uint32(NeedMoreRows), uint32(DrainRequested))
}

// consumerClosed helps processors implement RowSource.ConsumerClosed. The name
// is only used for debug messages.
func (rb *rowSourceBase) consumerClosed(name string) {
status := ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == ConsumerClosed {
logcrash.ReportOrPanic(context.Background(), nil, "%s already closed", redact.Safe(name))
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(ConsumerClosed))
}

// RowChannel is a thin layer over a RowChannelMsg channel, which can be used to
// transfer rows between goroutines.
type RowChannel struct {
Expand Down Expand Up @@ -532,7 +520,7 @@ func (rc *RowChannel) ConsumerDone() {

// ConsumerClosed is part of the RowSource interface.
func (rc *RowChannel) ConsumerClosed() {
rc.consumerClosed("RowChannel")
atomic.StoreUint32((*uint32)(&rc.ConsumerStatus), uint32(ConsumerClosed))
numSenders := atomic.LoadInt32(&rc.numSenders)
// Drain (at most) numSenders messages in case senders are blocked trying to
// emit a row.
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ type FlowBase struct {
// goroutines.
startedGoroutines bool

// headProcStarted tracks whether Start was called on the "head" processor
// in Run.
headProcStarted bool

// inboundStreams are streams that receive data from other hosts; this map
// is to be passed to FlowRegistry.RegisterFlow. This map is populated in
// Flow.Setup(), so it is safe to lookup into concurrently later.
Expand Down Expand Up @@ -572,6 +576,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) {
}
f.resumeCtx = ctx
log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc)
f.headProcStarted = true
headProc.Run(ctx, headOutput)
}

Expand Down Expand Up @@ -656,6 +661,39 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
return onCleanupStart, onCleanupEnd
}

// ConsumerClosedOnHeadProc calls ConsumerClosed method on the "head" processor
// of this flow to make sure that all resources are released. This is needed for
// pausable portal execution model where execinfra.Run might never call
// ConsumerClosed on the source (i.e. the "head" processor).
//
// The method is only called if:
// - the flow is local (pausable portals currently don't support DistSQL)
// - there is exactly 1 processor in the flow that runs in its own goroutine
// (which is always the case for pausable portal model at this time)
// - Start was called on that processor (ConsumerClosed is only valid to be
// called after Start)
// - that single processor implements execinfra.RowSource interface (those
// processors that don't implement it shouldn't be running through pausable
// portal model).
//
// Otherwise, this method is a noop.
func (f *FlowBase) ConsumerClosedOnHeadProc() {
if !f.IsLocal() {
return
}
if len(f.processors) != 1 {
return
}
if !f.headProcStarted {
return
}
rs, ok := f.processors[0].(execinfra.RowSource)
if !ok {
return
}
rs.ConsumerClosed()
}

// Cleanup is part of the Flow interface.
// NOTE: this implements only the shared cleanup logic between row-based and
// vectorized flows.
Expand Down
99 changes: 99 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
Original file line number Diff line number Diff line change
Expand Up @@ -1306,3 +1306,102 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end

subtest incomplete_processor_cleanup

send
Query {"String": "DEALLOCATE ALL;"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE TABLE t110486_1 (k1 PRIMARY KEY) AS VALUES (1), (2), (3);"}
Query {"String": "CREATE TABLE t110486_2 (k2 PRIMARY KEY) AS VALUES (1), (2), (3);"}
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Run the query that uses a row-by-row processor (the join reader) which
# previously wasn't cleaned up properly. This first execution doesn't trigger
# #110846 bug because it is effectively run in EXPLAIN ANALYZE flavor due to txn
# stats sampling (we prefer to run it explicitly rather than disable the txn
# stats sampling).
send
Parse {"Name": "q1", "Query": "SELECT * FROM t110486_1 INNER LOOKUP JOIN t110486_2 ON k1 = k2;"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Execute {"Portal": "p1", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Now run the query again (not in EXPLAIN ANALYZE flavor)
send
Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Do the same again using the row-based engine directly.
send crdb_only
Query {"String": "SET vectorize = off;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Bind {"DestinationPortal": "p3", "PreparedStatement": "q1"}
Execute {"Portal": "p3", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send crdb_only
Query {"String": "RESET vectorize;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"RESET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end
2 changes: 2 additions & 0 deletions pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ func (f *rowBasedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()
f.FlowBase.Cleanup(ctx)
f.Release()
}
Expand Down
1 change: 0 additions & 1 deletion pkg/testutils/distsqlutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/rowenc",
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/syncutil",
],
)
13 changes: 1 addition & 12 deletions pkg/testutils/distsqlutils/row_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand Down Expand Up @@ -71,9 +70,6 @@ type RowBufferArgs struct {
// OnConsumerDone, if specified, is called as the first thing in the
// ConsumerDone() method.
OnConsumerDone func(*RowBuffer)
// OnConsumerClose, if specified, is called as the first thing in the
// ConsumerClosed() method.
OnConsumerClosed func(*RowBuffer)
// OnNext, if specified, is called as the first thing in the Next() method.
// If it returns an empty row and metadata, then RowBuffer.Next() is allowed
// to run normally. Otherwise, the values are returned from RowBuffer.Next().
Expand Down Expand Up @@ -196,16 +192,9 @@ func (rb *RowBuffer) ConsumerDone() {
}
}

// ConsumerClosed is part of the RowSource interface.
// ConsumerClosed is part of the execinfra.RowSource interface.
func (rb *RowBuffer) ConsumerClosed() {
status := execinfra.ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == execinfra.ConsumerClosed {
log.Fatalf(context.Background(), "RowBuffer already closed")
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(execinfra.ConsumerClosed))
if rb.args.OnConsumerClosed != nil {
rb.args.OnConsumerClosed(rb)
}
}

// NextNoMeta is a version of Next which fails the test if
Expand Down