From c6897748a782339c65372b860cd00cce96cee788 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 2 Aug 2019 14:03:36 -0400 Subject: [PATCH] importccl: Remove pre-buffering stage from direct ingest IMPORT This change removes the pre-buffering step in the direct ingest IMPORT code path. Previously, we would create separate buckets for each table's primary data, and when the bucket would be full we would flush it to the BulkAdder. Running an import on 3 nodes of tpcc 1K OOM'ed as a result of this buffer. Two big wins we got from this pre-buffering stage were: 1. We avoided worst case overlapping behavior in the AddSSTable calls as a result of flushing keys with the same TableIDIndexID prefix, together. 2. Secondary index KVs which were few and filled the bucket infrequently were flushed only a few times, resulting in fewer L0 (and total) files. In order to resolve this OOM, we decided to take advantage of the split keys we insert across AllIndexSpans of each table during IMPORT. Since the BulkAdder is split aware and does not allow SSTables to span across splits, we already achieve the non-overlapping property we strive for (as mentioned above). The downside is we lose the second win, as the KVs fed to the BulkAdder are now ungrouped. This results in larger number of smaller SSTs being flushed, causing a spike in L0 and total number of files, but overall less memory usage. Some statistics for IMPORT tpcc 1k on 1 node setup with and without pre-buffering: `without pre-buffering:` `Time`: 1h19m `Peak mem`: 6.2 GiB `Cumulative Compaction (GB)`: 52.43 GB `with pre-buffering:` `Time`: 1h13m `Peak mem`: 12.2 GiB `Cumulative Compaction (GB)`: 24.54 GiB This change also ENABLES the `import/experimental-direct-ingestion` roachtest. TODO: Currently experimenting using two adders, one for primary indexes and one for secondary indexes. This helps us achieve the second win as well. Will have a follow up PR once the roachtest stabilizes. Release note: None --- pkg/ccl/importccl/read_import_proc.go | 87 +++++++++------------------ pkg/cmd/roachtest/import.go | 1 - 2 files changed, 29 insertions(+), 59 deletions(-) diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index b63036d77b10..b8d1bb59de96 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -34,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -514,70 +513,42 @@ func wrapRowErr(err error, file string, row int64, code, format string, args ... func ingestKvs( ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue, ) error { - const sortBatchSize = 48 << 20 // 48MB - - // TODO(dt): buffer to disk instead of all in-mem. - - // Batching all kvs together leads to worst case overlap behavior in the - // resulting AddSSTable calls, leading to compactions and potentially L0 - // stalls. Instead maintain a separate buffer for each table's primary data. - // This optimizes for the case when the data arriving to IMPORT is already - // sorted by primary key, leading to no overlapping AddSSTable requests. Given - // that many workloads (and actual imported data) will be sorted by primary - // key, it makes sense to try to exploit this. + // We insert splits at every index span of the table prior to the invocation + // of this method. Since the BulkAdder is split aware when constructing SSTs, + // there is no risk of worst case overlap behavior in the resulting AddSSTable + // calls. // - // TODO(dan): This was merged because it stabilized direct ingest IMPORT, but - // we may be able to do something simpler (such as chunking along index - // boundaries in flush) or more general (such as chunking based on the common - // prefix of the last N kvs). - kvsByTableIDIndexID := make(map[string]roachpb.KeyValueByKey) - sizeByTableIDIndexID := make(map[string]int64) - - flush := func(ctx context.Context, buf roachpb.KeyValueByKey) error { - if len(buf) == 0 { - return nil - } - for i := range buf { - if err := adder.Add(ctx, buf[i].Key, buf[i].Value.RawBytes); err != nil { + // NB: We are getting rid of the pre-buffering stage which constructed + // separate buckets for each table's primary data, and flushed to the + // BulkAdder when the bucket was full. This is because, a tpcc 1k IMPORT would + // OOM when maintaining this buffer. Two big wins we got from this + // pre-buffering stage were: + // + // 1. We avoided worst case overlapping behavior in the AddSSTable calls as a + // result of flushing keys with the same TableIDIndexID prefix, together. + // + // 2. Secondary index KVs which were few and filled the bucket infrequently + // were flushed rarely, resulting in fewer L0 (and total) files. + // + // While we continue to achieve the first property as a result of the splits + // mentioned above, the KVs sent to the BulkAdder are no longer grouped which + // results in flushing a much larger number of small SSTs. This increases the + // number of L0 (and total) files, but with a lower memory usage. + // + // TODO(adityamaru): Once the roachtest import/experimental-direct-ingestion + // stabilizes, we can explore a two adder approach where we send primary + // indexes to one, and secondary indexes to the other. This should reduce the + // number of L0 (and total) files to be comparable to the previous + // implementation with pre-buffering. + for kvBatch := range kvCh { + for _, kv := range kvBatch { + if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { if _, ok := err.(storagebase.DuplicateKeyError); ok { return errors.WithStack(err) } return err } } - return nil - } - - for kvBatch := range kvCh { - for _, kv := range kvBatch { - tableLen, err := encoding.PeekLength(kv.Key) - if err != nil { - return err - } - indexLen, err := encoding.PeekLength(kv.Key[tableLen:]) - if err != nil { - return err - } - bufKey := kv.Key[:tableLen+indexLen] - kvsByTableIDIndexID[string(bufKey)] = append(kvsByTableIDIndexID[string(bufKey)], kv) - sizeByTableIDIndexID[string(bufKey)] += int64(len(kv.Key) + len(kv.Value.RawBytes)) - - // TODO(dan): Prevent unbounded memory usage by flushing the largest - // buffer when the total size of all buffers exceeds some threshold. - if s := sizeByTableIDIndexID[string(bufKey)]; s > sortBatchSize { - buf := kvsByTableIDIndexID[string(bufKey)] - if err := flush(ctx, buf); err != nil { - return err - } - kvsByTableIDIndexID[string(bufKey)] = buf[:0] - sizeByTableIDIndexID[string(bufKey)] = 0 - } - } - } - for _, buf := range kvsByTableIDIndexID { - if err := flush(ctx, buf); err != nil { - return err - } } if err := adder.Flush(ctx); err != nil { diff --git a/pkg/cmd/roachtest/import.go b/pkg/cmd/roachtest/import.go index 1d8609df970a..0706d80dae9c 100644 --- a/pkg/cmd/roachtest/import.go +++ b/pkg/cmd/roachtest/import.go @@ -97,7 +97,6 @@ func registerImportTPCC(r *testRegistry) { }) r.Add(testSpec{ Name: `import/experimental-direct-ingestion`, - Skip: `bricks cluster`, MinVersion: `v19.1.0`, Cluster: makeClusterSpec(3, cpu(16)), Timeout: 2 * time.Hour,