diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 23871e66b7d8..b1a1c2bbeefb 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -40,78 +40,117 @@ var csvOutputTypes = []*types.T{ types.Bytes, } +const readImportDataProcessorName = "readImportDataProcessor" + +// readImportDataProcessor is a processor that does not take any inputs. It +// starts a worker goroutine in Start(), which emits progress updates over an +// internally maintained channel. Next() will read from this channel until +// exhausted and then emit the summary that the worker goroutine returns. The +// processor is built this way in order to manage parallelism internally. type readImportDataProcessor struct { + execinfra.ProcessorBase + flowCtx *execinfra.FlowCtx spec execinfrapb.ReadImportDataSpec output execinfra.RowReceiver -} -var _ execinfra.Processor = &readImportDataProcessor{} + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress -func (cp *readImportDataProcessor) OutputTypes() []*types.T { - return csvOutputTypes + importErr error + summary *roachpb.BulkOpSummary } -func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) { - sec := walltime / int64(time.Second) - nsec := walltime % int64(time.Second) - unixtime := timeutil.Unix(sec, nsec) - ctx.StmtTimestamp = unixtime - ctx.TxnTimestamp = unixtime -} +var _ execinfra.Processor = &readImportDataProcessor{} +var _ execinfra.RowSource = &readImportDataProcessor{} func newReadImportDataProcessor( flowCtx *execinfra.FlowCtx, processorID int32, spec execinfrapb.ReadImportDataSpec, + post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) (execinfra.Processor, error) { cp := &readImportDataProcessor{ flowCtx: flowCtx, spec: spec, output: output, + progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress), + } + if err := cp.Init(cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */ + execinfra.ProcStateOpts{ + // This processor doesn't have any inputs to drain. + InputsToDrain: nil, + }); err != nil { + return nil, err } return cp, nil } -func (cp *readImportDataProcessor) Run(ctx context.Context) { - ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor") - defer span.Finish() - defer cp.output.ProducerDone() - - progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) - - var summary *roachpb.BulkOpSummary - var err error +// Start is part of the RowSource interface. +func (idp *readImportDataProcessor) Start(ctx context.Context) context.Context { // We don't have to worry about this go routine leaking because next we loop over progCh // which is closed only after the go routine returns. go func() { - defer close(progCh) - summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh) + defer close(idp.progCh) + idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh) }() + return idp.StartInternal(ctx, readImportDataProcessorName) +} - for prog := range progCh { - // Take a copy so that we can send the progress address to the output processor. +// Next is part of the RowSource interface. +func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + if idp.State != execinfra.StateRunning { + return nil, idp.DrainHelper() + } + + for prog := range idp.progCh { p := prog - cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}) + return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p} } - if err != nil { - cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) - return + if idp.importErr != nil { + idp.MoveToDraining(idp.importErr) + return nil, idp.DrainHelper() + } + + if idp.summary == nil { + err := errors.Newf("no summary generated by %s", readImportDataProcessorName) + idp.MoveToDraining(err) + return nil, idp.DrainHelper() } // Once the import is done, send back to the controller the serialized // summary of the import operation. For more info see roachpb.BulkOpSummary. - countsBytes, err := protoutil.Marshal(summary) + countsBytes, err := protoutil.Marshal(idp.summary) if err != nil { - cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) - return + idp.MoveToDraining(err) + return nil, idp.DrainHelper() } - cp.output.Push(rowenc.EncDatumRow{ + + idp.MoveToDraining(nil /* err */) + return rowenc.EncDatumRow{ rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))), rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))), - }, nil) + }, idp.DrainHelper() +} + +// ConsumerDone is part of the RowSource interface. +func (idp *readImportDataProcessor) ConsumerDone() { + idp.MoveToDraining(nil /* err */) +} + +// ConsumerClosed is part of the RowSource interface. +func (idp *readImportDataProcessor) ConsumerClosed() { + // The consumer is done, Next() will not be called again. + idp.InternalClose() +} + +func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) { + sec := walltime / int64(time.Second) + nsec := walltime % int64(time.Second) + unixtime := timeutil.Unix(sec, nsec) + ctx.StmtTimestamp = unixtime + ctx.TxnTimestamp = unixtime } func makeInputConverter( diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index a94494069a81..49714ef07b80 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -301,8 +301,9 @@ func TestImportIgnoresProcessedFiles(t *testing.T) { for _, testCase := range tests { t.Run(fmt.Sprintf("processes-files-once-%s", testCase.name), func(t *testing.T) { spec := setInputOffsets(t, testCase.spec.getConverterSpec(), testCase.inputOffsets) + post := execinfrapb.PostProcessSpec{} - processor, err := newReadImportDataProcessor(flowCtx, 0, *spec, &errorReportingRowReceiver{t}) + processor, err := newReadImportDataProcessor(flowCtx, 0, *spec, &post, &errorReportingRowReceiver{t}) if err != nil { t.Fatalf("Could not create data processor: %v", err) @@ -573,9 +574,10 @@ func setImportReaderParallelism(parallelism int32) func() { factory := rowexec.NewReadImportDataProcessor rowexec.NewReadImportDataProcessor = func( flowCtx *execinfra.FlowCtx, processorID int32, - spec execinfrapb.ReadImportDataSpec, output execinfra.RowReceiver) (execinfra.Processor, error) { + spec execinfrapb.ReadImportDataSpec, post *execinfrapb.PostProcessSpec, + output execinfra.RowReceiver) (execinfra.Processor, error) { spec.ReaderParallelism = parallelism - return factory(flowCtx, processorID, spec, output) + return factory(flowCtx, processorID, spec, post, output) } return func() { diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index ecf1f2877277..ebd882e8bb5a 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -359,7 +359,7 @@ type ProcessorConstructor func( // FlowCtx *FlowCtx, l RowSource, r RowSource, post *PostProcessSpec, output RowReceiver, // ) (*concatProcessor, error) { // p := &concatProcessor{l: l, r: r} -// if err := p.init( +// if err := p.Init( // post, l.OutputTypes(), FlowCtx, output, // // We pass the inputs to the helper, to be consumed by DrainHelper() later. // ProcStateOpts{ @@ -383,11 +383,11 @@ type ProcessorConstructor func( // } // // // Next is part of the RowSource interface. -// func (p *concatProcessor) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) { +// func (p *concatProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { // // Loop while we haven't produced a row or a metadata record. We loop around // // in several cases, including when the filtering rejected a row coming. // for p.State == StateRunning { -// var row sqlbase.EncDatumRow +// var row rowenc.EncDatumRow // var meta *ProducerMetadata // if !p.leftConsumed { // row, meta = p.l.Next() diff --git a/pkg/sql/rowexec/processors.go b/pkg/sql/rowexec/processors.go index 5ed316c1c1bc..b375814a81ae 100644 --- a/pkg/sql/rowexec/processors.go +++ b/pkg/sql/rowexec/processors.go @@ -238,7 +238,7 @@ func NewProcessor( if NewReadImportDataProcessor == nil { return nil, errors.New("ReadImportData processor unimplemented") } - return NewReadImportDataProcessor(flowCtx, processorID, *core.ReadImport, outputs[0]) + return NewReadImportDataProcessor(flowCtx, processorID, *core.ReadImport, post, outputs[0]) } if core.BackupData != nil { if err := checkNumInOut(inputs, outputs, 0, 1); err != nil { @@ -354,7 +354,7 @@ func NewProcessor( } // NewReadImportDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. -var NewReadImportDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.ReadImportDataSpec, execinfra.RowReceiver) (execinfra.Processor, error) +var NewReadImportDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.ReadImportDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error) // NewBackupDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization. var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDataSpec, execinfra.RowReceiver) (execinfra.Processor, error)