Skip to content

Commit

Permalink
Merge #39418 #39424
Browse files Browse the repository at this point in the history
39418: stats: truncate large datums when sampling for histogram r=rytaft a=rytaft

This commit adds logic to truncate long bit arrays, byte strings,
strings, and collated strings during sampling for histogram creation.
We do this to avoid using excessive memory or disk space during
sampling and storage of the final histogram.

Release note: None

39424: importccl: Direct-ingest uses two bulk adders instead of one. r=adityamaru27 a=adityamaru27

This is another change to stabilize direct ingest import before
it is made the default.
As a consequence of #39271, the number of files (L0 and total),
along with the cumulative compaction size increased drastically.
A consequence of no longer creating buckets of TableIDIndexID
before flushing is that the single bulk adder would receive a
mix of primary and secondary index entries. Since SSTs cannot
span across the splits we inserted between index spans, it would
create numerous, small secondary index SSTs along with the
bigger primary index SSTs, and flush on reaching its limit
(which would be often).

By introducing two adders, one for ingesting primary index data,
and the other for ingesting secondary index data we regain the
ability to make fewer, bigger secondary index SSTs and flush less
often. The peak mem is lower than what prebuffering used to
hit, while the number of files (L0 and total), and the cumulative
compaction size return to prebuffering levels.

Some stats below for a tpcc 1k, on a 1 node cluster.

With prebuffering:
Total Files : 7670
L0 Files : 1848
Cumulative Compaction (GB): 24.54GiB

Without prebuffering, one adder:
Total Files : 22420
L0 Files : 16900
Cumulative Compaction (GB): 52.43 GiB

Without prebuffering, two adders:
Total Files : 6805
L0 Files : 1078
Cumulative Compaction (GB): 18.89GiB

Release note: None

Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
3 people committed Aug 8, 2019
3 parents d7ec277 + 0c524f1 + e0d214f commit e7c418c
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 30 deletions.
78 changes: 59 additions & 19 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,18 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
const bufferSize, flushSize = 64 << 20, 16 << 20
adder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)

// We create two bulk adders so as to combat the excessive flushing of
// small SSTs which was observed when using a single adder for both
// primary and secondary index kvs. The number of secondary index kvs are
// small, and so we expect the indexAdder to flush much less frequently
// than the pkIndexAdder.
pkIndexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
adder.SetDisallowShadowing(true)
pkIndexAdder.SetName("pkIndexAdder")
pkIndexAdder.SetDisallowShadowing(true)
// AddSSTable with disallowShadowing=true does not consider a KV with the
// same ts and value to be a collision. This is to support the resumption
// of IMPORT jobs which might re-import some already ingested, but not
Expand All @@ -390,16 +397,26 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
// To provide a similar behavior with KVs within the same SST, we silently
// skip over duplicates with the same value, instead of throwing a
// uniqueness error.
adder.SkipLocalDuplicatesWithSameValues(true)
defer adder.Close(ctx)
pkIndexAdder.SkipLocalDuplicatesWithSameValues(true)
defer pkIndexAdder.Close(ctx)

indexAdder, err := cp.flowCtx.Cfg.BulkAdder(ctx, cp.flowCtx.Cfg.DB, bufferSize, flushSize, writeTS)
if err != nil {
return err
}
indexAdder.SetName("indexAdder")
indexAdder.SetDisallowShadowing(true)
indexAdder.SkipLocalDuplicatesWithSameValues(true)
defer indexAdder.Close(ctx)

// Drain the kvCh using the BulkAdder until it closes.
if err := ingestKvs(ctx, adder, kvCh); err != nil {
if err := ingestKvs(ctx, pkIndexAdder, indexAdder, kvCh); err != nil {
return err
}

added := adder.GetSummary()
countsBytes, err := protoutil.Marshal(&added)
addedSummary := pkIndexAdder.GetSummary()
addedSummary.Add(indexAdder.GetSummary())
countsBytes, err := protoutil.Marshal(&addedSummary)
if err != nil {
return err
}
Expand Down Expand Up @@ -521,7 +538,10 @@ func wrapRowErr(err error, file string, row int64, code, format string, args ...
// ingestKvs drains kvs from the channel until it closes, ingesting them using
// the BulkAdder. It handles the required buffering/sorting/etc.
func ingestKvs(
ctx context.Context, adder storagebase.BulkAdder, kvCh <-chan []roachpb.KeyValue,
ctx context.Context,
pkIndexAdder storagebase.BulkAdder,
indexAdder storagebase.BulkAdder,
kvCh <-chan []roachpb.KeyValue,
) error {
// We insert splits at every index span of the table prior to the invocation
// of this method. Since the BulkAdder is split aware when constructing SSTs,
Expand All @@ -544,24 +564,44 @@ func ingestKvs(
// mentioned above, the KVs sent to the BulkAdder are no longer grouped which
// results in flushing a much larger number of small SSTs. This increases the
// number of L0 (and total) files, but with a lower memory usage.
//
// TODO(adityamaru): Once the roachtest import/experimental-direct-ingestion
// stabilizes, we can explore a two adder approach where we send primary
// indexes to one, and secondary indexes to the other. This should reduce the
// number of L0 (and total) files to be comparable to the previous
// implementation with pre-buffering.
for kvBatch := range kvCh {
for _, kv := range kvBatch {
if err := adder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
_, _, indexID, indexErr := sqlbase.DecodeTableIDIndexID(kv.Key)
if indexErr != nil {
return indexErr
}

// Decide which adder to send the KV to by extracting its index id.
//
// TODO(adityamaru): There is a potential optimization of plumbing the
// different putters, and differentiating based on their type. It might be
// more efficient than parsing every kv.
if indexID == 1 {
if err := pkIndexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
} else {
if err := indexAdder.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
if _, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}
return err
}
}
}

if err := adder.Flush(ctx); err != nil {
if err := pkIndexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
return err
}

if err := indexAdder.Flush(ctx); err != nil {
if err, ok := err.(storagebase.DuplicateKeyError); ok {
return errors.WithStack(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er
return false, errors.NewAssertionErrorWithWrappedErrf(err, "decoding rank column")
}
// Retain the rows with the top ranks.
if err := s.sr.SampleRow(ctx, row[:s.rankCol], uint64(rank)); err != nil {
if err := s.sr.SampleRow(ctx, s.evalCtx, row[:s.rankCol], uint64(rank)); err != nil {
return false, err
}
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, err er

// Use Int63 so we don't have headaches converting to DInt.
rank := uint64(rng.Int63())
if err := s.sr.SampleRow(ctx, row, rank); err != nil {
if err := s.sr.SampleRow(ctx, s.evalCtx, row, rank); err != nil {
return false, err
}
}
Expand Down
79 changes: 74 additions & 5 deletions pkg/sql/stats/row_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"container/heap"
"context"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -81,7 +82,7 @@ func (sr *SampleReservoir) Pop() interface{} { panic("unimplemented") }

// SampleRow looks at a row and either drops it or adds it to the reservoir.
func (sr *SampleReservoir) SampleRow(
ctx context.Context, row sqlbase.EncDatumRow, rank uint64,
ctx context.Context, evalCtx *tree.EvalContext, row sqlbase.EncDatumRow, rank uint64,
) error {
if len(sr.samples) < cap(sr.samples) {
// We haven't accumulated enough rows yet, just append.
Expand All @@ -94,7 +95,7 @@ func (sr *SampleReservoir) SampleRow(
return err
}
}
if err := sr.copyRow(ctx, rowCopy, row); err != nil {
if err := sr.copyRow(ctx, evalCtx, rowCopy, row); err != nil {
return err
}
sr.samples = append(sr.samples, SampledRow{Row: rowCopy, Rank: rank})
Expand All @@ -106,7 +107,7 @@ func (sr *SampleReservoir) SampleRow(
}
// Replace the max rank if ours is smaller.
if len(sr.samples) > 0 && rank < sr.samples[0].Rank {
if err := sr.copyRow(ctx, sr.samples[0].Row, row); err != nil {
if err := sr.copyRow(ctx, evalCtx, sr.samples[0].Row, row); err != nil {
return err
}
sr.samples[0].Rank = rank
Expand All @@ -120,7 +121,9 @@ func (sr *SampleReservoir) Get() []SampledRow {
return sr.samples
}

func (sr *SampleReservoir) copyRow(ctx context.Context, dst, src sqlbase.EncDatumRow) error {
func (sr *SampleReservoir) copyRow(
ctx context.Context, evalCtx *tree.EvalContext, dst, src sqlbase.EncDatumRow,
) error {
for i := range src {
// Copy only the decoded datum to ensure that we remove any reference to
// the encoded bytes. The encoded bytes would have been scanned in a batch
Expand All @@ -131,8 +134,14 @@ func (sr *SampleReservoir) copyRow(ctx context.Context, dst, src sqlbase.EncDatu
}
beforeSize := dst[i].Size()
dst[i] = sqlbase.DatumToEncDatum(&sr.colTypes[i], src[i].Datum)
afterSize := dst[i].Size()
if afterSize > uintptr(maxBytesPerSample) {
dst[i].Datum = truncateDatum(evalCtx, dst[i].Datum, maxBytesPerSample)
afterSize = dst[i].Size()
}

// Perform memory accounting.
if afterSize := dst[i].Size(); sr.memAcc != nil && afterSize > beforeSize {
if sr.memAcc != nil && afterSize > beforeSize {
if err := sr.memAcc.Grow(ctx, int64(afterSize-beforeSize)); err != nil {
return err
}
Expand All @@ -141,3 +150,63 @@ func (sr *SampleReservoir) copyRow(ctx context.Context, dst, src sqlbase.EncDatu
}
return nil
}

const maxBytesPerSample = 400

// truncateDatum truncates large datums to avoid using excessive memory or disk
// space. It performs a best-effort attempt to return a datum that is similar
// to d using at most maxBytes bytes.
//
// For example, if maxBytes=10, "Cockroach Labs" would be truncated to
// "Cockroach ".
func truncateDatum(evalCtx *tree.EvalContext, d tree.Datum, maxBytes int) tree.Datum {
switch t := d.(type) {
case *tree.DBitArray:
b := tree.DBitArray{BitArray: t.ToWidth(uint(maxBytes * 8))}
return &b

case *tree.DBytes:
// Make a copy so the memory from the original byte string can be garbage
// collected.
b := make([]byte, maxBytes)
copy(b, *t)
return tree.NewDBytes(tree.DBytes(b))

case *tree.DString:
return tree.NewDString(truncateString(string(*t), maxBytes))

case *tree.DCollatedString:
contents := truncateString(t.Contents, maxBytes)

// Note: this will end up being larger than maxBytes due to the key and
// locale, so this is just a best-effort attempt to limit the size.
return tree.NewDCollatedString(contents, t.Locale, &evalCtx.CollationEnv)

default:
// It's not easy to truncate other types (e.g. Decimal).
// TODO(rytaft): If the total memory limit is exceeded then the histogram
// should not be constructed.
return d
}
}

// truncateString truncates long strings to the longest valid substring that is
// less than maxBytes bytes. It is rune-aware so it does not cut unicode
// characters in half.
func truncateString(s string, maxBytes int) string {
last := 0
// For strings, range skips from rune to rune and i is the byte index of
// the current rune.
for i := range s {
if i > maxBytes {
break
}
last = i
}

// Copy the truncated string so that the memory from the longer string can
// be garbage collected.
b := make([]byte, last)
copy(b, s)
return string(b)
}
44 changes: 41 additions & 3 deletions pkg/sql/stats/row_sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sort"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand All @@ -25,13 +26,13 @@ import (

// runSampleTest feeds rows with the given ranks through a reservoir
// of a given size and verifies the results are correct.
func runSampleTest(t *testing.T, numSamples int, ranks []int) {
func runSampleTest(t *testing.T, evalCtx *tree.EvalContext, numSamples int, ranks []int) {
ctx := context.Background()
var sr SampleReservoir
sr.Init(numSamples, []types.T{*types.Int}, nil /* memAcc */)
for _, r := range ranks {
d := sqlbase.DatumToEncDatum(types.Int, tree.NewDInt(tree.DInt(r)))
if err := sr.SampleRow(ctx, sqlbase.EncDatumRow{d}, uint64(r)); err != nil {
if err := sr.SampleRow(ctx, evalCtx, sqlbase.EncDatumRow{d}, uint64(r)); err != nil {
t.Errorf("%v", err)
}
}
Expand Down Expand Up @@ -62,6 +63,7 @@ func runSampleTest(t *testing.T, numSamples int, ranks []int) {
}

func TestSampleReservoir(t *testing.T) {
evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
for _, n := range []int{10, 100, 1000, 10000} {
rng, _ := randutil.NewPseudoRand()
ranks := make([]int, n)
Expand All @@ -70,8 +72,44 @@ func TestSampleReservoir(t *testing.T) {
}
for _, k := range []int{1, 5, 10, 100} {
t.Run(fmt.Sprintf("%d/%d", n, k), func(t *testing.T) {
runSampleTest(t, k, ranks)
runSampleTest(t, &evalCtx, k, ranks)
})
}
}
}

func TestTruncateDatum(t *testing.T) {
evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
runTest := func(d, expected tree.Datum) {
actual := truncateDatum(&evalCtx, d, 10 /* maxBytes */)
if actual.Compare(&evalCtx, expected) != 0 {
t.Fatalf("expected %s but found %s", expected.String(), actual.String())
}
}

original1, err := tree.ParseDBitArray("0110110101111100001100110110101111100001100110110101111" +
"10000110011011010111110000110011011010111110000110011011010111110000110")
if err != nil {
t.Fatal(err)
}
expected1, err := tree.ParseDBitArray("0110110101111100001100110110101111100001100110110101111" +
"1000011001101101011111000")
if err != nil {
t.Fatal(err)
}
runTest(original1, expected1)

original2 := tree.DBytes("deadbeef1234567890")
expected2 := tree.DBytes("deadbeef12")
runTest(&original2, &expected2)

original3 := tree.DString("Hello 世界")
expected3 := tree.DString("Hello 世")
runTest(&original3, &expected3)

original4 := tree.NewDCollatedString(`IT was lovely summer weather in the country, and the golden
corn, the green oats, and the haystacks piled up in the meadows looked beautiful`,
"en_US", &tree.CollationEnvironment{})
expected4 := tree.NewDCollatedString("IT was lov", "en_US", &tree.CollationEnvironment{})
runTest(original4, expected4)
}
12 changes: 11 additions & 1 deletion pkg/storage/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type BufferingAdder struct {
total int
bufferSize int
}

// name of the BufferingAdder for the purpose of logging only.
name string
}

// MakeBulkAdder makes a storagebase.BulkAdder that buffers and sorts K/Vs passed
Expand Down Expand Up @@ -72,10 +75,17 @@ func (b *BufferingAdder) SkipLocalDuplicatesWithSameValues(skip bool) {
b.sink.skipDuplicateKeysWithSameValue = skip
}

// SetName sets the name of the adder being used for the purpose of logging
// stats.
func (b *BufferingAdder) SetName(name string) {
b.name = name
}

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
log.VEventf(ctx, 2,
"bulk adder ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
"bulk adder %s ingested %s, flushed %d times, %d due to buffer size. Flushed %d files, %d due to ranges, %d due to sst size",
b.name,
sz(b.sink.totalRows.DataSize),
b.flushCounts.total, b.flushCounts.bufferSize,
b.sink.flushCounts.total, b.sink.flushCounts.split, b.sink.flushCounts.sstSize,
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/storagebase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type BulkAdder interface {
// SetDisallowShadowing sets the flag which controls whether shadowing of
// existing keys is permitted in the AddSSTable method.
SetDisallowShadowing(bool)
// SetName sets the name of the adder for the purpose of logging adder stats.
SetName(string)
}

// DuplicateKeyError represents a failed attempt to ingest the same key twice
Expand Down

0 comments on commit e7c418c

Please sign in to comment.