diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 0664f4e3acea5..1d63872a005f6 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -500,11 +500,8 @@ func mergeStep(t *testing.T, s *mergeTestSuite) { datas, s.store, int64(5*size.MB), - 64*1024, mergeOutput, DefaultBlockSize, - DefaultMemSizeLimit, - 8*1024, onClose, s.concurrency, s.mergeIterHotspot, diff --git a/br/pkg/lightning/backend/external/byte_reader.go b/br/pkg/lightning/backend/external/byte_reader.go index 53df918b012ba..d2d46939f6aab 100644 --- a/br/pkg/lightning/backend/external/byte_reader.go +++ b/br/pkg/lightning/backend/external/byte_reader.go @@ -208,20 +208,18 @@ func (r *byteReader) switchToConcurrentReader() error { // containing those bytes. The content of returned slice may be changed after // next call. func (r *byteReader) readNBytes(n int) ([]byte, error) { - readLen, bs := r.next(n) - if readLen == n && len(bs) == 1 { - return bs[0], nil - } - // need to flatten bs if n <= 0 { return nil, errors.Errorf("illegal n (%d) when reading from external storage", n) } if n > int(size.GB) { return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB) } - if n <= 0 { - return nil, errors.Errorf("illegal n (%d) when reading from external storage", n) + + readLen, bs := r.next(n) + if readLen == n && len(bs) == 1 { + return bs[0], nil } + // need to flatten bs auxBuf := make([]byte, n) for _, b := range bs { copy(auxBuf[len(auxBuf)-n:], b) diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index 5560bb463c059..4d9a7754fbcf6 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -516,6 +516,8 @@ func NewMergeKVIter( outerConcurrency = 1 } concurrentReaderConcurrency := max(256/outerConcurrency, 8) + // TODO: merge-sort step passes outerConcurrency=0, so this bufSize might be + // too large when checkHotspot = true(add-index). largeBufSize := ConcurrentReaderBufferSizePerConc * concurrentReaderConcurrency memPool := membuf.NewPool( membuf.WithBlockNum(1), // currently only one reader will become hotspot diff --git a/br/pkg/lightning/backend/external/kv_reader.go b/br/pkg/lightning/backend/external/kv_reader.go index def354b18f884..455abcda4ea91 100644 --- a/br/pkg/lightning/backend/external/kv_reader.go +++ b/br/pkg/lightning/backend/external/kv_reader.go @@ -19,12 +19,19 @@ import ( "encoding/binary" "io" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) +var ( + // default read buf size of kvReader, this buf is split into 3 parts, 2 for prefetch + // from storage, 1 for read by user. + defaultReadBufferSize = 64 * units.KiB +) + type kvReader struct { byteReader *byteReader } diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index d371ac7050718..41d92311ac2e3 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -3,15 +3,25 @@ package external import ( "context" + "github.com/docker/go-units" "github.com/google/uuid" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) +var ( + // MaxMergingFilesPerThread is the maximum number of files that can be merged by a + // single thread. This value comes from the fact that 16 threads are ok to merge 4k + // files in parallel, so we set it to 250. + MaxMergingFilesPerThread = 250 + // MinUploadPartSize is the minimum size of each part when uploading files to + // external storage, which is 5MiB for both S3 and GCS. + MinUploadPartSize int64 = 5 * units.MiB +) + // MergeOverlappingFiles reads from given files whose key range may overlap // and writes to new sorted, nonoverlapping files. func MergeOverlappingFiles( @@ -19,24 +29,26 @@ func MergeOverlappingFiles( paths []string, store storage.ExternalStorage, partSize int64, - readBufferSize int, newFilePrefix string, blockSize int, - memSizeLimit uint64, - writeBatchCount uint64, onClose OnCloseFunc, concurrency int, checkHotspot bool, ) error { dataFilesSlice := splitDataFiles(paths, concurrency) + // during encode&sort step, the writer-limit is aligned to block size, so we + // need align this too. the max additional written size per file is max-block-size. + // for max-block-size = 32MiB, adding (max-block-size * MaxMergingFilesPerThread)/10000 ~ 1MiB + // to part-size is enough. + partSize = max(MinUploadPartSize, partSize+units.MiB) logutil.Logger(ctx).Info("start to merge overlapping files", zap.Int("file-count", len(paths)), zap.Int("file-groups", len(dataFilesSlice)), - zap.Int("concurrency", concurrency)) + zap.Int("concurrency", concurrency), + zap.Int64("part-size", partSize)) eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(concurrency) - partSize = max(int64(5*size.MB), partSize+int64(1*size.MB)) for _, files := range dataFilesSlice { files := files eg.Go(func() error { @@ -45,12 +57,9 @@ func MergeOverlappingFiles( files, store, partSize, - readBufferSize, newFilePrefix, uuid.New().String(), - memSizeLimit, blockSize, - writeBatchCount, onClose, checkHotspot, ) @@ -59,10 +68,11 @@ func MergeOverlappingFiles( return eg.Wait() } -// split input data files into max 'concurrency' shares evenly, if there are not -// enough files, merge at least 2 files in one batch. +// split input data files into multiple shares evenly, with the max number files +// in each share MaxMergingFilesPerThread, if there are not enough files, merge at +// least 2 files in one batch. func splitDataFiles(paths []string, concurrency int) [][]string { - shares := concurrency + shares := max((len(paths)+MaxMergingFilesPerThread-1)/MaxMergingFilesPerThread, concurrency) if len(paths) < 2*concurrency { shares = max(1, len(paths)/2) } @@ -91,30 +101,29 @@ func splitDataFiles(paths []string, concurrency int) [][]string { // accurately, here we only consider the memory used by our code, the estimate max // memory usage of this function is: // -// memSizeLimit -// + 20 * partSize -// + 20 * 5MiB(stat file, we might not use all part, as stat file is quite small) -// + readBufferSize * len(paths) +// defaultOneWriterMemSizeLimit +// + MaxMergingFilesPerThread * (X + defaultReadBufferSize) +// + maxUploadWorkersPerThread * (data-part-size + 5MiB(stat-part-size)) // + memory taken by concurrent reading if check-hotspot is enabled // -// memSizeLimit = 256 MiB now. -// partSize = index-kv-data-file-size / (10000 / MergeSortOverlapThreshold) for import into. -// readBufferSize = 64 KiB now. -// len(paths) >= kv-files-in-subtask(suppose MergeSortOverlapThreshold) / concurrency +// where X is memory used for each read connection, it's http2 for GCP, X might be +// 4 or more MiB, http1 for S3, it's smaller. +// +// with current default values, on machine with 2G per core, the estimate max memory +// usage for import into is: +// +// 128 + 250 * (4 + 64/1024) + 8 * (25.6 + 5) ~ 1.36 GiB +// where 25.6 is max part-size when there is only data kv = 1024*250/10000 = 25.6MiB // -// TODO: seems it might OOM if partSize = 256 / (10000/4000) = 100 MiB, when write -// external storage is slow. +// for add-index, it uses more memory as check-hotspot is enabled. func mergeOverlappingFilesInternal( ctx context.Context, paths []string, store storage.ExternalStorage, partSize int64, - readBufferSize int, newFilePrefix string, writerID string, - memSizeLimit uint64, blockSize int, - writeBatchCount uint64, onClose OnCloseFunc, checkHotspot bool, ) (err error) { @@ -127,7 +136,7 @@ func mergeOverlappingFilesInternal( }() zeroOffsets := make([]uint64, len(paths)) - iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot, 0) + iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, defaultReadBufferSize, checkHotspot, 0) if err != nil { return err } @@ -139,9 +148,8 @@ func mergeOverlappingFilesInternal( }() writer := NewWriterBuilder(). - SetMemorySizeLimit(memSizeLimit). + SetMemorySizeLimit(defaultOneWriterMemSizeLimit). SetBlockSize(blockSize). - SetWriterBatchCount(writeBatchCount). SetOnCloseFunc(onClose). BuildOneFile(store, newFilePrefix, writerID) err = writer.Init(ctx, partSize) diff --git a/br/pkg/lightning/backend/external/merge_test.go b/br/pkg/lightning/backend/external/merge_test.go index f66fb2d1ec838..c2be6fe6689f4 100644 --- a/br/pkg/lightning/backend/external/merge_test.go +++ b/br/pkg/lightning/backend/external/merge_test.go @@ -22,7 +22,7 @@ import ( ) func TestSplitDataFiles(t *testing.T) { - allPaths := make([]string, 0, 100) + allPaths := make([]string, 0, 110) for i := 0; i < cap(allPaths); i++ { allPaths = append(allPaths, fmt.Sprintf("%d", i)) } @@ -101,4 +101,25 @@ func TestSplitDataFiles(t *testing.T) { require.Equal(t, c.result, result) }) } + + bak := MaxMergingFilesPerThread + t.Cleanup(func() { + MaxMergingFilesPerThread = bak + }) + MaxMergingFilesPerThread = 10 + require.Equal(t, [][]string{ + allPaths[:10], allPaths[10:19], allPaths[19:28], allPaths[28:37], + allPaths[37:46], allPaths[46:55], allPaths[55:64], allPaths[64:73], + allPaths[73:82], allPaths[82:91], + }, splitDataFiles(allPaths[:91], 8)) + require.Equal(t, [][]string{ + allPaths[:10], allPaths[10:20], allPaths[20:30], allPaths[30:40], + allPaths[40:50], allPaths[50:60], allPaths[60:70], allPaths[70:80], + allPaths[80:90], allPaths[90:99], + }, splitDataFiles(allPaths[:99], 8)) + require.Equal(t, [][]string{ + allPaths[:10], allPaths[10:20], allPaths[20:29], allPaths[29:38], + allPaths[38:47], allPaths[47:56], allPaths[56:65], allPaths[65:74], + allPaths[74:83], allPaths[83:92], allPaths[92:101], + }, splitDataFiles(allPaths[:101], 8)) } diff --git a/br/pkg/lightning/backend/external/onefile_writer.go b/br/pkg/lightning/backend/external/onefile_writer.go index 858f4c0a78ae5..57b0c5517149d 100644 --- a/br/pkg/lightning/backend/external/onefile_writer.go +++ b/br/pkg/lightning/backend/external/onefile_writer.go @@ -20,15 +20,20 @@ import ( "path/filepath" "slices" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" ) +// defaultOneWriterMemSizeLimit is the memory size limit for one writer. OneWriter can write +// data in stream, this memory limit is only used to avoid allocating too many times +// for each KV pair. +var defaultOneWriterMemSizeLimit uint64 = 128 * units.MiB + // OneFileWriter is used to write data into external storage // with only one file for data and stat. type OneFileWriter struct { @@ -64,12 +69,16 @@ func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) ( err error, ) { w.dataFile = filepath.Join(w.filenamePrefix, "one-file") - w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{Concurrency: 20, PartSize: partSize}) + w.dataWriter, err = w.store.Create(ctx, w.dataFile, &storage.WriterOption{ + Concurrency: maxUploadWorkersPerThread, + PartSize: partSize}) if err != nil { return err } w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file") - w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)}) + w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{ + Concurrency: maxUploadWorkersPerThread, + PartSize: MinUploadPartSize}) if err != nil { w.logger.Info("create stat writer failed", zap.Error(err)) diff --git a/br/pkg/lightning/backend/external/onefile_writer_test.go b/br/pkg/lightning/backend/external/onefile_writer_test.go index d55a46309c58f..2a4ddce61f281 100644 --- a/br/pkg/lightning/backend/external/onefile_writer_test.go +++ b/br/pkg/lightning/backend/external/onefile_writer_test.go @@ -194,18 +194,22 @@ func TestMergeOverlappingFilesInternal(t *testing.T) { require.NoError(t, writer.WriteRow(ctx, key, val, dbkv.IntHandle(i))) } require.NoError(t, writer.Close(ctx)) - + readBufSizeBak := defaultReadBufferSize + memLimitBak := defaultOneWriterMemSizeLimit + t.Cleanup(func() { + defaultReadBufferSize = readBufSizeBak + defaultOneWriterMemSizeLimit = memLimitBak + }) + defaultReadBufferSize = 100 + defaultOneWriterMemSizeLimit = 1000 require.NoError(t, mergeOverlappingFilesInternal( ctx, []string{"/test/0/0", "/test/0/1", "/test/0/2", "/test/0/3", "/test/0/4"}, memStore, int64(5*size.MB), - 100, "/test2", "mergeID", 1000, - 1000, - 8*1024, nil, true, )) @@ -297,17 +301,22 @@ func TestOnefileWriterManyRows(t *testing.T) { onClose := func(summary *WriterSummary) { resSummary = summary } + readBufSizeBak := defaultReadBufferSize + memLimitBak := defaultOneWriterMemSizeLimit + t.Cleanup(func() { + defaultReadBufferSize = readBufSizeBak + defaultOneWriterMemSizeLimit = memLimitBak + }) + defaultReadBufferSize = 100 + defaultOneWriterMemSizeLimit = 1000 require.NoError(t, mergeOverlappingFilesInternal( ctx, []string{"/test/0/one-file"}, memStore, int64(5*size.MB), - 100, "/test2", "mergeID", 1000, - 1000, - 8*1024, onClose, true, )) diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index 64e6b14d24432..b4a5c8e7d7db9 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -161,17 +161,22 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { mergeMemSize := (rand.Intn(10) + 1) * 100 // use random mergeMemSize to test different memLimit of writer. // reproduce one bug, see https://github.com/pingcap/tidb/issues/49590 + bufSizeBak := defaultReadBufferSize + memLimitBak := defaultOneWriterMemSizeLimit + t.Cleanup(func() { + defaultReadBufferSize = bufSizeBak + defaultOneWriterMemSizeLimit = memLimitBak + }) + defaultReadBufferSize = 100 + defaultOneWriterMemSizeLimit = uint64(mergeMemSize) for _, group := range dataGroup { require.NoError(t, MergeOverlappingFiles( ctx, group, memStore, int64(5*size.MB), - 100, "/test2", mergeMemSize, - uint64(mergeMemSize), - 8*1024, closeFn, 1, true, diff --git a/br/pkg/lightning/backend/external/split_test.go b/br/pkg/lightning/backend/external/split_test.go index 9141f07f534d8..1be37a755b847 100644 --- a/br/pkg/lightning/backend/external/split_test.go +++ b/br/pkg/lightning/backend/external/split_test.go @@ -386,7 +386,6 @@ func Test3KFilesRangeSplitter(t *testing.T) { w := NewWriterBuilder(). SetMemorySizeLimit(DefaultMemSizeLimit). SetBlockSize(32*units.MiB). // dataKVGroupBlockSize - SetWriterBatchCount(8*1024). SetPropKeysDistance(8*1024). SetPropSizeDistance(size.MB). SetOnCloseFunc(onClose). diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 5f6d0c77948be..48370e5952a00 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -44,6 +44,11 @@ var ( multiFileStatNum = 500 defaultPropSizeDist = 1 * size.MB defaultPropKeysDist uint64 = 8 * 1024 + // Tested on GCP 16c/32c node, 32~64 workers used up all network bandwidth for + // part-size in range 5~20M, but not all thread will upload at same time. + // this value might not be optimal. + // TODO need data on AWS and other machine types + maxUploadWorkersPerThread = 8 // MergeSortOverlapThreshold is the threshold of overlap between sorted kv files. // if the overlap ratio is greater than this threshold, we will merge the files. @@ -106,7 +111,6 @@ type WriterBuilder struct { groupOffset int memSizeLimit uint64 blockSize int - writeBatchCount uint64 propSizeDist uint64 propKeysDist uint64 onClose OnCloseFunc @@ -116,12 +120,11 @@ type WriterBuilder struct { // NewWriterBuilder creates a WriterBuilder. func NewWriterBuilder() *WriterBuilder { return &WriterBuilder{ - memSizeLimit: DefaultMemSizeLimit, - blockSize: DefaultBlockSize, - writeBatchCount: 8 * 1024, - propSizeDist: defaultPropSizeDist, - propKeysDist: defaultPropKeysDist, - onClose: dummyOnCloseFunc, + memSizeLimit: DefaultMemSizeLimit, + blockSize: DefaultBlockSize, + propSizeDist: defaultPropSizeDist, + propKeysDist: defaultPropKeysDist, + onClose: dummyOnCloseFunc, } } @@ -134,12 +137,6 @@ func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder { return b } -// SetWriterBatchCount sets the batch count of the writer. -func (b *WriterBuilder) SetWriterBatchCount(count uint64) *WriterBuilder { - b.writeBatchCount = count - return b -} - // SetPropSizeDistance sets the distance of range size for each property. func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder { b.propSizeDist = dist @@ -594,12 +591,18 @@ func (w *Writer) createStorageWriter(ctx context.Context) ( err error, ) { dataPath := filepath.Join(w.filenamePrefix, strconv.Itoa(w.currentSeq)) - dataWriter, err := w.store.Create(ctx, dataPath, &storage.WriterOption{Concurrency: 20, PartSize: (int64)(5 * size.MB)}) + dataWriter, err := w.store.Create(ctx, dataPath, &storage.WriterOption{ + Concurrency: 20, + PartSize: MinUploadPartSize, + }) if err != nil { return "", "", nil, nil, err } statPath := filepath.Join(w.filenamePrefix+statSuffix, strconv.Itoa(w.currentSeq)) - statsWriter, err := w.store.Create(ctx, statPath, &storage.WriterOption{Concurrency: 20, PartSize: (int64)(5 * size.MB)}) + statsWriter, err := w.store.Create(ctx, statPath, &storage.WriterOption{ + Concurrency: 20, + PartSize: MinUploadPartSize, + }) if err != nil { _ = dataWriter.Close(ctx) return "", "", nil, nil, err diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index eaa6180d41066..bc4f8b504dce5 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -80,7 +80,6 @@ func mergeOverlappingFilesImpl(ctx context.Context, SetMemorySizeLimit(memSizeLimit). SetBlockSize(blockSize). SetOnCloseFunc(onClose). - SetWriterBatchCount(writeBatchCount). SetPropSizeDistance(propSizeDist). SetPropKeysDistance(propKeysDist). Build(store, newFilePrefix, writerID) diff --git a/br/pkg/storage/gcs_extra.go b/br/pkg/storage/gcs_extra.go index 590beb9188bc3..f0961dbd89602 100644 --- a/br/pkg/storage/gcs_extra.go +++ b/br/pkg/storage/gcs_extra.go @@ -141,26 +141,31 @@ func (w *GCSWriter) readChunk(ch chan chunk) { break } - select { - case <-w.ctx.Done(): - data.cleanup() - w.err.CompareAndSwap(nil, w.ctx.Err()) - default: - part := &xmlMPUPart{ - uploadBase: w.uploadBase, - uploadID: w.uploadID, - buf: data.buf, - partNumber: data.num, - } - if w.err.Load() == nil { - if err := part.Upload(); err != nil { - w.err.Store(err) + func() { + activeUploadWorkerCnt.Add(1) + defer activeUploadWorkerCnt.Add(-1) + + select { + case <-w.ctx.Done(): + data.cleanup() + w.err.CompareAndSwap(nil, w.ctx.Err()) + default: + part := &xmlMPUPart{ + uploadBase: w.uploadBase, + uploadID: w.uploadID, + buf: data.buf, + partNumber: data.num, + } + if w.err.Load() == nil { + if err := part.Upload(); err != nil { + w.err.Store(err) + } } + part.buf = nil + w.appendMPUPart(part) + data.cleanup() } - part.buf = nil - w.appendMPUPart(part) - data.cleanup() - } + }() } } diff --git a/br/pkg/storage/helper.go b/br/pkg/storage/helper.go index e93292a3a6191..ae88ec9ae9d6c 100644 --- a/br/pkg/storage/helper.go +++ b/br/pkg/storage/helper.go @@ -4,6 +4,7 @@ package storage import ( "context" + "sync/atomic" "github.com/pingcap/tidb/pkg/sessionctx/variable" ) @@ -27,3 +28,12 @@ func ValidateCloudStorageURI(ctx context.Context, uri string) error { }) return err } + +// activeUploadWorkerCnt is the active upload worker count, it only works for GCS. +// For S3, we cannot get it. +var activeUploadWorkerCnt atomic.Int64 + +// GetActiveUploadWorkerCount returns the active upload worker count. +func GetActiveUploadWorkerCount() int64 { + return activeUploadWorkerCnt.Load() +} diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 8378f59559ce4..94095c8ef29d7 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -95,11 +95,8 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta sm.DataFiles, store, int64(partSize), - 64*1024, prefix, external.DefaultBlockSize, - external.DefaultMemSizeLimit, - 8*1024, onClose, int(variable.GetDDLReorgWorkerCounter()), true) } diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index ec3257162530f..47a76a2fca0f9 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//br/pkg/lightning/common", "//br/pkg/lightning/log", + "//br/pkg/storage", "//pkg/config", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/proto", diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 9aa2948936ee8..460bcb8d0ec2e 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -22,6 +22,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/log" + litstorage "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" @@ -184,6 +185,7 @@ func (m *Manager) handleTasksLoop() { // service scope might change, so we call WithLabelValues every time. metrics.DistTaskUsedSlotsGauge.WithLabelValues(variable.ServiceScope.Load()). Set(float64(m.slotManager.usedSlots())) + metrics.GlobalSortUploadWorkerCount.Set(float64(litstorage.GetActiveUploadWorkerCount())) } } diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index caef4fca38f95..1d46293f08153 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -276,8 +276,11 @@ type mergeSortStepExecutor struct { // subtask of a task is run in serial now, so we don't need lock here. // change to SyncMap when we support parallel subtask in the future. subtaskSortedKVMeta *external.SortedKVMeta - partSize int64 - resource *proto.StepResource + // part-size for uploading merged files, it's calculated by: + // max(max-merged-files * max-file-size / max-part-num(10000), min-part-size) + dataKVPartSize int64 + indexKVPartSize int64 + resource *proto.StepResource } var _ execute.StepExecutor = &mergeSortStepExecutor{} @@ -291,14 +294,14 @@ func (m *mergeSortStepExecutor) Init(ctx context.Context) error { return err } m.controller = controller - // calculate part size of total-file-size / 10000, where 10000 = max part num - // TODO this 'min' is to simulate previous getWriterMemorySizeLimit logic - // to avoid breaks code in external package, there are too many magic numbers - // there, we can refine this later. - // TODO: maybe have a different part-size for index/data kv group. - _, perIndexKVMemSizePerCon := getWriterMemorySizeLimit(m.resource, &m.taskMeta.Plan) - perIndexKVMemSizePerCon = min(perIndexKVMemSizePerCon, external.DefaultMemSizeLimit) - m.partSize = int64(perIndexKVMemSizePerCon / 10000 * uint64(external.MergeSortOverlapThreshold)) + dataKVMemSizePerCon, perIndexKVMemSizePerCon := getWriterMemorySizeLimit(m.resource, &m.taskMeta.Plan) + m.dataKVPartSize = max(external.MinUploadPartSize, int64(dataKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000)) + m.indexKVPartSize = max(external.MinUploadPartSize, int64(perIndexKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000)) + + m.logger.Info("merge sort partSize", + zap.String("data-kv", units.BytesSize(float64(m.dataKVPartSize))), + zap.String("index-kv", units.BytesSize(float64(m.indexKVPartSize))), + ) return nil } @@ -324,18 +327,17 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S prefix := subtaskPrefix(m.taskID, subtask.ID) - logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize)))) - + partSize := m.dataKVPartSize + if sm.KVGroup != dataKVGroup { + partSize = m.indexKVPartSize + } err = external.MergeOverlappingFiles( logutil.WithFields(ctx, zap.String("kv-group", sm.KVGroup), zap.Int64("subtask-id", subtask.ID)), sm.DataFiles, m.controller.GlobalSortStore, - m.partSize, - 64*1024, + partSize, prefix, getKVGroupBlockSize(sm.KVGroup), - external.DefaultMemSizeLimit, - 8*1024, onClose, m.taskMeta.Plan.ThreadCnt, false) diff --git a/pkg/metrics/globalsort.go b/pkg/metrics/globalsort.go index 51d7210fcb440..71ecab3bd36d3 100644 --- a/pkg/metrics/globalsort.go +++ b/pkg/metrics/globalsort.go @@ -27,6 +27,8 @@ var ( GlobalSortReadFromCloudStorageRate *prometheus.HistogramVec // GlobalSortIngestWorkerCnt records the working number of ingest workers. GlobalSortIngestWorkerCnt *prometheus.GaugeVec + // GlobalSortUploadWorkerCount is the gauge of active parallel upload worker count. + GlobalSortUploadWorkerCount prometheus.Gauge ) // InitGlobalSortMetrics initializes defines global sort metrics. @@ -69,4 +71,13 @@ func InitGlobalSortMetrics() { Name: "ingest_worker_cnt", Help: "ingest worker cnt", }, []string{LblType}) + + GlobalSortUploadWorkerCount = NewGauge( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "global_sort", + Name: "upload_worker_cnt", + Help: "Gauge of active parallel upload worker count.", + }, + ) } diff --git a/pkg/metrics/grafana/tidb.json b/pkg/metrics/grafana/tidb.json index 1838ada2fd85e..4752e2aa582e9 100644 --- a/pkg/metrics/grafana/tidb.json +++ b/pkg/metrics/grafana/tidb.json @@ -15862,7 +15862,7 @@ }, { "editorMode": "code", - "expr": "tidb_server_maxprocs{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", instance=~\"$instance\", component=\"tidb\"}", + "expr": "tidb_server_maxprocs{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", instance=~\"$instance\", job=\"tidb\"}", "hide": false, "legendFormat": "Capacity - {{instance}}", "range": true, @@ -23937,6 +23937,93 @@ "yaxis": { "align": false } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 105 + }, + "id": 336, + "options": { + "legend": { + "calcs": [ + "min", + "max", + "mean" + ], + "displayMode": "table", + "placement": "right" + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "editorMode": "code", + "expr": "tidb_global_sort_upload_worker_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}", + "legendFormat": "{{instance}}", + "range": true, + "refId": "A" + } + ], + "title": "Parallel upload worker count", + "type": "timeseries" } ], "title": "Global Sort", diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 8d1530c3eb080..31588c2660b3b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -276,6 +276,7 @@ func RegisterMetrics() { prometheus.MustRegister(GlobalSortReadFromCloudStorageDuration) prometheus.MustRegister(GlobalSortReadFromCloudStorageRate) prometheus.MustRegister(GlobalSortIngestWorkerCnt) + prometheus.MustRegister(GlobalSortUploadWorkerCount) prometheus.MustRegister(AddIndexScanRate) prometheus.MustRegister(BindingCacheHitCounter)