Skip to content

Commit

Permalink
execinfra: relax RowSource.ConsumerClosed contract
Browse files Browse the repository at this point in the history
This commit adjusts RowSource.ConsumerClosed contract so that it's
possible for this method to be called multiple times. Previously, vast
majority of implementations already supported that (and some actually
assumed that), but there were a couple that would result in a panic or
an error if called multiple times - those have been relaxed.

This is needed for the follow-up commit which will introduce an
unconditional call of this method on the "head" processor of the flow
(needed for pausable portals execution model).

Release note: None
  • Loading branch information
yuzefovich committed Sep 14, 2023
1 parent 2694d15 commit 6ec6a0f
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type generativeSplitAndScatterProcessor struct {
chunkEntrySplitAndScatterers []splitAndScatterer

// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiple times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type splitAndScatterProcessor struct {

scatterer splitAndScatterer
// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiples times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
16 changes: 2 additions & 14 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// RowChannelBufSize is the default buffer size of a RowChannel.
Expand Down Expand Up @@ -166,7 +164,7 @@ type RowSource interface {
ConsumerDone()

// ConsumerClosed informs the source that the consumer is done and will not
// make any more calls to Next(). Must only be called once on a given
// make any more calls to Next(). Must be called at least once on a given
// RowSource.
//
// Like ConsumerDone(), if the consumer of the source stops consuming rows
Expand Down Expand Up @@ -422,16 +420,6 @@ func (rb *rowSourceBase) consumerDone() {
uint32(NeedMoreRows), uint32(DrainRequested))
}

// consumerClosed helps processors implement RowSource.ConsumerClosed. The name
// is only used for debug messages.
func (rb *rowSourceBase) consumerClosed(name string) {
status := ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == ConsumerClosed {
logcrash.ReportOrPanic(context.Background(), nil, "%s already closed", redact.Safe(name))
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(ConsumerClosed))
}

// RowChannel is a thin layer over a RowChannelMsg channel, which can be used to
// transfer rows between goroutines.
type RowChannel struct {
Expand Down Expand Up @@ -532,7 +520,7 @@ func (rc *RowChannel) ConsumerDone() {

// ConsumerClosed is part of the RowSource interface.
func (rc *RowChannel) ConsumerClosed() {
rc.consumerClosed("RowChannel")
atomic.StoreUint32((*uint32)(&rc.ConsumerStatus), uint32(ConsumerClosed))
numSenders := atomic.LoadInt32(&rc.numSenders)
// Drain (at most) numSenders messages in case senders are blocked trying to
// emit a row.
Expand Down
1 change: 0 additions & 1 deletion pkg/testutils/distsqlutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/rowenc",
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/syncutil",
],
)
13 changes: 1 addition & 12 deletions pkg/testutils/distsqlutils/row_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

Expand Down Expand Up @@ -71,9 +70,6 @@ type RowBufferArgs struct {
// OnConsumerDone, if specified, is called as the first thing in the
// ConsumerDone() method.
OnConsumerDone func(*RowBuffer)
// OnConsumerClose, if specified, is called as the first thing in the
// ConsumerClosed() method.
OnConsumerClosed func(*RowBuffer)
// OnNext, if specified, is called as the first thing in the Next() method.
// If it returns an empty row and metadata, then RowBuffer.Next() is allowed
// to run normally. Otherwise, the values are returned from RowBuffer.Next().
Expand Down Expand Up @@ -196,16 +192,9 @@ func (rb *RowBuffer) ConsumerDone() {
}
}

// ConsumerClosed is part of the RowSource interface.
// ConsumerClosed is part of the execinfra.RowSource interface.
func (rb *RowBuffer) ConsumerClosed() {
status := execinfra.ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == execinfra.ConsumerClosed {
log.Fatalf(context.Background(), "RowBuffer already closed")
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(execinfra.ConsumerClosed))
if rb.args.OnConsumerClosed != nil {
rb.args.OnConsumerClosed(rb)
}
}

// NextNoMeta is a version of Next which fails the test if
Expand Down

0 comments on commit 6ec6a0f

Please sign in to comment.