Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: refactor import processor to use ProcessorBase #57124

Merged
merged 2 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 72 additions & 33 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,78 +41,117 @@ var csvOutputTypes = []*types.T{
types.Bytes,
}

const readImportDataProcessorName = "readImportDataProcessor"

// readImportDataProcessor is a processor that does not take any inputs. It
// starts a worker goroutine in Start(), which emits progress updates over an
// internally maintained channel. Next() will read from this channel until
// exhausted and then emit the summary that the worker goroutine returns. The
// processor is built this way in order to manage parallelism internally.
type readImportDataProcessor struct {
execinfra.ProcessorBase

flowCtx *execinfra.FlowCtx
spec execinfrapb.ReadImportDataSpec
output execinfra.RowReceiver
}

var _ execinfra.Processor = &readImportDataProcessor{}
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress

func (cp *readImportDataProcessor) OutputTypes() []*types.T {
return csvOutputTypes
importErr error
summary *roachpb.BulkOpSummary
}

func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
unixtime := timeutil.Unix(sec, nsec)
ctx.StmtTimestamp = unixtime
ctx.TxnTimestamp = unixtime
}
var _ execinfra.Processor = &readImportDataProcessor{}
var _ execinfra.RowSource = &readImportDataProcessor{}

func newReadImportDataProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.ReadImportDataSpec,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
cp := &readImportDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
}
if err := cp.Init(cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
// This processor doesn't have any inputs to drain.
InputsToDrain: nil,
}); err != nil {
return nil, err
}
return cp, nil
}

func (cp *readImportDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor")
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

var summary *roachpb.BulkOpSummary
var err error
// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) context.Context {
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh)
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh)
}()
return idp.StartInternal(ctx, readImportDataProcessorName)
}

for prog := range progCh {
// Take a copy so that we can send the progress address to the output processor.
// Next is part of the RowSource interface.
func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if idp.State != execinfra.StateRunning {
return nil, idp.DrainHelper()
}

for prog := range idp.progCh {
p := prog
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p})
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}
}

if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
if idp.importErr != nil {
idp.MoveToDraining(idp.importErr)
return nil, idp.DrainHelper()
}

if idp.summary == nil {
err := errors.Newf("no summary generated by %s", readImportDataProcessorName)
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}

// Once the import is done, send back to the controller the serialized
// summary of the import operation. For more info see roachpb.BulkOpSummary.
countsBytes, err := protoutil.Marshal(summary)
countsBytes, err := protoutil.Marshal(idp.summary)
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}
cp.output.Push(rowenc.EncDatumRow{

idp.MoveToDraining(nil /* err */)
return rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, nil)
}, idp.DrainHelper()
}

// ConsumerDone is part of the RowSource interface.
func (idp *readImportDataProcessor) ConsumerDone() {
idp.MoveToDraining(nil /* err */)
}

// ConsumerClosed is part of the RowSource interface.
func (idp *readImportDataProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
idp.InternalClose()
}

func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
unixtime := timeutil.Unix(sec, nsec)
ctx.StmtTimestamp = unixtime
ctx.TxnTimestamp = unixtime
}

func makeInputConverter(
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ func TestImportIgnoresProcessedFiles(t *testing.T) {
for _, testCase := range tests {
t.Run(fmt.Sprintf("processes-files-once-%s", testCase.name), func(t *testing.T) {
spec := setInputOffsets(t, testCase.spec.getConverterSpec(), testCase.inputOffsets)
post := execinfrapb.PostProcessSpec{}

processor, err := newReadImportDataProcessor(flowCtx, 0, *spec, &errorReportingRowReceiver{t})
processor, err := newReadImportDataProcessor(flowCtx, 0, *spec, &post, &errorReportingRowReceiver{t})

if err != nil {
t.Fatalf("Could not create data processor: %v", err)
Expand Down Expand Up @@ -573,9 +574,10 @@ func setImportReaderParallelism(parallelism int32) func() {
factory := rowexec.NewReadImportDataProcessor
rowexec.NewReadImportDataProcessor = func(
flowCtx *execinfra.FlowCtx, processorID int32,
spec execinfrapb.ReadImportDataSpec, output execinfra.RowReceiver) (execinfra.Processor, error) {
spec execinfrapb.ReadImportDataSpec, post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver) (execinfra.Processor, error) {
spec.ReaderParallelism = parallelism
return factory(flowCtx, processorID, spec, output)
return factory(flowCtx, processorID, spec, post, output)
}

return func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ type ProcessorConstructor func(
// FlowCtx *FlowCtx, l RowSource, r RowSource, post *PostProcessSpec, output RowReceiver,
// ) (*concatProcessor, error) {
// p := &concatProcessor{l: l, r: r}
// if err := p.init(
// if err := p.Init(
// post, l.OutputTypes(), FlowCtx, output,
// // We pass the inputs to the helper, to be consumed by DrainHelper() later.
// ProcStateOpts{
Expand All @@ -410,11 +410,11 @@ type ProcessorConstructor func(
// }
//
// // Next is part of the RowSource interface.
// func (p *concatProcessor) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
// func (p *concatProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
// // Loop while we haven't produced a row or a metadata record. We loop around
// // in several cases, including when the filtering rejected a row coming.
// for p.State == StateRunning {
// var row sqlbase.EncDatumRow
// var row rowenc.EncDatumRow
// var meta *ProducerMetadata
// if !p.leftConsumed {
// row, meta = p.l.Next()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func NewProcessor(
if NewReadImportDataProcessor == nil {
return nil, errors.New("ReadImportData processor unimplemented")
}
return NewReadImportDataProcessor(flowCtx, processorID, *core.ReadImport, outputs[0])
return NewReadImportDataProcessor(flowCtx, processorID, *core.ReadImport, post, outputs[0])
}
if core.BackupData != nil {
if err := checkNumInOut(inputs, outputs, 0, 1); err != nil {
Expand Down Expand Up @@ -348,7 +348,7 @@ func NewProcessor(
}

// NewReadImportDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewReadImportDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.ReadImportDataSpec, execinfra.RowReceiver) (execinfra.Processor, error)
var NewReadImportDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.ReadImportDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error)

// NewBackupDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDataSpec, execinfra.RowReceiver) (execinfra.Processor, error)
Expand Down