Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: refactor out all business logic from the processor #41909

Merged
merged 1 commit into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 55 additions & 47 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,38 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {
defer tracing.FinishSpan(span)
defer cp.output.ProducerDone()

group := ctxgroup.WithContext(ctx)
kvCh := make(chan row.KVBatch, 10)
group.GoCtx(func(ctx context.Context) error { return runImport(ctx, cp.flowCtx, &cp.spec, kvCh) })
progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

var summary *roachpb.BulkOpSummary
var err error
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, kvCh)
}()

for prog := range progCh {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog})
}

// Ingest the KVs that the producer emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
group.GoCtx(func(ctx context.Context) error {
return cp.ingestKvs(ctx, kvCh)
})
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
}

if err := group.Wait(); err != nil {
// Once the import is done, send back to the controller the serialized
// summary of the import operation. For more info see roachpb.BulkOpSummary.
countsBytes, err := protoutil.Marshal(summary)
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
}
cp.output.Push(sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, nil)
}

func makeInputConverter(
Expand Down Expand Up @@ -133,13 +152,19 @@ func makeInputConverter(

// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan row.KVBatch) error {
func ingestKvs(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.ReadImportDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
kvCh <-chan row.KVBatch,
) (*roachpb.BulkOpSummary, error) {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos}

flushSize := storageccl.MaxImportBatchSize(cp.flowCtx.Cfg.Settings)
flushSize := storageccl.MaxImportBatchSize(flowCtx.Cfg.Settings)

// We create two bulk adders so as to combat the excessive flushing of small
// SSTs which was observed when using a single adder for both primary and
Expand All @@ -150,8 +175,8 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
// of the pkIndexAdder buffer be set below that of the indexAdder buffer.
// Otherwise, as a consequence of filling up faster the pkIndexAdder buffer
// will hog memory as it tries to grow more aggressively.
minBufferSize, maxBufferSize, stepSize := storageccl.ImportBufferConfigSizes(cp.flowCtx.Cfg.Settings, true /* isPKAdder */)
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
minBufferSize, maxBufferSize, stepSize := storageccl.ImportBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */)
pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "pkAdder",
DisallowShadowing: true,
SkipDuplicates: true,
Expand All @@ -161,12 +186,12 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
SSTSize: uint64(flushSize),
})
if err != nil {
return err
return nil, err
}
defer pkIndexAdder.Close(ctx)

minBufferSize, maxBufferSize, stepSize = storageccl.ImportBufferConfigSizes(cp.flowCtx.Cfg.Settings, false /* isPKAdder */)
indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
minBufferSize, maxBufferSize, stepSize = storageccl.ImportBufferConfigSizes(flowCtx.Cfg.Settings, false /* isPKAdder */)
indexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "indexAdder",
DisallowShadowing: true,
SkipDuplicates: true,
Expand All @@ -176,7 +201,7 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
SSTSize: uint64(flushSize),
})
if err != nil {
return err
return nil, err
}
defer indexAdder.Close(ctx)

Expand All @@ -188,11 +213,11 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
// - idxFlushedRow contains `writtenRow` as of the last index adder flush.
// In pkFlushedRow, idxFlushedRow and writtenFaction values are written via
// `atomic` so the progress reporting go goroutine can read them.
writtenRow := make([]uint64, len(cp.spec.Uri))
writtenFraction := make([]uint32, len(cp.spec.Uri))
writtenRow := make([]uint64, len(spec.Uri))
writtenFraction := make([]uint32, len(spec.Uri))

pkFlushedRow := make([]uint64, len(cp.spec.Uri))
idxFlushedRow := make([]uint64, len(cp.spec.Uri))
pkFlushedRow := make([]uint64, len(spec.Uri))
idxFlushedRow := make([]uint64, len(spec.Uri))

// When the PK adder flushes, everything written has been flushed, so we set
// pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we
Expand All @@ -214,9 +239,9 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
})

// offsets maps input file ID to a slot in our progress tracking slices.
offsets := make(map[int32]int, len(cp.spec.Uri))
offsets := make(map[int32]int, len(spec.Uri))
var offset int
for i := range cp.spec.Uri {
for i := range spec.Uri {
offsets[i] = offset
offset++
}
Expand Down Expand Up @@ -250,7 +275,7 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
}
prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset]))
}
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog})
progCh <- prog
}
}
})
Expand Down Expand Up @@ -314,43 +339,26 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
})

if err := g.Wait(); err != nil {
return err
return nil, err
}

if err := pkIndexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.Wrap(err, "duplicate key in primary index")
return nil, errors.Wrap(err, "duplicate key in primary index")
}
return err
return nil, err
}

if err := indexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.Wrap(err, "duplicate key in index")
return nil, errors.Wrap(err, "duplicate key in index")
}
return err
}

var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.CompletedRow = make(map[int32]uint64)
prog.CompletedFraction = make(map[int32]float32)
for i := range cp.spec.Uri {
prog.CompletedFraction[i] = 1.0
prog.CompletedRow[i] = math.MaxUint64
return nil, err
}
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog})

addedSummary := pkIndexAdder.GetSummary()
addedSummary.Add(indexAdder.GetSummary())
countsBytes, err := protoutil.Marshal(&addedSummary)
if err != nil {
return err
}
cp.output.Push(sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, nil)
return nil
return &addedSummary, nil
}

func init() {
Expand Down
30 changes: 27 additions & 3 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"io"
"io/ioutil"
"math"
"net/url"
"strings"

Expand All @@ -35,13 +36,14 @@ func runImport(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.ReadImportDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
kvCh chan row.KVBatch,
) error {
) (*roachpb.BulkOpSummary, error) {
group := ctxgroup.WithContext(ctx)

conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh)
if err != nil {
return err
return nil, err
}
conv.start(group)

Expand All @@ -53,7 +55,29 @@ func runImport(
return conv.readFiles(ctx, spec.Uri, spec.Format, flowCtx.Cfg.ExternalStorage)
})

return group.Wait()
// Ingest the KVs that the producer emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
var summary *roachpb.BulkOpSummary
group.GoCtx(func(ctx context.Context) error {
summary, err = ingestKvs(ctx, flowCtx, spec, progCh, kvCh)
if err != nil {
return err
}
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.CompletedRow = make(map[int32]uint64)
prog.CompletedFraction = make(map[int32]float32)
for i := range spec.Uri {
prog.CompletedFraction[i] = 1.0
prog.CompletedRow[i] = math.MaxUint64
}
progCh <- prog
return nil
})
if err := group.Wait(); err != nil {
return nil, err
}

return summary, nil
}

type readFileFunc func(context.Context, *fileReader, int32, string, chan string) error
Expand Down