diff --git a/br/pkg/conn/BUILD.bazel b/br/pkg/conn/BUILD.bazel index b784d5c080afa..2e035aaf2ea76 100644 --- a/br/pkg/conn/BUILD.bazel +++ b/br/pkg/conn/BUILD.bazel @@ -51,7 +51,7 @@ go_test( "//br/pkg/config", "//br/pkg/conn/util", "//br/pkg/pdutil", - "//br/pkg/utiltest", + "//br/pkg/restore/split", "//pkg/testkit/testsetup", "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index e15fc747eb837..43b0e7f9f0eb3 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -18,7 +18,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pingcap/tidb/br/pkg/utiltest" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/stretchr/testify/require" "go.uber.org/multierr" "google.golang.org/grpc/codes" @@ -62,7 +62,7 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { }, } - fpdc := utiltest.NewFakePDClient(stores, false, nil) + fpdc := split.NewFakePDClient(stores, false, nil) _, err = conn.GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) @@ -108,7 +108,7 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) { }, } - fpdc := utiltest.NewFakePDClient(stores, false, nil) + fpdc := split.NewFakePDClient(stores, false, nil) _, err = conn.GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.Error(t, err) @@ -164,7 +164,7 @@ func TestCheckStoresAlive(t *testing.T) { }, } - fpdc := utiltest.NewFakePDClient(stores, false, nil) + fpdc := split.NewFakePDClient(stores, false, nil) kvStores, err := conn.GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash) require.NoError(t, err) @@ -251,7 +251,7 @@ func TestGetAllTiKVStores(t *testing.T) { } for _, testCase := range testCases { - pdClient := utiltest.NewFakePDClient(testCase.stores, false, nil) + pdClient := split.NewFakePDClient(testCase.stores, false, nil) stores, err := util.GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior) if len(testCase.expectedError) != 0 { require.Error(t, err) @@ -421,7 +421,7 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { pctx := context.Background() for _, ca := range cases { ctx, cancel := context.WithCancel(pctx) - pdCli := utiltest.NewFakePDClient(ca.stores, false, nil) + pdCli := split.NewFakePDClient(ca.stores, false, nil) require.Equal(t, len(ca.content), len(ca.stores)) count := 0 mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -586,7 +586,7 @@ func TestIsLogBackupEnabled(t *testing.T) { pctx := context.Background() for _, ca := range cases { ctx, cancel := context.WithCancel(pctx) - pdCli := utiltest.NewFakePDClient(ca.stores, false, nil) + pdCli := split.NewFakePDClient(ca.stores, false, nil) require.Equal(t, len(ca.content), len(ca.stores)) count := 0 mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 0b571e4889f13..5351b6ee4178d 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -52,7 +52,7 @@ go_test( "//br/pkg/conn", "//br/pkg/mock", "//br/pkg/pdutil", - "//br/pkg/utiltest", + "//br/pkg/restore/split", "//pkg/kv", "//pkg/parser/model", "//pkg/session", diff --git a/br/pkg/restore/import_mode_switcher_test.go b/br/pkg/restore/import_mode_switcher_test.go index 75d08c11a9033..41a2cc78f08e9 100644 --- a/br/pkg/restore/import_mode_switcher_test.go +++ b/br/pkg/restore/import_mode_switcher_test.go @@ -28,7 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore" - "github.com/pingcap/tidb/br/pkg/utiltest" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -74,13 +74,13 @@ func TestRestorePreWork(t *testing.T) { require.NoError(t, err) }() - pdClient := utiltest.NewFakePDClient([]*metapb.Store{ + pdClient := split.NewFakePDClient([]*metapb.Store{ { Id: 1, Address: fmt.Sprintf(":%d", 51111+port), }, }, false, nil) - pdHTTPCli := utiltest.NewFakePDHTTPClient() + pdHTTPCli := split.NewFakePDHTTPClient() mgr := &conn.Mgr{ PdController: pdutil.NewPdControllerWithPDClient( pdClient, pdHTTPCli, &semver.Version{Major: 4, Minor: 0, Patch: 9}), @@ -96,17 +96,17 @@ func TestRestorePreWork(t *testing.T) { _, ok := pdutil.Schedulers[key] require.True(t, ok) } - require.Equal(t, len(utiltest.ExistPDCfgGeneratorBefore), len(cfg.ScheduleCfg)) + require.Equal(t, len(split.ExistPDCfgGeneratorBefore), len(cfg.ScheduleCfg)) for key, value := range cfg.ScheduleCfg { - expectValue, ok := utiltest.ExistPDCfgGeneratorBefore[key] + expectValue, ok := split.ExistPDCfgGeneratorBefore[key] require.True(t, ok) require.Equal(t, expectValue, value) } cfgs, err := pdHTTPCli.GetConfig(context.TODO()) require.NoError(t, err) - require.Equal(t, len(utiltest.ExpectPDCfgGeneratorsResult), len(cfg.ScheduleCfg)) + require.Equal(t, len(split.ExpectPDCfgGeneratorsResult), len(cfg.ScheduleCfg)) for key, value := range cfgs { - expectValue, ok := utiltest.ExpectPDCfgGeneratorsResult[key[len("schedule."):]] + expectValue, ok := split.ExpectPDCfgGeneratorsResult[key[len("schedule."):]] require.True(t, ok) require.Equal(t, expectValue, value) } @@ -123,9 +123,9 @@ func TestRestorePreWork(t *testing.T) { { cfgs, err := pdHTTPCli.GetConfig(context.TODO()) require.NoError(t, err) - require.Equal(t, len(utiltest.ExistPDCfgGeneratorBefore), len(cfg.ScheduleCfg)) + require.Equal(t, len(split.ExistPDCfgGeneratorBefore), len(cfg.ScheduleCfg)) for key, value := range cfgs { - expectValue, ok := utiltest.ExistPDCfgGeneratorBefore[key[len("schedule."):]] + expectValue, ok := split.ExistPDCfgGeneratorBefore[key[len("schedule."):]] require.True(t, ok) require.Equal(t, expectValue, value) } diff --git a/br/pkg/restore/internal/log_split/BUILD.bazel b/br/pkg/restore/internal/log_split/BUILD.bazel deleted file mode 100644 index d929b04c003ad..0000000000000 --- a/br/pkg/restore/internal/log_split/BUILD.bazel +++ /dev/null @@ -1,54 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "log_split", - srcs = [ - "split.go", - "sum_sorted.go", - ], - importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/log_split", - visibility = ["//br/pkg/restore:__subpackages__"], - deps = [ - "//br/pkg/logutil", - "//br/pkg/restore/internal/snap_split", - "//br/pkg/restore/split", - "//br/pkg/restore/utils", - "//br/pkg/utils", - "//pkg/kv", - "//pkg/tablecodec", - "//pkg/util", - "//pkg/util/codec", - "@com_github_google_btree//:btree", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_kvproto//pkg/brpb", - "@com_github_pingcap_log//:log", - "@org_golang_x_sync//errgroup", - "@org_uber_go_zap//:zap", - ], -) - -go_test( - name = "log_split_test", - timeout = "short", - srcs = [ - "export_test.go", - "split_test.go", - "sum_sorted_test.go", - ], - embed = [":log_split"], - flaky = True, - shard_count = 4, - deps = [ - "//br/pkg/restore/internal/snap_split", - "//br/pkg/restore/split", - "//br/pkg/restore/utils", - "//br/pkg/utiltest", - "//pkg/kv", - "//pkg/tablecodec", - "//pkg/util/codec", - "@com_github_docker_go_units//:go-units", - "@com_github_pingcap_kvproto//pkg/brpb", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_stretchr_testify//require", - ], -) diff --git a/br/pkg/restore/internal/log_split/export_test.go b/br/pkg/restore/internal/log_split/export_test.go deleted file mode 100644 index bca3eada27bbb..0000000000000 --- a/br/pkg/restore/internal/log_split/export_test.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package logsplit - -import restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - -func NewSplitHelperIteratorForTest(helper *SplitHelper, tableID int64, rule *restoreutils.RewriteRules) *splitHelperIterator { - return &splitHelperIterator{ - tableSplitters: []*rewriteSplitter{ - { - tableID: tableID, - rule: rule, - splitter: helper, - }, - }, - } -} diff --git a/br/pkg/restore/internal/log_split/split.go b/br/pkg/restore/internal/log_split/split.go deleted file mode 100644 index eb9b3165ce761..0000000000000 --- a/br/pkg/restore/internal/log_split/split.go +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package logsplit - -import ( - "bytes" - "context" - "sort" - "sync" - "time" - - "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" - snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" - "github.com/pingcap/tidb/br/pkg/restore/split" - restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/pkg/tablecodec" - "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/codec" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -type rewriteSplitter struct { - rewriteKey []byte - tableID int64 - rule *restoreutils.RewriteRules - splitter *SplitHelper -} - -type splitHelperIterator struct { - tableSplitters []*rewriteSplitter -} - -func (iter *splitHelperIterator) Traverse(fn func(v Valued, endKey []byte, rule *restoreutils.RewriteRules) bool) { - for _, entry := range iter.tableSplitters { - endKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(entry.tableID+1)) - rule := entry.rule - entry.splitter.Traverse(func(v Valued) bool { - return fn(v, endKey, rule) - }) - } -} - -type LogSplitHelper struct { - tableSplitter map[int64]*SplitHelper - rules map[int64]*restoreutils.RewriteRules - client split.SplitClient - pool *util.WorkerPool - eg *errgroup.Group - regionsCh chan []*split.RegionInfo - - splitThreSholdSize uint64 - splitThreSholdKeys int64 -} - -func NewLogSplitHelper(rules map[int64]*restoreutils.RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) *LogSplitHelper { - return &LogSplitHelper{ - tableSplitter: make(map[int64]*SplitHelper), - rules: rules, - client: client, - pool: util.NewWorkerPool(128, "split region"), - eg: nil, - - splitThreSholdSize: splitSize, - splitThreSholdKeys: splitKeys, - } -} - -func (helper *LogSplitHelper) iterator() *splitHelperIterator { - tableSplitters := make([]*rewriteSplitter, 0, len(helper.tableSplitter)) - for tableID, splitter := range helper.tableSplitter { - delete(helper.tableSplitter, tableID) - rewriteRule, exists := helper.rules[tableID] - if !exists { - log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID)) - continue - } - newTableID := restoreutils.GetRewriteTableID(tableID, rewriteRule) - if newTableID == 0 { - log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) - continue - } - tableSplitters = append(tableSplitters, &rewriteSplitter{ - rewriteKey: codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(newTableID)), - tableID: newTableID, - rule: rewriteRule, - splitter: splitter, - }) - } - sort.Slice(tableSplitters, func(i, j int) bool { - return bytes.Compare(tableSplitters[i].rewriteKey, tableSplitters[j].rewriteKey) < 0 - }) - return &splitHelperIterator{ - tableSplitters: tableSplitters, - } -} - -const splitFileThreshold = 1024 * 1024 // 1 MB - -func (helper *LogSplitHelper) skipFile(file *backuppb.DataFileInfo) bool { - _, exist := helper.rules[file.TableId] - return file.Length < splitFileThreshold || file.IsMeta || !exist -} - -func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { - if helper.skipFile(file) { - return - } - splitHelper, exist := helper.tableSplitter[file.TableId] - if !exist { - splitHelper = NewSplitHelper() - helper.tableSplitter[file.TableId] = splitHelper - } - - splitHelper.Merge(Valued{ - Key: Span{ - StartKey: file.StartKey, - EndKey: file.EndKey, - }, - Value: Value{ - Size: file.Length, - Number: file.NumberOfEntries, - }, - }) -} - -type splitFunc = func(context.Context, *snapsplit.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error - -func (helper *LogSplitHelper) splitRegionByPoints( - ctx context.Context, - regionSplitter *snapsplit.RegionSplitter, - initialLength uint64, - initialNumber int64, - region *split.RegionInfo, - valueds []Valued, -) error { - var ( - splitPoints [][]byte = make([][]byte, 0) - lastKey []byte = region.Region.StartKey - length uint64 = initialLength - number int64 = initialNumber - ) - for _, v := range valueds { - // decode will discard ts behind the key, which results in the same key for consecutive ranges - if !bytes.Equal(lastKey, v.GetStartKey()) && (v.Value.Size+length > helper.splitThreSholdSize || v.Value.Number+number > helper.splitThreSholdKeys) { - _, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil) - splitPoints = append(splitPoints, rawKey) - length = 0 - number = 0 - } - lastKey = v.GetStartKey() - length += v.Value.Size - number += v.Value.Number - } - - if len(splitPoints) == 0 { - return nil - } - - helper.pool.ApplyOnErrorGroup(helper.eg, func() error { - newRegions, errSplit := regionSplitter.SplitWaitAndScatter(ctx, region, splitPoints) - if errSplit != nil { - log.Warn("failed to split the scaned region", zap.Error(errSplit)) - sort.Slice(splitPoints, func(i, j int) bool { - return bytes.Compare(splitPoints[i], splitPoints[j]) < 0 - }) - return regionSplitter.ExecuteSplit(ctx, splitPoints) - } - select { - case <-ctx.Done(): - return nil - case helper.regionsCh <- newRegions: - } - log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) - return nil - }) - return nil -} - -// SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region -func SplitPoint( - ctx context.Context, - iter *splitHelperIterator, - client split.SplitClient, - splitF splitFunc, -) (err error) { - // common status - var ( - regionSplitter *snapsplit.RegionSplitter = snapsplit.NewRegionSplitter(client) - ) - // region traverse status - var ( - // the region buffer of each scan - regions []*split.RegionInfo = nil - regionIndex int = 0 - ) - // region split status - var ( - // range span +----------------+------+---+-------------+ - // region span +------------------------------------+ - // +initial length+ +end valued+ - // regionValueds is the ranges array overlapped with `regionInfo` - regionValueds []Valued = nil - // regionInfo is the region to be split - regionInfo *split.RegionInfo = nil - // intialLength is the length of the part of the first range overlapped with the region - initialLength uint64 = 0 - initialNumber int64 = 0 - ) - // range status - var ( - // regionOverCount is the number of regions overlapped with the range - regionOverCount uint64 = 0 - ) - - iter.Traverse(func(v Valued, endKey []byte, rule *restoreutils.RewriteRules) bool { - if v.Value.Number == 0 || v.Value.Size == 0 { - return true - } - var ( - vStartKey []byte - vEndKey []byte - ) - // use `vStartKey` and `vEndKey` to compare with region's key - vStartKey, vEndKey, err = restoreutils.GetRewriteEncodedKeys(v, rule) - if err != nil { - return false - } - // traverse to the first region overlapped with the range - for ; regionIndex < len(regions); regionIndex++ { - if bytes.Compare(vStartKey, regions[regionIndex].Region.EndKey) < 0 { - break - } - } - // cannot find any regions overlapped with the range - // need to scan regions again - if regionIndex == len(regions) { - regions = nil - } - regionOverCount = 0 - for { - if regionIndex >= len(regions) { - var startKey []byte - if len(regions) > 0 { - // has traversed over the region buffer, should scan from the last region's end-key of the region buffer - startKey = regions[len(regions)-1].Region.EndKey - } else { - // scan from the range's start-key - startKey = vStartKey - } - // scan at most 64 regions into the region buffer - regions, err = split.ScanRegionsWithRetry(ctx, client, startKey, endKey, 64) - if err != nil { - return false - } - regionIndex = 0 - } - - region := regions[regionIndex] - // this region must be overlapped with the range - regionOverCount++ - // the region is the last one overlapped with the range, - // should split the last recorded region, - // and then record this region as the region to be split - if bytes.Compare(vEndKey, region.Region.EndKey) < 0 { - endLength := v.Value.Size / regionOverCount - endNumber := v.Value.Number / int64(regionOverCount) - if len(regionValueds) > 0 && regionInfo != region { - // add a part of the range as the end part - if bytes.Compare(vStartKey, regionInfo.Region.EndKey) < 0 { - regionValueds = append(regionValueds, NewValued(vStartKey, regionInfo.Region.EndKey, Value{Size: endLength, Number: endNumber})) - } - // try to split the region - err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) - if err != nil { - return false - } - regionValueds = make([]Valued, 0) - } - if regionOverCount == 1 { - // the region completely contains the range - regionValueds = append(regionValueds, Valued{ - Key: Span{ - StartKey: vStartKey, - EndKey: vEndKey, - }, - Value: v.Value, - }) - } else { - // the region is overlapped with the last part of the range - initialLength = endLength - initialNumber = endNumber - } - regionInfo = region - // try the next range - return true - } - - // try the next region - regionIndex++ - } - }) - - if err != nil { - return errors.Trace(err) - } - if len(regionValueds) > 0 { - // try to split the region - err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) - if err != nil { - return errors.Trace(err) - } - } - - return nil -} - -func (helper *LogSplitHelper) Split(ctx context.Context) error { - var ectx context.Context - var wg sync.WaitGroup - helper.eg, ectx = errgroup.WithContext(ctx) - helper.regionsCh = make(chan []*split.RegionInfo, 1024) - wg.Add(1) - go func() { - defer wg.Done() - scatterRegions := make([]*split.RegionInfo, 0) - receiveNewRegions: - for { - select { - case <-ctx.Done(): - return - case newRegions, ok := <-helper.regionsCh: - if !ok { - break receiveNewRegions - } - - scatterRegions = append(scatterRegions, newRegions...) - } - } - - regionSplitter := snapsplit.NewRegionSplitter(helper.client) - // It is too expensive to stop recovery and wait for a small number of regions - // to complete scatter, so the maximum waiting time is reduced to 1 minute. - _ = regionSplitter.WaitForScatterRegionsTimeout(ctx, scatterRegions, time.Minute) - }() - - iter := helper.iterator() - if err := SplitPoint(ectx, iter, helper.client, helper.splitRegionByPoints); err != nil { - return errors.Trace(err) - } - - // wait for completion of splitting regions - if err := helper.eg.Wait(); err != nil { - return errors.Trace(err) - } - - // wait for completion of scattering regions - close(helper.regionsCh) - wg.Wait() - - return nil -} diff --git a/br/pkg/restore/internal/log_split/split_test.go b/br/pkg/restore/internal/log_split/split_test.go deleted file mode 100644 index acbf61ae12f29..0000000000000 --- a/br/pkg/restore/internal/log_split/split_test.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package logsplit_test - -import ( - "context" - "fmt" - "testing" - - "github.com/docker/go-units" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/import_sstpb" - logsplit "github.com/pingcap/tidb/br/pkg/restore/internal/log_split" - snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" - "github.com/pingcap/tidb/br/pkg/restore/split" - restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/utiltest" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/tablecodec" - "github.com/pingcap/tidb/pkg/util/codec" - "github.com/stretchr/testify/require" -) - -func keyWithTablePrefix(tableID int64, key string) []byte { - rawKey := append(tablecodec.GenTableRecordPrefix(tableID), []byte(key)...) - return codec.EncodeBytes([]byte{}, rawKey) -} - -func TestSplitPoint(t *testing.T) { - ctx := context.Background() - var oldTableID int64 = 50 - var tableID int64 = 100 - rewriteRules := &restoreutils.RewriteRules{ - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), - NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), - }, - }, - } - - // range: b c d e g i - // +---+ +---+ +---------+ - // +-------------+----------+---------+ - // region: a f h j - splitHelper := logsplit.NewSplitHelper() - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: logsplit.Value{Size: 100, Number: 100}}) - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: logsplit.Value{Size: 200, Number: 200}}) - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: logsplit.Value{Size: 300, Number: 300}}) - client := utiltest.NewFakeSplitClient() - client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f")) - client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h")) - client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) - client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) - - iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error { - require.Equal(t, u, uint64(0)) - require.Equal(t, o, int64(0)) - require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) - require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) - require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) - require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "c")) - require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) - require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) - require.Equal(t, len(v), 2) - return nil - }) - require.NoError(t, err) -} - -func getCharFromNumber(prefix string, i int) string { - c := '1' + (i % 10) - b := '1' + (i%100)/10 - a := '1' + i/100 - return fmt.Sprintf("%s%c%c%c", prefix, a, b, c) -} - -func TestSplitPoint2(t *testing.T) { - ctx := context.Background() - var oldTableID int64 = 50 - var tableID int64 = 100 - rewriteRules := &restoreutils.RewriteRules{ - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), - NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), - }, - }, - } - - // range: b c d e f i j k l n - // +---+ +---+ +-----------------+ +----+ +--------+ - // +---------------+--+.....+----+------------+---------+ - // region: a g >128 h m o - splitHelper := logsplit.NewSplitHelper() - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: logsplit.Value{Size: 100, Number: 100}}) - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: logsplit.Value{Size: 200, Number: 200}}) - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: logsplit.Value{Size: 300, Number: 300}}) - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: logsplit.Value{Size: 200, Number: 200}}) - splitHelper.Merge(logsplit.Valued{Key: logsplit.Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: logsplit.Value{Size: 200, Number: 200}}) - client := utiltest.NewFakeSplitClient() - client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g")) - client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0))) - for i := 0; i < 256; i++ { - client.AppendRegion(keyWithTablePrefix(tableID, getCharFromNumber("g", i)), keyWithTablePrefix(tableID, getCharFromNumber("g", i+1))) - } - client.AppendRegion(keyWithTablePrefix(tableID, getCharFromNumber("g", 256)), keyWithTablePrefix(tableID, "h")) - client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "m")) - client.AppendRegion(keyWithTablePrefix(tableID, "m"), keyWithTablePrefix(tableID, "o")) - client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) - - firstSplit := true - iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) - err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error { - if firstSplit { - require.Equal(t, u, uint64(0)) - require.Equal(t, o, int64(0)) - require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) - require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "g")) - require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) - require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "c")) - require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) - require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) - require.EqualValues(t, v[2].Key.StartKey, keyWithTablePrefix(tableID, "f")) - require.EqualValues(t, v[2].Key.EndKey, keyWithTablePrefix(tableID, "g")) - require.Equal(t, v[2].Value.Size, uint64(1)) - require.Equal(t, v[2].Value.Number, int64(1)) - require.Equal(t, len(v), 3) - firstSplit = false - } else { - require.Equal(t, u, uint64(1)) - require.Equal(t, o, int64(1)) - require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "h")) - require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "m")) - require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "j")) - require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "k")) - require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "l")) - require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "m")) - require.Equal(t, v[1].Value.Size, uint64(100)) - require.Equal(t, v[1].Value.Number, int64(100)) - require.Equal(t, len(v), 2) - } - return nil - }) - require.NoError(t, err) -} - -func fakeFile(tableID, rowID int64, length uint64, num int64) *backuppb.DataFileInfo { - return &backuppb.DataFileInfo{ - StartKey: fakeRowKey(tableID, rowID), - EndKey: fakeRowKey(tableID, rowID+1), - TableId: tableID, - Length: length, - NumberOfEntries: num, - } -} - -func fakeRowKey(tableID, rowID int64) kv.Key { - return codec.EncodeBytes(nil, tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))) -} - -func TestLogSplitHelper(t *testing.T) { - ctx := context.Background() - rules := map[int64]*restoreutils.RewriteRules{ - 1: { - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: tablecodec.GenTableRecordPrefix(1), - NewKeyPrefix: tablecodec.GenTableRecordPrefix(100), - }, - }, - }, - 2: { - Data: []*import_sstpb.RewriteRule{ - { - OldKeyPrefix: tablecodec.GenTableRecordPrefix(2), - NewKeyPrefix: tablecodec.GenTableRecordPrefix(200), - }, - }, - }, - } - oriRegions := [][]byte{ - {}, - codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)), - codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)), - codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)), - } - mockPDCli := split.NewMockPDClientForSplit() - mockPDCli.SetRegions(oriRegions) - client := split.NewClient(mockPDCli, nil, nil, 100, 4) - helper := logsplit.NewLogSplitHelper(rules, client, 4*units.MiB, 400) - - helper.Merge(fakeFile(1, 100, 100, 100)) - helper.Merge(fakeFile(1, 200, 2*units.MiB, 200)) - helper.Merge(fakeFile(2, 100, 3*units.MiB, 300)) - helper.Merge(fakeFile(3, 100, 10*units.MiB, 100000)) - // different regions, no split happens - err := helper.Split(ctx) - require.NoError(t, err) - regions, err := mockPDCli.ScanRegions(ctx, []byte{}, []byte{}, 0) - require.NoError(t, err) - require.Len(t, regions, 3) - require.Equal(t, []byte{}, regions[0].Meta.StartKey) - require.Equal(t, codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)), regions[1].Meta.StartKey) - require.Equal(t, codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)), regions[2].Meta.StartKey) - require.Equal(t, codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)), regions[2].Meta.EndKey) - - helper.Merge(fakeFile(1, 300, 3*units.MiB, 10)) - helper.Merge(fakeFile(1, 400, 4*units.MiB, 10)) - // trigger to split regions for table 1 - err = helper.Split(ctx) - require.NoError(t, err) - regions, err = mockPDCli.ScanRegions(ctx, []byte{}, []byte{}, 0) - require.NoError(t, err) - require.Len(t, regions, 4) - require.Equal(t, fakeRowKey(100, 400), kv.Key(regions[1].Meta.EndKey)) -} diff --git a/br/pkg/restore/internal/snap_split/BUILD.bazel b/br/pkg/restore/internal/snap_split/BUILD.bazel deleted file mode 100644 index ab6df360220d6..0000000000000 --- a/br/pkg/restore/internal/snap_split/BUILD.bazel +++ /dev/null @@ -1,31 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "snap_split", - srcs = ["split.go"], - importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split", - visibility = ["//br/pkg/restore:__subpackages__"], - deps = [ - "//br/pkg/restore/split", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_log//:log", - "@org_uber_go_zap//:zap", - ], -) - -go_test( - name = "snap_split_test", - timeout = "short", - srcs = ["split_test.go"], - flaky = True, - shard_count = 4, - deps = [ - ":snap_split", - "//br/pkg/restore/split", - "//br/pkg/restore/utils", - "//br/pkg/rtree", - "//pkg/util/codec", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_stretchr_testify//require", - ], -) diff --git a/br/pkg/restore/internal/snap_split/split.go b/br/pkg/restore/internal/snap_split/split.go deleted file mode 100644 index fca4a69cb5e6b..0000000000000 --- a/br/pkg/restore/internal/snap_split/split.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package snapsplit - -import ( - "context" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/br/pkg/restore/split" - "go.uber.org/zap" -) - -// RegionSplitter is a executor of region split by rules. -type RegionSplitter struct { - client split.SplitClient -} - -// NewRegionSplitter returns a new RegionSplitter. -func NewRegionSplitter(client split.SplitClient) *RegionSplitter { - return &RegionSplitter{ - client: client, - } -} - -// SplitWaitAndScatter expose the function `SplitWaitAndScatter` of split client. -func (rs *RegionSplitter) SplitWaitAndScatter(ctx context.Context, region *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) { - return rs.client.SplitWaitAndScatter(ctx, region, keys) -} - -// ExecuteSplit executes regions split and make sure new splitted regions are balance. -// It will split regions by the rewrite rules, -// then it will split regions by the end key of each range. -// tableRules includes the prefix of a table, since some ranges may have -// a prefix with record sequence or index sequence. -// note: all ranges and rewrite rules must have raw key. -func (rs *RegionSplitter) ExecuteSplit( - ctx context.Context, - sortedSplitKeys [][]byte, -) error { - if len(sortedSplitKeys) == 0 { - log.Info("skip split regions, no split keys") - return nil - } - - log.Info("execute split sorted keys", zap.Int("keys count", len(sortedSplitKeys))) - return rs.executeSplitByRanges(ctx, sortedSplitKeys) -} - -func (rs *RegionSplitter) executeSplitByRanges( - ctx context.Context, - sortedKeys [][]byte, -) error { - startTime := time.Now() - // Choose the rough region split keys, - // each splited region contains 128 regions to be splitted. - const regionIndexStep = 128 - - roughSortedSplitKeys := make([][]byte, 0, len(sortedKeys)/regionIndexStep+1) - for curRegionIndex := regionIndexStep; curRegionIndex < len(sortedKeys); curRegionIndex += regionIndexStep { - roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex]) - } - if len(roughSortedSplitKeys) > 0 { - if err := rs.executeSplitByKeys(ctx, roughSortedSplitKeys); err != nil { - return errors.Trace(err) - } - } - log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime))) - - // Then send split requests to each TiKV. - if err := rs.executeSplitByKeys(ctx, sortedKeys); err != nil { - return errors.Trace(err) - } - - log.Info("finish spliting and scattering regions", zap.Duration("take", time.Since(startTime))) - return nil -} - -// executeSplitByKeys will split regions by **sorted** keys with following steps. -// 1. locate regions with correspond keys. -// 2. split these regions with correspond keys. -// 3. make sure new split regions are balanced. -func (rs *RegionSplitter) executeSplitByKeys( - ctx context.Context, - sortedKeys [][]byte, -) error { - startTime := time.Now() - scatterRegions, err := rs.client.SplitKeysAndScatter(ctx, sortedKeys) - if err != nil { - return errors.Trace(err) - } - if len(scatterRegions) > 0 { - log.Info("finish splitting and scattering regions. and starts to wait", zap.Int("regions", len(scatterRegions)), - zap.Duration("take", time.Since(startTime))) - rs.waitRegionsScattered(ctx, scatterRegions, split.ScatterWaitUpperInterval) - } else { - log.Info("finish splitting regions.", zap.Duration("take", time.Since(startTime))) - } - return nil -} - -// waitRegionsScattered try to wait mutilple regions scatterd in 3 minutes. -// this could timeout, but if many regions scatterd the restore could continue -// so we don't wait long time here. -func (rs *RegionSplitter) waitRegionsScattered(ctx context.Context, scatterRegions []*split.RegionInfo, timeout time.Duration) { - log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions))) - startTime := time.Now() - leftCnt := rs.WaitForScatterRegionsTimeout(ctx, scatterRegions, timeout) - if leftCnt == 0 { - log.Info("waiting for scattering regions done", - zap.Int("regions", len(scatterRegions)), - zap.Duration("take", time.Since(startTime))) - } else { - log.Warn("waiting for scattering regions timeout", - zap.Int("not scattered Count", leftCnt), - zap.Int("regions", len(scatterRegions)), - zap.Duration("take", time.Since(startTime))) - } -} - -func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*split.RegionInfo, timeout time.Duration) int { - ctx2, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - leftRegions, _ := rs.client.WaitRegionsScattered(ctx2, regionInfos) - return leftRegions -} diff --git a/br/pkg/restore/internal/snap_split/split_test.go b/br/pkg/restore/internal/snap_split/split_test.go deleted file mode 100644 index 0507950d589c5..0000000000000 --- a/br/pkg/restore/internal/snap_split/split_test.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package snapsplit_test - -import ( - "bytes" - "context" - "sort" - "testing" - - "github.com/pingcap/kvproto/pkg/import_sstpb" - snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" - "github.com/pingcap/tidb/br/pkg/restore/split" - restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/pkg/util/codec" - "github.com/stretchr/testify/require" -) - -func TestScanEmptyRegion(t *testing.T) { - mockPDCli := split.NewMockPDClientForSplit() - mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) - client := split.NewClient(mockPDCli, nil, nil, 100, 4) - keys := initKeys() - // make keys has only one - keys = keys[0:1] - regionSplitter := snapsplit.NewRegionSplitter(client) - - ctx := context.Background() - err := regionSplitter.ExecuteSplit(ctx, keys) - // should not return error with only one range entry - require.NoError(t, err) -} - -func TestSplitEmptyRegion(t *testing.T) { - mockPDCli := split.NewMockPDClientForSplit() - mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) - client := split.NewClient(mockPDCli, nil, nil, 100, 4) - regionSplitter := snapsplit.NewRegionSplitter(client) - err := regionSplitter.ExecuteSplit(context.Background(), nil) - require.NoError(t, err) -} - -// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) -// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) -// rewrite rules: aa -> xx, cc -> bb -// expected regions after split: -// -// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), -// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) -func TestSplitAndScatter(t *testing.T) { - rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} - encodeBytes(rangeBoundaries) - mockPDCli := split.NewMockPDClientForSplit() - mockPDCli.SetRegions(rangeBoundaries) - client := split.NewClient(mockPDCli, nil, nil, 100, 4) - regionSplitter := snapsplit.NewRegionSplitter(client) - ctx := context.Background() - - ranges := initRanges() - rules := initRewriteRules() - splitKeys := make([][]byte, 0, len(ranges)) - for _, rg := range ranges { - tmp, err := restoreutils.RewriteRange(&rg, rules) - require.NoError(t, err) - splitKeys = append(splitKeys, tmp.EndKey) - } - sort.Slice(splitKeys, func(i, j int) bool { - return bytes.Compare(splitKeys[i], splitKeys[j]) < 0 - }) - err := regionSplitter.ExecuteSplit(ctx, splitKeys) - require.NoError(t, err) - regions := mockPDCli.Regions.ScanRange(nil, nil, 100) - expected := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbf"), []byte("bbh"), []byte("bbj"), []byte("cca"), []byte("xxe"), []byte("xxz"), []byte("")} - encodeBytes(expected) - require.Len(t, regions, len(expected)-1) - for i, region := range regions { - require.Equal(t, expected[i], region.Meta.StartKey) - require.Equal(t, expected[i+1], region.Meta.EndKey) - } -} - -func encodeBytes(keys [][]byte) { - for i := range keys { - if len(keys[i]) == 0 { - continue - } - keys[i] = codec.EncodeBytes(nil, keys[i]) - } -} - -func TestRawSplit(t *testing.T) { - // Fix issue #36490. - splitKeys := [][]byte{{}} - ctx := context.Background() - rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} - mockPDCli := split.NewMockPDClientForSplit() - mockPDCli.SetRegions(rangeBoundaries) - client := split.NewClient(mockPDCli, nil, nil, 100, 4, split.WithRawKV()) - - regionSplitter := snapsplit.NewRegionSplitter(client) - err := regionSplitter.ExecuteSplit(ctx, splitKeys) - require.NoError(t, err) - - regions := mockPDCli.Regions.ScanRange(nil, nil, 100) - require.Len(t, regions, len(rangeBoundaries)-1) - for i, region := range regions { - require.Equal(t, rangeBoundaries[i], region.Meta.StartKey) - require.Equal(t, rangeBoundaries[i+1], region.Meta.EndKey) - } -} - -// keys: aae, aaz, ccf, ccj -func initKeys() [][]byte { - return [][]byte{ - []byte("aae"), - []byte("aaz"), - []byte("ccf"), - []byte("ccj"), - } -} - -// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) -func initRanges() []rtree.Range { - var ranges [4]rtree.Range - ranges[0] = rtree.Range{ - StartKey: []byte("aaa"), - EndKey: []byte("aae"), - } - ranges[1] = rtree.Range{ - StartKey: []byte("aae"), - EndKey: []byte("aaz"), - } - ranges[2] = rtree.Range{ - StartKey: []byte("ccd"), - EndKey: []byte("ccf"), - } - ranges[3] = rtree.Range{ - StartKey: []byte("ccf"), - EndKey: []byte("ccj"), - } - return ranges[:] -} - -func initRewriteRules() *restoreutils.RewriteRules { - var rules [2]*import_sstpb.RewriteRule - rules[0] = &import_sstpb.RewriteRule{ - OldKeyPrefix: []byte("aa"), - NewKeyPrefix: []byte("xx"), - } - rules[1] = &import_sstpb.RewriteRule{ - OldKeyPrefix: []byte("cc"), - NewKeyPrefix: []byte("bb"), - } - return &restoreutils.RewriteRules{ - Data: rules[:], - } -} diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 5975b0726aa1d..d55c8066514f3 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -24,7 +24,6 @@ go_library( "//br/pkg/restore", "//br/pkg/restore/ingestrec", "//br/pkg/restore/internal/import_client", - "//br/pkg/restore/internal/log_split", "//br/pkg/restore/internal/rawkv", "//br/pkg/restore/split", "//br/pkg/restore/tiflashrec", diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index d208b58bb15d2..4160aa86a6048 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -46,7 +46,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" importclient "github.com/pingcap/tidb/br/pkg/restore/internal/import_client" - logsplit "github.com/pingcap/tidb/br/pkg/restore/internal/log_split" "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" @@ -1719,7 +1718,7 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore( type LogFilesIterWithSplitHelper struct { iter LogIter - helper *logsplit.LogSplitHelper + helper *split.LogSplitHelper buffer []*LogDataFileInfo next int } @@ -1729,7 +1728,7 @@ const SplitFilesBufferSize = 4096 func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*restoreutils.RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) LogIter { return &LogFilesIterWithSplitHelper{ iter: iter, - helper: logsplit.NewLogSplitHelper(rules, client, splitSize, splitKeys), + helper: split.NewLogSplitHelper(rules, client, splitSize, splitKeys), buffer: nil, next: 0, } diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index 0b1baf7af5675..6b16ec34c28ba 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/mock" logclient "github.com/pingcap/tidb/br/pkg/restore/log_client" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/utils/iter" @@ -90,7 +91,7 @@ func TestDeleteRangeQueryExec(t *testing.T) { m := mc g := gluetidb.New() client := logclient.NewRestoreClient( - utiltest.NewFakePDClient(nil, false, nil), nil, nil, keepalive.ClientParameters{}) + split.NewFakePDClient(nil, false, nil), nil, nil, keepalive.ClientParameters{}) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -109,7 +110,7 @@ func TestDeleteRangeQuery(t *testing.T) { g := gluetidb.New() client := logclient.NewRestoreClient( - utiltest.NewFakePDClient(nil, false, nil), nil, nil, keepalive.ClientParameters{}) + split.NewFakePDClient(nil, false, nil), nil, nil, keepalive.ClientParameters{}) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -1338,7 +1339,7 @@ func TestLogFilesIterWithSplitHelper(t *testing.T) { } mockIter := &mockLogIter{} ctx := context.Background() - logIter := logclient.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, utiltest.NewFakeSplitClient(), 144*1024*1024, 1440000) + logIter := logclient.NewLogFilesIterWithSplitHelper(mockIter, rewriteRulesMap, split.NewFakeSplitClient(), 144*1024*1024, 1440000) next := 0 for r := logIter.TryNext(ctx); !r.Finished; r = logIter.TryNext(ctx) { require.NoError(t, r.Err) diff --git a/br/pkg/restore/log_client/import_retry_test.go b/br/pkg/restore/log_client/import_retry_test.go index 5c47f1f3acb27..bcde03c69c1ed 100644 --- a/br/pkg/restore/log_client/import_retry_test.go +++ b/br/pkg/restore/log_client/import_retry_test.go @@ -22,7 +22,6 @@ import ( logclient "github.com/pingcap/tidb/br/pkg/restore/log_client" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" @@ -92,12 +91,12 @@ func (c *TestClient) GetAllRegions() map[uint64]*split.RegionInfo { return c.regions } -func (c *TestClient) GetPDClient() *utiltest.FakePDClient { +func (c *TestClient) GetPDClient() *split.FakePDClient { stores := make([]*metapb.Store, 0, len(c.stores)) for _, store := range c.stores { stores = append(stores, store) } - return utiltest.NewFakePDClient(stores, false, nil) + return split.NewFakePDClient(stores, false, nil) } func (c *TestClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { diff --git a/br/pkg/restore/misc_test.go b/br/pkg/restore/misc_test.go index b461b3e395ebd..37fe2c4544859 100644 --- a/br/pkg/restore/misc_test.go +++ b/br/pkg/restore/misc_test.go @@ -23,7 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/restore" - "github.com/pingcap/tidb/br/pkg/utiltest" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" @@ -107,7 +107,7 @@ func TestAssertUserDBsEmpty(t *testing.T) { func TestGetTSWithRetry(t *testing.T) { t.Run("PD leader is healthy:", func(t *testing.T) { retryTimes := -1000 - pDClient := utiltest.NewFakePDClient(nil, false, &retryTimes) + pDClient := split.NewFakePDClient(nil, false, &retryTimes) _, err := restore.GetTSWithRetry(context.Background(), pDClient) require.NoError(t, err) }) @@ -118,14 +118,14 @@ func TestGetTSWithRetry(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/utils/set-attempt-to-one")) }() retryTimes := -1000 - pDClient := utiltest.NewFakePDClient(nil, true, &retryTimes) + pDClient := split.NewFakePDClient(nil, true, &retryTimes) _, err := restore.GetTSWithRetry(context.Background(), pDClient) require.Error(t, err) }) t.Run("PD leader switch successfully", func(t *testing.T) { retryTimes := 0 - pDClient := utiltest.NewFakePDClient(nil, true, &retryTimes) + pDClient := split.NewFakePDClient(nil, true, &retryTimes) _, err := restore.GetTSWithRetry(context.Background(), pDClient) require.NoError(t, err) }) diff --git a/br/pkg/restore/snap_client/BUILD.bazel b/br/pkg/restore/snap_client/BUILD.bazel index e10541b3b4d00..d77985e92b808 100644 --- a/br/pkg/restore/snap_client/BUILD.bazel +++ b/br/pkg/restore/snap_client/BUILD.bazel @@ -26,7 +26,6 @@ go_library( "//br/pkg/restore/internal/import_client", "//br/pkg/restore/internal/prealloc_db", "//br/pkg/restore/internal/prealloc_table_id", - "//br/pkg/restore/internal/snap_split", "//br/pkg/restore/split", "//br/pkg/restore/utils", "//br/pkg/storage", @@ -93,9 +92,9 @@ go_test( "//br/pkg/mock", "//br/pkg/restore", "//br/pkg/restore/internal/import_client", + "//br/pkg/restore/split", "//br/pkg/restore/utils", "//br/pkg/utils", - "//br/pkg/utiltest", "//pkg/domain", "//pkg/kv", "//pkg/meta/model", diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index 8e255c98743db..c627ec8c416c1 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -183,7 +183,7 @@ func (rc *SnapClient) Close() { rc.closeConn() if err := rc.fileImporter.Close(); err != nil { - log.Warn("failed to close file improter") + log.Warn("failed to close file importer") } log.Info("Restore client closed") @@ -457,6 +457,7 @@ func (rc *SnapClient) initClients(ctx context.Context, backend *backuppb.Storage if isRawKvMode { splitClientOpts = append(splitClientOpts, split.WithRawKV()) } + metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, rc.storeCount+1, splitClientOpts...) importCli := importclient.NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) rc.fileImporter, err = NewSnapFileImporter(ctx, metaClient, importCli, backend, isRawKvMode, isTxnKvMode, stores, rc.rewriteMode, rc.concurrencyPerStore) diff --git a/br/pkg/restore/snap_client/client_test.go b/br/pkg/restore/snap_client/client_test.go index 380e4421b68fd..dd919646ddf61 100644 --- a/br/pkg/restore/snap_client/client_test.go +++ b/br/pkg/restore/snap_client/client_test.go @@ -34,7 +34,7 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" importclient "github.com/pingcap/tidb/br/pkg/restore/internal/import_client" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" - "github.com/pingcap/tidb/br/pkg/utiltest" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -48,7 +48,7 @@ var mc *mock.Cluster func TestCreateTables(t *testing.T) { m := mc g := gluetidb.New() - client := snapclient.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, utiltest.DefaultTestKeepaliveCfg) + client := snapclient.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) err := client.Init(g, m.Storage) require.NoError(t, err) @@ -119,7 +119,7 @@ func TestNeedCheckTargetClusterFresh(t *testing.T) { defer cluster.Stop() g := gluetidb.New() - client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, utiltest.DefaultTestKeepaliveCfg) + client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -149,7 +149,7 @@ func TestCheckTargetClusterFresh(t *testing.T) { defer cluster.Stop() g := gluetidb.New() - client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, utiltest.DefaultTestKeepaliveCfg) + client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -166,7 +166,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) { defer cluster.Stop() g := gluetidb.New() - client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, utiltest.DefaultTestKeepaliveCfg) + client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -201,7 +201,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) { func TestInitFullClusterRestore(t *testing.T) { cluster := mc g := gluetidb.New() - client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, utiltest.DefaultTestKeepaliveCfg) + client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) @@ -309,7 +309,7 @@ func TestSetSpeedLimit(t *testing.T) { // 1. The cost of concurrent communication is expected to be less than the cost of serial communication. client := snapclient.NewRestoreClient( - utiltest.NewFakePDClient(mockStores, false, nil), nil, nil, utiltest.DefaultTestKeepaliveCfg) + split.NewFakePDClient(mockStores, false, nil), nil, nil, split.DefaultTestKeepaliveCfg) ctx := context.Background() recordStores = NewRecordStores() @@ -334,7 +334,7 @@ func TestSetSpeedLimit(t *testing.T) { recordStores = NewRecordStores() mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store client = snapclient.NewRestoreClient( - utiltest.NewFakePDClient(mockStores, false, nil), nil, nil, utiltest.DefaultTestKeepaliveCfg) + split.NewFakePDClient(mockStores, false, nil), nil, nil, split.DefaultTestKeepaliveCfg) // Concurrency needs to be less than the number of stores err = snapclient.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2) diff --git a/br/pkg/restore/snap_client/import_test.go b/br/pkg/restore/snap_client/import_test.go index 762beb3784d22..324b2ec9a007a 100644 --- a/br/pkg/restore/snap_client/import_test.go +++ b/br/pkg/restore/snap_client/import_test.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" importclient "github.com/pingcap/tidb/br/pkg/restore/internal/import_client" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" + "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" ) @@ -156,7 +156,7 @@ func (client *fakeImporterClient) MultiIngest( func TestSnapImporter(t *testing.T) { ctx := context.Background() - splitClient := utiltest.NewFakeSplitClient() + splitClient := split.NewFakeSplitClient() for _, region := range generateRegions() { splitClient.AppendPdRegion(region) } @@ -180,7 +180,7 @@ func TestSnapImporter(t *testing.T) { func TestSnapImporterRaw(t *testing.T) { ctx := context.Background() - splitClient := utiltest.NewFakeSplitClient() + splitClient := split.NewFakeSplitClient() for _, region := range generateRegions() { splitClient.AppendPdRegion(region) } diff --git a/br/pkg/restore/snap_client/placement_rule_manager_test.go b/br/pkg/restore/snap_client/placement_rule_manager_test.go index c078ebd6e48c4..1e60aaa5b93d1 100644 --- a/br/pkg/restore/snap_client/placement_rule_manager_test.go +++ b/br/pkg/restore/snap_client/placement_rule_manager_test.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/metautil" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" + "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/tablecodec" @@ -93,8 +93,8 @@ func TestContextManagerOnlineNoStores(t *testing.T) { }, } - pdClient := utiltest.NewFakePDClient(stores, false, nil) - pdHTTPCli := utiltest.NewFakePDHTTPClient() + pdClient := split.NewFakePDClient(stores, false, nil) + pdHTTPCli := split.NewFakePDHTTPClient() placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, pdClient, pdHTTPCli, nil, true) require.NoError(t, err) tables := generateTables() @@ -234,9 +234,9 @@ func TestContextManagerOnlineLeave(t *testing.T) { stores := generateStores() regions := generateRegions() - pdClient := utiltest.NewFakePDClient(stores, false, nil) + pdClient := split.NewFakePDClient(stores, false, nil) pdClient.SetRegions(regions) - pdHTTPCli := utiltest.NewFakePDHTTPClient() + pdHTTPCli := split.NewFakePDHTTPClient() placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, pdClient, pdHTTPCli, nil, true) require.NoError(t, err) tables := generateTables() diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index 358f457c55007..1c1ccb629b112 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -23,8 +23,8 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/restore" snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/br/pkg/utiltest" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -35,7 +35,7 @@ import ( func TestCheckSysTableCompatibility(t *testing.T) { cluster := mc g := gluetidb.New() - client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, utiltest.DefaultTestKeepaliveCfg) + client := snapclient.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, split.DefaultTestKeepaliveCfg) err := client.Init(g, cluster.Storage) require.NoError(t, err) diff --git a/br/pkg/restore/snap_client/tikv_sender.go b/br/pkg/restore/snap_client/tikv_sender.go index a5f27a87c4050..85aeac0d76f24 100644 --- a/br/pkg/restore/snap_client/tikv_sender.go +++ b/br/pkg/restore/snap_client/tikv_sender.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/br/pkg/checkpoint" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" - snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split" "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/summary" @@ -341,7 +340,7 @@ func (rc *SnapClient) SplitPoints( splitClientOpts = append(splitClientOpts, split.WithRawKV()) } - splitter := snapsplit.NewRegionSplitter(split.NewClient( + splitter := split.NewRegionSplitter(split.NewClient( rc.pdClient, rc.pdHTTPClient, rc.tlsConf, @@ -350,7 +349,7 @@ func (rc *SnapClient) SplitPoints( splitClientOpts..., )) - return splitter.ExecuteSplit(ctx, sortedSplitKeys) + return splitter.ExecuteSortedKeys(ctx, sortedSplitKeys) } func getFileRangeKey(f string) string { diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 2d7002b493ad2..e27bb1834d7ac 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "mock_pd_client.go", "region.go", "split.go", + "sum_sorted.go", ], importpath = "github.com/pingcap/tidb/br/pkg/restore/split", visibility = ["//visibility:public"], @@ -14,19 +15,24 @@ go_library( "//br/pkg/conn/util", "//br/pkg/errors", "//br/pkg/logutil", + "//br/pkg/pdutil", + "//br/pkg/restore/utils", "//br/pkg/utils", "//pkg/kv", "//pkg/lightning/common", "//pkg/lightning/config", "//pkg/lightning/log", "//pkg/store/pdtypes", + "//pkg/tablecodec", "//pkg/util", "//pkg/util/codec", "//pkg/util/intest", "//pkg/util/redact", "@com_github_docker_go_units//:go-units", + "@com_github_google_btree//:btree", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", @@ -39,6 +45,7 @@ go_library( "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", + "@org_golang_google_grpc//keepalive", "@org_golang_google_grpc//status", "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", @@ -52,12 +59,15 @@ go_test( srcs = [ "client_test.go", "split_test.go", + "sum_sorted_test.go", ], embed = [":split"], flaky = True, - shard_count = 19, + shard_count = 27, deps = [ "//br/pkg/errors", + "//br/pkg/restore/utils", + "//br/pkg/rtree", "//br/pkg/utils", "//pkg/kv", "//pkg/sessionctx/stmtctx", @@ -65,8 +75,11 @@ go_test( "//pkg/tablecodec", "//pkg/types", "//pkg/util/codec", + "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/pdpb", diff --git a/br/pkg/restore/split/mock_pd_client.go b/br/pkg/restore/split/mock_pd_client.go index 92cd055939926..6df6cd56f94b0 100644 --- a/br/pkg/restore/split/mock_pd_client.go +++ b/br/pkg/restore/split/mock_pd_client.go @@ -5,25 +5,172 @@ package split import ( "bytes" "context" + "math" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) +type TestClient struct { + SplitClient + pd.Client + + mu sync.RWMutex + stores map[uint64]*metapb.Store + Regions map[uint64]*RegionInfo + RegionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions + nextRegionID uint64 + + scattered map[uint64]bool + InjectErr bool + InjectTimes int32 +} + +func NewTestClient( + stores map[uint64]*metapb.Store, + regions map[uint64]*RegionInfo, + nextRegionID uint64, +) *TestClient { + regionsInfo := &pdtypes.RegionTree{} + for _, regionInfo := range regions { + regionsInfo.SetRegion(pdtypes.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) + } + return &TestClient{ + stores: stores, + Regions: regions, + RegionsInfo: regionsInfo, + nextRegionID: nextRegionID, + scattered: map[uint64]bool{}, + } +} + +func (c *TestClient) GetAllRegions() map[uint64]*RegionInfo { + c.mu.RLock() + defer c.mu.RUnlock() + return c.Regions +} + +func (c *TestClient) GetPDClient() *FakePDClient { + stores := make([]*metapb.Store, 0, len(c.stores)) + for _, store := range c.stores { + stores = append(stores, store) + } + return NewFakePDClient(stores, false, nil) +} + +func (c *TestClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { + c.mu.RLock() + defer c.mu.RUnlock() + store, ok := c.stores[storeID] + if !ok { + return nil, errors.Errorf("store not found") + } + return store, nil +} + +func (c *TestClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + for _, region := range c.Regions { + if bytes.Compare(key, region.Region.StartKey) >= 0 && + (len(region.Region.EndKey) == 0 || bytes.Compare(key, region.Region.EndKey) < 0) { + return region, nil + } + } + return nil, errors.Errorf("region not found: key=%s", string(key)) +} + +func (c *TestClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + c.mu.RLock() + defer c.mu.RUnlock() + region, ok := c.Regions[regionID] + if !ok { + return nil, errors.Errorf("region not found: id=%d", regionID) + } + return region, nil +} + +func (c *TestClient) SplitWaitAndScatter(_ context.Context, _ *RegionInfo, keys [][]byte) ([]*RegionInfo, error) { + c.mu.Lock() + defer c.mu.Unlock() + newRegions := make([]*RegionInfo, 0) + for _, key := range keys { + var target *RegionInfo + splitKey := codec.EncodeBytes([]byte{}, key) + for _, region := range c.Regions { + if region.ContainsInterior(splitKey) { + target = region + } + } + if target == nil { + continue + } + newRegion := &RegionInfo{ + Region: &metapb.Region{ + Peers: target.Region.Peers, + Id: c.nextRegionID, + StartKey: target.Region.StartKey, + EndKey: splitKey, + }, + } + c.Regions[c.nextRegionID] = newRegion + c.nextRegionID++ + target.Region.StartKey = splitKey + c.Regions[target.Region.Id] = target + newRegions = append(newRegions, newRegion) + } + return newRegions, nil +} + +func (c *TestClient) GetOperator(context.Context, uint64) (*pdpb.GetOperatorResponse, error) { + return &pdpb.GetOperatorResponse{ + Header: new(pdpb.ResponseHeader), + }, nil +} + +func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { + if c.InjectErr && c.InjectTimes > 0 { + c.InjectTimes -= 1 + return nil, status.Error(codes.Unavailable, "not leader") + } + if len(key) != 0 && bytes.Equal(key, endKey) { + return nil, status.Error(codes.Internal, "key and endKey are the same") + } + + infos := c.RegionsInfo.ScanRange(key, endKey, limit) + regions := make([]*RegionInfo, 0, len(infos)) + for _, info := range infos { + regions = append(regions, &RegionInfo{ + Region: info.Meta, + Leader: info.Leader, + }) + } + return regions, nil +} + +func (c *TestClient) WaitRegionsScattered(context.Context, []*RegionInfo) (int, error) { + return 0, nil +} + // MockPDClientForSplit is a mock PD client for testing split and scatter. type MockPDClientForSplit struct { pd.Client mu sync.Mutex + stores map[uint64]*metapb.Store Regions *pdtypes.RegionTree lastRegionID uint64 scanRegions struct { @@ -66,6 +213,13 @@ func (c *MockPDClientForSplit) SetRegions(boundaries [][]byte) []*metapb.Region return c.setRegions(boundaries) } +func (c *MockPDClientForSplit) SetStores(stores map[uint64]*metapb.Store) { + c.mu.Lock() + defer c.mu.Unlock() + + c.stores = stores +} + func (c *MockPDClientForSplit) setRegions(boundaries [][]byte) []*metapb.Region { ret := make([]*metapb.Region, 0, len(boundaries)-1) for i := 1; i < len(boundaries); i++ { @@ -236,3 +390,273 @@ func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) ( c.getOperator.responses[regionID] = c.getOperator.responses[regionID][1:] return ret, nil } + +func (c *MockPDClientForSplit) GetStore(_ context.Context, storeID uint64) (*metapb.Store, error) { + return c.stores[storeID], nil +} + +var DefaultTestKeepaliveCfg = keepalive.ClientParameters{ + Time: 3 * time.Second, + Timeout: 10 * time.Second, +} + +var ( + ExpectPDCfgGeneratorsResult = map[string]any{ + "merge-schedule-limit": 0, + "leader-schedule-limit": float64(40), + "region-schedule-limit": float64(40), + "max-snapshot-count": float64(40), + "enable-location-replacement": "false", + "max-pending-peer-count": uint64(math.MaxInt32), + } + + ExistPDCfgGeneratorBefore = map[string]any{ + "merge-schedule-limit": 100, + "leader-schedule-limit": float64(100), + "region-schedule-limit": float64(100), + "max-snapshot-count": float64(100), + "enable-location-replacement": "true", + "max-pending-peer-count": 100, + } +) + +type FakePDHTTPClient struct { + pdhttp.Client + + expireSchedulers map[string]time.Time + cfgs map[string]any + + rules map[string]*pdhttp.Rule +} + +func NewFakePDHTTPClient() *FakePDHTTPClient { + return &FakePDHTTPClient{ + expireSchedulers: make(map[string]time.Time), + cfgs: make(map[string]any), + + rules: make(map[string]*pdhttp.Rule), + } +} + +func (fpdh *FakePDHTTPClient) GetScheduleConfig(_ context.Context) (map[string]any, error) { + return ExistPDCfgGeneratorBefore, nil +} + +func (fpdh *FakePDHTTPClient) GetSchedulers(_ context.Context) ([]string, error) { + schedulers := make([]string, 0, len(pdutil.Schedulers)) + for scheduler := range pdutil.Schedulers { + schedulers = append(schedulers, scheduler) + } + return schedulers, nil +} + +func (fpdh *FakePDHTTPClient) SetSchedulerDelay(_ context.Context, key string, delay int64) error { + expireTime, ok := fpdh.expireSchedulers[key] + if ok { + if time.Now().Compare(expireTime) > 0 { + return errors.Errorf("the scheduler config set is expired") + } + if delay == 0 { + delete(fpdh.expireSchedulers, key) + } + } + if !ok && delay == 0 { + return errors.Errorf("set the nonexistent scheduler") + } + expireTime = time.Now().Add(time.Second * time.Duration(delay)) + fpdh.expireSchedulers[key] = expireTime + return nil +} + +func (fpdh *FakePDHTTPClient) SetConfig(_ context.Context, config map[string]any, ttl ...float64) error { + for key, value := range config { + fpdh.cfgs[key] = value + } + return nil +} + +func (fpdh *FakePDHTTPClient) GetConfig(_ context.Context) (map[string]any, error) { + return fpdh.cfgs, nil +} + +func (fpdh *FakePDHTTPClient) GetDelaySchedulers() map[string]struct{} { + delaySchedulers := make(map[string]struct{}) + for key, t := range fpdh.expireSchedulers { + now := time.Now() + if now.Compare(t) < 0 { + delaySchedulers[key] = struct{}{} + } + } + return delaySchedulers +} + +func (fpdh *FakePDHTTPClient) GetPlacementRule(_ context.Context, groupID string, ruleID string) (*pdhttp.Rule, error) { + rule, ok := fpdh.rules[ruleID] + if !ok { + rule = &pdhttp.Rule{ + GroupID: groupID, + ID: ruleID, + } + fpdh.rules[ruleID] = rule + } + return rule, nil +} + +func (fpdh *FakePDHTTPClient) SetPlacementRule(_ context.Context, rule *pdhttp.Rule) error { + fpdh.rules[rule.ID] = rule + return nil +} + +func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID string, ruleID string) error { + delete(fpdh.rules, ruleID) + return nil +} + +type FakePDClient struct { + pd.Client + stores []*metapb.Store + regions []*pd.Region + + notLeader bool + retryTimes *int + + peerStoreId uint64 +} + +func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *FakePDClient { + var retryTimeInternal int + if retryTime == nil { + retryTime = &retryTimeInternal + } + return &FakePDClient{ + stores: stores, + + notLeader: notLeader, + retryTimes: retryTime, + + peerStoreId: 0, + } +} + +func (fpdc *FakePDClient) SetRegions(regions []*pd.Region) { + fpdc.regions = regions +} + +func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { + return append([]*metapb.Store{}, fpdc.stores...), nil +} + +func (fpdc *FakePDClient) ScanRegions( + ctx context.Context, + key, endKey []byte, + limit int, + opts ...pd.GetRegionOption, +) ([]*pd.Region, error) { + regions := make([]*pd.Region, 0, len(fpdc.regions)) + fpdc.peerStoreId = fpdc.peerStoreId + 1 + peerStoreId := (fpdc.peerStoreId + 1) / 2 + for _, region := range fpdc.regions { + if len(endKey) != 0 && bytes.Compare(region.Meta.StartKey, endKey) >= 0 { + continue + } + if len(region.Meta.EndKey) != 0 && bytes.Compare(region.Meta.EndKey, key) <= 0 { + continue + } + region.Meta.Peers = []*metapb.Peer{{StoreId: peerStoreId}} + regions = append(regions, region) + } + return regions, nil +} + +func (fpdc *FakePDClient) BatchScanRegions( + ctx context.Context, + ranges []pd.KeyRange, + limit int, + opts ...pd.GetRegionOption, +) ([]*pd.Region, error) { + regions := make([]*pd.Region, 0, len(fpdc.regions)) + fpdc.peerStoreId = fpdc.peerStoreId + 1 + peerStoreId := (fpdc.peerStoreId + 1) / 2 + for _, region := range fpdc.regions { + inRange := false + for _, keyRange := range ranges { + if len(keyRange.EndKey) != 0 && bytes.Compare(region.Meta.StartKey, keyRange.EndKey) >= 0 { + continue + } + if len(region.Meta.EndKey) != 0 && bytes.Compare(region.Meta.EndKey, keyRange.StartKey) <= 0 { + continue + } + inRange = true + } + if inRange { + region.Meta.Peers = []*metapb.Peer{{StoreId: peerStoreId}} + regions = append(regions, region) + } + } + return nil, nil +} + +func (fpdc *FakePDClient) GetTS(ctx context.Context) (int64, int64, error) { + (*fpdc.retryTimes)++ + if *fpdc.retryTimes >= 3 { // the mock PD leader switched successfully + fpdc.notLeader = false + } + + if fpdc.notLeader { + return 0, 0, errors.Errorf( + "rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, " + + "requested pd is not leader of cluster", + ) + } + return 1, 1, nil +} + +type FakeSplitClient struct { + SplitClient + regions []*RegionInfo +} + +func NewFakeSplitClient() *FakeSplitClient { + return &FakeSplitClient{ + regions: make([]*RegionInfo, 0), + } +} + +func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte) { + f.regions = append(f.regions, &RegionInfo{ + Region: &metapb.Region{ + StartKey: startKey, + EndKey: endKey, + }, + }) +} + +func (f *FakeSplitClient) AppendPdRegion(region *pd.Region) { + f.regions = append(f.regions, &RegionInfo{ + Region: region.Meta, + Leader: region.Leader, + }) +} + +func (f *FakeSplitClient) ScanRegions( + ctx context.Context, + startKey, endKey []byte, + limit int, +) ([]*RegionInfo, error) { + result := make([]*RegionInfo, 0) + count := 0 + for _, rng := range f.regions { + if bytes.Compare(rng.Region.StartKey, endKey) <= 0 && bytes.Compare(rng.Region.EndKey, startKey) > 0 { + result = append(result, rng) + count++ + } + if count >= limit { + break + } + } + return result, nil +} + +func (f *FakeSplitClient) WaitRegionsScattered(context.Context, []*RegionInfo) (int, error) { + return 0, nil +} diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index ce6faa90b209c..f7df83cd2e3e9 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -7,18 +7,25 @@ import ( "context" "encoding/hex" goerrors "errors" + "sort" + "sync" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" + restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/lightning/config" + "github.com/pingcap/tidb/pkg/tablecodec" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/redact" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -37,6 +44,461 @@ const ( ScanRegionPaginationLimit = 128 ) +type rewriteSplitter struct { + rewriteKey []byte + tableID int64 + rule *restoreutils.RewriteRules + splitter *SplitHelper +} + +type splitHelperIterator struct { + tableSplitters []*rewriteSplitter +} + +func (iter *splitHelperIterator) Traverse(fn func(v Valued, endKey []byte, rule *restoreutils.RewriteRules) bool) { + for _, entry := range iter.tableSplitters { + endKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(entry.tableID+1)) + rule := entry.rule + entry.splitter.Traverse(func(v Valued) bool { + return fn(v, endKey, rule) + }) + } +} + +type LogSplitHelper struct { + tableSplitter map[int64]*SplitHelper + rules map[int64]*restoreutils.RewriteRules + client SplitClient + pool *util.WorkerPool + eg *errgroup.Group + regionsCh chan []*RegionInfo + + splitThresholdSize uint64 + splitThresholdKeys int64 +} + +func NewLogSplitHelper(rules map[int64]*restoreutils.RewriteRules, client SplitClient, splitSize uint64, splitKeys int64) *LogSplitHelper { + return &LogSplitHelper{ + tableSplitter: make(map[int64]*SplitHelper), + rules: rules, + client: client, + pool: util.NewWorkerPool(128, "split region"), + eg: nil, + + splitThresholdSize: splitSize, + splitThresholdKeys: splitKeys, + } +} + +func (helper *LogSplitHelper) iterator() *splitHelperIterator { + tableSplitters := make([]*rewriteSplitter, 0, len(helper.tableSplitter)) + for tableID, splitter := range helper.tableSplitter { + delete(helper.tableSplitter, tableID) + rewriteRule, exists := helper.rules[tableID] + if !exists { + log.Info("skip splitting due to no table id matched", zap.Int64("tableID", tableID)) + continue + } + newTableID := restoreutils.GetRewriteTableID(tableID, rewriteRule) + if newTableID == 0 { + log.Warn("failed to get the rewrite table id", zap.Int64("tableID", tableID)) + continue + } + tableSplitters = append(tableSplitters, &rewriteSplitter{ + rewriteKey: codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(newTableID)), + tableID: newTableID, + rule: rewriteRule, + splitter: splitter, + }) + } + sort.Slice(tableSplitters, func(i, j int) bool { + return bytes.Compare(tableSplitters[i].rewriteKey, tableSplitters[j].rewriteKey) < 0 + }) + return &splitHelperIterator{ + tableSplitters: tableSplitters, + } +} + +const splitFileThreshold = 1024 * 1024 // 1 MB + +func (helper *LogSplitHelper) skipFile(file *backuppb.DataFileInfo) bool { + _, exist := helper.rules[file.TableId] + return file.Length < splitFileThreshold || file.IsMeta || !exist +} + +func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) { + if helper.skipFile(file) { + return + } + splitHelper, exist := helper.tableSplitter[file.TableId] + if !exist { + splitHelper = NewSplitHelper() + helper.tableSplitter[file.TableId] = splitHelper + } + + splitHelper.Merge(Valued{ + Key: Span{ + StartKey: file.StartKey, + EndKey: file.EndKey, + }, + Value: Value{ + Size: file.Length, + Number: file.NumberOfEntries, + }, + }) +} + +type splitFunc = func(context.Context, *RegionSplitter, uint64, int64, *RegionInfo, []Valued) error + +func (helper *LogSplitHelper) splitRegionByPoints( + ctx context.Context, + regionSplitter *RegionSplitter, + initialLength uint64, + initialNumber int64, + region *RegionInfo, + valueds []Valued, +) error { + var ( + splitPoints [][]byte = make([][]byte, 0) + lastKey []byte = region.Region.StartKey + length uint64 = initialLength + number int64 = initialNumber + ) + for _, v := range valueds { + // decode will discard ts behind the key, which results in the same key for consecutive ranges + if !bytes.Equal(lastKey, v.GetStartKey()) && (v.Value.Size+length > helper.splitThresholdSize || v.Value.Number+number > helper.splitThresholdKeys) { + _, rawKey, _ := codec.DecodeBytes(v.GetStartKey(), nil) + splitPoints = append(splitPoints, rawKey) + length = 0 + number = 0 + } + lastKey = v.GetStartKey() + length += v.Value.Size + number += v.Value.Number + } + + if len(splitPoints) == 0 { + return nil + } + + helper.pool.ApplyOnErrorGroup(helper.eg, func() error { + newRegions, errSplit := regionSplitter.ExecuteOneRegion(ctx, region, splitPoints) + if errSplit != nil { + log.Warn("failed to split the scaned region", zap.Error(errSplit)) + sort.Slice(splitPoints, func(i, j int) bool { + return bytes.Compare(splitPoints[i], splitPoints[j]) < 0 + }) + return regionSplitter.ExecuteSortedKeys(ctx, splitPoints) + } + select { + case <-ctx.Done(): + return nil + case helper.regionsCh <- newRegions: + } + log.Info("split the region", zap.Uint64("region-id", region.Region.Id), zap.Int("split-point-number", len(splitPoints))) + return nil + }) + return nil +} + +// SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region +func SplitPoint( + ctx context.Context, + iter *splitHelperIterator, + client SplitClient, + splitF splitFunc, +) (err error) { + // common status + var ( + regionSplitter *RegionSplitter = NewRegionSplitter(client) + ) + // region traverse status + var ( + // the region buffer of each scan + regions []*RegionInfo = nil + regionIndex int = 0 + ) + // region split status + var ( + // range span +----------------+------+---+-------------+ + // region span +------------------------------------+ + // +initial length+ +end valued+ + // regionValueds is the ranges array overlapped with `regionInfo` + regionValueds []Valued = nil + // regionInfo is the region to be split + regionInfo *RegionInfo = nil + // intialLength is the length of the part of the first range overlapped with the region + initialLength uint64 = 0 + initialNumber int64 = 0 + ) + // range status + var ( + // regionOverCount is the number of regions overlapped with the range + regionOverCount uint64 = 0 + ) + + iter.Traverse(func(v Valued, endKey []byte, rule *restoreutils.RewriteRules) bool { + if v.Value.Number == 0 || v.Value.Size == 0 { + return true + } + var ( + vStartKey []byte + vEndKey []byte + ) + // use `vStartKey` and `vEndKey` to compare with region's key + vStartKey, vEndKey, err = restoreutils.GetRewriteEncodedKeys(v, rule) + if err != nil { + return false + } + // traverse to the first region overlapped with the range + for ; regionIndex < len(regions); regionIndex++ { + if bytes.Compare(vStartKey, regions[regionIndex].Region.EndKey) < 0 { + break + } + } + // cannot find any regions overlapped with the range + // need to scan regions again + if regionIndex == len(regions) { + regions = nil + } + regionOverCount = 0 + for { + if regionIndex >= len(regions) { + var startKey []byte + if len(regions) > 0 { + // has traversed over the region buffer, should scan from the last region's end-key of the region buffer + startKey = regions[len(regions)-1].Region.EndKey + } else { + // scan from the range's start-key + startKey = vStartKey + } + // scan at most 64 regions into the region buffer + regions, err = ScanRegionsWithRetry(ctx, client, startKey, endKey, 64) + if err != nil { + return false + } + regionIndex = 0 + } + + region := regions[regionIndex] + // this region must be overlapped with the range + regionOverCount++ + // the region is the last one overlapped with the range, + // should split the last recorded region, + // and then record this region as the region to be split + if bytes.Compare(vEndKey, region.Region.EndKey) < 0 { + endLength := v.Value.Size / regionOverCount + endNumber := v.Value.Number / int64(regionOverCount) + if len(regionValueds) > 0 && regionInfo != region { + // add a part of the range as the end part + if bytes.Compare(vStartKey, regionInfo.Region.EndKey) < 0 { + regionValueds = append(regionValueds, NewValued(vStartKey, regionInfo.Region.EndKey, Value{Size: endLength, Number: endNumber})) + } + // try to split the region + err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) + if err != nil { + return false + } + regionValueds = make([]Valued, 0) + } + if regionOverCount == 1 { + // the region completely contains the range + regionValueds = append(regionValueds, Valued{ + Key: Span{ + StartKey: vStartKey, + EndKey: vEndKey, + }, + Value: v.Value, + }) + } else { + // the region is overlapped with the last part of the range + initialLength = endLength + initialNumber = endNumber + } + regionInfo = region + // try the next range + return true + } + + // try the next region + regionIndex++ + } + }) + + if err != nil { + return errors.Trace(err) + } + if len(regionValueds) > 0 { + // try to split the region + err = splitF(ctx, regionSplitter, initialLength, initialNumber, regionInfo, regionValueds) + if err != nil { + return errors.Trace(err) + } + } + + return nil +} + +func (helper *LogSplitHelper) Split(ctx context.Context) error { + var ectx context.Context + var wg sync.WaitGroup + helper.eg, ectx = errgroup.WithContext(ctx) + helper.regionsCh = make(chan []*RegionInfo, 1024) + wg.Add(1) + go func() { + defer wg.Done() + scatterRegions := make([]*RegionInfo, 0) + receiveNewRegions: + for { + select { + case <-ctx.Done(): + return + case newRegions, ok := <-helper.regionsCh: + if !ok { + break receiveNewRegions + } + + scatterRegions = append(scatterRegions, newRegions...) + } + } + + regionSplitter := NewRegionSplitter(helper.client) + // It is too expensive to stop recovery and wait for a small number of regions + // to complete scatter, so the maximum waiting time is reduced to 1 minute. + _ = regionSplitter.WaitForScatterRegionsTimeout(ctx, scatterRegions, time.Minute) + }() + + iter := helper.iterator() + if err := SplitPoint(ectx, iter, helper.client, helper.splitRegionByPoints); err != nil { + return errors.Trace(err) + } + + // wait for completion of splitting regions + if err := helper.eg.Wait(); err != nil { + return errors.Trace(err) + } + + // wait for completion of scattering regions + close(helper.regionsCh) + wg.Wait() + + return nil +} + +// RegionSplitter is a executor of region split by rules. +type RegionSplitter struct { + client SplitClient +} + +// NewRegionSplitter returns a new RegionSplitter. +func NewRegionSplitter(client SplitClient) *RegionSplitter { + return &RegionSplitter{ + client: client, + } +} + +// ExecuteOneRegion expose the function `SplitWaitAndScatter` of split client. +func (rs *RegionSplitter) ExecuteOneRegion(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) { + return rs.client.SplitWaitAndScatter(ctx, region, keys) +} + +// ExecuteSortedKeys executes regions split and make sure new splitted regions are balance. +// It will split regions by the rewrite rules, +// then it will split regions by the end key of each range. +// tableRules includes the prefix of a table, since some ranges may have +// a prefix with record sequence or index sequence. +// note: all ranges and rewrite rules must have raw key. +func (rs *RegionSplitter) ExecuteSortedKeys( + ctx context.Context, + sortedSplitKeys [][]byte, +) error { + if len(sortedSplitKeys) == 0 { + log.Info("skip split regions, no split keys") + return nil + } + + log.Info("execute split sorted keys", zap.Int("keys count", len(sortedSplitKeys))) + return rs.executeSplitByRanges(ctx, sortedSplitKeys) +} + +func (rs *RegionSplitter) executeSplitByRanges( + ctx context.Context, + sortedKeys [][]byte, +) error { + startTime := time.Now() + // Choose the rough region split keys, + // each splited region contains 128 regions to be splitted. + const regionIndexStep = 128 + + roughSortedSplitKeys := make([][]byte, 0, len(sortedKeys)/regionIndexStep+1) + for curRegionIndex := regionIndexStep; curRegionIndex < len(sortedKeys); curRegionIndex += regionIndexStep { + roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex]) + } + if len(roughSortedSplitKeys) > 0 { + if err := rs.executeSplitByKeys(ctx, roughSortedSplitKeys); err != nil { + return errors.Trace(err) + } + } + log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime))) + + // Then send split requests to each TiKV. + if err := rs.executeSplitByKeys(ctx, sortedKeys); err != nil { + return errors.Trace(err) + } + + log.Info("finish spliting and scattering regions", zap.Duration("take", time.Since(startTime))) + return nil +} + +// executeSplitByKeys will split regions by **sorted** keys with following steps. +// 1. locate regions with correspond keys. +// 2. split these regions with correspond keys. +// 3. make sure new split regions are balanced. +func (rs *RegionSplitter) executeSplitByKeys( + ctx context.Context, + sortedKeys [][]byte, +) error { + startTime := time.Now() + scatterRegions, err := rs.client.SplitKeysAndScatter(ctx, sortedKeys) + if err != nil { + return errors.Trace(err) + } + if len(scatterRegions) > 0 { + log.Info("finish splitting and scattering regions. and starts to wait", zap.Int("regions", len(scatterRegions)), + zap.Duration("take", time.Since(startTime))) + rs.waitRegionsScattered(ctx, scatterRegions, ScatterWaitUpperInterval) + } else { + log.Info("finish splitting regions.", zap.Duration("take", time.Since(startTime))) + } + return nil +} + +// waitRegionsScattered try to wait mutilple regions scatterd in 3 minutes. +// this could timeout, but if many regions scatterd the restore could continue +// so we don't wait long time here. +func (rs *RegionSplitter) waitRegionsScattered(ctx context.Context, scatterRegions []*RegionInfo, timeout time.Duration) { + log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions))) + startTime := time.Now() + leftCnt := rs.WaitForScatterRegionsTimeout(ctx, scatterRegions, timeout) + if leftCnt == 0 { + log.Info("waiting for scattering regions done", + zap.Int("regions", len(scatterRegions)), + zap.Duration("take", time.Since(startTime))) + } else { + log.Warn("waiting for scattering regions timeout", + zap.Int("not scattered Count", leftCnt), + zap.Int("regions", len(scatterRegions)), + zap.Duration("take", time.Since(startTime))) + } +} + +func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*RegionInfo, timeout time.Duration) int { + ctx2, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + leftRegions, _ := rs.client.WaitRegionsScattered(ctx2, regionInfos) + return leftRegions +} + func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { // current pd can't guarantee the consistency of returned regions if len(regions) == 0 { diff --git a/br/pkg/restore/split/split_test.go b/br/pkg/restore/split/split_test.go index 2250f7a96635c..4acef5dac84b7 100644 --- a/br/pkg/restore/split/split_test.go +++ b/br/pkg/restore/split/split_test.go @@ -5,16 +5,23 @@ import ( "bytes" "context" goerrors "errors" + "fmt" "slices" + "sort" "testing" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" berrors "github.com/pingcap/tidb/br/pkg/errors" + restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" + "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/pdtypes" @@ -756,3 +763,351 @@ func TestScanRegionsWithRetry(t *testing.T) { require.Equal(t, []byte("2"), regions[1].Region.StartKey) } } + +func TestScanEmptyRegion(t *testing.T) { + mockPDCli := NewMockPDClientForSplit() + mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) + client := NewClient(mockPDCli, nil, nil, 100, 4) + keys := initKeys() + // make keys has only one + keys = keys[0:1] + regionSplitter := NewRegionSplitter(client) + + ctx := context.Background() + err := regionSplitter.ExecuteSortedKeys(ctx, keys) + // should not return error with only one range entry + require.NoError(t, err) +} + +func TestSplitEmptyRegion(t *testing.T) { + mockPDCli := NewMockPDClientForSplit() + mockPDCli.SetRegions([][]byte{{}, {12}, {34}, {}}) + client := NewClient(mockPDCli, nil, nil, 100, 4) + regionSplitter := NewRegionSplitter(client) + err := regionSplitter.ExecuteSortedKeys(context.Background(), nil) + require.NoError(t, err) +} + +// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) +// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) +// rewrite rules: aa -> xx, cc -> bb +// expected regions after split: +// +// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), +// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) +func TestSplitAndScatter(t *testing.T) { + rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + encodeBytes(rangeBoundaries) + mockPDCli := NewMockPDClientForSplit() + mockPDCli.SetRegions(rangeBoundaries) + client := NewClient(mockPDCli, nil, nil, 100, 4) + regionSplitter := NewRegionSplitter(client) + ctx := context.Background() + + ranges := initRanges() + rules := initRewriteRules() + splitKeys := make([][]byte, 0, len(ranges)) + for _, rg := range ranges { + tmp, err := restoreutils.RewriteRange(&rg, rules) + require.NoError(t, err) + splitKeys = append(splitKeys, tmp.EndKey) + } + sort.Slice(splitKeys, func(i, j int) bool { + return bytes.Compare(splitKeys[i], splitKeys[j]) < 0 + }) + err := regionSplitter.ExecuteSortedKeys(ctx, splitKeys) + require.NoError(t, err) + regions := mockPDCli.Regions.ScanRange(nil, nil, 100) + expected := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbf"), []byte("bbh"), []byte("bbj"), []byte("cca"), []byte("xxe"), []byte("xxz"), []byte("")} + encodeBytes(expected) + require.Len(t, regions, len(expected)-1) + for i, region := range regions { + require.Equal(t, expected[i], region.Meta.StartKey) + require.Equal(t, expected[i+1], region.Meta.EndKey) + } +} + +func encodeBytes(keys [][]byte) { + for i := range keys { + if len(keys[i]) == 0 { + continue + } + keys[i] = codec.EncodeBytes(nil, keys[i]) + } +} + +func TestRawSplit(t *testing.T) { + // Fix issue #36490. + splitKeys := [][]byte{{}} + ctx := context.Background() + rangeBoundaries := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} + mockPDCli := NewMockPDClientForSplit() + mockPDCli.SetRegions(rangeBoundaries) + client := NewClient(mockPDCli, nil, nil, 100, 4, WithRawKV()) + + regionSplitter := NewRegionSplitter(client) + err := regionSplitter.ExecuteSortedKeys(ctx, splitKeys) + require.NoError(t, err) + + regions := mockPDCli.Regions.ScanRange(nil, nil, 100) + require.Len(t, regions, len(rangeBoundaries)-1) + for i, region := range regions { + require.Equal(t, rangeBoundaries[i], region.Meta.StartKey) + require.Equal(t, rangeBoundaries[i+1], region.Meta.EndKey) + } +} + +// keys: aae, aaz, ccf, ccj +func initKeys() [][]byte { + return [][]byte{ + []byte("aae"), + []byte("aaz"), + []byte("ccf"), + []byte("ccj"), + } +} + +// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj) +func initRanges() []rtree.Range { + var ranges [4]rtree.Range + ranges[0] = rtree.Range{ + StartKey: []byte("aaa"), + EndKey: []byte("aae"), + } + ranges[1] = rtree.Range{ + StartKey: []byte("aae"), + EndKey: []byte("aaz"), + } + ranges[2] = rtree.Range{ + StartKey: []byte("ccd"), + EndKey: []byte("ccf"), + } + ranges[3] = rtree.Range{ + StartKey: []byte("ccf"), + EndKey: []byte("ccj"), + } + return ranges[:] +} + +func initRewriteRules() *restoreutils.RewriteRules { + var rules [2]*import_sstpb.RewriteRule + rules[0] = &import_sstpb.RewriteRule{ + OldKeyPrefix: []byte("aa"), + NewKeyPrefix: []byte("xx"), + } + rules[1] = &import_sstpb.RewriteRule{ + OldKeyPrefix: []byte("cc"), + NewKeyPrefix: []byte("bb"), + } + return &restoreutils.RewriteRules{ + Data: rules[:], + } +} + +func keyWithTablePrefix(tableID int64, key string) []byte { + rawKey := append(tablecodec.GenTableRecordPrefix(tableID), []byte(key)...) + return codec.EncodeBytes([]byte{}, rawKey) +} + +func TestSplitPoint(t *testing.T) { + ctx := context.Background() + var oldTableID int64 = 50 + var tableID int64 = 100 + rewriteRules := &restoreutils.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, + }, + } + + // range: b c d e g i + // +---+ +---+ +---------+ + // +-------------+----------+---------+ + // region: a f h j + splitHelper := NewSplitHelper() + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: Value{Size: 100, Number: 100}}) + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: Value{Size: 200, Number: 200}}) + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "g"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: Value{Size: 300, Number: 300}}) + client := NewFakeSplitClient() + client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "f")) + client.AppendRegion(keyWithTablePrefix(tableID, "f"), keyWithTablePrefix(tableID, "h")) + client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "j")) + client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a")) + + iter := NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) + err := SplitPoint(ctx, iter, client, func(ctx context.Context, rs *RegionSplitter, u uint64, o int64, ri *RegionInfo, v []Valued) error { + require.Equal(t, u, uint64(0)) + require.Equal(t, o, int64(0)) + require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) + require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "f")) + require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) + require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "c")) + require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) + require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) + require.Equal(t, len(v), 2) + return nil + }) + require.NoError(t, err) +} + +func getCharFromNumber(prefix string, i int) string { + c := '1' + (i % 10) + b := '1' + (i%100)/10 + a := '1' + i/100 + return fmt.Sprintf("%s%c%c%c", prefix, a, b, c) +} + +func TestSplitPoint2(t *testing.T) { + ctx := context.Background() + var oldTableID int64 = 50 + var tableID int64 = 100 + rewriteRules := &restoreutils.RewriteRules{ + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(tableID), + }, + }, + } + + // range: b c d e f i j k l n + // +---+ +---+ +-----------------+ +----+ +--------+ + // +---------------+--+.....+----+------------+---------+ + // region: a g >128 h m o + splitHelper := NewSplitHelper() + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "b"), EndKey: keyWithTablePrefix(oldTableID, "c")}, Value: Value{Size: 100, Number: 100}}) + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "d"), EndKey: keyWithTablePrefix(oldTableID, "e")}, Value: Value{Size: 200, Number: 200}}) + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "f"), EndKey: keyWithTablePrefix(oldTableID, "i")}, Value: Value{Size: 300, Number: 300}}) + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "j"), EndKey: keyWithTablePrefix(oldTableID, "k")}, Value: Value{Size: 200, Number: 200}}) + splitHelper.Merge(Valued{Key: Span{StartKey: keyWithTablePrefix(oldTableID, "l"), EndKey: keyWithTablePrefix(oldTableID, "n")}, Value: Value{Size: 200, Number: 200}}) + client := NewFakeSplitClient() + client.AppendRegion(keyWithTablePrefix(tableID, "a"), keyWithTablePrefix(tableID, "g")) + client.AppendRegion(keyWithTablePrefix(tableID, "g"), keyWithTablePrefix(tableID, getCharFromNumber("g", 0))) + for i := 0; i < 256; i++ { + client.AppendRegion(keyWithTablePrefix(tableID, getCharFromNumber("g", i)), keyWithTablePrefix(tableID, getCharFromNumber("g", i+1))) + } + client.AppendRegion(keyWithTablePrefix(tableID, getCharFromNumber("g", 256)), keyWithTablePrefix(tableID, "h")) + client.AppendRegion(keyWithTablePrefix(tableID, "h"), keyWithTablePrefix(tableID, "m")) + client.AppendRegion(keyWithTablePrefix(tableID, "m"), keyWithTablePrefix(tableID, "o")) + client.AppendRegion(keyWithTablePrefix(tableID, "o"), keyWithTablePrefix(tableID+1, "a")) + + firstSplit := true + iter := NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules) + err := SplitPoint(ctx, iter, client, func(ctx context.Context, rs *RegionSplitter, u uint64, o int64, ri *RegionInfo, v []Valued) error { + if firstSplit { + require.Equal(t, u, uint64(0)) + require.Equal(t, o, int64(0)) + require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a")) + require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "g")) + require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "b")) + require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "c")) + require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "d")) + require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "e")) + require.EqualValues(t, v[2].Key.StartKey, keyWithTablePrefix(tableID, "f")) + require.EqualValues(t, v[2].Key.EndKey, keyWithTablePrefix(tableID, "g")) + require.Equal(t, v[2].Value.Size, uint64(1)) + require.Equal(t, v[2].Value.Number, int64(1)) + require.Equal(t, len(v), 3) + firstSplit = false + } else { + require.Equal(t, u, uint64(1)) + require.Equal(t, o, int64(1)) + require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "h")) + require.Equal(t, ri.Region.EndKey, keyWithTablePrefix(tableID, "m")) + require.EqualValues(t, v[0].Key.StartKey, keyWithTablePrefix(tableID, "j")) + require.EqualValues(t, v[0].Key.EndKey, keyWithTablePrefix(tableID, "k")) + require.EqualValues(t, v[1].Key.StartKey, keyWithTablePrefix(tableID, "l")) + require.EqualValues(t, v[1].Key.EndKey, keyWithTablePrefix(tableID, "m")) + require.Equal(t, v[1].Value.Size, uint64(100)) + require.Equal(t, v[1].Value.Number, int64(100)) + require.Equal(t, len(v), 2) + } + return nil + }) + require.NoError(t, err) +} + +func fakeFile(tableID, rowID int64, length uint64, num int64) *backuppb.DataFileInfo { + return &backuppb.DataFileInfo{ + StartKey: fakeRowKey(tableID, rowID), + EndKey: fakeRowKey(tableID, rowID+1), + TableId: tableID, + Length: length, + NumberOfEntries: num, + } +} + +func fakeRowKey(tableID, rowID int64) kv.Key { + return codec.EncodeBytes(nil, tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))) +} + +func TestLogSplitHelper(t *testing.T) { + ctx := context.Background() + rules := map[int64]*restoreutils.RewriteRules{ + 1: { + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.GenTableRecordPrefix(1), + NewKeyPrefix: tablecodec.GenTableRecordPrefix(100), + }, + }, + }, + 2: { + Data: []*import_sstpb.RewriteRule{ + { + OldKeyPrefix: tablecodec.GenTableRecordPrefix(2), + NewKeyPrefix: tablecodec.GenTableRecordPrefix(200), + }, + }, + }, + } + oriRegions := [][]byte{ + {}, + codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)), + codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)), + codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)), + } + mockPDCli := NewMockPDClientForSplit() + mockPDCli.SetRegions(oriRegions) + client := NewClient(mockPDCli, nil, nil, 100, 4) + helper := NewLogSplitHelper(rules, client, 4*units.MiB, 400) + + helper.Merge(fakeFile(1, 100, 100, 100)) + helper.Merge(fakeFile(1, 200, 2*units.MiB, 200)) + helper.Merge(fakeFile(2, 100, 3*units.MiB, 300)) + helper.Merge(fakeFile(3, 100, 10*units.MiB, 100000)) + // different regions, no split happens + err := helper.Split(ctx) + require.NoError(t, err) + regions, err := mockPDCli.ScanRegions(ctx, []byte{}, []byte{}, 0) + require.NoError(t, err) + require.Len(t, regions, 3) + require.Equal(t, []byte{}, regions[0].Meta.StartKey) + require.Equal(t, codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(100)), regions[1].Meta.StartKey) + require.Equal(t, codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(200)), regions[2].Meta.StartKey) + require.Equal(t, codec.EncodeBytes(nil, tablecodec.EncodeTablePrefix(402)), regions[2].Meta.EndKey) + + helper.Merge(fakeFile(1, 300, 3*units.MiB, 10)) + helper.Merge(fakeFile(1, 400, 4*units.MiB, 10)) + // trigger to split regions for table 1 + err = helper.Split(ctx) + require.NoError(t, err) + regions, err = mockPDCli.ScanRegions(ctx, []byte{}, []byte{}, 0) + require.NoError(t, err) + require.Len(t, regions, 4) + require.Equal(t, fakeRowKey(100, 400), kv.Key(regions[1].Meta.EndKey)) +} + +func NewSplitHelperIteratorForTest(helper *SplitHelper, tableID int64, rule *restoreutils.RewriteRules) *splitHelperIterator { + return &splitHelperIterator{ + tableSplitters: []*rewriteSplitter{ + { + tableID: tableID, + rule: rule, + splitter: helper, + }, + }, + } +} diff --git a/br/pkg/restore/internal/log_split/sum_sorted.go b/br/pkg/restore/split/sum_sorted.go similarity index 99% rename from br/pkg/restore/internal/log_split/sum_sorted.go rename to br/pkg/restore/split/sum_sorted.go index fb5d3d8f9a0a2..1ab51588ba6ca 100644 --- a/br/pkg/restore/internal/log_split/sum_sorted.go +++ b/br/pkg/restore/split/sum_sorted.go @@ -1,5 +1,5 @@ // Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. -package logsplit +package split import ( "bytes" diff --git a/br/pkg/restore/internal/log_split/sum_sorted_test.go b/br/pkg/restore/split/sum_sorted_test.go similarity index 87% rename from br/pkg/restore/internal/log_split/sum_sorted_test.go rename to br/pkg/restore/split/sum_sorted_test.go index 634ed93f003b1..965a39d6242c4 100644 --- a/br/pkg/restore/internal/log_split/sum_sorted_test.go +++ b/br/pkg/restore/split/sum_sorted_test.go @@ -1,17 +1,17 @@ // Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. -package logsplit_test +package split_test import ( "fmt" "testing" - logsplit "github.com/pingcap/tidb/br/pkg/restore/internal/log_split" + split "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/stretchr/testify/require" ) -func v(s, e string, val logsplit.Value) logsplit.Valued { - return logsplit.Valued{ - Key: logsplit.Span{ +func v(s, e string, val split.Value) split.Valued { + return split.Valued{ + Key: split.Span{ StartKey: []byte(s), EndKey: []byte(e), }, @@ -19,8 +19,8 @@ func v(s, e string, val logsplit.Value) logsplit.Valued { } } -func mb(b uint64) logsplit.Value { - return logsplit.Value{ +func mb(b uint64) split.Value { + return split.Value{ Size: b * 1024 * 1024, Number: int64(b), } @@ -32,12 +32,12 @@ func exportString(startKey, endKey, size string, number int) string { func TestSumSorted(t *testing.T) { cases := []struct { - values []logsplit.Valued + values []split.Valued result []uint64 strs []string }{ { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("d", "g", mb(100)), @@ -50,7 +50,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("d", "f", mb(100)), @@ -63,7 +63,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -76,7 +76,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -91,7 +91,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -108,7 +108,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -125,7 +125,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -142,7 +142,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -159,7 +159,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -176,7 +176,7 @@ func TestSumSorted(t *testing.T) { }, }, { - values: []logsplit.Valued{ + values: []split.Valued{ v("a", "f", mb(100)), v("a", "c", mb(200)), v("c", "f", mb(100)), @@ -195,14 +195,14 @@ func TestSumSorted(t *testing.T) { } for _, ca := range cases { - full := logsplit.NewSplitHelper() + full := split.NewSplitHelper() for i, v := range ca.values { require.Equal(t, ca.strs[i], v.String()) full.Merge(v) } i := 0 - full.Traverse(func(v logsplit.Valued) bool { + full.Traverse(func(v split.Valued) bool { require.Equal(t, mb(ca.result[i]), v.Value) i++ return true diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 163d28fd6b0e2..77aba10ac0fcf 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -126,6 +126,7 @@ go_test( "//br/pkg/metautil", "//br/pkg/mock", "//br/pkg/restore/snap_client", + "//br/pkg/restore/split", "//br/pkg/restore/tiflashrec", "//br/pkg/storage", "//br/pkg/stream", diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 4713e5a540ab7..86ceb3755ee09 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -19,6 +19,7 @@ import ( gluemock "github.com/pingcap/tidb/br/pkg/gluetidb/mock" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/br/pkg/task" utiltest "github.com/pingcap/tidb/br/pkg/utiltest" @@ -58,7 +59,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) { }, } - pdClient := utiltest.NewFakePDClient(mockStores, false, nil) + pdClient := split.NewFakePDClient(mockStores, false, nil) tables := make([]*metautil.Table, 4) for i := 0; i < len(tables); i++ { diff --git a/br/pkg/utiltest/BUILD.bazel b/br/pkg/utiltest/BUILD.bazel index c8c70993f1488..97781c99c589d 100644 --- a/br/pkg/utiltest/BUILD.bazel +++ b/br/pkg/utiltest/BUILD.bazel @@ -2,23 +2,13 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "utiltest", - srcs = [ - "fake.go", - "suite.go", - ], + srcs = ["suite.go"], importpath = "github.com/pingcap/tidb/br/pkg/utiltest", visibility = ["//visibility:public"], deps = [ "//br/pkg/gluetidb/mock", "//br/pkg/mock", - "//br/pkg/pdutil", - "//br/pkg/restore/split", "//br/pkg/storage", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pkg_errors//:errors", "@com_github_stretchr_testify//require", - "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//http", - "@org_golang_google_grpc//keepalive", ], ) diff --git a/br/pkg/utiltest/fake.go b/br/pkg/utiltest/fake.go deleted file mode 100644 index 2d4cbaaef29ea..0000000000000 --- a/br/pkg/utiltest/fake.go +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utiltest - -import ( - "bytes" - "context" - "math" - "time" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pingcap/tidb/br/pkg/restore/split" - "github.com/pkg/errors" - pd "github.com/tikv/pd/client" - pdhttp "github.com/tikv/pd/client/http" - "google.golang.org/grpc/keepalive" -) - -var DefaultTestKeepaliveCfg = keepalive.ClientParameters{ - Time: 3 * time.Second, - Timeout: 10 * time.Second, -} - -var ( - ExpectPDCfgGeneratorsResult = map[string]any{ - "merge-schedule-limit": 0, - "leader-schedule-limit": float64(40), - "region-schedule-limit": float64(40), - "max-snapshot-count": float64(40), - "enable-location-replacement": "false", - "max-pending-peer-count": uint64(math.MaxInt32), - } - - ExistPDCfgGeneratorBefore = map[string]any{ - "merge-schedule-limit": 100, - "leader-schedule-limit": float64(100), - "region-schedule-limit": float64(100), - "max-snapshot-count": float64(100), - "enable-location-replacement": "true", - "max-pending-peer-count": 100, - } -) - -type FakePDHTTPClient struct { - pdhttp.Client - - expireSchedulers map[string]time.Time - cfgs map[string]any - - rules map[string]*pdhttp.Rule -} - -func NewFakePDHTTPClient() *FakePDHTTPClient { - return &FakePDHTTPClient{ - expireSchedulers: make(map[string]time.Time), - cfgs: make(map[string]any), - - rules: make(map[string]*pdhttp.Rule), - } -} - -func (fpdh *FakePDHTTPClient) GetScheduleConfig(_ context.Context) (map[string]any, error) { - return ExistPDCfgGeneratorBefore, nil -} - -func (fpdh *FakePDHTTPClient) GetSchedulers(_ context.Context) ([]string, error) { - schedulers := make([]string, 0, len(pdutil.Schedulers)) - for scheduler := range pdutil.Schedulers { - schedulers = append(schedulers, scheduler) - } - return schedulers, nil -} - -func (fpdh *FakePDHTTPClient) SetSchedulerDelay(_ context.Context, key string, delay int64) error { - expireTime, ok := fpdh.expireSchedulers[key] - if ok { - if time.Now().Compare(expireTime) > 0 { - return errors.Errorf("the scheduler config set is expired") - } - if delay == 0 { - delete(fpdh.expireSchedulers, key) - } - } - if !ok && delay == 0 { - return errors.Errorf("set the nonexistent scheduler") - } - expireTime = time.Now().Add(time.Second * time.Duration(delay)) - fpdh.expireSchedulers[key] = expireTime - return nil -} - -func (fpdh *FakePDHTTPClient) SetConfig(_ context.Context, config map[string]any, ttl ...float64) error { - for key, value := range config { - fpdh.cfgs[key] = value - } - return nil -} - -func (fpdh *FakePDHTTPClient) GetConfig(_ context.Context) (map[string]any, error) { - return fpdh.cfgs, nil -} - -func (fpdh *FakePDHTTPClient) GetDelaySchedulers() map[string]struct{} { - delaySchedulers := make(map[string]struct{}) - for key, t := range fpdh.expireSchedulers { - now := time.Now() - if now.Compare(t) < 0 { - delaySchedulers[key] = struct{}{} - } - } - return delaySchedulers -} - -func (fpdh *FakePDHTTPClient) GetPlacementRule(_ context.Context, groupID string, ruleID string) (*pdhttp.Rule, error) { - rule, ok := fpdh.rules[ruleID] - if !ok { - rule = &pdhttp.Rule{ - GroupID: groupID, - ID: ruleID, - } - fpdh.rules[ruleID] = rule - } - return rule, nil -} - -func (fpdh *FakePDHTTPClient) SetPlacementRule(_ context.Context, rule *pdhttp.Rule) error { - fpdh.rules[rule.ID] = rule - return nil -} - -func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID string, ruleID string) error { - delete(fpdh.rules, ruleID) - return nil -} - -type FakePDClient struct { - pd.Client - stores []*metapb.Store - regions []*pd.Region - - notLeader bool - retryTimes *int - - peerStoreId uint64 -} - -func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *FakePDClient { - var retryTimeInternal int - if retryTime == nil { - retryTime = &retryTimeInternal - } - return &FakePDClient{ - stores: stores, - - notLeader: notLeader, - retryTimes: retryTime, - - peerStoreId: 0, - } -} - -func (fpdc *FakePDClient) SetRegions(regions []*pd.Region) { - fpdc.regions = regions -} - -func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { - return append([]*metapb.Store{}, fpdc.stores...), nil -} - -func (fpdc *FakePDClient) ScanRegions( - ctx context.Context, - key, endKey []byte, - limit int, - opts ...pd.GetRegionOption, -) ([]*pd.Region, error) { - regions := make([]*pd.Region, 0, len(fpdc.regions)) - fpdc.peerStoreId = fpdc.peerStoreId + 1 - peerStoreId := (fpdc.peerStoreId + 1) / 2 - for _, region := range fpdc.regions { - if len(endKey) != 0 && bytes.Compare(region.Meta.StartKey, endKey) >= 0 { - continue - } - if len(region.Meta.EndKey) != 0 && bytes.Compare(region.Meta.EndKey, key) <= 0 { - continue - } - region.Meta.Peers = []*metapb.Peer{{StoreId: peerStoreId}} - regions = append(regions, region) - } - return regions, nil -} - -func (fpdc *FakePDClient) BatchScanRegions( - ctx context.Context, - ranges []pd.KeyRange, - limit int, - opts ...pd.GetRegionOption, -) ([]*pd.Region, error) { - regions := make([]*pd.Region, 0, len(fpdc.regions)) - fpdc.peerStoreId = fpdc.peerStoreId + 1 - peerStoreId := (fpdc.peerStoreId + 1) / 2 - for _, region := range fpdc.regions { - inRange := false - for _, keyRange := range ranges { - if len(keyRange.EndKey) != 0 && bytes.Compare(region.Meta.StartKey, keyRange.EndKey) >= 0 { - continue - } - if len(region.Meta.EndKey) != 0 && bytes.Compare(region.Meta.EndKey, keyRange.StartKey) <= 0 { - continue - } - inRange = true - } - if inRange { - region.Meta.Peers = []*metapb.Peer{{StoreId: peerStoreId}} - regions = append(regions, region) - } - } - return nil, nil -} - -func (fpdc *FakePDClient) GetTS(ctx context.Context) (int64, int64, error) { - (*fpdc.retryTimes)++ - if *fpdc.retryTimes >= 3 { // the mock PD leader switched successfully - fpdc.notLeader = false - } - - if fpdc.notLeader { - return 0, 0, errors.Errorf( - "rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, " + - "requested pd is not leader of cluster", - ) - } - return 1, 1, nil -} - -type FakeSplitClient struct { - split.SplitClient - regions []*split.RegionInfo -} - -func NewFakeSplitClient() *FakeSplitClient { - return &FakeSplitClient{ - regions: make([]*split.RegionInfo, 0), - } -} - -func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte) { - f.regions = append(f.regions, &split.RegionInfo{ - Region: &metapb.Region{ - StartKey: startKey, - EndKey: endKey, - }, - }) -} - -func (f *FakeSplitClient) AppendPdRegion(region *pd.Region) { - f.regions = append(f.regions, &split.RegionInfo{ - Region: region.Meta, - Leader: region.Leader, - }) -} - -func (f *FakeSplitClient) ScanRegions( - ctx context.Context, - startKey, endKey []byte, - limit int, -) ([]*split.RegionInfo, error) { - result := make([]*split.RegionInfo, 0) - count := 0 - for _, rng := range f.regions { - if bytes.Compare(rng.Region.StartKey, endKey) <= 0 && bytes.Compare(rng.Region.EndKey, startKey) > 0 { - result = append(result, rng) - count++ - } - if count >= limit { - break - } - } - return result, nil -} - -func (f *FakeSplitClient) WaitRegionsScattered(context.Context, []*split.RegionInfo) (int, error) { - return 0, nil -} diff --git a/pkg/lightning/backend/local/BUILD.bazel b/pkg/lightning/backend/local/BUILD.bazel index 0c2f1811a34c9..80fd7691d13cc 100644 --- a/pkg/lightning/backend/local/BUILD.bazel +++ b/pkg/lightning/backend/local/BUILD.bazel @@ -129,7 +129,6 @@ go_test( "//br/pkg/mock/mocklocal", "//br/pkg/restore/split", "//br/pkg/storage", - "//br/pkg/utiltest", "//pkg/ddl", "//pkg/errno", "//pkg/keyspace", diff --git a/pkg/lightning/backend/local/local_check_test.go b/pkg/lightning/backend/local/local_check_test.go index 87dba07ef56b5..933a366f697a2 100644 --- a/pkg/lightning/backend/local/local_check_test.go +++ b/pkg/lightning/backend/local/local_check_test.go @@ -23,7 +23,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/br/pkg/utiltest" + "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/pkg/lightning/backend" "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/common" @@ -100,7 +100,7 @@ func TestGetRegionSplitSizeKeys(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cli := utiltest.NewFakePDClient(allStores, false, nil) + cli := split.NewFakePDClient(allStores, false, nil) defer func() { local.SetGetSplitConfFromStoreFunc(local.GetSplitConfFromStore) }()