Skip to content

Commit

Permalink
WIP on pooling planNodeToRowSource
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzefovich committed Sep 28, 2022
1 parent d2bfab2 commit d2717cc
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 36 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execagg",
"//pkg/sql/execinfra/execopnode",
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/execstats",
"//pkg/sql/faketreeeval",
Expand Down
8 changes: 3 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,17 +3339,15 @@ func (dsp *DistSQLPlanner) wrapPlan(
// expecting is in fact RowsAffected. RowsAffected statements return a single
// row with the number of rows affected by the statement, and are the only
// types of statement where it's valid to invoke a plan's fast path.
wrapper, err := makePlanNodeToRowSource(n,
wrapper := newPlanNodeToRowSource(
n,
runParams{
extendedEvalCtx: &evalCtx,
p: planCtx.planner,
},
useFastPath,
firstNotWrapped,
)
if err != nil {
return nil, err
}
wrapper.firstNotWrapped = firstNotWrapped

localProcIdx := p.AddLocalProcessor(wrapper)
var input []execinfrapb.InputSyncSpec
Expand Down
113 changes: 82 additions & 31 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ package sql

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -44,33 +46,63 @@ type planNodeToRowSource struct {
row rowenc.EncDatumRow
}

var _ execinfra.LocalProcessor = &planNodeToRowSource{}
var _ execreleasable.Releasable = &planNodeToRowSource{}
var _ execopnode.OpNode = &planNodeToRowSource{}

func makePlanNodeToRowSource(
source planNode, params runParams, fastPath bool,
) (*planNodeToRowSource, error) {
var typs []*types.T
var planNodeToRowSourcePool = sync.Pool{
New: func() interface{} {
return &planNodeToRowSource{}
},
}

func newPlanNodeToRowSource(
source planNode, params runParams, fastPath bool, firstNotWrapped planNode,
) *planNodeToRowSource {
p := planNodeToRowSourcePool.Get().(*planNodeToRowSource)
p.Closed = false
*p = planNodeToRowSource{
ProcessorBase: p.ProcessorBase,
fastPath: fastPath,
node: source,
params: params,
outputTypes: p.outputTypes,
firstNotWrapped: firstNotWrapped,
row: p.row,
}
if fastPath {
// If our node is a "fast path node", it means that we're set up to
// just return a row count meaning we'll output a single row with a
// single INT column.
typs = []*types.T{types.Int}
if cap(p.outputTypes) >= 1 {
// Make sure to not lose larger slice for future reuse.
p.outputTypes = p.outputTypes[:1]
p.outputTypes[0] = types.Int
} else {
p.outputTypes = []*types.T{types.Int}
}
} else {
typs = getTypesFromResultColumns(planColumns(source))
resultColumns := planColumns(source)
if cap(p.outputTypes) >= len(resultColumns) {
p.outputTypes = p.outputTypes[:len(resultColumns)]
} else {
p.outputTypes = make([]*types.T, len(resultColumns))
}
for i := range resultColumns {
p.outputTypes[i] = resultColumns[i].Typ
}
}
row := make(rowenc.EncDatumRow, len(typs))

return &planNodeToRowSource{
node: source,
params: params,
outputTypes: typs,
row: row,
fastPath: fastPath,
}, nil
if p.row != nil && cap(p.row) >= len(p.outputTypes) {
// In some cases we might have no output columns, so nil row would have
// sufficient width, yet nil row is a special value, so we can only
// reuse the old row if it's non-nil.
p.row = p.row[:len(p.outputTypes)]
} else {
p.row = make(rowenc.EncDatumRow, len(p.outputTypes))
}
return p
}

var _ execinfra.LocalProcessor = &planNodeToRowSource{}

// MustBeStreaming implements the execinfra.Processor interface.
func (p *planNodeToRowSource) MustBeStreaming() bool {
// hookFnNode is special because it might be blocked forever if we decide to
Expand All @@ -90,26 +122,15 @@ func (p *planNodeToRowSource) InitWithOutput(
flowCtx,
// Note that we have already created a copy of the extendedEvalContext
// (which made a copy of the EvalContext) right before calling
// makePlanNodeToRowSource, so we can just use the eval context from the
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
0, /* processorID */
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
var meta []execinfrapb.ProducerMetadata
if p.InternalClose() {
// Check if we're wrapping a mutation and emit the rows
// written metric if so.
if m, ok := p.node.(mutationPlanNode); ok {
metrics := execinfrapb.GetMetricsMeta()
metrics.RowsWritten = m.rowsWritten()
meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}}
}
}
return meta
},
// Input to drain is added in SetInput.
TrailingMetaCallback: p.trailingMetaCallback,
},
)
}
Expand Down Expand Up @@ -217,6 +238,36 @@ func (p *planNodeToRowSource) forwardMetadata(metadata *execinfrapb.ProducerMeta
p.ProcessorBase.AppendTrailingMeta(*metadata)
}

func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetadata {
var meta []execinfrapb.ProducerMetadata
if p.InternalClose() {
// Check if we're wrapping a mutation and emit the rows written metric
// if so.
if m, ok := p.node.(mutationPlanNode); ok {
metrics := execinfrapb.GetMetricsMeta()
metrics.RowsWritten = m.rowsWritten()
meta = []execinfrapb.ProducerMetadata{{Metrics: metrics}}
}
}
return meta
}

// Release releases this planNodeToRowSource back to the pool.
func (p *planNodeToRowSource) Release() {
p.ProcessorBase.Reset()
// Deeply reset the row. Note that we don't bother deeply resetting the
// types slice since the types are small objects.
for i := range p.row {
p.row[i] = rowenc.EncDatum{}
}
*p = planNodeToRowSource{
ProcessorBase: p.ProcessorBase,
outputTypes: p.outputTypes[:0],
row: p.row[:0],
}
planNodeToRowSourcePool.Put(p)
}

// ChildCount is part of the execopnode.OpNode interface.
func (p *planNodeToRowSource) ChildCount(verbose bool) int {
if _, ok := p.input.(execopnode.OpNode); ok {
Expand Down

0 comments on commit d2717cc

Please sign in to comment.