Skip to content

Commit

Permalink
global sort: global sort whole process in ut (#49419)
Browse files Browse the repository at this point in the history
ref #48952
  • Loading branch information
ywqzzy authored Dec 14, 2023
1 parent 0110acb commit d076bc1
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 83 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"reader.go",
"split.go",
"stat_reader.go",
"testutil.go",
"util.go",
"writer.go",
],
Expand All @@ -40,6 +41,7 @@ go_library(
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand All @@ -60,6 +62,7 @@ go_test(
"iter_test.go",
"onefile_writer_test.go",
"reader_test.go",
"sort_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
Expand All @@ -83,6 +86,7 @@ go_test(
"@com_github_aws_aws_sdk_go//service/s3",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
Expand Down
92 changes: 9 additions & 83 deletions br/pkg/lightning/backend/external/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,99 +18,18 @@ import (
"bytes"
"context"
"fmt"
"math"
"slices"
"testing"
"time"

"github.com/jfcg/sorty/v2"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func testReadAndCompare(
ctx context.Context,
t *testing.T,
kvs []common.KvPair,
store storage.ExternalStorage,
memSizeLimit int) {
datas, stats, err := GetAllFileNames(ctx, store, "")
require.NoError(t, err)

splitter, err := NewRangeSplitter(
ctx,
datas,
stats,
store,
int64(memSizeLimit), // make the group small for testing
math.MaxInt64,
4*1024*1024*1024,
math.MaxInt64,
true,
)
require.NoError(t, err)

bufPool := membuf.NewPool()
loaded := &memKVsAndBuffers{}
curStart := kvs[0].Key
kvIdx := 0

for {
endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err := splitter.SplitOneRangesGroup()
require.NoError(t, err)
curEnd := endKeyOfGroup
if len(endKeyOfGroup) == 0 {
curEnd = dbkv.Key(kvs[len(kvs)-1].Key).Next()
}

err = readAllData(
ctx,
store,
dataFilesOfGroup,
statFilesOfGroup,
curStart,
curEnd,
bufPool,
loaded,
)

require.NoError(t, err)
// check kvs sorted
sorty.MaxGor = uint64(8)
sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool {
if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or >
if r != s {
loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r]
loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r]
}
return true
}
return false
})
for i, key := range loaded.keys {
require.EqualValues(t, kvs[kvIdx].Key, key)
require.EqualValues(t, kvs[kvIdx].Val, loaded.values[i])
kvIdx++
}

// release
loaded.keys = nil
loaded.values = nil
loaded.memKVBuffers = nil
copy(curStart, curEnd)

if len(endKeyOfGroup) == 0 {
break
}
}
}

func TestReadAllDataBasic(t *testing.T) {
seed := time.Now().Unix()
rand.Seed(uint64(seed))
Expand Down Expand Up @@ -144,7 +63,10 @@ func TestReadAllDataBasic(t *testing.T) {
return bytes.Compare(i.Key, j.Key)
})

testReadAndCompare(ctx, t, kvs, memStore, memSizeLimit)
datas, stats, err := GetAllFileNames(ctx, memStore, "")
require.NoError(t, err)

testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit)
}

func TestReadAllOneFile(t *testing.T) {
Expand Down Expand Up @@ -179,5 +101,9 @@ func TestReadAllOneFile(t *testing.T) {
slices.SortFunc(kvs, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})
testReadAndCompare(ctx, t, kvs, memStore, memSizeLimit)

datas, stats, err := GetAllFileNames(ctx, memStore, "")
require.NoError(t, err)

testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit)
}
170 changes: 170 additions & 0 deletions br/pkg/lightning/backend/external/sort_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"bytes"
"context"
"slices"
"testing"
"time"

"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func TestGlobalSortLocalBasic(t *testing.T) {
// 1. write data step
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
ctx := context.Background()
memStore := storage.NewMemStorage()
memSizeLimit := (rand.Intn(10) + 1) * 400
lastStepDatas := make([]string, 0, 10)
lastStepStats := make([]string, 0, 10)
var startKey, endKey dbkv.Key

closeFn := func(s *WriterSummary) {
for _, stat := range s.MultipleFilesStats {
for i := range stat.Filenames {
lastStepDatas = append(lastStepDatas, stat.Filenames[i][0])
lastStepStats = append(lastStepStats, stat.Filenames[i][1])
}
}
if len(startKey) == 0 && len(endKey) == 0 {
startKey = s.Min.Clone()
endKey = s.Max.Clone().Next()
}
startKey = BytesMin(startKey, s.Min.Clone())
endKey = BytesMax(endKey, s.Max.Clone().Next())
}

w := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
SetMemorySizeLimit(uint64(memSizeLimit)).
SetBlockSize(memSizeLimit).
SetOnCloseFunc(closeFn).
Build(memStore, "/test", "0")

writer := NewEngineWriter(w)
kvCnt := rand.Intn(10) + 10000
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
kvs[i] = common.KvPair{
Key: []byte(uuid.New().String()),
Val: []byte("56789"),
}
}
slices.SortFunc(kvs, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})

require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
_, err := writer.Close(ctx)
require.NoError(t, err)

// 2. read and sort step
testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit)
}

func TestGlobalSortLocalWithMerge(t *testing.T) {
// 1. write data step
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
ctx := context.Background()
memStore := storage.NewMemStorage()
memSizeLimit := (rand.Intn(10) + 1) * 400

w := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
SetMemorySizeLimit(uint64(memSizeLimit)).
SetBlockSize(memSizeLimit).
Build(memStore, "/test", "0")

writer := NewEngineWriter(w)
kvCnt := rand.Intn(10) + 10000
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
kvs[i] = common.KvPair{
Key: []byte(uuid.New().String()),
Val: []byte("56789"),
}
}

slices.SortFunc(kvs, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})

require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
_, err := writer.Close(ctx)
require.NoError(t, err)

// 2. merge step
datas, stats, err := GetAllFileNames(ctx, memStore, "")
require.NoError(t, err)

dataGroup, _ := splitDataAndStatFiles(datas, stats)

lastStepDatas := make([]string, 0, 10)
lastStepStats := make([]string, 0, 10)
var startKey, endKey dbkv.Key

closeFn := func(s *WriterSummary) {
for _, stat := range s.MultipleFilesStats {
for i := range stat.Filenames {
lastStepDatas = append(lastStepDatas, stat.Filenames[i][0])
lastStepStats = append(lastStepStats, stat.Filenames[i][1])
}

}
if len(startKey) == 0 && len(endKey) == 0 {
startKey = s.Min.Clone()
endKey = s.Max.Clone().Next()
}
startKey = BytesMin(startKey, s.Min.Clone())
endKey = BytesMax(endKey, s.Max.Clone().Next())
}

for _, group := range dataGroup {
MergeOverlappingFiles(
ctx,
group,
memStore,
int64(5*size.MB),
100,
"/test2",
100,
8*1024,
100,
2,
closeFn,
1,
true,
)
}

// 3. read and sort step
testReadAndCompare(ctx, t, kvs, memStore, lastStepDatas, lastStepStats, startKey, memSizeLimit)
}
Loading

0 comments on commit d076bc1

Please sign in to comment.