Skip to content

Commit

Permalink
importccl: Direct-ingest uses two bulk adders instead of one.
Browse files Browse the repository at this point in the history
This is another change to stabilize direct ingest import before
it is made the default.
As a consequence of cockroachdb#39271, the number of files (L0 and total),
along with the cumulative compaction size increased drastically.
A consequence of no longer creating buckets of TableIDIndexID
before flushing is that the single bulk adder would receive a
mix of primary and secondary index entries. Since SSTs cannot
span across the splits we inserted between index spans, it would
create numerous, small secondary index SSTs along with the
bigger primary index SSTs, and flush on reaching its limit
(which would be often).

By introducing two adders, one for ingesting primary index data,
and the other for ingesting secondary index data we regain the
ability to make fewer, bigger secondary index SSTs and flush less
often. The peak mem is lower than what prebuffering used to
hit, while the number of files (L0 and total), and the cumulative
compaction size return to prebuffering levels.

Some stats below for a tpcc 1k, on a 1 node cluster.

With prebuffering:
Total Files : 7670
L0 Files : 1848
Cumulative Compaction (GB): 24.54GiB

Without prebuffering, one adder:
Total Files : 22420
L0 Files : 16900
Cumulative Compaction (GB): 52.43 GiB

Without prebuffering, two adders:
Total Files : 6805
L0 Files : 1078
Cumulative Compaction (GB): 18.89GiB

Release note: None
  • Loading branch information
adityamaru27 committed Aug 7, 2019
1 parent 11a6a13 commit 594cc63
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 18 deletions.
78 changes: 61 additions & 17 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,19 +377,40 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
const bufferSize, flushSize = 64 << 20, 16 << 20
adder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)

// We create two bulk adders so as to combat the excessive flushing of
// small SSTs which was observed when using a single adder for both
// primary and secondary index kvs. The number of secondary index kvs are
// small, and so we expect the indexAdder to flush much less frequently
// than the pkIndexAdder.
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
pkIndexAdder.SetName("pkIndexAdder")
defer pkIndexAdder.Close(ctx)

indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
defer adder.Close(ctx)
indexAdder.SetName("indexAdder")
defer indexAdder.Close(ctx)

// Drain the kvCh using the BulkAdder until it closes.
if err := ingestKvs(ctx, adder, kvCh); err != nil {
if err := ingestKvs(ctx, pkIndexAdder, indexAdder, kvCh); err != nil {
return err
}

added := adder.GetSummary()
countsBytes, err := protoutil.Marshal(&added)
pkAdded := pkIndexAdder.GetSummary()
indexAdded := indexAdder.GetSummary()
cumulativeAdded := &roachpb.BulkOpSummary{
DataSize: pkAdded.DataSize + indexAdded.DataSize,
Rows: pkAdded.Rows + indexAdded.Rows,
IndexEntries: pkAdded.IndexEntries + indexAdded.IndexEntries,
SystemRecords: pkAdded.SystemRecords + indexAdded.SystemRecords}

countsBytes, err := protoutil.Marshal(cumulativeAdded)
if err != nil {
return err
}
Expand Down Expand Up @@ -511,7 +532,10 @@ func wrapRowErr(err error, file string, row int64, code, format string, args ...
// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func ingestKvs(
ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue,
ctx context.Context,
pkIndexAdder storagebase.BulkAdder,
indexAdder storagebase.BulkAdder,
kvCh <-chan []roachpb.KeyValue,
) error {
// We insert splits at every index span of the table prior to the invocation
// of this method. Since the BulkAdder is split aware when constructing SSTs,
Expand All @@ -534,24 +558,44 @@ func ingestKvs(
// mentioned above, the KVs sent to the BulkAdder are no longer grouped which
// results in flushing a much larger number of small SSTs. This increases the
// number of L0 (and total) files, but with a lower memory usage.
//
// TODO(adityamaru): Once the roachtest import/experimental-direct-ingestion
// stabilizes, we can explore a two adder approach where we send primary
// indexes to one, and secondary indexes to the other. This should reduce the
// number of L0 (and total) files to be comparable to the previous
// implementation with pre-buffering.
for kvBatch := range kvCh {
for _, kv := range kvBatch {
if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
_, _, indexID, indexErr := sqlbase.DecodeTableIDIndexID(kv.Key)
if indexErr != nil {
return indexErr
}

// Decide which adder to send the KV to by extracting its index id.
//
// TODO(adityamaru): There is a potential optimization of plumbing the
// different putters, and differentiating based on their type. It might be
// more efficient than parsing every kv.
if indexID == 1 {
if err := pkIndexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
} else {
if err := indexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
return err
}
}
}

if err := adder.Flush(ctx); err != nil {
if err := pkIndexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}

if err := indexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type BufferingAdder struct {
total int
bufferSize int
}

// name of the BufferingAdder for the purpose of logging only.
name string
}

// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed
Expand All @@ -66,10 +69,17 @@ func (b *BufferingAdder) SkipLocalDuplicates(skip bool) {
b.sink.skipDuplicates = skip
}

// SetName sets the name of the adder being used for the purpose of logging
// stats.
func (b *BufferingAdder) SetName(name string) {
b.name = name
}

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
log.VEventf(ctx, 2,
"bulk adder ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
"bulk adder %s ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
b.name,
sz(b.sink.totalRows.DataSize),
b.flushCounts.total, b.flushCounts.bufferSize,
b.sink.flushCounts.total, b.sink.flushCounts.split, b.sink.flushCounts.sstSize,
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type BulkAdder interface {
// SetDisallowShadowing sets the flag which controls whether shadowing of
// existing keys is permitted in the AddSSTable method.
SetDisallowShadowing(bool)
// SetName sets the name of the adder for the purpose of logging adder stats.
SetName(string)
}

// DuplicateKeyError represents a failed attempt to ingest the same key twice
Expand Down

0 comments on commit 594cc63

Please sign in to comment.