Skip to content

Commit

Permalink
Merge pull request #110666 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-110625
  • Loading branch information
yuzefovich authored Sep 27, 2023
2 parents 3237b98 + 86199fe commit 6a18a04
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type generativeSplitAndScatterProcessor struct {
chunkEntrySplitAndScatterers []splitAndScatterer

// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiple times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type splitAndScatterProcessor struct {

scatterer splitAndScatterer
// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiples times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func (f *vectorizedFlow) Setup(
// Resume is part of the Flow interface.
func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) {
if f.batchFlowCoordinator != nil {
// Resume is expected to be called only for pausable portals, for which
// we must be using limitedCommandResult which currently doesn't
// implement the execinfra.BatchReceiver interface, so we shouldn't have
// a batch flow coordinator here.
recv.Push(
nil, /* row */
&execinfrapb.ProducerMetadata{
Expand Down Expand Up @@ -392,6 +396,9 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// as well as closes all the closers.
f.creator.cleanup(ctx)

// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()

f.tempStorage.Lock()
created := f.tempStorage.path != ""
f.tempStorage.Unlock()
Expand Down
16 changes: 2 additions & 14 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// RowChannelBufSize is the default buffer size of a RowChannel.
Expand Down Expand Up @@ -166,7 +164,7 @@ type RowSource interface {
ConsumerDone()

// ConsumerClosed informs the source that the consumer is done and will not
// make any more calls to Next(). Must only be called once on a given
// make any more calls to Next(). Must be called at least once on a given
// RowSource.
//
// Like ConsumerDone(), if the consumer of the source stops consuming rows
Expand Down Expand Up @@ -422,16 +420,6 @@ func (rb *rowSourceBase) consumerDone() {
uint32(NeedMoreRows), uint32(DrainRequested))
}

// consumerClosed helps processors implement RowSource.ConsumerClosed. The name
// is only used for debug messages.
func (rb *rowSourceBase) consumerClosed(name string) {
status := ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == ConsumerClosed {
logcrash.ReportOrPanic(context.Background(), nil, "%s already closed", redact.Safe(name))
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(ConsumerClosed))
}

// RowChannel is a thin layer over a RowChannelMsg channel, which can be used to
// transfer rows between goroutines.
type RowChannel struct {
Expand Down Expand Up @@ -532,7 +520,7 @@ func (rc *RowChannel) ConsumerDone() {

// ConsumerClosed is part of the RowSource interface.
func (rc *RowChannel) ConsumerClosed() {
rc.consumerClosed("RowChannel")
atomic.StoreUint32((*uint32)(&rc.ConsumerStatus), uint32(ConsumerClosed))
numSenders := atomic.LoadInt32(&rc.numSenders)
// Drain (at most) numSenders messages in case senders are blocked trying to
// emit a row.
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ type FlowBase struct {
// goroutines.
startedGoroutines bool

// headProcStarted tracks whether Start was called on the "head" processor
// in Run.
headProcStarted bool

// inboundStreams are streams that receive data from other hosts; this map
// is to be passed to FlowRegistry.RegisterFlow. This map is populated in
// Flow.Setup(), so it is safe to lookup into concurrently later.
Expand Down Expand Up @@ -572,6 +576,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) {
}
f.resumeCtx = ctx
log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc)
f.headProcStarted = true
headProc.Run(ctx, headOutput)
}

Expand Down Expand Up @@ -656,6 +661,39 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
return onCleanupStart, onCleanupEnd
}

// ConsumerClosedOnHeadProc calls ConsumerClosed method on the "head" processor
// of this flow to make sure that all resources are released. This is needed for
// pausable portal execution model where execinfra.Run might never call
// ConsumerClosed on the source (i.e. the "head" processor).
//
// The method is only called if:
// - the flow is local (pausable portals currently don't support DistSQL)
// - there is exactly 1 processor in the flow that runs in its own goroutine
// (which is always the case for pausable portal model at this time)
// - Start was called on that processor (ConsumerClosed is only valid to be
// called after Start)
// - that single processor implements execinfra.RowSource interface (those
// processors that don't implement it shouldn't be running through pausable
// portal model).
//
// Otherwise, this method is a noop.
func (f *FlowBase) ConsumerClosedOnHeadProc() {
if !f.IsLocal() {
return
}
if len(f.processors) != 1 {
return
}
if !f.headProcStarted {
return
}
rs, ok := f.processors[0].(execinfra.RowSource)
if !ok {
return
}
rs.ConsumerClosed()
}

// Cleanup is part of the Flow interface.
// NOTE: this implements only the shared cleanup logic between row-based and
// vectorized flows.
Expand Down
99 changes: 99 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
Original file line number Diff line number Diff line change
Expand Up @@ -1306,3 +1306,102 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end

subtest incomplete_processor_cleanup

send
Query {"String": "DEALLOCATE ALL;"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE TABLE t110486_1 (k1 PRIMARY KEY) AS VALUES (1), (2), (3);"}
Query {"String": "CREATE TABLE t110486_2 (k2 PRIMARY KEY) AS VALUES (1), (2), (3);"}
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Run the query that uses a row-by-row processor (the join reader) which
# previously wasn't cleaned up properly. This first execution doesn't trigger
# #110846 bug because it is effectively run in EXPLAIN ANALYZE flavor due to txn
# stats sampling (we prefer to run it explicitly rather than disable the txn
# stats sampling).
send
Parse {"Name": "q1", "Query": "SELECT * FROM t110486_1 INNER LOOKUP JOIN t110486_2 ON k1 = k2;"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Execute {"Portal": "p1", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Now run the query again (not in EXPLAIN ANALYZE flavor)
send
Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Do the same again using the row-based engine directly.
send crdb_only
Query {"String": "SET vectorize = off;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Bind {"DestinationPortal": "p3", "PreparedStatement": "q1"}
Execute {"Portal": "p3", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send crdb_only
Query {"String": "RESET vectorize;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"RESET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end
2 changes: 2 additions & 0 deletions pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ func (f *rowBasedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()
f.FlowBase.Cleanup(ctx)
f.Release()
}
Expand Down
1 change: 0 additions & 1 deletion pkg/testutils/distsqlutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/rowenc",
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/syncutil",
],
)
13 changes: 1 addition & 12 deletions pkg/testutils/distsqlutils/row_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand Down Expand Up @@ -71,9 +70,6 @@ type RowBufferArgs struct {
// OnConsumerDone, if specified, is called as the first thing in the
// ConsumerDone() method.
OnConsumerDone func(*RowBuffer)
// OnConsumerClose, if specified, is called as the first thing in the
// ConsumerClosed() method.
OnConsumerClosed func(*RowBuffer)
// OnNext, if specified, is called as the first thing in the Next() method.
// If it returns an empty row and metadata, then RowBuffer.Next() is allowed
// to run normally. Otherwise, the values are returned from RowBuffer.Next().
Expand Down Expand Up @@ -196,16 +192,9 @@ func (rb *RowBuffer) ConsumerDone() {
}
}

// ConsumerClosed is part of the RowSource interface.
// ConsumerClosed is part of the execinfra.RowSource interface.
func (rb *RowBuffer) ConsumerClosed() {
status := execinfra.ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == execinfra.ConsumerClosed {
log.Fatalf(context.Background(), "RowBuffer already closed")
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(execinfra.ConsumerClosed))
if rb.args.OnConsumerClosed != nil {
rb.args.OnConsumerClosed(rb)
}
}

// NextNoMeta is a version of Next which fails the test if
Expand Down

0 comments on commit 6a18a04

Please sign in to comment.