Skip to content

Commit

Permalink
*: support a region divided into multiple regions (pingcap#11739)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Sep 24, 2019
1 parent 790b5d5 commit 7fa8297
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 139 deletions.
39 changes: 20 additions & 19 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
max := int64(1 << tbInfo.ShardRowIDBits)
splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits - 1)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID),
zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
splitTableKeys = append(splitTableKeys, key)
}
var err error
regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed",
zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
Expand All @@ -93,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
if len(regionIDs) == 1 {
return regionIDs[0]
}
return 0
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
splitKeys := make([][]byte, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
splitKeys = append(splitKeys, indexPrefix)
}
regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
return regionIDs
}
Expand Down
13 changes: 9 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,7 +1963,7 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) {
}

func (s *testSuite) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -1972,7 +1972,7 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down Expand Up @@ -3849,7 +3849,7 @@ func (s *testSuite) TestReadPartitionedTable(c *C) {
func (s *testSuite) TestSplitRegion(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
_, err := tk.Exec(`split table t index idx1 by ("abcd");`)
Expand Down Expand Up @@ -3926,8 +3926,13 @@ func (s *testSuite) TestSplitRegion(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid")

// Test split region by syntax
// Test split region by syntax.
tk.MustExec(`split table t by (0),(1000),(1000000)`)

// Test split region twice to test for multiple batch split region requests.
tk.MustExec("create table t1(a int, b int)")
tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1"))
tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1"))
}

func (s *testSuite) TestShowTableRegion(c *C) {
Expand Down
61 changes: 19 additions & 42 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -94,27 +93,18 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down Expand Up @@ -294,30 +284,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
if err != nil {
return err
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ type Iterator interface {

// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error)
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
27 changes: 19 additions & 8 deletions store/mockstore/mocktikv/rpc.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,25 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc
}

func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
key := NewMvccKey(req.GetSplitKey())
region, _ := h.cluster.GetRegionByKey(key)
if bytes.Equal(region.GetStartKey(), key) {
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
for i, key := range keys {
k := NewMvccKey(key)
region, _ := h.cluster.GetRegionByKey(k)
if bytes.Equal(region.GetStartKey(), key) {
continue
}
if i == 0 {
// Set the leftmost region.
resp.Regions = append(resp.Regions, region)
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
// The mocktikv should return a deep copy of meta info to avoid data race
metaCloned := proto.Clone(newRegion.Meta)
resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region))
}
return resp
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ const (
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
maxSplitRegionsBackoff = 120000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
locateRegionMaxBackoff = 20000
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*
}

func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
for i, key := range keys {
keyToValue[string(key)] = values[i]
}
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 5 additions & 1 deletion store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca
// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) {
// filter is used to filter some unwanted keys.
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) {
groups := make(map[RegionVerID][][]byte)
var first RegionVerID
var lastLoc *KeyLocation
Expand All @@ -451,6 +452,9 @@ func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[Regio
if err != nil {
return nil, first, errors.Trace(err)
}
if filter != nil && filter(k, lastLoc.StartKey) {
continue
}
}
id := lastLoc.Region
if i == 0 {
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,18 @@ func (s *mockTikvGrpcServer) MvccGetByStartTs(context.Context, *kvrpcpb.MvccGetB
func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error {
return errors.New("unreachable")
}

func (s *mockTikvGrpcServer) BatchCommands(tikvpb.Tikv_BatchCommandsServer) error {
return errors.New("unreachable")
}

func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) {
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
// prepare a mock tikv grpc server
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) {
c.Assert(err, IsNil)

if rowNum > 123 {
_, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false)
_, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false)
c.Assert(err, IsNil)
}

if rowNum > 456 {
_, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false)
_, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false)
c.Assert(err, IsNil)
}

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
}

func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 7fa8297

Please sign in to comment.