From 4088cdd89d8b6d13ead8e000f680369ee8e60598 Mon Sep 17 00:00:00 2001 From: Arjun Narayan Date: Thu, 19 Apr 2018 15:29:52 -0400 Subject: [PATCH 1/2] distsqlrun: export distinct processor Prerequisite before embedding it in the planNodes in Local SQL. Release note: None --- pkg/sql/distsqlrun/distinct.go | 44 +++++++++++++++++----------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/sql/distsqlrun/distinct.go b/pkg/sql/distsqlrun/distinct.go index d126300ae9a9..98e3281969be 100644 --- a/pkg/sql/distsqlrun/distinct.go +++ b/pkg/sql/distsqlrun/distinct.go @@ -25,7 +25,7 @@ import ( "github.com/pkg/errors" ) -type distinct struct { +type Distinct struct { processorBase input RowSource @@ -41,19 +41,19 @@ type distinct struct { scratch []byte } -// sortedDistinct is a specialized distinct that can be used when all of the +// SortedDistinct is a specialized distinct that can be used when all of the // distinct columns are also ordered. -type sortedDistinct struct { - distinct +type SortedDistinct struct { + Distinct } -var _ Processor = &distinct{} -var _ RowSource = &distinct{} +var _ Processor = &Distinct{} +var _ RowSource = &Distinct{} const distinctProcName = "distinct" -var _ Processor = &sortedDistinct{} -var _ RowSource = &sortedDistinct{} +var _ Processor = &SortedDistinct{} +var _ RowSource = &SortedDistinct{} const sortedDistinctProcName = "sorted distinct" @@ -82,7 +82,7 @@ func newDistinct( distinctCols.Add(int(col)) } - d := &distinct{ + d := &Distinct{ input: input, orderedCols: spec.OrderedColumns, distinctCols: distinctCols, @@ -106,8 +106,8 @@ func newDistinct( if allSorted { // We can use the faster sortedDistinct processor. - return &sortedDistinct{ - distinct: *d, + return &SortedDistinct{ + Distinct: *d, }, nil } @@ -115,13 +115,13 @@ func newDistinct( } // Start is part of the RowSource interface. -func (d *distinct) Start(ctx context.Context) context.Context { +func (d *Distinct) Start(ctx context.Context) context.Context { d.input.Start(ctx) return d.startInternal(ctx, distinctProcName) } // Run is part of the processor interface. -func (d *distinct) Run(ctx context.Context, wg *sync.WaitGroup) { +func (d *Distinct) Run(ctx context.Context, wg *sync.WaitGroup) { if d.out.output == nil { panic("distinct output not initialized for emitting rows") } @@ -133,13 +133,13 @@ func (d *distinct) Run(ctx context.Context, wg *sync.WaitGroup) { } // Start is part of the RowSource interface. -func (d *sortedDistinct) Start(ctx context.Context) context.Context { +func (d *SortedDistinct) Start(ctx context.Context) context.Context { d.input.Start(ctx) return d.startInternal(ctx, sortedDistinctProcName) } // Run is part of the processor interface. -func (d *sortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) { +func (d *SortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) { if d.out.output == nil { panic("distinct output not initialized for emitting rows") } @@ -150,7 +150,7 @@ func (d *sortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) { } } -func (d *distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) { +func (d *Distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) { if !d.haveLastGroupKey { return false, nil } @@ -167,7 +167,7 @@ func (d *distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) { // encode appends the encoding of non-ordered columns, which we use as a key in // our 'seen' set. -func (d *distinct) encode(appendTo []byte, row sqlbase.EncDatumRow) ([]byte, error) { +func (d *Distinct) encode(appendTo []byte, row sqlbase.EncDatumRow) ([]byte, error) { var err error for i, datum := range row { // Ignore columns that are not in the distinctCols, as if we are @@ -192,14 +192,14 @@ func (d *distinct) encode(appendTo []byte, row sqlbase.EncDatumRow) ([]byte, err return appendTo, nil } -func (d *distinct) close() { +func (d *Distinct) close() { // Need to close the mem accounting while the context is still valid. d.memAcc.Close(d.ctx) d.internalClose() } // Next is part of the RowSource interface. -func (d *distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { +func (d *Distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { for d.state == stateRunning { row, meta := d.input.Next() if meta != nil { @@ -266,7 +266,7 @@ func (d *distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { // // sortedDistinct is simpler than distinct. All it has to do is keep track // of the last row it saw, emitting if the new row is different. -func (d *sortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { +func (d *SortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { for d.state == stateRunning { row, meta := d.input.Next() if meta != nil { @@ -296,12 +296,12 @@ func (d *sortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { } // ConsumerDone is part of the RowSource interface. -func (d *distinct) ConsumerDone() { +func (d *Distinct) ConsumerDone() { d.input.ConsumerDone() } // ConsumerClosed is part of the RowSource interface. -func (d *distinct) ConsumerClosed() { +func (d *Distinct) ConsumerClosed() { // The consumer is done, Next() will not be called again. d.close() } From c74fb21950722bade7f003ac174c7926c84b2455 Mon Sep 17 00:00:00 2001 From: Arjun Narayan Date: Fri, 16 Mar 2018 09:20:43 -0400 Subject: [PATCH 2/2] sql: use distsql for local distinct Use the distsql distinct processor for processing local sql queries. Closes #23901. Release note: none --- pkg/sql/distinct.go | 153 +++++++--------------------- pkg/sql/distsql_physical_planner.go | 37 ++++--- pkg/sql/distsqlrun/base.go | 6 ++ pkg/sql/distsqlrun/distinct.go | 6 +- pkg/sql/distsqlrun/distinct_test.go | 4 +- pkg/sql/distsqlrun/processors.go | 2 +- pkg/sql/plan_node_to_row_source.go | 103 +++++++++++++++++++ pkg/sql/row_source_to_plan_node.go | 86 ++++++++++++++++ 8 files changed, 262 insertions(+), 135 deletions(-) create mode 100644 pkg/sql/plan_node_to_row_source.go create mode 100644 pkg/sql/row_source_to_plan_node.go diff --git a/pkg/sql/distinct.go b/pkg/sql/distinct.go index a1377dc81140..0756b3dc54d2 100644 --- a/pkg/sql/distinct.go +++ b/pkg/sql/distinct.go @@ -15,14 +15,14 @@ package sql import ( - "bytes" "context" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/pkg/errors" ) // distinctNode de-duplicates rows returned by a wrapped planNode. @@ -41,7 +41,7 @@ type distinctNode struct { // the DISTINCT ON () clause. distinctOnColIdxs util.FastIntSet - run distinctRun + run *rowSourceToPlanNode } // distinct constructs a distinctNode. @@ -187,132 +187,57 @@ func (p *planner) distinct( return plan, d, nil } -// distinctRun contains the run-time state of distinctNode during local execution. -type distinctRun struct { - // Encoding of the columnsInOrder columns for the previous row. - prefixSeen []byte - prefixMemAcc mon.BoundAccount - - // Encoding of the non-columnInOrder columns for rows sharing the same - // prefixSeen value. - suffixSeen map[string]struct{} - suffixMemAcc mon.BoundAccount -} - func (n *distinctNode) startExec(params runParams) error { - n.run.prefixMemAcc = params.EvalContext().Mon.MakeBoundAccount() - n.run.suffixMemAcc = params.EvalContext().Mon.MakeBoundAccount() - n.run.suffixSeen = make(map[string]struct{}) - return nil -} - -func (n *distinctNode) Next(params runParams) (bool, error) { - ctx := params.ctx + flowCtx := &distsqlrun.FlowCtx{ + EvalCtx: *params.EvalContext(), + } - for { - if err := params.p.cancelChecker.Check(); err != nil { - return false, err - } + cols := make([]int, len(planColumns(n.plan))) + for i := range cols { + cols[i] = i + } - next, err := n.plan.Next(params) - if !next { - return false, err - } + spec := createDistinctSpec(n, cols) - // Detect duplicates - prefix, suffix, err := n.encodeDistinctOnVals(n.plan.Values()) - if err != nil { - return false, err - } + input, err := makePlanNodeToRowSource(n.plan, params) + if err != nil { + return err + } + if len(spec.DistinctColumns) == 0 { + return errors.New("cannot initialize a distinctNode with 0 columns") + } - if !bytes.Equal(prefix, n.run.prefixSeen) { - // The prefix of the row which is ordered differs from the last row; - // reset our seen set. - if len(n.run.suffixSeen) > 0 { - n.run.suffixMemAcc.Clear(ctx) - n.run.suffixSeen = make(map[string]struct{}) - } - if err := n.run.prefixMemAcc.Resize( - ctx, int64(len(n.run.prefixSeen)), int64(len(prefix))); err != nil { - return false, err - } - n.run.prefixSeen = prefix - if suffix != nil { - if err := n.addSuffixSeen(ctx, &n.run.suffixMemAcc, string(suffix)); err != nil { - return false, err - } - } - return true, nil - } + post := &distsqlrun.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic. + var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource. - // The prefix of the row is the same as the last row; check - // to see if the suffix which is not ordered has been seen. - if suffix != nil { - sKey := string(suffix) - if _, ok := n.run.suffixSeen[sKey]; !ok { - if err := n.addSuffixSeen(ctx, &n.run.suffixMemAcc, sKey); err != nil { - return false, err - } - return true, nil - } - } + proc, err := distsqlrun.NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, output) + if err != nil { + return err } -} -func (n *distinctNode) Values() tree.Datums { - // We return only the required columns set during planning. - // These columns are always at the beginning of the child row since - // we _append_ additional DISTINCT ON columns. - // See planner.distinct. - return n.plan.Values() -} + n.run = makeRowSourceToPlanNode(proc) -func (n *distinctNode) Close(ctx context.Context) { - n.plan.Close(ctx) - n.run.prefixSeen = nil - n.run.prefixMemAcc.Close(ctx) - n.run.suffixSeen = nil - n.run.suffixMemAcc.Close(ctx) -} + n.run.source.Start(params.ctx) -func (n *distinctNode) addSuffixSeen( - ctx context.Context, acc *mon.BoundAccount, sKey string, -) error { - sz := int64(len(sKey)) - if err := acc.Grow(ctx, sz); err != nil { - return err - } - n.run.suffixSeen[sKey] = struct{}{} return nil } -// TODO(irfansharif): This can be refactored away to use -// sqlbase.EncodeDatums([]byte, tree.Datums) -func (n *distinctNode) encodeDistinctOnVals(values tree.Datums) ([]byte, []byte, error) { - var prefix, suffix []byte - var err error - for i, val := range values { - // Only encode DISTINCT ON expressions/columns (if applicable). - if !n.distinctOnColIdxs.Empty() && !n.distinctOnColIdxs.Contains(i) { - continue - } +func (n *distinctNode) Next(params runParams) (bool, error) { + return n.run.Next(params) +} - if n.columnsInOrder != nil && n.columnsInOrder[i] { - if prefix == nil { - prefix = make([]byte, 0, 100) - } - prefix, err = sqlbase.EncodeDatum(prefix, val) - } else { - if suffix == nil { - suffix = make([]byte, 0, 100) - } - suffix, err = sqlbase.EncodeDatum(suffix, val) - } - if err != nil { - break - } +func (n *distinctNode) Values() tree.Datums { + return n.run.Values() +} + +func (n *distinctNode) Close(ctx context.Context) { + if n.run != nil { + n.run.Close(ctx) + } else { + // If we haven't gotten around to initializing n.run yet, then we still + // need to propagate the close message to our inputs - do so directly. + n.plan.Close(ctx) } - return prefix, suffix, err } // projectChildPropsToOnExprs takes the physical props (e.g. ordering info, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0b41d01525b2..35b2b5e1d7f4 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1718,7 +1718,7 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin( } // getTypesForPlanResult returns the types of the elements in the result streams -// of a plan that corresponds to a given planNode. If planToSreamColMap is nil, +// of a plan that corresponds to a given planNode. If planToStreamColMap is nil, // a 1-1 mapping is assumed. func getTypesForPlanResult(node planNode, planToStreamColMap []int) ([]sqlbase.ColumnType, error) { nodeColumns := planColumns(node) @@ -2220,24 +2220,17 @@ func (dsp *DistSQLPlanner) createPlanForValues( }, nil } -func (dsp *DistSQLPlanner) createPlanForDistinct( - planCtx *planningCtx, n *distinctNode, -) (physicalPlan, error) { - plan, err := dsp.createPlanForNode(planCtx, n.plan) - if err != nil { - return physicalPlan{}, err - } - currentResultRouters := plan.ResultRouters +func createDistinctSpec(n *distinctNode, cols []int) *distsqlrun.DistinctSpec { var orderedColumns []uint32 for i, o := range n.columnsInOrder { if o { - orderedColumns = append(orderedColumns, uint32(plan.planToStreamColMap[i])) + orderedColumns = append(orderedColumns, uint32(cols[i])) } } var distinctColumns []uint32 if !n.distinctOnColIdxs.Empty() { - for planCol, streamCol := range plan.planToStreamColMap { + for planCol, streamCol := range cols { if streamCol != -1 && n.distinctOnColIdxs.Contains(planCol) { distinctColumns = append(distinctColumns, uint32(streamCol)) } @@ -2245,17 +2238,29 @@ func (dsp *DistSQLPlanner) createPlanForDistinct( } else { // If no distinct columns were specified, run distinct on the entire row. for planCol := range planColumns(n) { - if streamCol := plan.planToStreamColMap[planCol]; streamCol != -1 { + if streamCol := cols[planCol]; streamCol != -1 { distinctColumns = append(distinctColumns, uint32(streamCol)) } } } + return &distsqlrun.DistinctSpec{ + OrderedColumns: orderedColumns, + DistinctColumns: distinctColumns, + } +} + +func (dsp *DistSQLPlanner) createPlanForDistinct( + planCtx *planningCtx, n *distinctNode, +) (physicalPlan, error) { + plan, err := dsp.createPlanForNode(planCtx, n.plan) + if err != nil { + return physicalPlan{}, err + } + currentResultRouters := plan.ResultRouters + distinctSpec := distsqlrun.ProcessorCoreUnion{ - Distinct: &distsqlrun.DistinctSpec{ - OrderedColumns: orderedColumns, - DistinctColumns: distinctColumns, - }, + Distinct: createDistinctSpec(n, plan.planToStreamColMap), } if len(currentResultRouters) == 1 { diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index f9cec571fd14..e3adbe07ac7d 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -155,6 +155,12 @@ type RowSource interface { ConsumerClosed() } +// RowSourcedProcessor is the union of RowSource and Processor. +type RowSourcedProcessor interface { + RowSource + Run(_ context.Context, wg *sync.WaitGroup) +} + // Run reads records from the source and outputs them to the receiver, properly // draining the source of metadata and closing both the source and receiver. // diff --git a/pkg/sql/distsqlrun/distinct.go b/pkg/sql/distsqlrun/distinct.go index 98e3281969be..41d14b697558 100644 --- a/pkg/sql/distsqlrun/distinct.go +++ b/pkg/sql/distsqlrun/distinct.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" ) +// Distinct is the physical processor implementation of the DISTINCT relational operator. type Distinct struct { processorBase @@ -57,14 +58,15 @@ var _ RowSource = &SortedDistinct{} const sortedDistinctProcName = "sorted distinct" -func newDistinct( +// NewDistinct instantiates a new Distinct processor. +func NewDistinct( flowCtx *FlowCtx, processorID int32, spec *DistinctSpec, input RowSource, post *PostProcessSpec, output RowReceiver, -) (Processor, error) { +) (RowSourcedProcessor, error) { if len(spec.DistinctColumns) == 0 { return nil, errors.New("programming error: 0 distinct columns specified for distinct processor") } diff --git a/pkg/sql/distsqlrun/distinct_test.go b/pkg/sql/distsqlrun/distinct_test.go index 5aef99c6669c..7dc38fbfb677 100644 --- a/pkg/sql/distsqlrun/distinct_test.go +++ b/pkg/sql/distsqlrun/distinct_test.go @@ -123,7 +123,7 @@ func TestDistinct(t *testing.T) { EvalCtx: evalCtx, } - d, err := newDistinct(&flowCtx, 0 /* processorID */, &ds, in, &PostProcessSpec{}, out) + d, err := NewDistinct(&flowCtx, 0 /* processorID */, &ds, in, &PostProcessSpec{}, out) if err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ func benchmarkDistinct(b *testing.B, orderedColumns []uint32) { b.SetBytes(int64(8 * numRows * numCols)) b.ResetTimer() for i := 0; i < b.N; i++ { - d, err := newDistinct(flowCtx, 0 /* processorID */, spec, input, post, &RowDisposer{}) + d, err := NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, &RowDisposer{}) if err != nil { b.Fatal(err) } diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index 70f77456a3b0..5296db8b0f5b 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -916,7 +916,7 @@ func newProcessor( if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { return nil, err } - return newDistinct(flowCtx, processorID, core.Distinct, inputs[0], post, outputs[0]) + return NewDistinct(flowCtx, processorID, core.Distinct, inputs[0], post, outputs[0]) } if core.Aggregator != nil { if err := checkNumInOut(inputs, outputs, 1, 1); err != nil { diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go new file mode 100644 index 000000000000..875881b6e3cc --- /dev/null +++ b/pkg/sql/plan_node_to_row_source.go @@ -0,0 +1,103 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" +) + +type planNodeToRowSource struct { + running bool + + node planNode + params runParams + outputTypes []sqlbase.ColumnType + + // run time state machine values + ctx context.Context + row sqlbase.EncDatumRow +} + +func makePlanNodeToRowSource(source planNode, params runParams) (*planNodeToRowSource, error) { + nodeColumns := planColumns(source) + + types := make([]sqlbase.ColumnType, len(nodeColumns)) + for i := range nodeColumns { + colTyp, err := sqlbase.DatumTypeToColumnType(nodeColumns[i].Typ) + if err != nil { + return nil, err + } + types[i] = colTyp + } + row := make(sqlbase.EncDatumRow, len(nodeColumns)) + + return &planNodeToRowSource{ + node: source, + params: params, + outputTypes: types, + row: row, + running: true, + }, nil +} + +var _ distsqlrun.RowSource = &planNodeToRowSource{} + +func (p *planNodeToRowSource) OutputTypes() []sqlbase.ColumnType { + return p.outputTypes +} + +func (p *planNodeToRowSource) Start(ctx context.Context) context.Context { + p.ctx = ctx + return ctx +} + +func (p *planNodeToRowSource) internalClose() { + if p.running { + p.node.Close(p.ctx) + p.running = false + } +} + +func (p *planNodeToRowSource) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMetadata) { + if !p.running { + return nil, nil + } + + valid, err := p.node.Next(p.params) + if err != nil { + p.internalClose() + return nil, &distsqlrun.ProducerMetadata{Err: err} + } + if !valid { + p.internalClose() + return nil, nil + } + + for i, datum := range p.node.Values() { + p.row[i] = sqlbase.DatumToEncDatum(p.outputTypes[i], datum) + } + return p.row, nil +} + +func (p *planNodeToRowSource) ConsumerDone() { + p.internalClose() +} + +func (p *planNodeToRowSource) ConsumerClosed() { + p.internalClose() +} diff --git a/pkg/sql/row_source_to_plan_node.go b/pkg/sql/row_source_to_plan_node.go new file mode 100644 index 000000000000..46dba9ae74c4 --- /dev/null +++ b/pkg/sql/row_source_to_plan_node.go @@ -0,0 +1,86 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sql + +import ( + "context" + + "fmt" + + "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" +) + +// rowSourceToPlanNode wraps a RowSource and presents it as a PlanNode. It must +// be constructed with Create(), after which it is a PlanNode and can be treated +// as such. +type rowSourceToPlanNode struct { + source distsqlrun.RowSource + + // Temporary variables + row sqlbase.EncDatumRow + da sqlbase.DatumAlloc + datumRow tree.Datums +} + +var _ planNode = &rowSourceToPlanNode{} + +func makeRowSourceToPlanNode(s distsqlrun.RowSource) *rowSourceToPlanNode { + row := make(tree.Datums, len(s.OutputTypes())) + + return &rowSourceToPlanNode{ + source: s, + datumRow: row, + } +} + +func (r *rowSourceToPlanNode) Next(params runParams) (bool, error) { + for { + var p *distsqlrun.ProducerMetadata + r.row, p = r.source.Next() + + if p != nil { + if p.Err != nil { + return false, p.Err + } + if p.TraceData != nil { + // We drop trace metadata since we have no reasonable way to propagate + // it in local SQL execution. + continue + } + return false, fmt.Errorf("unexpected producer metadata: %+v", p) + } + + if r.row == nil { + return false, nil + } + + if err := sqlbase.EncDatumRowToDatums(r.source.OutputTypes(), r.datumRow, r.row, &r.da); err != nil { + return false, err + } + return true, nil + } +} + +func (r *rowSourceToPlanNode) Values() tree.Datums { + return r.datumRow +} + +func (r *rowSourceToPlanNode) Close(ctx context.Context) { + if r.source != nil { + r.source.ConsumerClosed() + } +}