Skip to content

Commit

Permalink
globalsort: opt merge step mem to run on 8c (#51924)
Browse files Browse the repository at this point in the history
ref #50752
  • Loading branch information
D3Hunter authored Mar 22, 2024
1 parent f23c6fc commit df79e5c
Show file tree
Hide file tree
Showing 21 changed files with 280 additions and 107 deletions.
3 changes: 0 additions & 3 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,8 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
datas,
s.store,
int64(5*size.MB),
64*1024,
mergeOutput,
DefaultBlockSize,
DefaultMemSizeLimit,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
Expand Down
12 changes: 5 additions & 7 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,18 @@ func (r *byteReader) switchToConcurrentReader() error {
// containing those bytes. The content of returned slice may be changed after
// next call.
func (r *byteReader) readNBytes(n int) ([]byte, error) {
readLen, bs := r.next(n)
if readLen == n && len(bs) == 1 {
return bs[0], nil
}
// need to flatten bs
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)
}
if n > int(size.GB) {
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
}
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)

readLen, bs := r.next(n)
if readLen == n && len(bs) == 1 {
return bs[0], nil
}
// need to flatten bs
auxBuf := make([]byte, n)
for _, b := range bs {
copy(auxBuf[len(auxBuf)-n:], b)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ func NewMergeKVIter(
outerConcurrency = 1
}
concurrentReaderConcurrency := max(256/outerConcurrency, 8)
// TODO: merge-sort step passes outerConcurrency=0, so this bufSize might be
// too large when checkHotspot = true(add-index).
largeBufSize := ConcurrentReaderBufferSizePerConc * concurrentReaderConcurrency
memPool := membuf.NewPool(
membuf.WithBlockNum(1), // currently only one reader will become hotspot
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/external/kv_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ import (
"encoding/binary"
"io"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

var (
// default read buf size of kvReader, this buf is split into 3 parts, 2 for prefetch
// from storage, 1 for read by user.
defaultReadBufferSize = 64 * units.KiB
)

type kvReader struct {
byteReader *byteReader
}
Expand Down
64 changes: 36 additions & 28 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,52 @@ package external
import (
"context"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

var (
// MaxMergingFilesPerThread is the maximum number of files that can be merged by a
// single thread. This value comes from the fact that 16 threads are ok to merge 4k
// files in parallel, so we set it to 250.
MaxMergingFilesPerThread = 250
// MinUploadPartSize is the minimum size of each part when uploading files to
// external storage, which is 5MiB for both S3 and GCS.
MinUploadPartSize int64 = 5 * units.MiB
)

// MergeOverlappingFiles reads from given files whose key range may overlap
// and writes to new sorted, nonoverlapping files.
func MergeOverlappingFiles(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
partSize int64,
readBufferSize int,
newFilePrefix string,
blockSize int,
memSizeLimit uint64,
writeBatchCount uint64,
onClose OnCloseFunc,
concurrency int,
checkHotspot bool,
) error {
dataFilesSlice := splitDataFiles(paths, concurrency)
// during encode&sort step, the writer-limit is aligned to block size, so we
// need align this too. the max additional written size per file is max-block-size.
// for max-block-size = 32MiB, adding (max-block-size * MaxMergingFilesPerThread)/10000 ~ 1MiB
// to part-size is enough.
partSize = max(MinUploadPartSize, partSize+units.MiB)

logutil.Logger(ctx).Info("start to merge overlapping files",
zap.Int("file-count", len(paths)),
zap.Int("file-groups", len(dataFilesSlice)),
zap.Int("concurrency", concurrency))
zap.Int("concurrency", concurrency),
zap.Int64("part-size", partSize))
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(concurrency)
partSize = max(int64(5*size.MB), partSize+int64(1*size.MB))
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
Expand All @@ -45,12 +57,9 @@ func MergeOverlappingFiles(
files,
store,
partSize,
readBufferSize,
newFilePrefix,
uuid.New().String(),
memSizeLimit,
blockSize,
writeBatchCount,
onClose,
checkHotspot,
)
Expand All @@ -59,10 +68,11 @@ func MergeOverlappingFiles(
return eg.Wait()
}

// split input data files into max 'concurrency' shares evenly, if there are not
// enough files, merge at least 2 files in one batch.
// split input data files into multiple shares evenly, with the max number files
// in each share MaxMergingFilesPerThread, if there are not enough files, merge at
// least 2 files in one batch.
func splitDataFiles(paths []string, concurrency int) [][]string {
shares := concurrency
shares := max((len(paths)+MaxMergingFilesPerThread-1)/MaxMergingFilesPerThread, concurrency)
if len(paths) < 2*concurrency {
shares = max(1, len(paths)/2)
}
Expand Down Expand Up @@ -91,30 +101,29 @@ func splitDataFiles(paths []string, concurrency int) [][]string {
// accurately, here we only consider the memory used by our code, the estimate max
// memory usage of this function is:
//
// memSizeLimit
// + 20 * partSize
// + 20 * 5MiB(stat file, we might not use all part, as stat file is quite small)
// + readBufferSize * len(paths)
// defaultOneWriterMemSizeLimit
// + MaxMergingFilesPerThread * (X + defaultReadBufferSize)
// + maxUploadWorkersPerThread * (data-part-size + 5MiB(stat-part-size))
// + memory taken by concurrent reading if check-hotspot is enabled
//
// memSizeLimit = 256 MiB now.
// partSize = index-kv-data-file-size / (10000 / MergeSortOverlapThreshold) for import into.
// readBufferSize = 64 KiB now.
// len(paths) >= kv-files-in-subtask(suppose MergeSortOverlapThreshold) / concurrency
// where X is memory used for each read connection, it's http2 for GCP, X might be
// 4 or more MiB, http1 for S3, it's smaller.
//
// with current default values, on machine with 2G per core, the estimate max memory
// usage for import into is:
//
// 128 + 250 * (4 + 64/1024) + 8 * (25.6 + 5) ~ 1.36 GiB
// where 25.6 is max part-size when there is only data kv = 1024*250/10000 = 25.6MiB
//
// TODO: seems it might OOM if partSize = 256 / (10000/4000) = 100 MiB, when write
// external storage is slow.
// for add-index, it uses more memory as check-hotspot is enabled.
func mergeOverlappingFilesInternal(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
partSize int64,
readBufferSize int,
newFilePrefix string,
writerID string,
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
onClose OnCloseFunc,
checkHotspot bool,
) (err error) {
Expand All @@ -127,7 +136,7 @@ func mergeOverlappingFilesInternal(
}()

zeroOffsets := make([]uint64, len(paths))
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot, 0)
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, defaultReadBufferSize, checkHotspot, 0)
if err != nil {
return err
}
Expand All @@ -139,9 +148,8 @@ func mergeOverlappingFilesInternal(
}()

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetMemorySizeLimit(defaultOneWriterMemSizeLimit).
SetBlockSize(blockSize).
SetWriterBatchCount(writeBatchCount).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
Expand Down
23 changes: 22 additions & 1 deletion br/pkg/lightning/backend/external/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestSplitDataFiles(t *testing.T) {
allPaths := make([]string, 0, 100)
allPaths := make([]string, 0, 110)
for i := 0; i < cap(allPaths); i++ {
allPaths = append(allPaths, fmt.Sprintf("%d", i))
}
Expand Down Expand Up @@ -101,4 +101,25 @@ func TestSplitDataFiles(t *testing.T) {
require.Equal(t, c.result, result)
})
}

bak := MaxMergingFilesPerThread
t.Cleanup(func() {
MaxMergingFilesPerThread = bak
})
MaxMergingFilesPerThread = 10
require.Equal(t, [][]string{
allPaths[:10], allPaths[10:19], allPaths[19:28], allPaths[28:37],
allPaths[37:46], allPaths[46:55], allPaths[55:64], allPaths[64:73],
allPaths[73:82], allPaths[82:91],
}, splitDataFiles(allPaths[:91], 8))
require.Equal(t, [][]string{
allPaths[:10], allPaths[10:20], allPaths[20:30], allPaths[30:40],
allPaths[40:50], allPaths[50:60], allPaths[60:70], allPaths[70:80],
allPaths[80:90], allPaths[90:99],
}, splitDataFiles(allPaths[:99], 8))
require.Equal(t, [][]string{
allPaths[:10], allPaths[10:20], allPaths[20:29], allPaths[29:38],
allPaths[38:47], allPaths[47:56], allPaths[56:65], allPaths[65:74],
allPaths[74:83], allPaths[83:92], allPaths[92:101],
}, splitDataFiles(allPaths[:101], 8))
}
15 changes: 12 additions & 3 deletions br/pkg/lightning/backend/external/onefile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ import (
"path/filepath"
"slices"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
)

// defaultOneWriterMemSizeLimit is the memory size limit for one writer. OneWriter can write
// data in stream, this memory limit is only used to avoid allocating too many times
// for each KV pair.
var defaultOneWriterMemSizeLimit uint64 = 128 * units.MiB

// OneFileWriter is used to write data into external storage
// with only one file for data and stat.
type OneFileWriter struct {
Expand Down Expand Up @@ -64,12 +69,16 @@ func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) (
err error,
) {
w.dataFile = filepath.Join(w.filenamePrefix, "one-file")
w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{Concurrency: 20, PartSize: partSize})
w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{
Concurrency: maxUploadWorkersPerThread,
PartSize: partSize})
if err != nil {
return err
}
w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file")
w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)})
w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{
Concurrency: maxUploadWorkersPerThread,
PartSize: MinUploadPartSize})
if err != nil {
w.logger.Info("create stat writer failed",
zap.Error(err))
Expand Down
23 changes: 16 additions & 7 deletions br/pkg/lightning/backend/external/onefile_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,22 @@ func TestMergeOverlappingFilesInternal(t *testing.T) {
require.NoError(t, writer.WriteRow(ctx, key, val, dbkv.IntHandle(i)))
}
require.NoError(t, writer.Close(ctx))

readBufSizeBak := defaultReadBufferSize
memLimitBak := defaultOneWriterMemSizeLimit
t.Cleanup(func() {
defaultReadBufferSize = readBufSizeBak
defaultOneWriterMemSizeLimit = memLimitBak
})
defaultReadBufferSize = 100
defaultOneWriterMemSizeLimit = 1000
require.NoError(t, mergeOverlappingFilesInternal(
ctx,
[]string{"/test/0/0", "/test/0/1", "/test/0/2", "/test/0/3", "/test/0/4"},
memStore,
int64(5*size.MB),
100,
"/test2",
"mergeID",
1000,
1000,
8*1024,
nil,
true,
))
Expand Down Expand Up @@ -297,17 +301,22 @@ func TestOnefileWriterManyRows(t *testing.T) {
onClose := func(summary *WriterSummary) {
resSummary = summary
}
readBufSizeBak := defaultReadBufferSize
memLimitBak := defaultOneWriterMemSizeLimit
t.Cleanup(func() {
defaultReadBufferSize = readBufSizeBak
defaultOneWriterMemSizeLimit = memLimitBak
})
defaultReadBufferSize = 100
defaultOneWriterMemSizeLimit = 1000
require.NoError(t, mergeOverlappingFilesInternal(
ctx,
[]string{"/test/0/one-file"},
memStore,
int64(5*size.MB),
100,
"/test2",
"mergeID",
1000,
1000,
8*1024,
onClose,
true,
))
Expand Down
11 changes: 8 additions & 3 deletions br/pkg/lightning/backend/external/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,22 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
mergeMemSize := (rand.Intn(10) + 1) * 100
// use random mergeMemSize to test different memLimit of writer.
// reproduce one bug, see https://github.com/pingcap/tidb/issues/49590
bufSizeBak := defaultReadBufferSize
memLimitBak := defaultOneWriterMemSizeLimit
t.Cleanup(func() {
defaultReadBufferSize = bufSizeBak
defaultOneWriterMemSizeLimit = memLimitBak
})
defaultReadBufferSize = 100
defaultOneWriterMemSizeLimit = uint64(mergeMemSize)
for _, group := range dataGroup {
require.NoError(t, MergeOverlappingFiles(
ctx,
group,
memStore,
int64(5*size.MB),
100,
"/test2",
mergeMemSize,
uint64(mergeMemSize),
8*1024,
closeFn,
1,
true,
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/external/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ func Test3KFilesRangeSplitter(t *testing.T) {
w := NewWriterBuilder().
SetMemorySizeLimit(DefaultMemSizeLimit).
SetBlockSize(32*units.MiB). // dataKVGroupBlockSize
SetWriterBatchCount(8*1024).
SetPropKeysDistance(8*1024).
SetPropSizeDistance(size.MB).
SetOnCloseFunc(onClose).
Expand Down
Loading

0 comments on commit df79e5c

Please sign in to comment.