From e0d214f8527aa3a97424af7a2dadc604fa06ea65 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Wed, 7 Aug 2019 16:07:55 -0400 Subject: [PATCH 1/2] importccl: Direct-ingest uses two bulk adders instead of one. This is another change to stabilize direct ingest import before it is made the default. As a consequence of #39271, the number of files (L0 and total), along with the cumulative compaction size increased drastically. A consequence of no longer creating buckets of TableIDIndexID before flushing is that the single bulk adder would receive a mix of primary and secondary index entries. Since SSTs cannot span across the splits we inserted between index spans, it would create numerous, small secondary index SSTs along with the bigger primary index SSTs, and flush on reaching its limit (which would be often). By introducing two adders, one for ingesting primary index data, and the other for ingesting secondary index data we regain the ability to make fewer, bigger secondary index SSTs and flush less often. The peak mem is lower than what prebuffering used to hit, while the number of files (L0 and total), and the cumulative compaction size return to prebuffering levels. Some stats below for a tpcc 1k, on a 1 node cluster. With prebuffering: Total Files : 7670 L0 Files : 1848 Cumulative Compaction (GB): 24.54GiB Without prebuffering, one adder: Total Files : 22420 L0 Files : 16900 Cumulative Compaction (GB): 52.43 GiB Without prebuffering, two adders: Total Files : 6805 L0 Files : 1078 Cumulative Compaction (GB): 18.89GiB Release note: None --- pkg/ccl/importccl/read_import_proc.go | 78 ++++++++++++++++++++------- pkg/storage/bulk/buffering_adder.go | 12 ++++- pkg/storage/storagebase/bulk_adder.go | 2 + 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 137c5ca02027..a763d413c3a6 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -377,11 +377,18 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error { writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos} const bufferSize, flushSize = 64 << 20, 16 << 20 - adder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS) + + // We create two bulk adders so as to combat the excessive flushing of + // small SSTs which was observed when using a single adder for both + // primary and secondary index kvs. The number of secondary index kvs are + // small, and so we expect the indexAdder to flush much less frequently + // than the pkIndexAdder. + pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS) if err != nil { return err } - adder.SetDisallowShadowing(true) + pkIndexAdder.SetName("pkIndexAdder") + pkIndexAdder.SetDisallowShadowing(true) // AddSSTable with disallowShadowing=true does not consider a KV with the // same ts and value to be a collision. This is to support the resumption // of IMPORT jobs which might re-import some already ingested, but not @@ -390,16 +397,26 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error { // To provide a similar behavior with KVs within the same SST, we silently // skip over duplicates with the same value, instead of throwing a // uniqueness error. - adder.SkipLocalDuplicatesWithSameValues(true) - defer adder.Close(ctx) + pkIndexAdder.SkipLocalDuplicatesWithSameValues(true) + defer pkIndexAdder.Close(ctx) + + indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS) + if err != nil { + return err + } + indexAdder.SetName("indexAdder") + indexAdder.SetDisallowShadowing(true) + indexAdder.SkipLocalDuplicatesWithSameValues(true) + defer indexAdder.Close(ctx) // Drain the kvCh using the BulkAdder until it closes. - if err := ingestKvs(ctx, adder, kvCh); err != nil { + if err := ingestKvs(ctx, pkIndexAdder, indexAdder, kvCh); err != nil { return err } - added := adder.GetSummary() - countsBytes, err := protoutil.Marshal(&added) + addedSummary := pkIndexAdder.GetSummary() + addedSummary.Add(indexAdder.GetSummary()) + countsBytes, err := protoutil.Marshal(&addedSummary) if err != nil { return err } @@ -521,7 +538,10 @@ func wrapRowErr(err error, file string, row int64, code, format string, args ... // ingestKvs drains kvs from the channel until it closes, ingesting them using // the BulkAdder. It handles the required buffering/sorting/etc. func ingestKvs( - ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue, + ctx context.Context, + pkIndexAdder storagebase.BulkAdder, + indexAdder storagebase.BulkAdder, + kvCh <-chan []roachpb.KeyValue, ) error { // We insert splits at every index span of the table prior to the invocation // of this method. Since the BulkAdder is split aware when constructing SSTs, @@ -544,24 +564,44 @@ func ingestKvs( // mentioned above, the KVs sent to the BulkAdder are no longer grouped which // results in flushing a much larger number of small SSTs. This increases the // number of L0 (and total) files, but with a lower memory usage. - // - // TODO(adityamaru): Once the roachtest import/experimental-direct-ingestion - // stabilizes, we can explore a two adder approach where we send primary - // indexes to one, and secondary indexes to the other. This should reduce the - // number of L0 (and total) files to be comparable to the previous - // implementation with pre-buffering. for kvBatch := range kvCh { for _, kv := range kvBatch { - if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { - if _, ok := err.(storagebase.DuplicateKeyError); ok { - return errors.WithStack(err) + _, _, indexID, indexErr := sqlbase.DecodeTableIDIndexID(kv.Key) + if indexErr != nil { + return indexErr + } + + // Decide which adder to send the KV to by extracting its index id. + // + // TODO(adityamaru): There is a potential optimization of plumbing the + // different putters, and differentiating based on their type. It might be + // more efficient than parsing every kv. + if indexID == 1 { + if err := pkIndexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { + if _, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err + } + } else { + if err := indexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { + if _, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err } - return err } } } - if err := adder.Flush(ctx); err != nil { + if err := pkIndexAdder.Flush(ctx); err != nil { + if err, ok := err.(storagebase.DuplicateKeyError); ok { + return errors.WithStack(err) + } + return err + } + + if err := indexAdder.Flush(ctx); err != nil { if err, ok := err.(storagebase.DuplicateKeyError); ok { return errors.WithStack(err) } diff --git a/pkg/storage/bulk/buffering_adder.go b/pkg/storage/bulk/buffering_adder.go index 65cfc04033c9..96db372d701c 100644 --- a/pkg/storage/bulk/buffering_adder.go +++ b/pkg/storage/bulk/buffering_adder.go @@ -40,6 +40,9 @@ type BufferingAdder struct { total int bufferSize int } + + // name of the BufferingAdder for the purpose of logging only. + name string } // MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed @@ -72,10 +75,17 @@ func (b *BufferingAdder) SkipLocalDuplicatesWithSameValues(skip bool) { b.sink.skipDuplicateKeysWithSameValue = skip } +// SetName sets the name of the adder being used for the purpose of logging +// stats. +func (b *BufferingAdder) SetName(name string) { + b.name = name +} + // Close closes the underlying SST builder. func (b *BufferingAdder) Close(ctx context.Context) { log.VEventf(ctx, 2, - "bulk adder ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size", + "bulk adder %s ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size", + b.name, sz(b.sink.totalRows.DataSize), b.flushCounts.total, b.flushCounts.bufferSize, b.sink.flushCounts.total, b.sink.flushCounts.split, b.sink.flushCounts.sstSize, diff --git a/pkg/storage/storagebase/bulk_adder.go b/pkg/storage/storagebase/bulk_adder.go index 2ce563063a4f..0646a51a9f06 100644 --- a/pkg/storage/storagebase/bulk_adder.go +++ b/pkg/storage/storagebase/bulk_adder.go @@ -46,6 +46,8 @@ type BulkAdder interface { // SetDisallowShadowing sets the flag which controls whether shadowing of // existing keys is permitted in the AddSSTable method. SetDisallowShadowing(bool) + // SetName sets the name of the adder for the purpose of logging adder stats. + SetName(string) } // DuplicateKeyError represents a failed attempt to ingest the same key twice From 0c524f1deae878aad68388fa0f3287d503a003cf Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Wed, 7 Aug 2019 20:33:02 +0200 Subject: [PATCH 2/2] stats: truncate large datums when sampling for histogram This commit adds logic to truncate long bit arrays, byte strings, strings, and collated strings during sampling for histogram creation. We do this to avoid using excessive memory or disk space during sampling and storage of the final histogram. Release note: None --- pkg/sql/distsqlrun/sample_aggregator.go | 2 +- pkg/sql/distsqlrun/sampler.go | 2 +- pkg/sql/stats/row_sampling.go | 79 +++++++++++++++++++++++-- pkg/sql/stats/row_sampling_test.go | 44 +++++++++++++- 4 files changed, 117 insertions(+), 10 deletions(-) diff --git a/pkg/sql/distsqlrun/sample_aggregator.go b/pkg/sql/distsqlrun/sample_aggregator.go index bcc53de6bc23..fe0f685feae2 100644 --- a/pkg/sql/distsqlrun/sample_aggregator.go +++ b/pkg/sql/distsqlrun/sample_aggregator.go @@ -229,7 +229,7 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er return false, errors.NewAssertionErrorWithWrappedErrf(err, "decoding rank column") } // Retain the rows with the top ranks. - if err := s.sr.SampleRow(ctx, row[:s.rankCol], uint64(rank)); err != nil { + if err := s.sr.SampleRow(ctx, s.evalCtx, row[:s.rankCol], uint64(rank)); err != nil { return false, err } continue diff --git a/pkg/sql/distsqlrun/sampler.go b/pkg/sql/distsqlrun/sampler.go index 5dfe9636aa71..c7b725391773 100644 --- a/pkg/sql/distsqlrun/sampler.go +++ b/pkg/sql/distsqlrun/sampler.go @@ -310,7 +310,7 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, err er // Use Int63 so we don't have headaches converting to DInt. rank := uint64(rng.Int63()) - if err := s.sr.SampleRow(ctx, row, rank); err != nil { + if err := s.sr.SampleRow(ctx, s.evalCtx, row, rank); err != nil { return false, err } } diff --git a/pkg/sql/stats/row_sampling.go b/pkg/sql/stats/row_sampling.go index e47926db621a..986ad2ca42df 100644 --- a/pkg/sql/stats/row_sampling.go +++ b/pkg/sql/stats/row_sampling.go @@ -14,6 +14,7 @@ import ( "container/heap" "context" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -81,7 +82,7 @@ func (sr *SampleReservoir) Pop() interface{} { panic("unimplemented") } // SampleRow looks at a row and either drops it or adds it to the reservoir. func (sr *SampleReservoir) SampleRow( - ctx context.Context, row sqlbase.EncDatumRow, rank uint64, + ctx context.Context, evalCtx *tree.EvalContext, row sqlbase.EncDatumRow, rank uint64, ) error { if len(sr.samples) < cap(sr.samples) { // We haven't accumulated enough rows yet, just append. @@ -94,7 +95,7 @@ func (sr *SampleReservoir) SampleRow( return err } } - if err := sr.copyRow(ctx, rowCopy, row); err != nil { + if err := sr.copyRow(ctx, evalCtx, rowCopy, row); err != nil { return err } sr.samples = append(sr.samples, SampledRow{Row: rowCopy, Rank: rank}) @@ -106,7 +107,7 @@ func (sr *SampleReservoir) SampleRow( } // Replace the max rank if ours is smaller. if len(sr.samples) > 0 && rank < sr.samples[0].Rank { - if err := sr.copyRow(ctx, sr.samples[0].Row, row); err != nil { + if err := sr.copyRow(ctx, evalCtx, sr.samples[0].Row, row); err != nil { return err } sr.samples[0].Rank = rank @@ -120,7 +121,9 @@ func (sr *SampleReservoir) Get() []SampledRow { return sr.samples } -func (sr *SampleReservoir) copyRow(ctx context.Context, dst, src sqlbase.EncDatumRow) error { +func (sr *SampleReservoir) copyRow( + ctx context.Context, evalCtx *tree.EvalContext, dst, src sqlbase.EncDatumRow, +) error { for i := range src { // Copy only the decoded datum to ensure that we remove any reference to // the encoded bytes. The encoded bytes would have been scanned in a batch @@ -131,8 +134,14 @@ func (sr *SampleReservoir) copyRow(ctx context.Context, dst, src sqlbase.EncDatu } beforeSize := dst[i].Size() dst[i] = sqlbase.DatumToEncDatum(&sr.colTypes[i], src[i].Datum) + afterSize := dst[i].Size() + if afterSize > uintptr(maxBytesPerSample) { + dst[i].Datum = truncateDatum(evalCtx, dst[i].Datum, maxBytesPerSample) + afterSize = dst[i].Size() + } + // Perform memory accounting. - if afterSize := dst[i].Size(); sr.memAcc != nil && afterSize > beforeSize { + if sr.memAcc != nil && afterSize > beforeSize { if err := sr.memAcc.Grow(ctx, int64(afterSize-beforeSize)); err != nil { return err } @@ -141,3 +150,63 @@ func (sr *SampleReservoir) copyRow(ctx context.Context, dst, src sqlbase.EncDatu } return nil } + +const maxBytesPerSample = 400 + +// truncateDatum truncates large datums to avoid using excessive memory or disk +// space. It performs a best-effort attempt to return a datum that is similar +// to d using at most maxBytes bytes. +// +// For example, if maxBytes=10, "Cockroach Labs" would be truncated to +// "Cockroach ". +func truncateDatum(evalCtx *tree.EvalContext, d tree.Datum, maxBytes int) tree.Datum { + switch t := d.(type) { + case *tree.DBitArray: + b := tree.DBitArray{BitArray: t.ToWidth(uint(maxBytes * 8))} + return &b + + case *tree.DBytes: + // Make a copy so the memory from the original byte string can be garbage + // collected. + b := make([]byte, maxBytes) + copy(b, *t) + return tree.NewDBytes(tree.DBytes(b)) + + case *tree.DString: + return tree.NewDString(truncateString(string(*t), maxBytes)) + + case *tree.DCollatedString: + contents := truncateString(t.Contents, maxBytes) + + // Note: this will end up being larger than maxBytes due to the key and + // locale, so this is just a best-effort attempt to limit the size. + return tree.NewDCollatedString(contents, t.Locale, &evalCtx.CollationEnv) + + default: + // It's not easy to truncate other types (e.g. Decimal). + // TODO(rytaft): If the total memory limit is exceeded then the histogram + // should not be constructed. + return d + } +} + +// truncateString truncates long strings to the longest valid substring that is +// less than maxBytes bytes. It is rune-aware so it does not cut unicode +// characters in half. +func truncateString(s string, maxBytes int) string { + last := 0 + // For strings, range skips from rune to rune and i is the byte index of + // the current rune. + for i := range s { + if i > maxBytes { + break + } + last = i + } + + // Copy the truncated string so that the memory from the longer string can + // be garbage collected. + b := make([]byte, last) + copy(b, s) + return string(b) +} diff --git a/pkg/sql/stats/row_sampling_test.go b/pkg/sql/stats/row_sampling_test.go index dc77a93034fe..41117793465a 100644 --- a/pkg/sql/stats/row_sampling_test.go +++ b/pkg/sql/stats/row_sampling_test.go @@ -17,6 +17,7 @@ import ( "sort" "testing" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -25,13 +26,13 @@ import ( // runSampleTest feeds rows with the given ranks through a reservoir // of a given size and verifies the results are correct. -func runSampleTest(t *testing.T, numSamples int, ranks []int) { +func runSampleTest(t *testing.T, evalCtx *tree.EvalContext, numSamples int, ranks []int) { ctx := context.Background() var sr SampleReservoir sr.Init(numSamples, []types.T{*types.Int}, nil /* memAcc */) for _, r := range ranks { d := sqlbase.DatumToEncDatum(types.Int, tree.NewDInt(tree.DInt(r))) - if err := sr.SampleRow(ctx, sqlbase.EncDatumRow{d}, uint64(r)); err != nil { + if err := sr.SampleRow(ctx, evalCtx, sqlbase.EncDatumRow{d}, uint64(r)); err != nil { t.Errorf("%v", err) } } @@ -62,6 +63,7 @@ func runSampleTest(t *testing.T, numSamples int, ranks []int) { } func TestSampleReservoir(t *testing.T) { + evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) for _, n := range []int{10, 100, 1000, 10000} { rng, _ := randutil.NewPseudoRand() ranks := make([]int, n) @@ -70,8 +72,44 @@ func TestSampleReservoir(t *testing.T) { } for _, k := range []int{1, 5, 10, 100} { t.Run(fmt.Sprintf("%d/%d", n, k), func(t *testing.T) { - runSampleTest(t, k, ranks) + runSampleTest(t, &evalCtx, k, ranks) }) } } } + +func TestTruncateDatum(t *testing.T) { + evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings()) + runTest := func(d, expected tree.Datum) { + actual := truncateDatum(&evalCtx, d, 10 /* maxBytes */) + if actual.Compare(&evalCtx, expected) != 0 { + t.Fatalf("expected %s but found %s", expected.String(), actual.String()) + } + } + + original1, err := tree.ParseDBitArray("0110110101111100001100110110101111100001100110110101111" + + "10000110011011010111110000110011011010111110000110011011010111110000110") + if err != nil { + t.Fatal(err) + } + expected1, err := tree.ParseDBitArray("0110110101111100001100110110101111100001100110110101111" + + "1000011001101101011111000") + if err != nil { + t.Fatal(err) + } + runTest(original1, expected1) + + original2 := tree.DBytes("deadbeef1234567890") + expected2 := tree.DBytes("deadbeef12") + runTest(&original2, &expected2) + + original3 := tree.DString("Hello 世界") + expected3 := tree.DString("Hello 世") + runTest(&original3, &expected3) + + original4 := tree.NewDCollatedString(`IT was lovely summer weather in the country, and the golden +corn, the green oats, and the haystacks piled up in the meadows looked beautiful`, + "en_US", &tree.CollationEnvironment{}) + expected4 := tree.NewDCollatedString("IT was lov", "en_US", &tree.CollationEnvironment{}) + runTest(original4, expected4) +}