Skip to content

Commit

Permalink
Merge #39271
Browse files Browse the repository at this point in the history
39271: importccl: Remove pre-buffering stage from direct ingest IMPORT r=dt a=adityamaru27

This change removes the pre-buffering step in the direct ingest
IMPORT code path. Previously, we would create separate buckets
for each table's primary data, and when the bucket would be full
we would flush it to the BulkAdder. Running an import on 3 nodes
of tpcc 1K OOM'ed as a result of this buffer.

Two big wins we got from this pre-buffering stage were:
1. We avoided worst case overlapping behavior in the AddSSTable
calls as a result of flushing keys with the same TableIDIndexID
prefix, together.

2. Secondary index KVs which were few and filled the bucket
infrequently were flushed only a few times, resulting in fewer
L0 (and total) files.

In order to resolve this OOM, we decided to take advantage of the
split keys we insert across AllIndexSpans of each table during
IMPORT. Since the BulkAdder is split aware and does not allow
SSTables to span across splits, we already achieve the
non-overlapping property we strive for (as mentioned above).
The downside is we lose the second win, as the KVs fed to the
BulkAdder are now ungrouped. This results in larger  number of
smaller SSTs being flushed, causing a spike in L0 and
total number of files, but overall less memory usage.

This change also ENABLES the `import/experimental-direct-ingestion`
roachtest.

TODO: Currently experimenting using two adders, one for primary
indexes and one for secondary indexes. This helps us achieve the
second win as well. Will have a follow up PR once the roachtest
stabilizes.

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru27 committed Aug 4, 2019
2 parents ea50f7c + c689774 commit 836c28d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 59 deletions.
87 changes: 29 additions & 58 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -514,70 +513,42 @@ func wrapRowErr(err error, file string, row int64, code, format string, args ...
func ingestKvs(
ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue,
) error {
const sortBatchSize = 48 << 20 // 48MB

// TODO(dt): buffer to disk instead of all in-mem.

// Batching all kvs together leads to worst case overlap behavior in the
// resulting AddSSTable calls, leading to compactions and potentially L0
// stalls. Instead maintain a separate buffer for each table's primary data.
// This optimizes for the case when the data arriving to IMPORT is already
// sorted by primary key, leading to no overlapping AddSSTable requests. Given
// that many workloads (and actual imported data) will be sorted by primary
// key, it makes sense to try to exploit this.
// We insert splits at every index span of the table prior to the invocation
// of this method. Since the BulkAdder is split aware when constructing SSTs,
// there is no risk of worst case overlap behavior in the resulting AddSSTable
// calls.
//
// TODO(dan): This was merged because it stabilized direct ingest IMPORT, but
// we may be able to do something simpler (such as chunking along index
// boundaries in flush) or more general (such as chunking based on the common
// prefix of the last N kvs).
kvsByTableIDIndexID := make(map[string]roachpb.KeyValueByKey)
sizeByTableIDIndexID := make(map[string]int64)

flush := func(ctx context.Context, buf roachpb.KeyValueByKey) error {
if len(buf) == 0 {
return nil
}
for i := range buf {
if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil {
// NB: We are getting rid of the pre-buffering stage which constructed
// separate buckets for each table's primary data, and flushed to the
// BulkAdder when the bucket was full. This is because, a tpcc 1k IMPORT would
// OOM when maintaining this buffer. Two big wins we got from this
// pre-buffering stage were:
//
// 1. We avoided worst case overlapping behavior in the AddSSTable calls as a
// result of flushing keys with the same TableIDIndexID prefix, together.
//
// 2. Secondary index KVs which were few and filled the bucket infrequently
// were flushed rarely, resulting in fewer L0 (and total) files.
//
// While we continue to achieve the first property as a result of the splits
// mentioned above, the KVs sent to the BulkAdder are no longer grouped which
// results in flushing a much larger number of small SSTs. This increases the
// number of L0 (and total) files, but with a lower memory usage.
//
// TODO(adityamaru): Once the roachtest import/experimental-direct-ingestion
// stabilizes, we can explore a two adder approach where we send primary
// indexes to one, and secondary indexes to the other. This should reduce the
// number of L0 (and total) files to be comparable to the previous
// implementation with pre-buffering.
for kvBatch := range kvCh {
for _, kv := range kvBatch {
if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
}
return nil
}

for kvBatch := range kvCh {
for _, kv := range kvBatch {
tableLen, err := encoding.PeekLength(kv.Key)
if err != nil {
return err
}
indexLen, err := encoding.PeekLength(kv.Key[tableLen:])
if err != nil {
return err
}
bufKey := kv.Key[:tableLen+indexLen]
kvsByTableIDIndexID[string(bufKey)] = append(kvsByTableIDIndexID[string(bufKey)], kv)
sizeByTableIDIndexID[string(bufKey)] += int64(len(kv.Key) + len(kv.Value.RawBytes))

// TODO(dan): Prevent unbounded memory usage by flushing the largest
// buffer when the total size of all buffers exceeds some threshold.
if s := sizeByTableIDIndexID[string(bufKey)]; s > sortBatchSize {
buf := kvsByTableIDIndexID[string(bufKey)]
if err := flush(ctx, buf); err != nil {
return err
}
kvsByTableIDIndexID[string(bufKey)] = buf[:0]
sizeByTableIDIndexID[string(bufKey)] = 0
}
}
}
for _, buf := range kvsByTableIDIndexID {
if err := flush(ctx, buf); err != nil {
return err
}
}

if err := adder.Flush(ctx); err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func registerImportTPCC(r *testRegistry) {
})
r.Add(testSpec{
Name: `import/experimental-direct-ingestion`,
Skip: `bricks cluster`,
MinVersion: `v19.1.0`,
Cluster: makeClusterSpec(3, cpu(16)),
Timeout: 2 * time.Hour,
Expand Down

0 comments on commit 836c28d

Please sign in to comment.