diff --git a/ddl/split_region.go b/ddl/split_region.go index 2a252268f0c90..79472faad43ea 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -14,6 +14,8 @@ package ddl import ( + "context" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" @@ -68,19 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat // And the max _tidb_rowid is 9223372036854775807, it won't be negative number. // Split table region. - regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices)) step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions)) max := int64(1 << tbInfo.ShardRowIDBits) + splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions)) for p := int64(step); p < max; p += step { recordID := p << (64 - tbInfo.ShardRowIDBits - 1) recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID) key := tablecodec.EncodeRecordKey(recordPrefix, recordID) - regionID, err := store.SplitRegion(key, scatter) - if err != nil { - logutil.BgLogger().Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err)) - } else { - regionIDs = append(regionIDs, regionID) - } + splitTableKeys = append(splitTableKeys, key) + } + var err error + regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter) + if err != nil { + logutil.BgLogger().Warn("[ddl] pre split some table regions failed", + zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...) if scatter { @@ -90,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 { tableStartKey := tablecodec.GenTablePrefix(tableID) - regionID, err := store.SplitRegion(tableStartKey, scatter) + regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter) if err != nil { // It will be automatically split by TiKV later. logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err)) } - return regionID + if len(regionIDs) == 1 { + return regionIDs[0] + } + return 0 } func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 { - regionIDs := make([]uint64, 0, len(tblInfo.Indices)) + splitKeys := make([][]byte, 0, len(tblInfo.Indices)) for _, idx := range tblInfo.Indices { indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID) - regionID, err := store.SplitRegion(indexPrefix, scatter) - if err != nil { - logutil.BgLogger().Warn("[ddl] pre split table index region failed", - zap.Stringer("table", tblInfo.Name), - zap.Stringer("index", idx.Name), - zap.Error(err)) - } - regionIDs = append(regionIDs, regionID) + splitKeys = append(splitKeys, indexPrefix) + } + regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter) + if err != nil { + logutil.BgLogger().Warn("[ddl] pre split some table index regions failed", + zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } return regionIDs } diff --git a/executor/executor_test.go b/executor/executor_test.go index aa851ff2da96c..f2eb2448d6bd9 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2153,7 +2153,7 @@ func (s *testSuiteP1) TestBatchPointGetRepeatableRead(c *C) { } func (s *testSuite4) TestSplitRegionTimeout(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -2162,7 +2162,7 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) { tk.MustExec(`set @@tidb_wait_split_region_timeout=1`) // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil) } func (s *testSuiteP1) TestRow(c *C) { @@ -4050,7 +4050,7 @@ func (s *testSuiteP1) TestReadPartitionedTable(c *C) { func (s *testSuiteP1) TestSplitRegion(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists t, t1") tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))") tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`) _, err := tk.Exec(`split table t index idx1 by ("abcd");`) @@ -4127,8 +4127,13 @@ func (s *testSuiteP1) TestSplitRegion(c *C) { c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid") - // Test split region by syntax + // Test split region by syntax. tk.MustExec(`split table t by (0),(1000),(1000000)`) + + // Test split region twice to test for multiple batch split region requests. + tk.MustExec("create table t1(a int, b int)") + tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1")) + tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1")) } func (s *testSuite) TestShowTableRegion(c *C) { diff --git a/executor/split.go b/executor/split.go index 90d481dba6d55..4c7fc72b9d9ce 100755 --- a/executor/split.go +++ b/executor/split.go @@ -23,7 +23,6 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -94,27 +93,18 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs := make([]uint64, 0, len(splitIdxKeys)) - for _, idxKey := range splitIdxKeys { - if isCtxDone(ctxWithTimeout) { - break - } - - regionID, err := s.SplitRegion(idxKey, true) - if err != nil { - logutil.BgLogger().Warn("split table index region failed", - zap.String("table", e.tableInfo.Name.L), - zap.String("index", e.indexInfo.Name.L), - zap.Error(err)) - continue - } - if regionID == 0 { - continue - } - regionIDs = append(regionIDs, regionID) - + regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true) + if err != nil { + logutil.BgLogger().Warn("split table index region failed", + zap.String("table", e.tableInfo.Name.L), + zap.String("index", e.indexInfo.Name.L), + zap.Error(err)) } e.splitRegions = len(regionIDs) + if e.splitRegions == 0 { + return nil + } + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } @@ -294,30 +284,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { if err != nil { return err } - regionIDs := make([]uint64, 0, len(splitKeys)) - for _, key := range splitKeys { - failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) { - if val.(bool) { - time.Sleep(time.Second*1 + time.Millisecond*10) - } - }) - if isCtxDone(ctxWithTimeout) { - break - } - regionID, err := s.SplitRegion(key, true) - if err != nil { - logutil.BgLogger().Warn("split table region failed", - zap.String("table", e.tableInfo.Name.L), - zap.Error(err)) - continue - } - if regionID == 0 { - continue - } - regionIDs = append(regionIDs, regionID) - + regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true) + if err != nil { + logutil.BgLogger().Warn("split table region failed", + zap.String("table", e.tableInfo.Name.L), + zap.Error(err)) } e.splitRegions = len(regionIDs) + if e.splitRegions == 0 { + return nil + } + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } diff --git a/go.mod b/go.mod index b7f913019c5f6..6860e5a241e18 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 + github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190902030720-275a827cf4e3 github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b diff --git a/go.sum b/go.sum index c628a3197a912..b878561e614ac 100644 --- a/go.sum +++ b/go.sum @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE= -github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae h1:WR4d5ga8zXT+QDWYFzzyA+PJMMszR0kQxyYMh6dvHPg= +github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/kv/kv.go b/kv/kv.go index 24977006f9208..9af0b88baf950 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -332,7 +332,7 @@ type Iterator interface { // SplitableStore is the kv store which supports split regions. type SplitableStore interface { - SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error) + SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error) WaitScatterRegionFinish(regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go old mode 100644 new mode 100755 index 8dd8109a523e2..fb798baffe395 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -604,16 +604,25 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc } func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { - key := NewMvccKey(req.GetSplitKey()) - region, _ := h.cluster.GetRegionByKey(key) - if bytes.Equal(region.GetStartKey(), key) { - return &kvrpcpb.SplitRegionResponse{} - } - newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) - newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0]) - // The mocktikv should return a deep copy of meta info to avoid data race - metaCloned := proto.Clone(newRegion.Meta) - return &kvrpcpb.SplitRegionResponse{Left: metaCloned.(*metapb.Region)} + keys := req.GetSplitKeys() + resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)} + for i, key := range keys { + k := NewMvccKey(key) + region, _ := h.cluster.GetRegionByKey(k) + if bytes.Equal(region.GetStartKey(), key) { + continue + } + if i == 0 { + // Set the leftmost region. + resp.Regions = append(resp.Regions, region) + } + newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) + newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0]) + // The mocktikv should return a deep copy of meta info to avoid data race + metaCloned := proto.Clone(newRegion.Meta) + resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region)) + } + return resp } // RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 05ba781a50ba8..4f3715ee11014 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -331,7 +331,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA if len(keys) == 0 { return nil } - groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys) + groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 4f3c88e8f6506..1eae476da5bad 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -219,6 +219,7 @@ const ( deleteRangeOneRegionMaxBackoff = 100000 rawkvMaxBackoff = 20000 splitRegionBackoff = 20000 + maxSplitRegionsBackoff = 120000 scatterRegionBackoff = 20000 waitScatterRegionFinishBackoff = 120000 locateRegionMaxBackoff = 20000 diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index c510dddba9d9f..c85489d7a3ea8 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -395,7 +395,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (* } func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return nil, errors.Trace(err) } @@ -544,7 +544,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error { for i, key := range keys { keyToValue[string(key)] = values[i] } - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index b6e312bd76377..3f1e60276d734 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -473,7 +473,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. -func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { +// filter is used to filter some unwanted keys. +func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) { groups := make(map[RegionVerID][][]byte) var first RegionVerID var lastLoc *KeyLocation @@ -484,6 +485,9 @@ func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[Regio if err != nil { return nil, first, errors.Trace(err) } + if filter != nil && filter(k, lastLoc.StartKey) { + continue + } } id := lastLoc.Region if i == 0 { diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index 0deeb0751ef48..fb99bffd33454 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) { c.Assert(err, IsNil) if rowNum > 123 { - _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false) + _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false) c.Assert(err, IsNil) } if rowNum > 456 { - _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false) + _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false) c.Assert(err, IsNil) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 875b45ae45212..e13fb0674d7cb 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -114,7 +114,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] } func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { - groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 055eda0516962..bf1ac13d9ad26 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -16,70 +16,187 @@ package tikv import ( "bytes" "context" + "math" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) -// SplitRegion splits the region contains splitKey into 2 regions: [start, -// splitKey) and [splitKey, end). -func (s *tikvStore) SplitRegion(splitKey kv.Key, scatter bool) (regionID uint64, err error) { - logutil.BgLogger().Info("start split region", - zap.Stringer("at", splitKey)) - bo := NewBackoffer(context.Background(), splitRegionBackoff) - sender := NewRegionRequestSender(s.regionCache, s.client) +func equalRegionStartKey(key, regionStartKey []byte) bool { + if bytes.Equal(key, regionStartKey) { + return true + } + return false +} + +func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) { + // equalRegionStartKey is used to filter split keys. + // If the split key is equal to the start key of the region, then the key has been split, we need to skip the split key. + groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys, equalRegionStartKey) + if err != nil { + return nil, errors.Trace(err) + } + + var batches []batch + for regionID, groupKeys := range groups { + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + } + + if len(batches) == 0 { + return nil, nil + } + // The first time it enters this function. + if bo.totalSleep == 0 { + logutil.BgLogger().Info("split batch regions request", + zap.Int("split key count", len(keys)), + zap.Int("batch count", len(batches)), + zap.Uint64("first batch, region ID", batches[0].regionID.id), + zap.Stringer("first split key", kv.Key(batches[0].keys[0]))) + } + if len(batches) == 1 { + resp := s.batchSendSingleRegion(bo, batches[0], scatter) + return resp.resp, errors.Trace(resp.err) + } + ch := make(chan singleBatchResp, len(batches)) + for _, batch1 := range batches { + batch := batch1 + go func() { + backoffer, cancel := bo.Fork() + defer cancel() + + util.WithRecovery(func() { + select { + case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case <-bo.ctx.Done(): + ch <- singleBatchResp{err: bo.ctx.Err()} + } + }, func(r interface{}) { + if r != nil { + ch <- singleBatchResp{err: errors.Errorf("%v", r)} + } + }) + }() + } + + srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} + for i := 0; i < len(batches); i++ { + batchResp := <-ch + if batchResp.err != nil { + logutil.BgLogger().Debug("tikv store batch send failed", + zap.Error(batchResp.err)) + if err == nil { + err = batchResp.err + } + continue + } + + spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions...) + } + return &tikvrpc.Response{Resp: srResp}, errors.Trace(err) +} + +func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool) singleBatchResp { + failpoint.Inject("MockSplitRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Second*1 + time.Millisecond*10) + } + }) + req := tikvrpc.NewRequest(tikvrpc.CmdSplitRegion, &kvrpcpb.SplitRegionRequest{ - SplitKey: splitKey, + SplitKeys: batch.keys, }, kvrpcpb.Context{ Priority: kvrpcpb.CommandPri_Normal, }) - for { - loc, err := s.regionCache.LocateKey(bo, splitKey) - if err != nil { - return 0, errors.Trace(err) - } - if bytes.Equal(splitKey, loc.StartKey) { - logutil.BgLogger().Info("skip split region", - zap.Stringer("at", splitKey)) - return 0, nil - } - res, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort) + + sender := NewRegionRequestSender(s.regionCache, s.client) + resp, err := sender.SendReq(bo, req, batch.regionID, readTimeoutShort) + + batchResp := singleBatchResp{resp: resp} + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + regionErr, err := resp.GetRegionError() + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + if regionErr != nil { + err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return 0, errors.Trace(err) + batchResp.err = errors.Trace(err) + return batchResp } - regionErr, err := res.GetRegionError() - if err != nil { - return 0, errors.Trace(err) + resp, err = s.splitBatchRegionsReq(bo, batch.keys, scatter) + batchResp.resp = resp + batchResp.err = err + return batchResp + } + + spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) + regions := spResp.GetRegions() + if len(regions) > 0 { + // Divide a region into n, one of them may not need to be scattered, + // so n-1 needs to be scattered to other stores. + spResp.Regions = regions[:len(regions)-1] + } + if !scatter { + if len(spResp.Regions) == 0 { + return batchResp } - if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return 0, errors.Trace(err) - } + logutil.BgLogger().Info("batch split regions complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Stringer("first at", kv.Key(batch.keys[0])), + zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])), + zap.Int("new region count", len(spResp.Regions))) + return batchResp + } + + for i, r := range spResp.Regions { + if err = s.scatterRegion(r.Id); err == nil { + logutil.BgLogger().Info("batch split regions, scatter a region complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Stringer("at", kv.Key(batch.keys[i])), + zap.Stringer("new region left", logutil.Hex(r))) continue } - splitRegion := res.Resp.(*kvrpcpb.SplitRegionResponse) - logutil.BgLogger().Info("split region complete", - zap.Stringer("at", splitKey), - zap.Stringer("new region left", logutil.Hex(splitRegion.GetLeft())), - zap.Stringer("new region right", logutil.Hex(splitRegion.GetRight()))) - left := splitRegion.GetLeft() - if left == nil { - return 0, nil + + logutil.BgLogger().Info("batch split regions, scatter a region failed", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Stringer("at", kv.Key(batch.keys[i])), + zap.Stringer("new region left", logutil.Hex(r)), + zap.Error(err)) + if batchResp.err == nil { + batchResp.err = err } - if scatter { - err = s.scatterRegion(left.Id) - if err != nil { - return 0, errors.Trace(err) - } + } + return batchResp +} + +// SplitRegions splits regions by splitKeys. +func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { + bo := NewBackoffer(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff))) + resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter) + regionIDs = make([]uint64, 0, len(splitKeys)) + if resp != nil && resp.Resp != nil { + spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) + for _, r := range spResp.Regions { + regionIDs = append(regionIDs, r.Id) } - return left.Id, nil + logutil.BgLogger().Info("split regions complete", zap.Uint64s("region IDs", regionIDs)) } + return regionIDs, errors.Trace(err) } func (s *tikvStore) scatterRegion(regionID uint64) error { @@ -88,14 +205,13 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { bo := NewBackoffer(context.Background(), scatterRegionBackoff) for { err := s.pdClient.ScatterRegion(context.Background(), regionID) + if err == nil { + break + } + err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) if err != nil { - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) - if err != nil { - return errors.Trace(err) - } - continue + return errors.Trace(err) } - break } logutil.BgLogger().Info("scatter region complete", zap.Uint64("regionID", regionID)) diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index fe5caa718891e..1f70994501a4a 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -61,7 +61,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) keys := [][]byte{{'a'}, {'b'}, {'c'}} - _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys) + _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys, nil) c.Assert(err, IsNil) batch := batchKeys{ region: region,