diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index 14ca8d930492..50afea5e6dd4 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -249,6 +249,13 @@ func (c *Columnarizer) DrainMeta(ctx context.Context) []execinfrapb.ProducerMeta if c.removedFromFlow { return nil } + if c.initStatus == colexecop.OperatorNotInitialized { + // The columnarizer wasn't initialized, so the wrapped processors might + // not have been started leaving them in an unsafe to drain state, so + // we skip the draining. Mostly likely this happened because a panic was + // encountered in Init. + return c.accumulatedMeta + } c.MoveToDraining(nil /* err */) for { meta := c.DrainHelper() diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 23a8cfe7be51..bea449a137a3 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -274,7 +274,7 @@ func (m *Materializer) Start(ctx context.Context) { m.MoveToDraining(err) } else { // Note that we intentionally only start the drain helper if - // initialization was successful - no starting the helper will tell it + // initialization was successful - not starting the helper will tell it // to not drain the metadata sources (which have not been properly // initialized). m.drainHelper.Start(ctx) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index accccb3880e5..696dc1eb6e47 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -52,7 +52,7 @@ type Outbox struct { // draining is an atomic that represents whether the Outbox is draining. draining uint32 - metadataSources []execinfrapb.MetadataSource + metadataSources execinfrapb.MetadataSources // closers is a slice of Closers that need to be Closed on termination. closers colexecop.Closers @@ -307,10 +307,8 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT }, }) } - for _, src := range o.metadataSources { - for _, meta := range src.DrainMeta(ctx) { - msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta)) - } + for _, meta := range o.metadataSources.DrainMeta(ctx) { + msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta)) } if len(msg.Data.Metadata) == 0 { return nil diff --git a/pkg/sql/execinfrapb/BUILD.bazel b/pkg/sql/execinfrapb/BUILD.bazel index 6f77e231df38..8cb1511b22df 100644 --- a/pkg/sql/execinfrapb/BUILD.bazel +++ b/pkg/sql/execinfrapb/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", "//pkg/sql/catalog/tabledesc", + "//pkg/sql/colexecerror", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", diff --git a/pkg/sql/execinfrapb/data.go b/pkg/sql/execinfrapb/data.go index 44eae6a79b99..2b66c84a250c 100644 --- a/pkg/sql/execinfrapb/data.go +++ b/pkg/sql/execinfrapb/data.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -336,11 +337,18 @@ type MetadataSource interface { type MetadataSources []MetadataSource // DrainMeta calls DrainMeta on all MetadataSources and returns a single slice -// with all the accumulated metadata. +// with all the accumulated metadata. Note that this method wraps the draining +// with the panic-catcher so that the callers don't have to. func (s MetadataSources) DrainMeta(ctx context.Context) []ProducerMetadata { var result []ProducerMetadata - for _, src := range s { - result = append(result, src.DrainMeta(ctx)...) + if err := colexecerror.CatchVectorizedRuntimeError(func() { + for _, src := range s { + result = append(result, src.DrainMeta(ctx)...) + } + }); err != nil { + meta := GetProducerMeta() + meta.Err = err + result = append(result, *meta) } return result }