Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: Direct-ingest uses two bulk adders instead of one. #39424

Merged
merged 1 commit into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,18 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
const bufferSize, flushSize = 64 << 20, 16 << 20
adder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)

// We create two bulk adders so as to combat the excessive flushing of
// small SSTs which was observed when using a single adder for both
// primary and secondary index kvs. The number of secondary index kvs are
// small, and so we expect the indexAdder to flush much less frequently
// than the pkIndexAdder.
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
adder.SetDisallowShadowing(true)
pkIndexAdder.SetName("pkIndexAdder")
pkIndexAdder.SetDisallowShadowing(true)
// AddSSTable with disallowShadowing=true does not consider a KV with the
// same ts and value to be a collision. This is to support the resumption
// of IMPORT jobs which might re-import some already ingested, but not
Expand All @@ -390,16 +397,26 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
// To provide a similar behavior with KVs within the same SST, we silently
// skip over duplicates with the same value, instead of throwing a
// uniqueness error.
adder.SkipLocalDuplicatesWithSameValues(true)
defer adder.Close(ctx)
pkIndexAdder.SkipLocalDuplicatesWithSameValues(true)
defer pkIndexAdder.Close(ctx)

indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
indexAdder.SetName("indexAdder")
indexAdder.SetDisallowShadowing(true)
indexAdder.SkipLocalDuplicatesWithSameValues(true)
defer indexAdder.Close(ctx)

// Drain the kvCh using the BulkAdder until it closes.
if err := ingestKvs(ctx, adder, kvCh); err != nil {
if err := ingestKvs(ctx, pkIndexAdder, indexAdder, kvCh); err != nil {
return err
}

added := adder.GetSummary()
countsBytes, err := protoutil.Marshal(&added)
addedSummary := pkIndexAdder.GetSummary()
addedSummary.Add(indexAdder.GetSummary())
countsBytes, err := protoutil.Marshal(&addedSummary)
if err != nil {
return err
}
Expand Down Expand Up @@ -521,7 +538,10 @@ func wrapRowErr(err error, file string, row int64, code, format string, args ...
// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func ingestKvs(
ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue,
ctx context.Context,
pkIndexAdder storagebase.BulkAdder,
indexAdder storagebase.BulkAdder,
kvCh <-chan []roachpb.KeyValue,
) error {
// We insert splits at every index span of the table prior to the invocation
// of this method. Since the BulkAdder is split aware when constructing SSTs,
Expand All @@ -544,24 +564,44 @@ func ingestKvs(
// mentioned above, the KVs sent to the BulkAdder are no longer grouped which
// results in flushing a much larger number of small SSTs. This increases the
// number of L0 (and total) files, but with a lower memory usage.
//
// TODO(adityamaru): Once the roachtest import/experimental-direct-ingestion
// stabilizes, we can explore a two adder approach where we send primary
// indexes to one, and secondary indexes to the other. This should reduce the
// number of L0 (and total) files to be comparable to the previous
// implementation with pre-buffering.
for kvBatch := range kvCh {
for _, kv := range kvBatch {
if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
_, _, indexID, indexErr := sqlbase.DecodeTableIDIndexID(kv.Key)
if indexErr != nil {
return indexErr
}

// Decide which adder to send the KV to by extracting its index id.
//
// TODO(adityamaru): There is a potential optimization of plumbing the
// different putters, and differentiating based on their type. It might be
// more efficient than parsing every kv.
if indexID == 1 {
if err := pkIndexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
} else {
if err := indexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
return err
}
}
}

if err := adder.Flush(ctx); err != nil {
if err := pkIndexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}

if err := indexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type BufferingAdder struct {
total int
bufferSize int
}

// name of the BufferingAdder for the purpose of logging only.
name string
}

// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed
Expand Down Expand Up @@ -72,10 +75,17 @@ func (b *BufferingAdder) SkipLocalDuplicatesWithSameValues(skip bool) {
b.sink.skipDuplicateKeysWithSameValue = skip
}

// SetName sets the name of the adder being used for the purpose of logging
// stats.
func (b *BufferingAdder) SetName(name string) {
b.name = name
}

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
log.VEventf(ctx, 2,
"bulk adder ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
"bulk adder %s ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
b.name,
sz(b.sink.totalRows.DataSize),
b.flushCounts.total, b.flushCounts.bufferSize,
b.sink.flushCounts.total, b.sink.flushCounts.split, b.sink.flushCounts.sstSize,
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type BulkAdder interface {
// SetDisallowShadowing sets the flag which controls whether shadowing of
// existing keys is permitted in the AddSSTable method.
SetDisallowShadowing(bool)
// SetName sets the name of the adder for the purpose of logging adder stats.
SetName(string)
}

// DuplicateKeyError represents a failed attempt to ingest the same key twice
Expand Down