Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
63103: sql: wrap CastExpr within IndirectionExpr in a ParenExpr r=ajwerner,rafiss a=the-ericwang35

Fixes #63097.

When we type check ParenExpr, we remove the parentheses.
This can cause issues for IndirectionExprs.

For example, something like `('{a}'::_typ)[1]` would
turn into `'{a}'::_typ[1]` after type checking, resulting
in the `[1]` being interpreted as part of the type.
This would result in errors when trying to use these
expressions in default expressions.
This patch checks for this specific case, and if
it finds that there is a CastExpr within an IndirecionExpr,
it will wrap it in a ParenExpr.

Release note: None

63108: colexec: wrap DrainMeta with panic-catcher and protect columnarizer r=yuzefovich a=yuzefovich

**colexec: wrap DrainMeta with panic-catcher and protect columnarizer**

Previously, in some edge cases (like when a panic is encountered during
`Operator.Init`) the metadata sources could have been uninitialized, so
when we tried to drain them, we'd encounter a crash. In order to avoid
that in the future, now all root components will wrap the draining with
the panic-catcher. Additionally, we now protect the columnarizer in this
case explicitly - if it wasn't initialized, it won't drain the wrapped
processor in `DrainMeta`.

Fixes: #62514.

Release note: None

**rowexec: remove redundant implementations of MetadataSource interface**

Previously, some row-by-row processors implemented
`execinfra.MetadataSource` interface. The idea behind that originally
was to allow for wrapped processors to return their metadata in the
vectorized flow, but nothing explicit is actually needed because every
wrapped processor has a columnarizer after it which will drain the
processor according to row-by-row model (by moving into draining state
and exhausting the trailing meta). This commit removes those redundant
implementations.

This allows us to move the interface into `colexecop` package where it
belongs.

Release note: None

63169: README.md: fix indentation for Deployment section r=ajwerner a=ajwerner

It was formatted as preformatted as opposed to the list it was trying to be.

Release note: None

Co-authored-by: Eric Wang <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
4 people committed Apr 7, 2021
4 parents aa4d1f0 + 9b62b56 + 70f2639 + c1029fe commit fe1327d
Show file tree
Hide file tree
Showing 36 changed files with 208 additions and 155 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ CockroachDB supports the PostgreSQL wire protocol, so you can use any available

## Deployment

- [CockroachCloud](https://www.cockroachlabs.com/docs/cockroachcloud/quickstart) - Steps to deploy a [free CockroachCloud cluster](https://cockroachlabs.cloud/signup?referralId=githubquickstart) on public Cloud platforms.
- [Manual](https://www.cockroachlabs.com/docs/stable/manual-deployment.html) - Steps to deploy a CockroachDB cluster manually on multiple machines.
- [Cloud](https://www.cockroachlabs.com/docs/stable/cloud-deployment.html) - Guides for deploying CockroachDB on various cloud platforms.
- [Orchestration](https://www.cockroachlabs.com/docs/stable/orchestration.html) - Guides for running CockroachDB with popular open-source orchestration systems.
- [CockroachCloud](https://www.cockroachlabs.com/docs/cockroachcloud/quickstart) - Steps to deploy a [free CockroachCloud cluster](https://cockroachlabs.cloud/signup?referralId=githubquickstart) on public Cloud platforms.
- [Manual](https://www.cockroachlabs.com/docs/stable/manual-deployment.html) - Steps to deploy a CockroachDB cluster manually on multiple machines.
- [Cloud](https://www.cockroachlabs.com/docs/stable/cloud-deployment.html) - Guides for deploying CockroachDB on various cloud platforms.
- [Orchestration](https://www.cockroachlabs.com/docs/stable/orchestration.html) - Guides for running CockroachDB with popular open-source orchestration systems.

## Need Help?

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func wrapRowSources(
c.MarkAsRemovedFromFlow()
toWrapInputs = append(toWrapInputs, c.Input())
} else {
var metadataSources execinfrapb.MetadataSources
var metadataSources colexecop.MetadataSources
if len(args.MetadataSources) > i {
// In some testing paths, MetadataSources might be left unset,
// so we check whether the slice has ith element. In the
Expand Down Expand Up @@ -616,7 +616,7 @@ func (r opResult) createAndWrapRowSource(
if args.TestingKnobs.PlanInvariantsCheckers {
r.Op = colexec.NewInvariantsChecker(r.Op)
}
r.MetadataSources = append(r.MetadataSources, r.Op.(execinfrapb.MetadataSource))
r.MetadataSources = append(r.MetadataSources, r.Op.(colexecop.MetadataSource))
r.ToClose = append(r.ToClose, c)
r.Releasables = append(r.Releasables, releasables...)
return nil
Expand Down Expand Up @@ -765,7 +765,7 @@ func NewColOperator(
result.Op = colexec.NewInvariantsChecker(result.Op)
}
result.KVReader = scanOp
result.MetadataSources = append(result.MetadataSources, result.Op.(execinfrapb.MetadataSource))
result.MetadataSources = append(result.MetadataSources, result.Op.(colexecop.MetadataSource))
result.Releasables = append(result.Releasables, scanOp)

// We want to check for cancellation once per input batch, and
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type NewColOperatorArgs struct {
// NewColOperator call creates a colexec.Materializer, then it will take
// over the responsibility of draining the sources; otherwise, they will be
// returned in NewColOperatorResult.
MetadataSources []execinfrapb.MetadataSources
MetadataSources []colexecop.MetadataSources
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Expand Down Expand Up @@ -97,7 +97,7 @@ type NewColOperatorResult struct {
// created during NewColOperator call, but it also might include all of the
// sources from NewColOperatorArgs.MetadataSources if no metadata draining
// component was created.
MetadataSources execinfrapb.MetadataSources
MetadataSources colexecop.MetadataSources
// ToClose is a slice of components that need to be Closed.
ToClose []colexecop.Closer
OpMonitors []*mon.BytesMonitor
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1477,3 +1478,14 @@ func GenerateBatchSize() int {
}
return coldata.BatchSize()
}

// CallbackMetadataSource is a utility struct that implements the
// colexecop.MetadataSource interface by calling a provided callback.
type CallbackMetadataSource struct {
DrainMetaCb func(context.Context) []execinfrapb.ProducerMetadata
}

// DrainMeta is part of the colexecop.MetadataSource interface.
func (s CallbackMetadataSource) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
return s.DrainMetaCb(ctx)
}
14 changes: 10 additions & 4 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,16 +239,22 @@ func (c *Columnarizer) Run(context.Context) {
}

var (
_ colexecop.Operator = &Columnarizer{}
_ execinfrapb.MetadataSource = &Columnarizer{}
_ colexecop.Closer = &Columnarizer{}
_ colexecop.DrainableOperator = &Columnarizer{}
_ colexecop.Closer = &Columnarizer{}
)

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (c *Columnarizer) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
if c.removedFromFlow {
return nil
}
if c.initStatus == colexecop.OperatorNotInitialized {
// The columnarizer wasn't initialized, so the wrapped processors might
// not have been started leaving them in an unsafe to drain state, so
// we skip the draining. Mostly likely this happened because a panic was
// encountered in Init.
return c.accumulatedMeta
}
c.MoveToDraining(nil /* err */)
for {
meta := c.DrainHelper()
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/colexec/invariants_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ type InvariantsChecker struct {
colexecop.NonExplainable

initStatus colexecop.OperatorInitStatus
metadataSource execinfrapb.MetadataSource
metadataSource colexecop.MetadataSource
}

var _ colexecop.Operator = &InvariantsChecker{}
var _ execinfrapb.MetadataSource
var _ colexecop.DrainableOperator = &InvariantsChecker{}

// NewInvariantsChecker creates a new InvariantsChecker.
func NewInvariantsChecker(input colexecop.Operator) *InvariantsChecker {
c := &InvariantsChecker{
OneInputNode: colexecop.OneInputNode{Input: input},
}
if ms, ok := input.(execinfrapb.MetadataSource); ok {
if ms, ok := input.(colexecop.MetadataSource); ok {
c.metadataSource = ms
}
return c
Expand Down Expand Up @@ -99,7 +98,7 @@ func (i *InvariantsChecker) Next(ctx context.Context) coldata.Batch {
return b
}

// DrainMeta implements the execinfrapb.MetadataSource interface.
// DrainMeta implements the colexecop.MetadataSource interface.
func (i *InvariantsChecker) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
if shortCircuit := i.assertInitWasCalled(); shortCircuit {
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type drainHelper struct {
ctx context.Context

getStats func() []*execinfrapb.ComponentStats
sources execinfrapb.MetadataSources
sources colexecop.MetadataSources

bufferedMeta []execinfrapb.ProducerMetadata
}
Expand All @@ -92,7 +92,7 @@ var drainHelperPool = sync.Pool{
}

func newDrainHelper(
getStats func() []*execinfrapb.ComponentStats, sources execinfrapb.MetadataSources,
getStats func() []*execinfrapb.ComponentStats, sources colexecop.MetadataSources,
) *drainHelper {
d := drainHelperPool.Get().(*drainHelper)
d.getStats = getStats
Expand Down Expand Up @@ -199,7 +199,7 @@ func NewMaterializer(
typs []*types.T,
output execinfra.RowReceiver,
getStats func() []*execinfrapb.ComponentStats,
metadataSources []execinfrapb.MetadataSource,
metadataSources []colexecop.MetadataSource,
toClose []colexecop.Closer,
cancelFlow func() context.CancelFunc,
) (*Materializer, error) {
Expand Down Expand Up @@ -274,7 +274,7 @@ func (m *Materializer) Start(ctx context.Context) {
m.MoveToDraining(err)
} else {
// Note that we intentionally only start the drain helper if
// initialization was successful - no starting the helper will tell it
// initialization was successful - not starting the helper will tell it
// to not drain the metadata sources (which have not been properly
// initialized).
m.drainHelper.Start(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
defer leaktest.AfterTest(t)()

testError := errors.New("test-induced error")
metadataSource := &execinfrapb.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
metadataSource := &colexectestutils.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
colexecerror.InternalError(testError)
// Unreachable
return nil
Expand All @@ -209,7 +209,7 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
nil, /* typ */
nil, /* output */
nil, /* getStats */
[]execinfrapb.MetadataSource{metadataSource},
[]colexecop.MetadataSource{metadataSource},
nil, /* toClose */
nil, /* cancelFlow */
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execinfra.O

// SynchronizerInput is a wrapper over a colexecop.Operator that a
// synchronizer goroutine will be calling Next on. An accompanying
// []execinfrapb.MetadataSource may also be specified, in which case
// colexecop.MetadataSources may also be specified, in which case
// DrainMeta will be called from the same goroutine.
type SynchronizerInput struct {
// Op is the input Operator.
Expand All @@ -123,7 +123,7 @@ type SynchronizerInput struct {
StatsCollectors []VectorizedStatsCollector
// MetadataSources are metadata sources in the input tree that should be
// drained in the same goroutine as Op.
MetadataSources execinfrapb.MetadataSources
MetadataSources colexecop.MetadataSources
// ToClose are Closers in the input tree that should be closed in the same
// goroutine as Op.
ToClose colexecop.Closers
Expand Down Expand Up @@ -358,7 +358,7 @@ func (s *ParallelUnorderedSynchronizer) notifyInputToReadNextBatch(inputIdx int)
}
}

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *ParallelUnorderedSynchronizer) DrainMeta(
ctx context.Context,
) []execinfrapb.ProducerMetadata {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
Expand Down Expand Up @@ -59,8 +60,8 @@ func TestParallelUnorderedSynchronizer(t *testing.T) {
source.ResetBatchesToReturn(numBatches)
inputs[i].Op = source
inputIdx := i
inputs[i].MetadataSources = []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
inputs[i].MetadataSources = []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
return []execinfrapb.ProducerMetadata{{Err: errors.Errorf("input %d test-induced metadata", inputIdx)}}
}},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/serial_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SerialUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
}
}

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *SerialUnorderedSynchronizer) DrainMeta(
ctx context.Context,
) []execinfrapb.ProducerMetadata {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexec/serial_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -44,8 +45,8 @@ func TestSerialUnorderedSynchronizer(t *testing.T) {
inputIdx := i
inputs[i] = SynchronizerInput{
Op: source,
MetadataSources: []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
MetadataSources: []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{
DrainMetaCb: func(_ context.Context) []execinfrapb.ProducerMetadata {
return []execinfrapb.ProducerMetadata{{Err: errors.Errorf("input %d test-induced metadata", inputIdx)}}
},
Expand Down
35 changes: 34 additions & 1 deletion pkg/sql/colexecop/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Operator interface {
// DrainMeta may not be called concurrently.
type DrainableOperator interface {
Operator
execinfrapb.MetadataSource
MetadataSource
}

// KVReader is an operator that performs KV reads.
Expand Down Expand Up @@ -310,3 +310,36 @@ func (n *noopOperator) Reset(ctx context.Context) {
r.Reset(ctx)
}
}

// MetadataSource is an interface implemented by processors and columnar
// operators that can produce metadata.
type MetadataSource interface {
// DrainMeta returns all the metadata produced by the processor or operator.
// It will be called exactly once, usually, when the processor or operator
// has finished doing its computations. This is a signal that the output
// requires no more rows to be returned.
// Implementers can choose what to do on subsequent calls (if such occur).
// TODO(yuzefovich): modify the contract to require returning nil on all
// calls after the first one.
DrainMeta(context.Context) []execinfrapb.ProducerMetadata
}

// MetadataSources is a slice of MetadataSource.
type MetadataSources []MetadataSource

// DrainMeta calls DrainMeta on all MetadataSources and returns a single slice
// with all the accumulated metadata. Note that this method wraps the draining
// with the panic-catcher so that the callers don't have to.
func (s MetadataSources) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
var result []execinfrapb.ProducerMetadata
if err := colexecerror.CatchVectorizedRuntimeError(func() {
for _, src := range s {
result = append(result, src.DrainMeta(ctx)...)
}
}); err != nil {
meta := execinfrapb.GetProducerMeta()
meta.Err = err
result = append(result, *meta)
}
return result
}
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *ColBatchScan) Next(ctx context.Context) coldata.Batch {
return bat
}

// DrainMeta is part of the MetadataSource interface.
// DrainMeta is part of the colexecop.MetadataSource interface.
func (s *ColBatchScan) DrainMeta(ctx context.Context) []execinfrapb.ProducerMetadata {
s.mu.Lock()
initialized := s.mu.init
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
"//pkg/col/colserde",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/colexec/colexectestutils",
"//pkg/sql/colexec/colexecutils",
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
Expand Down Expand Up @@ -504,8 +505,8 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
}
outbox, err := NewOutbox(
colmem.NewAllocator(ctx, &outboxMemAcc, coldata.StandardColumnFactory),
input, typs, nil /* getStats */, []execinfrapb.MetadataSource{
execinfrapb.CallbackMetadataSource{
input, typs, nil /* getStats */, []colexecop.MetadataSource{
colexectestutils.CallbackMetadataSource{
DrainMetaCb: func(context.Context) []execinfrapb.ProducerMetadata {
return expectedMetadata
},
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Outbox struct {

// draining is an atomic that represents whether the Outbox is draining.
draining uint32
metadataSources []execinfrapb.MetadataSource
metadataSources colexecop.MetadataSources
// closers is a slice of Closers that need to be Closed on termination.
closers colexecop.Closers

Expand Down Expand Up @@ -81,7 +81,7 @@ func NewOutbox(
input colexecop.Operator,
typs []*types.T,
getStats func() []*execinfrapb.ComponentStats,
metadataSources []execinfrapb.MetadataSource,
metadataSources []colexecop.MetadataSource,
toClose []colexecop.Closer,
) (*Outbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
Expand Down Expand Up @@ -307,10 +307,8 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT
},
})
}
for _, src := range o.metadataSources {
for _, meta := range src.DrainMeta(ctx) {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
for _, meta := range o.metadataSources.DrainMeta(ctx) {
msg.Data.Metadata = append(msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, meta))
}
if len(msg.Data.Metadata) == 0 {
return nil
Expand Down
Loading

0 comments on commit fe1327d

Please sign in to comment.