Skip to content

Commit

Permalink
importccl: rename import processor receiver
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
pbardea committed Nov 25, 2020
1 parent 5507808 commit 9a675f0
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,62 +88,62 @@ func newReadImportDataProcessor(
}

// Start is part of the RowSource interface.
func (cp *readImportDataProcessor) Start(ctx context.Context) context.Context {
func (idp *readImportDataProcessor) Start(ctx context.Context) context.Context {
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(cp.progCh)
cp.summary, cp.importErr = runImport(ctx, cp.flowCtx, &cp.spec, cp.progCh)
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh)
}()
return cp.StartInternal(ctx, readImportDataProcessorName)
return idp.StartInternal(ctx, readImportDataProcessorName)
}

// Next is part of the RowSource interface.
func (cp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if cp.State != execinfra.StateRunning {
return nil, cp.DrainHelper()
func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if idp.State != execinfra.StateRunning {
return nil, idp.DrainHelper()
}

for prog := range cp.progCh {
for prog := range idp.progCh {
p := prog
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}
}

if cp.importErr != nil {
cp.MoveToDraining(cp.importErr)
return nil, cp.DrainHelper()
if idp.importErr != nil {
idp.MoveToDraining(idp.importErr)
return nil, idp.DrainHelper()
}

if cp.summary == nil {
if idp.summary == nil {
err := errors.Newf("no summary generated by %s", readImportDataProcessorName)
cp.MoveToDraining(err)
return nil, cp.DrainHelper()
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}

// Once the import is done, send back to the controller the serialized
// summary of the import operation. For more info see roachpb.BulkOpSummary.
countsBytes, err := protoutil.Marshal(cp.summary)
countsBytes, err := protoutil.Marshal(idp.summary)
if err != nil {
cp.MoveToDraining(err)
return nil, cp.DrainHelper()
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}

cp.MoveToDraining(nil /* err */)
idp.MoveToDraining(nil /* err */)
return rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, cp.DrainHelper()
}, idp.DrainHelper()
}

// ConsumerDone is part of the RowSource interface.
func (cp *readImportDataProcessor) ConsumerDone() {
cp.MoveToDraining(nil /* err */)
func (idp *readImportDataProcessor) ConsumerDone() {
idp.MoveToDraining(nil /* err */)
}

// ConsumerClosed is part of the RowSource interface.
func (cp *readImportDataProcessor) ConsumerClosed() {
func (idp *readImportDataProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
cp.InternalClose()
idp.InternalClose()
}

func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {
Expand Down

0 comments on commit 9a675f0

Please sign in to comment.