diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 22d2589512b7..46c263608221 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -64,7 +64,7 @@ type generativeSplitAndScatterProcessor struct { chunkEntrySplitAndScatterers []splitAndScatterer // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for - // it to finish. + // it to finish. It can be called multiple times. cancelScatterAndWaitForWorker func() doneScatterCh chan entryNode diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index b51518dd6bba..10570fd8f812 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -195,7 +195,7 @@ type splitAndScatterProcessor struct { scatterer splitAndScatterer // cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for - // it to finish. + // it to finish. It can be called multiples times. cancelScatterAndWaitForWorker func() doneScatterCh chan entryNode diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index fe4a2a9a747a..b001dbb34894 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -22,11 +22,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" ) // RowChannelBufSize is the default buffer size of a RowChannel. @@ -166,7 +164,7 @@ type RowSource interface { ConsumerDone() // ConsumerClosed informs the source that the consumer is done and will not - // make any more calls to Next(). Must only be called once on a given + // make any more calls to Next(). Must be called at least once on a given // RowSource. // // Like ConsumerDone(), if the consumer of the source stops consuming rows @@ -422,16 +420,6 @@ func (rb *rowSourceBase) consumerDone() { uint32(NeedMoreRows), uint32(DrainRequested)) } -// consumerClosed helps processors implement RowSource.ConsumerClosed. The name -// is only used for debug messages. -func (rb *rowSourceBase) consumerClosed(name string) { - status := ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus))) - if status == ConsumerClosed { - logcrash.ReportOrPanic(context.Background(), nil, "%s already closed", redact.Safe(name)) - } - atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(ConsumerClosed)) -} - // RowChannel is a thin layer over a RowChannelMsg channel, which can be used to // transfer rows between goroutines. type RowChannel struct { @@ -532,7 +520,7 @@ func (rc *RowChannel) ConsumerDone() { // ConsumerClosed is part of the RowSource interface. func (rc *RowChannel) ConsumerClosed() { - rc.consumerClosed("RowChannel") + atomic.StoreUint32((*uint32)(&rc.ConsumerStatus), uint32(ConsumerClosed)) numSenders := atomic.LoadInt32(&rc.numSenders) // Drain (at most) numSenders messages in case senders are blocked trying to // emit a row. diff --git a/pkg/testutils/distsqlutils/BUILD.bazel b/pkg/testutils/distsqlutils/BUILD.bazel index 7dd2e8384c64..f3cce3aa3762 100644 --- a/pkg/testutils/distsqlutils/BUILD.bazel +++ b/pkg/testutils/distsqlutils/BUILD.bazel @@ -10,7 +10,6 @@ go_library( "//pkg/sql/execinfrapb", "//pkg/sql/rowenc", "//pkg/sql/types", - "//pkg/util/log", "//pkg/util/syncutil", ], ) diff --git a/pkg/testutils/distsqlutils/row_buffer.go b/pkg/testutils/distsqlutils/row_buffer.go index cf16809dd6b5..818322f3d9e8 100644 --- a/pkg/testutils/distsqlutils/row_buffer.go +++ b/pkg/testutils/distsqlutils/row_buffer.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -71,9 +70,6 @@ type RowBufferArgs struct { // OnConsumerDone, if specified, is called as the first thing in the // ConsumerDone() method. OnConsumerDone func(*RowBuffer) - // OnConsumerClose, if specified, is called as the first thing in the - // ConsumerClosed() method. - OnConsumerClosed func(*RowBuffer) // OnNext, if specified, is called as the first thing in the Next() method. // If it returns an empty row and metadata, then RowBuffer.Next() is allowed // to run normally. Otherwise, the values are returned from RowBuffer.Next(). @@ -196,16 +192,9 @@ func (rb *RowBuffer) ConsumerDone() { } } -// ConsumerClosed is part of the RowSource interface. +// ConsumerClosed is part of the execinfra.RowSource interface. func (rb *RowBuffer) ConsumerClosed() { - status := execinfra.ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus))) - if status == execinfra.ConsumerClosed { - log.Fatalf(context.Background(), "RowBuffer already closed") - } atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(execinfra.ConsumerClosed)) - if rb.args.OnConsumerClosed != nil { - rb.args.OnConsumerClosed(rb) - } } // NextNoMeta is a version of Next which fails the test if