Skip to content

Commit

Permalink
globalsort: split files evenly in MergeOverlappingFiles (#51878)
Browse files Browse the repository at this point in the history
ref #50752
  • Loading branch information
D3Hunter authored Mar 19, 2024
1 parent 9b6db9d commit 61c4714
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 42 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_test(
"engine_test.go",
"file_test.go",
"iter_test.go",
"merge_test.go",
"misc_bench_test.go",
"onefile_writer_test.go",
"reader_test.go",
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,6 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
DefaultBlockSize,
DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
onClose,
s.concurrency,
s.mergeIterHotspot,
Expand Down
47 changes: 27 additions & 20 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,11 @@ func MergeOverlappingFiles(
blockSize int,
memSizeLimit uint64,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
onClose OnCloseFunc,
concurrency int,
checkHotspot bool,
) error {
var dataFilesSlice [][]string
batchCount := 1
if len(paths) > concurrency {
batchCount = len(paths) / concurrency
}
for i := 0; i < len(paths); i += batchCount {
end := i + batchCount
if end > len(paths) {
end = len(paths)
}
dataFilesSlice = append(dataFilesSlice, paths[i:end])
}
dataFilesSlice := splitDataFiles(paths, concurrency)

logutil.Logger(ctx).Info("start to merge overlapping files",
zap.Int("file-count", len(paths)),
Expand All @@ -64,8 +51,6 @@ func MergeOverlappingFiles(
memSizeLimit,
blockSize,
writeBatchCount,
propSizeDist,
propKeysDist,
onClose,
checkHotspot,
)
Expand All @@ -74,6 +59,32 @@ func MergeOverlappingFiles(
return eg.Wait()
}

// split input data files into max 'concurrency' shares evenly, if there are not
// enough files, merge at least 2 files in one batch.
func splitDataFiles(paths []string, concurrency int) [][]string {
shares := concurrency
if len(paths) < 2*concurrency {
shares = max(1, len(paths)/2)
}
dataFilesSlice := make([][]string, 0, shares)
batchCount := len(paths) / shares
remainder := len(paths) % shares
start := 0
for start < len(paths) {
end := start + batchCount
if remainder > 0 {
end++
remainder--
}
if end > len(paths) {
end = len(paths)
}
dataFilesSlice = append(dataFilesSlice, paths[start:end])
start = end
}
return dataFilesSlice
}

// mergeOverlappingFilesInternal reads from given files whose key range may overlap
// and writes to one new sorted, nonoverlapping files.
// since some memory are taken by library, such as HTTP2, that we cannot calculate
Expand Down Expand Up @@ -104,8 +115,6 @@ func mergeOverlappingFilesInternal(
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
onClose OnCloseFunc,
checkHotspot bool,
) (err error) {
Expand Down Expand Up @@ -133,8 +142,6 @@ func mergeOverlappingFilesInternal(
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetWriterBatchCount(writeBatchCount).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
Expand Down
104 changes: 104 additions & 0 deletions br/pkg/lightning/backend/external/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestSplitDataFiles(t *testing.T) {
allPaths := make([]string, 0, 100)
for i := 0; i < cap(allPaths); i++ {
allPaths = append(allPaths, fmt.Sprintf("%d", i))
}
cases := []struct {
paths []string
concurrency int
result [][]string
}{
{
paths: allPaths[:1],
concurrency: 1,
result: [][]string{allPaths[:1]},
},
{
paths: allPaths[:2],
concurrency: 1,
result: [][]string{allPaths[:2]},
},
{
paths: allPaths[:2],
concurrency: 4,
result: [][]string{allPaths[:2]},
},
{
paths: allPaths[:3],
concurrency: 4,
result: [][]string{allPaths[:3]},
},
{
paths: allPaths[:4],
concurrency: 4,
result: [][]string{allPaths[:2], allPaths[2:4]},
},
{
paths: allPaths[:5],
concurrency: 4,
result: [][]string{allPaths[:3], allPaths[3:5]},
},
{
paths: allPaths[:6],
concurrency: 4,
result: [][]string{allPaths[:2], allPaths[2:4], allPaths[4:6]},
},
{
paths: allPaths[:7],
concurrency: 4,
result: [][]string{allPaths[:3], allPaths[3:5], allPaths[5:7]},
},
{
paths: allPaths[:15],
concurrency: 4,
result: [][]string{allPaths[:4], allPaths[4:8], allPaths[8:12], allPaths[12:15]},
},
{
paths: allPaths[:83],
concurrency: 4,
result: [][]string{allPaths[:21], allPaths[21:42], allPaths[42:63], allPaths[63:83]},
},
{
paths: allPaths[:100],
concurrency: 4,
result: [][]string{allPaths[:25], allPaths[25:50], allPaths[50:75], allPaths[75:100]},
},
{
paths: allPaths[:100],
concurrency: 8,
result: [][]string{
allPaths[:13], allPaths[13:26], allPaths[26:39], allPaths[39:52],
allPaths[52:64], allPaths[64:76], allPaths[76:88], allPaths[88:100],
},
},
}
for i, c := range cases {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
result := splitDataFiles(c.paths, c.concurrency)
require.Equal(t, c.result, result)
})
}
}
8 changes: 2 additions & 6 deletions br/pkg/lightning/backend/external/onefile_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance ui
}

func TestMergeOverlappingFilesInternal(t *testing.T) {
changePropDist(t, defaultPropSizeDist, 2)
// 1. Write to 5 files.
// 2. merge 5 files into one file.
// 3. read one file and check result.
// 4. check duplicate key.
ctx := context.Background()
memStore := storage.NewMemStorage()
writer := NewWriterBuilder().
SetPropKeysDistance(2).
SetMemorySizeLimit(1000).
SetKeyDuplicationEncoding(true).
Build(memStore, "/test", "0")
Expand All @@ -206,8 +206,6 @@ func TestMergeOverlappingFilesInternal(t *testing.T) {
1000,
1000,
8*1024,
1*size.MB,
2,
nil,
true,
))
Expand Down Expand Up @@ -256,14 +254,14 @@ func TestMergeOverlappingFilesInternal(t *testing.T) {
}

func TestOnefileWriterManyRows(t *testing.T) {
changePropDist(t, defaultPropSizeDist, 2)
// 1. write into one file with sorted order.
// 2. merge one file.
// 3. read kv file and check the result.
// 4. check the writeSummary.
ctx := context.Background()
memStore := storage.NewMemStorage()
writer := NewWriterBuilder().
SetPropKeysDistance(2).
SetMemorySizeLimit(1000).
BuildOneFile(memStore, "/test", "0")

Expand Down Expand Up @@ -310,8 +308,6 @@ func TestOnefileWriterManyRows(t *testing.T) {
1000,
1000,
8*1024,
1*size.MB,
2,
onClose,
true,
))
Expand Down
16 changes: 12 additions & 4 deletions br/pkg/lightning/backend/external/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ import (
"golang.org/x/exp/rand"
)

func changePropDist(t *testing.T, sizeDist, keysDist uint64) {
sizeDistBak := defaultPropSizeDist
keysDistBak := defaultPropKeysDist
t.Cleanup(func() {
defaultPropSizeDist = sizeDistBak
defaultPropKeysDist = keysDistBak
})
defaultPropSizeDist = sizeDist
defaultPropKeysDist = keysDist
}

func TestGlobalSortLocalBasic(t *testing.T) {
// 1. write data step
seed := time.Now().Unix()
Expand Down Expand Up @@ -90,6 +101,7 @@ func TestGlobalSortLocalBasic(t *testing.T) {
}

func TestGlobalSortLocalWithMerge(t *testing.T) {
changePropDist(t, 100, 2)
// 1. write data step
seed := time.Now().Unix()
rand.Seed(uint64(seed))
Expand All @@ -99,8 +111,6 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
memSizeLimit := (rand.Intn(10) + 1) * 400

w := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
SetMemorySizeLimit(uint64(memSizeLimit)).
SetBlockSize(memSizeLimit).
Build(memStore, "/test", "0")
Expand Down Expand Up @@ -162,8 +172,6 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
mergeMemSize,
uint64(mergeMemSize),
8*1024,
100,
2,
closeFn,
1,
true,
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
)

var (
multiFileStatNum = 500
multiFileStatNum = 500
defaultPropSizeDist = 1 * size.MB
defaultPropKeysDist uint64 = 8 * 1024

// MergeSortOverlapThreshold is the threshold of overlap between sorted kv files.
// if the overlap ratio is greater than this threshold, we will merge the files.
Expand Down Expand Up @@ -117,8 +119,8 @@ func NewWriterBuilder() *WriterBuilder {
memSizeLimit: DefaultMemSizeLimit,
blockSize: DefaultBlockSize,
writeBatchCount: 8 * 1024,
propSizeDist: 1 * size.MB,
propKeysDist: 8 * 1024,
propSizeDist: defaultPropSizeDist,
propKeysDist: defaultPropKeysDist,
onClose: dummyOnCloseFunc,
}
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
)

type mergeSortExecutor struct {
Expand Down Expand Up @@ -102,8 +101,6 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
external.DefaultBlockSize,
external.DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
onClose,
int(variable.GetDDLReorgWorkerCounter()), true)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ go_library(
"//pkg/util/etcd",
"//pkg/util/logutil",
"//pkg/util/promutil",
"//pkg/util/size",
"//pkg/util/sqlexec",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
3 changes: 0 additions & 3 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -337,8 +336,6 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S
getKVGroupBlockSize(sm.KVGroup),
external.DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
onClose,
m.taskMeta.Plan.ThreadCnt,
false)
Expand Down

0 comments on commit 61c4714

Please sign in to comment.