Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsqlrun: clean up the materializer #39386

Merged
merged 1 commit into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/sql/distsqlrun/column_exec_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,12 @@ func wrapRowSource(
// to this operator (e.g. streamIDToOp).
toWrapInput = c.input
} else {
outputToInputColIdx := make([]int, len(inputTypes))
for i := range outputToInputColIdx {
outputToInputColIdx[i] = i
}
var err error
toWrapInput, err = newMaterializer(
flowCtx,
processorID,
input,
inputTypes,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down Expand Up @@ -1337,10 +1332,6 @@ func (s *vectorizedFlowCreator) setupOutput(
}
// Make the materializer, which will write to the given receiver.
columnTypes := s.syncFlowConsumer.Types()
outputToInputColIdx := make([]int, len(columnTypes))
for i := range outputToInputColIdx {
outputToInputColIdx[i] = i
}
var outputStatsToTrace func()
if s.recordingStats {
// Make a copy given that vectorizedStatsCollectorsQueue is reset and
Expand All @@ -1357,7 +1348,6 @@ func (s *vectorizedFlowCreator) setupOutput(
pspec.ProcessorID,
op,
columnTypes,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
s.syncFlowConsumer,
metadataSourcesQueue,
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/distsqlrun/columnar_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,11 @@ func verifyColOperator(
return err
}

outputToInputColIdx := make([]int, len(outputTypes))
for i := range outputTypes {
outputToInputColIdx[i] = i
}
outColOp, err := newMaterializer(
flowCtx,
int32(len(inputs))+2,
result.op,
outputTypes,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func (f *Flow) setupProcessors(ctx context.Context, inputSyncs [][]RowSource) er
func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {
f.spec = spec
if f.isVectorized {
log.VEventf(ctx, 1, "setting up vectorize flow %d", f.id)
log.VEventf(ctx, 1, "setting up vectorize flow %s", f.id.Short())
acc := f.EvalCtx.Mon.MakeBoundAccount()
f.vectorizedBoundAccount = &acc
err := f.setupVectorizedFlow(ctx, f.vectorizedBoundAccount)
Expand All @@ -481,8 +481,10 @@ func (f *Flow) setup(ctx context.Context, spec *distsqlpb.FlowSpec) error {

// startInternal starts the flow. All processors are started, each in their own
// goroutine. The caller must forward any returned error to syncFlowConsumer if
// set.
func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
// set. A new context is derived and returned, and it must be used when this
// method returns so that all components running in their own goroutines could
// listen for a cancellation on the same context.
func (f *Flow) startInternal(ctx context.Context, doneFn func()) (context.Context, error) {
f.doneFn = doneFn
log.VEventf(
ctx, 1, "starting (%d processors, %d startables)", len(f.processors), len(f.startables),
Expand All @@ -503,7 +505,7 @@ func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
if err := f.flowRegistry.RegisterFlow(
ctx, f.id, f, f.inboundStreams, settingFlowStreamTimeout.Get(&f.FlowCtx.Cfg.Settings.SV),
); err != nil {
return err
return ctx, err
}
}

Expand All @@ -523,7 +525,7 @@ func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
}(i)
}
f.startedGoroutines = len(f.startables) > 0 || len(f.processors) > 0 || !f.isLocal()
return nil
return ctx, nil
}

// isLocal returns whether this flow does not have any remote execution.
Expand All @@ -540,7 +542,7 @@ func (f *Flow) isLocal() bool {
// setup error is pushed to the syncFlowConsumer. In this case, a subsequent
// call to f.Wait() will not block.
func (f *Flow) Start(ctx context.Context, doneFn func()) error {
if err := f.startInternal(ctx, doneFn); err != nil {
if _, err := f.startInternal(ctx, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &distsqlpb.ProducerMetadata{Err: err})
Expand Down Expand Up @@ -569,7 +571,8 @@ func (f *Flow) Run(ctx context.Context, doneFn func()) error {
headProc = f.processors[len(f.processors)-1]
f.processors = f.processors[:len(f.processors)-1]

if err := f.startInternal(ctx, doneFn); err != nil {
var err error
if ctx, err = f.startInternal(ctx, doneFn); err != nil {
// For sync flows, the error goes to the consumer.
if f.syncFlowConsumer != nil {
f.syncFlowConsumer.Push(nil /* row */, &distsqlpb.ProducerMetadata{Err: err})
Expand Down
40 changes: 7 additions & 33 deletions pkg/sql/distsqlrun/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ type materializer struct {

da sqlbase.DatumAlloc

// outputToInputColIdx is a mapping from output row index to the operator's
// internal column schema. For example, if the input operator had 2 columns
// [a, b], and the desired output was just [b], outputToInputColIdx would be
// [1]: mapping the 0th column of the output row schema onto the 1st column
// of the operator's row schema.
outputToInputColIdx []int

// runtime fields --

// curIdx represents the current index into the column batch: the next row the
Expand All @@ -53,10 +46,6 @@ type materializer struct {
outputRow sqlbase.EncDatumRow
outputMetadata *distsqlpb.ProducerMetadata

// ctxCancel will cancel the context that is passed to the input (which will
// pass it down further). This allows for the cancellation of the tree rooted
// at this materializer when it is closed.
ctxCancel context.CancelFunc
// cancelFlow will return a function to cancel the context of the flow. It is
// a function in order to be lazily evaluated, since the context cancellation
// function is only available when Starting. This function differs from
Expand All @@ -83,19 +72,15 @@ func newMaterializer(
processorID int32,
input exec.Operator,
typs []types.T,
// TODO(yuzefovich): I feel like we should remove outputToInputColIdx
// argument since it's always {0, 1, ..., len(typs)-1}.
outputToInputColIdx []int,
post *distsqlpb.PostProcessSpec,
output RowReceiver,
metadataSourcesQueue []distsqlpb.MetadataSource,
outputStatsToTrace func(),
cancelFlow func() context.CancelFunc,
) (*materializer, error) {
m := &materializer{
input: input,
outputToInputColIdx: outputToInputColIdx,
row: make(sqlbase.EncDatumRow, len(outputToInputColIdx)),
input: input,
row: make(sqlbase.EncDatumRow, len(typs)),
}

if err := m.ProcessorBase.Init(
Expand Down Expand Up @@ -139,17 +124,7 @@ func (m *materializer) Child(nth int) exec.OpNode {

func (m *materializer) Start(ctx context.Context) context.Context {
m.input.Init()
ctx = m.ProcessorBase.StartInternal(ctx, materializerProcName)
// In general case, ctx that is passed is related to the "flow context" that
// will be canceled by m.cancelFlow. However, in some cases (like when there
// is a subquery), it appears as if the subquery flow context is not related
// to the flow context of the main query, so calling m.cancelFlow will not
// shutdown the subquery tree. To work around this, we always use another
// context and get another cancellation function, and we will trigger both
// upon exit from the materializer.
// TODO(yuzefovich): figure out what is the problem here.
m.Ctx, m.ctxCancel = context.WithCancel(ctx)
return m.Ctx
return m.ProcessorBase.StartInternal(ctx, materializerProcName)
}

// nextAdapter calls next() and saves the returned results in m. For internal
Expand Down Expand Up @@ -182,17 +157,17 @@ func (m *materializer) next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
m.curIdx++

typs := m.OutputTypes()
for outIdx, cIdx := range m.outputToInputColIdx {
col := m.batch.ColVec(cIdx)
for colIdx := 0; colIdx < len(typs); colIdx++ {
col := m.batch.ColVec(colIdx)
// TODO(asubiotto): we shouldn't have to do this check. Figure out who's
// not setting nulls.
if col.MaybeHasNulls() {
if col.Nulls().NullAt(rowIdx) {
m.row[outIdx].Datum = tree.DNull
m.row[colIdx].Datum = tree.DNull
continue
}
}
m.row[outIdx].Datum = exec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[outIdx])
m.row[colIdx].Datum = exec.PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[colIdx])
}
return m.ProcessRowHelper(m.row), nil
}
Expand All @@ -209,7 +184,6 @@ func (m *materializer) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)

func (m *materializer) InternalClose() bool {
if m.ProcessorBase.InternalClose() {
m.ctxCancel()
if m.cancelFlow != nil {
m.cancelFlow()()
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/distsqlrun/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestColumnarizeMaterialize(t *testing.T) {
1, /* processorID */
c,
typs,
[]int{0, 1},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down Expand Up @@ -139,7 +138,6 @@ func TestMaterializeTypes(t *testing.T) {
1, /* processorID */
c,
types,
outputToInputColIdx,
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down Expand Up @@ -194,7 +192,6 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
1, /* processorID */
c,
types,
[]int{0, 1},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourcesQueue */
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/distsqlrun/vectorized_error_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestVectorizedErrorPropagation(t *testing.T) {
1, /* processorID */
vee,
types,
[]int{0},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourceQueue */
Expand Down Expand Up @@ -116,7 +115,6 @@ func TestNonVectorizedErrorPropagation(t *testing.T) {
1, /* processorID */
nvee,
types,
[]int{0},
&distsqlpb.PostProcessSpec{},
nil, /* output */
nil, /* metadataSourceQueue */
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsqlrun/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ func TestVectorizedFlowShutdown(t *testing.T) {
1, /* processorID */
materializerInput,
semtyps,
[]int{0},
&distsqlpb.PostProcessSpec{},
nil, /* output */
materializerMetadataSources,
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/distsqlrun/vectorized_meta_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestVectorizedMetaPropagation(t *testing.T) {
2, /* processorID */
noop,
types,
[]int{0},
&distsqlpb.PostProcessSpec{},
nil, /* output */
[]distsqlpb.MetadataSource{col},
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/exec/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (o *Outbox) Run(

log.VEvent(ctx, 2, "Outbox starting normal operation")
o.runWithStream(ctx, stream, cancelFn)
log.VEvent(ctx, 2, "Outbox exiting")
}

// handleStreamErr is a utility method used to handle an error when calling
Expand Down