From 9a675f05e3bad20bc8ac58a857e50126267d9abe Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Wed, 25 Nov 2020 16:55:19 -0500 Subject: [PATCH] importccl: rename import processor receiver Release note: None --- pkg/ccl/importccl/import_processor.go | 46 +++++++++++++-------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index a95edbc6ec48..d9ec09333547 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -88,62 +88,62 @@ func newReadImportDataProcessor( } // Start is part of the RowSource interface. -func (cp *readImportDataProcessor) Start(ctx context.Context) context.Context { +func (idp *readImportDataProcessor) Start(ctx context.Context) context.Context { // We don't have to worry about this go routine leaking because next we loop over progCh // which is closed only after the go routine returns. go func() { - defer close(cp.progCh) - cp.summary, cp.importErr = runImport(ctx, cp.flowCtx, &cp.spec, cp.progCh) + defer close(idp.progCh) + idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh) }() - return cp.StartInternal(ctx, readImportDataProcessorName) + return idp.StartInternal(ctx, readImportDataProcessorName) } // Next is part of the RowSource interface. -func (cp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { - if cp.State != execinfra.StateRunning { - return nil, cp.DrainHelper() +func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) { + if idp.State != execinfra.StateRunning { + return nil, idp.DrainHelper() } - for prog := range cp.progCh { + for prog := range idp.progCh { p := prog return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p} } - if cp.importErr != nil { - cp.MoveToDraining(cp.importErr) - return nil, cp.DrainHelper() + if idp.importErr != nil { + idp.MoveToDraining(idp.importErr) + return nil, idp.DrainHelper() } - if cp.summary == nil { + if idp.summary == nil { err := errors.Newf("no summary generated by %s", readImportDataProcessorName) - cp.MoveToDraining(err) - return nil, cp.DrainHelper() + idp.MoveToDraining(err) + return nil, idp.DrainHelper() } // Once the import is done, send back to the controller the serialized // summary of the import operation. For more info see roachpb.BulkOpSummary. - countsBytes, err := protoutil.Marshal(cp.summary) + countsBytes, err := protoutil.Marshal(idp.summary) if err != nil { - cp.MoveToDraining(err) - return nil, cp.DrainHelper() + idp.MoveToDraining(err) + return nil, idp.DrainHelper() } - cp.MoveToDraining(nil /* err */) + idp.MoveToDraining(nil /* err */) return rowenc.EncDatumRow{ rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))), rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))), - }, cp.DrainHelper() + }, idp.DrainHelper() } // ConsumerDone is part of the RowSource interface. -func (cp *readImportDataProcessor) ConsumerDone() { - cp.MoveToDraining(nil /* err */) +func (idp *readImportDataProcessor) ConsumerDone() { + idp.MoveToDraining(nil /* err */) } // ConsumerClosed is part of the RowSource interface. -func (cp *readImportDataProcessor) ConsumerClosed() { +func (idp *readImportDataProcessor) ConsumerClosed() { // The consumer is done, Next() will not be called again. - cp.InternalClose() + idp.InternalClose() } func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {