Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
57124: importccl: refactor import processor to use ProcessorBase r=pbardea a=pbardea

This commit refactors the import processor to use ProcessorBase.
Although this processor doesn't have any inputs and the main utility
that ProcessorBase provides is management of draining inputs, there is
still some utility for this processor to embed a ProcessorBase. As
tracing and other infrastructural improvements are made to
ProcessorBase, this allows the import processor to stay up to date with
these changes.

The way that this processor works is that it kicks off a goroutine on
start that will begin the import and start sending progress updates over
an internally maintained channel. Next() will continually read from that
channel until it is fully consumed, and then read the returned summary
generated by the goroutine.

Import is build this way so that it can manage concurrency internally to
the processor.

Relates to cockroachdb#57293.

Release note: None

Co-authored-by: Paul Bardea <[email protected]>
  • Loading branch information
craig[bot] and pbardea committed Dec 14, 2020
2 parents 943364e + 9a675f0 commit dffc015
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 41 deletions.
105 changes: 72 additions & 33 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,78 +40,117 @@ var csvOutputTypes = []*types.T{
types.Bytes,
}

const readImportDataProcessorName = "readImportDataProcessor"

// readImportDataProcessor is a processor that does not take any inputs. It
// starts a worker goroutine in Start(), which emits progress updates over an
// internally maintained channel. Next() will read from this channel until
// exhausted and then emit the summary that the worker goroutine returns. The
// processor is built this way in order to manage parallelism internally.
type readImportDataProcessor struct {
execinfra.ProcessorBase

flowCtx *execinfra.FlowCtx
spec execinfrapb.ReadImportDataSpec
output execinfra.RowReceiver
}

var _ execinfra.Processor = &readImportDataProcessor{}
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress

func (cp *readImportDataProcessor) OutputTypes() []*types.T {
return csvOutputTypes
importErr error
summary *roachpb.BulkOpSummary
}

func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
unixtime := timeutil.Unix(sec, nsec)
ctx.StmtTimestamp = unixtime
ctx.TxnTimestamp = unixtime
}
var _ execinfra.Processor = &readImportDataProcessor{}
var _ execinfra.RowSource = &readImportDataProcessor{}

func newReadImportDataProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.ReadImportDataSpec,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
cp := &readImportDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
}
if err := cp.Init(cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
// This processor doesn't have any inputs to drain.
InputsToDrain: nil,
}); err != nil {
return nil, err
}
return cp, nil
}

func (cp *readImportDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor")
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

var summary *roachpb.BulkOpSummary
var err error
// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) context.Context {
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh)
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh)
}()
return idp.StartInternal(ctx, readImportDataProcessorName)
}

for prog := range progCh {
// Take a copy so that we can send the progress address to the output processor.
// Next is part of the RowSource interface.
func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if idp.State != execinfra.StateRunning {
return nil, idp.DrainHelper()
}

for prog := range idp.progCh {
p := prog
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p})
return nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &p}
}

if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
if idp.importErr != nil {
idp.MoveToDraining(idp.importErr)
return nil, idp.DrainHelper()
}

if idp.summary == nil {
err := errors.Newf("no summary generated by %s", readImportDataProcessorName)
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}

// Once the import is done, send back to the controller the serialized
// summary of the import operation. For more info see roachpb.BulkOpSummary.
countsBytes, err := protoutil.Marshal(summary)
countsBytes, err := protoutil.Marshal(idp.summary)
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}
cp.output.Push(rowenc.EncDatumRow{

idp.MoveToDraining(nil /* err */)
return rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, nil)
}, idp.DrainHelper()
}

// ConsumerDone is part of the RowSource interface.
func (idp *readImportDataProcessor) ConsumerDone() {
idp.MoveToDraining(nil /* err */)
}

// ConsumerClosed is part of the RowSource interface.
func (idp *readImportDataProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
idp.InternalClose()
}

func injectTimeIntoEvalCtx(ctx *tree.EvalContext, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
unixtime := timeutil.Unix(sec, nsec)
ctx.StmtTimestamp = unixtime
ctx.TxnTimestamp = unixtime
}

func makeInputConverter(
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,9 @@ func TestImportIgnoresProcessedFiles(t *testing.T) {
for _, testCase := range tests {
t.Run(fmt.Sprintf("processes-files-once-%s", testCase.name), func(t *testing.T) {
spec := setInputOffsets(t, testCase.spec.getConverterSpec(), testCase.inputOffsets)
post := execinfrapb.PostProcessSpec{}

processor, err := newReadImportDataProcessor(flowCtx, 0, *spec, &errorReportingRowReceiver{t})
processor, err := newReadImportDataProcessor(flowCtx, 0, *spec, &post, &errorReportingRowReceiver{t})

if err != nil {
t.Fatalf("Could not create data processor: %v", err)
Expand Down Expand Up @@ -573,9 +574,10 @@ func setImportReaderParallelism(parallelism int32) func() {
factory := rowexec.NewReadImportDataProcessor
rowexec.NewReadImportDataProcessor = func(
flowCtx *execinfra.FlowCtx, processorID int32,
spec execinfrapb.ReadImportDataSpec, output execinfra.RowReceiver) (execinfra.Processor, error) {
spec execinfrapb.ReadImportDataSpec, post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver) (execinfra.Processor, error) {
spec.ReaderParallelism = parallelism
return factory(flowCtx, processorID, spec, output)
return factory(flowCtx, processorID, spec, post, output)
}

return func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ type ProcessorConstructor func(
// FlowCtx *FlowCtx, l RowSource, r RowSource, post *PostProcessSpec, output RowReceiver,
// ) (*concatProcessor, error) {
// p := &concatProcessor{l: l, r: r}
// if err := p.init(
// if err := p.Init(
// post, l.OutputTypes(), FlowCtx, output,
// // We pass the inputs to the helper, to be consumed by DrainHelper() later.
// ProcStateOpts{
Expand All @@ -383,11 +383,11 @@ type ProcessorConstructor func(
// }
//
// // Next is part of the RowSource interface.
// func (p *concatProcessor) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
// func (p *concatProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
// // Loop while we haven't produced a row or a metadata record. We loop around
// // in several cases, including when the filtering rejected a row coming.
// for p.State == StateRunning {
// var row sqlbase.EncDatumRow
// var row rowenc.EncDatumRow
// var meta *ProducerMetadata
// if !p.leftConsumed {
// row, meta = p.l.Next()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func NewProcessor(
if NewReadImportDataProcessor == nil {
return nil, errors.New("ReadImportData processor unimplemented")
}
return NewReadImportDataProcessor(flowCtx, processorID, *core.ReadImport, outputs[0])
return NewReadImportDataProcessor(flowCtx, processorID, *core.ReadImport, post, outputs[0])
}
if core.BackupData != nil {
if err := checkNumInOut(inputs, outputs, 0, 1); err != nil {
Expand Down Expand Up @@ -354,7 +354,7 @@ func NewProcessor(
}

// NewReadImportDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewReadImportDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.ReadImportDataSpec, execinfra.RowReceiver) (execinfra.Processor, error)
var NewReadImportDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.ReadImportDataSpec, *execinfrapb.PostProcessSpec, execinfra.RowReceiver) (execinfra.Processor, error)

// NewBackupDataProcessor is implemented in the non-free (CCL) codebase and then injected here via runtime initialization.
var NewBackupDataProcessor func(*execinfra.FlowCtx, int32, execinfrapb.BackupDataSpec, execinfra.RowReceiver) (execinfra.Processor, error)
Expand Down

0 comments on commit dffc015

Please sign in to comment.