Skip to content

Commit

Permalink
Merge pull request #41909 from spaskob/import-refactor
Browse files Browse the repository at this point in the history
importccl: refactor out all business logic from the processor
  • Loading branch information
spaskob authored Oct 24, 2019
2 parents fad4071 + d993f9e commit 7ac14eb
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 50 deletions.
102 changes: 55 additions & 47 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,38 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {
defer tracing.FinishSpan(span)
defer cp.output.ProducerDone()

group := ctxgroup.WithContext(ctx)
kvCh := make(chan row.KVBatch, 10)
group.GoCtx(func(ctx context.Context) error { return runImport(ctx, cp.flowCtx, &cp.spec, kvCh) })
progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

var summary *roachpb.BulkOpSummary
var err error
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, kvCh)
}()

for prog := range progCh {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog})
}

// Ingest the KVs that the producer emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
group.GoCtx(func(ctx context.Context) error {
return cp.ingestKvs(ctx, kvCh)
})
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
}

if err := group.Wait(); err != nil {
// Once the import is done, send back to the controller the serialized
// summary of the import operation. For more info see roachpb.BulkOpSummary.
countsBytes, err := protoutil.Marshal(summary)
if err != nil {
cp.output.Push(nil, &execinfrapb.ProducerMetadata{Err: err})
return
}
cp.output.Push(sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, nil)
}

func makeInputConverter(
Expand Down Expand Up @@ -133,13 +152,19 @@ func makeInputConverter(

// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan row.KVBatch) error {
func ingestKvs(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.ReadImportDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
kvCh <-chan row.KVBatch,
) (*roachpb.BulkOpSummary, error) {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos}

flushSize := storageccl.MaxImportBatchSize(cp.flowCtx.Cfg.Settings)
flushSize := storageccl.MaxImportBatchSize(flowCtx.Cfg.Settings)

// We create two bulk adders so as to combat the excessive flushing of small
// SSTs which was observed when using a single adder for both primary and
Expand All @@ -150,8 +175,8 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
// of the pkIndexAdder buffer be set below that of the indexAdder buffer.
// Otherwise, as a consequence of filling up faster the pkIndexAdder buffer
// will hog memory as it tries to grow more aggressively.
minBufferSize, maxBufferSize, stepSize := storageccl.ImportBufferConfigSizes(cp.flowCtx.Cfg.Settings, true /* isPKAdder */)
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
minBufferSize, maxBufferSize, stepSize := storageccl.ImportBufferConfigSizes(flowCtx.Cfg.Settings, true /* isPKAdder */)
pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "pkAdder",
DisallowShadowing: true,
SkipDuplicates: true,
Expand All @@ -161,12 +186,12 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
SSTSize: uint64(flushSize),
})
if err != nil {
return err
return nil, err
}
defer pkIndexAdder.Close(ctx)

minBufferSize, maxBufferSize, stepSize = storageccl.ImportBufferConfigSizes(cp.flowCtx.Cfg.Settings, false /* isPKAdder */)
indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
minBufferSize, maxBufferSize, stepSize = storageccl.ImportBufferConfigSizes(flowCtx.Cfg.Settings, false /* isPKAdder */)
indexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB, writeTS, storagebase.BulkAdderOptions{
Name: "indexAdder",
DisallowShadowing: true,
SkipDuplicates: true,
Expand All @@ -176,7 +201,7 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
SSTSize: uint64(flushSize),
})
if err != nil {
return err
return nil, err
}
defer indexAdder.Close(ctx)

Expand All @@ -188,11 +213,11 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
// - idxFlushedRow contains `writtenRow` as of the last index adder flush.
// In pkFlushedRow, idxFlushedRow and writtenFaction values are written via
// `atomic` so the progress reporting go goroutine can read them.
writtenRow := make([]uint64, len(cp.spec.Uri))
writtenFraction := make([]uint32, len(cp.spec.Uri))
writtenRow := make([]uint64, len(spec.Uri))
writtenFraction := make([]uint32, len(spec.Uri))

pkFlushedRow := make([]uint64, len(cp.spec.Uri))
idxFlushedRow := make([]uint64, len(cp.spec.Uri))
pkFlushedRow := make([]uint64, len(spec.Uri))
idxFlushedRow := make([]uint64, len(spec.Uri))

// When the PK adder flushes, everything written has been flushed, so we set
// pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we
Expand All @@ -214,9 +239,9 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
})

// offsets maps input file ID to a slot in our progress tracking slices.
offsets := make(map[int32]int, len(cp.spec.Uri))
offsets := make(map[int32]int, len(spec.Uri))
var offset int
for i := range cp.spec.Uri {
for i := range spec.Uri {
offsets[i] = offset
offset++
}
Expand Down Expand Up @@ -250,7 +275,7 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
}
prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset]))
}
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog})
progCh <- prog
}
}
})
Expand Down Expand Up @@ -314,43 +339,26 @@ func (cp *readImportDataProcessor) ingestKvs(ctx context.Context, kvCh <-chan ro
})

if err := g.Wait(); err != nil {
return err
return nil, err
}

if err := pkIndexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.Wrap(err, "duplicate key in primary index")
return nil, errors.Wrap(err, "duplicate key in primary index")
}
return err
return nil, err
}

if err := indexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.Wrap(err, "duplicate key in index")
return nil, errors.Wrap(err, "duplicate key in index")
}
return err
}

var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.CompletedRow = make(map[int32]uint64)
prog.CompletedFraction = make(map[int32]float32)
for i := range cp.spec.Uri {
prog.CompletedFraction[i] = 1.0
prog.CompletedRow[i] = math.MaxUint64
return nil, err
}
cp.output.Push(nil, &execinfrapb.ProducerMetadata{BulkProcessorProgress: &prog})

addedSummary := pkIndexAdder.GetSummary()
addedSummary.Add(indexAdder.GetSummary())
countsBytes, err := protoutil.Marshal(&addedSummary)
if err != nil {
return err
}
cp.output.Push(sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
sqlbase.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, nil)
return nil
return &addedSummary, nil
}

func init() {
Expand Down
30 changes: 27 additions & 3 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"io"
"io/ioutil"
"math"
"net/url"
"strings"

Expand All @@ -35,13 +36,14 @@ func runImport(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.ReadImportDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
kvCh chan row.KVBatch,
) error {
) (*roachpb.BulkOpSummary, error) {
group := ctxgroup.WithContext(ctx)

conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh)
if err != nil {
return err
return nil, err
}
conv.start(group)

Expand All @@ -53,7 +55,29 @@ func runImport(
return conv.readFiles(ctx, spec.Uri, spec.Format, flowCtx.Cfg.ExternalStorage)
})

return group.Wait()
// Ingest the KVs that the producer emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
var summary *roachpb.BulkOpSummary
group.GoCtx(func(ctx context.Context) error {
summary, err = ingestKvs(ctx, flowCtx, spec, progCh, kvCh)
if err != nil {
return err
}
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.CompletedRow = make(map[int32]uint64)
prog.CompletedFraction = make(map[int32]float32)
for i := range spec.Uri {
prog.CompletedFraction[i] = 1.0
prog.CompletedRow[i] = math.MaxUint64
}
progCh <- prog
return nil
})
if err := group.Wait(); err != nil {
return nil, err
}

return summary, nil
}

type readFileFunc func(context.Context, *fileReader, int32, string, chan string) error
Expand Down

0 comments on commit 7ac14eb

Please sign in to comment.