diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 12819117b4c2..3a321e126097 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -68,19 +68,38 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) { defer tracing.FinishSpan(span) defer cp.output.ProducerDone() - group := ctxgroup.WithContext(ctx) kvCh := make(chan row.KVBatch, 10) - group.GoCtx(func(ctx context.Context) error { return runImport(ctx, cp.flowCtx, &cp.spec, kvCh) }) + progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) + + var summary *roachpb.BulkOpSummary + var err error + // We don't have to worry about this go routine leaking because next we loop over progCh + // which is closed only after the go routine returns. + go func() { + defer close(progCh) + summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, kvCh) + }() + + for prog := range progCh { + cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}) + } - // Ingest the KVs that the producer emitted to the chan and the row result - // at the end is one row containing an encoded BulkOpSummary. - group.GoCtx(func(ctx context.Context) error { - return cp.ingestKvs(ctx, kvCh) - }) + if err != nil { + cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) + return + } - if err := group.Wait(); err != nil { + // Once the import is done, send back to the controller the serialized + // summary of the import operation. For more info see roachpb.BulkOpSummary. + countsBytes, err := protoutil.Marshal(summary) + if err != nil { cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err}) + return } + cp.output.Push(sqlbase.EncDatumRow{ + sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))), + sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))), + }, nil) } func makeInputConverter( @@ -133,13 +152,19 @@ func makeInputConverter( // ingestKvs drains kvs from the channel until it closes, ingesting them using // the BulkAdder. It handles the required buffering/sorting/etc. -func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan row.KVBatch) error { +func ingestKvs( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + spec *execinfrapb.ReadImportDataSpec, + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + kvCh <-chan row.KVBatch, +) (*roachpb.BulkOpSummary, error) { ctx, span := tracing.ChildSpan(ctx, "ingestKVs") defer tracing.FinishSpan(span) - writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos} + writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos} - flushSize := storageccl.MaxImportBatchSize(cp.flowCtx.Cfg.Settings) + flushSize := storageccl.MaxImportBatchSize(flowCtx.Cfg.Settings) // We create two bulk adders so as to combat the excessive flushing of small // SSTs which was observed when using a single adder for both primary and @@ -150,8 +175,8 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro // of the pkIndexAdder buffer be set below that of the indexAdder buffer. // Otherwise, as a consequence of filling up faster the pkIndexAdder buffer // will hog memory as it tries to grow more aggressively. - minBufferSize, maxBufferSize, stepSize := storageccl.ImportBufferConfigSizes(cp.flowCtx.Cfg.Settings, true /* isPKAdder */) - pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{ + minBufferSize, maxBufferSize, stepSize := storageccl.ImportBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */) + pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{ Name: "pkAdder", DisallowShadowing: true, SkipDuplicates: true, @@ -161,12 +186,12 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro SSTSize: uint64(flushSize), }) if err != nil { - return err + return nil, err } defer pkIndexAdder.Close(ctx) - minBufferSize, maxBufferSize, stepSize = storageccl.ImportBufferConfigSizes(cp.flowCtx.Cfg.Settings, false /* isPKAdder */) - indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{ + minBufferSize, maxBufferSize, stepSize = storageccl.ImportBufferConfigSizes(flowCtx.Cfg.Settings, false /* isPKAdder */) + indexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{ Name: "indexAdder", DisallowShadowing: true, SkipDuplicates: true, @@ -176,7 +201,7 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro SSTSize: uint64(flushSize), }) if err != nil { - return err + return nil, err } defer indexAdder.Close(ctx) @@ -188,11 +213,11 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro // - idxFlushedRow contains `writtenRow` as of the last index adder flush. // In pkFlushedRow, idxFlushedRow and writtenFaction values are written via // `atomic` so the progress reporting go goroutine can read them. - writtenRow := make([]uint64, len(cp.spec.Uri)) - writtenFraction := make([]uint32, len(cp.spec.Uri)) + writtenRow := make([]uint64, len(spec.Uri)) + writtenFraction := make([]uint32, len(spec.Uri)) - pkFlushedRow := make([]uint64, len(cp.spec.Uri)) - idxFlushedRow := make([]uint64, len(cp.spec.Uri)) + pkFlushedRow := make([]uint64, len(spec.Uri)) + idxFlushedRow := make([]uint64, len(spec.Uri)) // When the PK adder flushes, everything written has been flushed, so we set // pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we @@ -214,9 +239,9 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro }) // offsets maps input file ID to a slot in our progress tracking slices. - offsets := make(map[int32]int, len(cp.spec.Uri)) + offsets := make(map[int32]int, len(spec.Uri)) var offset int - for i := range cp.spec.Uri { + for i := range spec.Uri { offsets[i] = offset offset++ } @@ -250,7 +275,7 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro } prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset])) } - cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}) + progCh <- prog } } }) @@ -314,43 +339,26 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro }) if err := g.Wait(); err != nil { - return err + return nil, err } if err := pkIndexAdder.Flush(ctx); err != nil { if err, ok := err.(storagebase.DuplicateKeyError); ok { - return errors.Wrap(err, "duplicate key in primary index") + return nil, errors.Wrap(err, "duplicate key in primary index") } - return err + return nil, err } if err := indexAdder.Flush(ctx); err != nil { if err, ok := err.(storagebase.DuplicateKeyError); ok { - return errors.Wrap(err, "duplicate key in index") + return nil, errors.Wrap(err, "duplicate key in index") } - return err - } - - var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - prog.CompletedRow = make(map[int32]uint64) - prog.CompletedFraction = make(map[int32]float32) - for i := range cp.spec.Uri { - prog.CompletedFraction[i] = 1.0 - prog.CompletedRow[i] = math.MaxUint64 + return nil, err } - cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog}) addedSummary := pkIndexAdder.GetSummary() addedSummary.Add(indexAdder.GetSummary()) - countsBytes, err := protoutil.Marshal(&addedSummary) - if err != nil { - return err - } - cp.output.Push(sqlbase.EncDatumRow{ - sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))), - sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))), - }, nil) - return nil + return &addedSummary, nil } func init() { diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 60d0cf3c9ec4..dd888b31b189 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -15,6 +15,7 @@ import ( "context" "io" "io/ioutil" + "math" "net/url" "strings" @@ -35,13 +36,14 @@ func runImport( ctx context.Context, flowCtx *execinfra.FlowCtx, spec *execinfrapb.ReadImportDataSpec, + progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, kvCh chan row.KVBatch, -) error { +) (*roachpb.BulkOpSummary, error) { group := ctxgroup.WithContext(ctx) conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh) if err != nil { - return err + return nil, err } conv.start(group) @@ -53,7 +55,29 @@ func runImport( return conv.readFiles(ctx, spec.Uri, spec.Format, flowCtx.Cfg.ExternalStorage) }) - return group.Wait() + // Ingest the KVs that the producer emitted to the chan and the row result + // at the end is one row containing an encoded BulkOpSummary. + var summary *roachpb.BulkOpSummary + group.GoCtx(func(ctx context.Context) error { + summary, err = ingestKvs(ctx, flowCtx, spec, progCh, kvCh) + if err != nil { + return err + } + var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + prog.CompletedRow = make(map[int32]uint64) + prog.CompletedFraction = make(map[int32]float32) + for i := range spec.Uri { + prog.CompletedFraction[i] = 1.0 + prog.CompletedRow[i] = math.MaxUint64 + } + progCh <- prog + return nil + }) + if err := group.Wait(); err != nil { + return nil, err + } + + return summary, nil } type readFileFunc func(context.Context, *fileReader, int32, string, chan string) error