Skip to content

Commit

Permalink
*: support a region divided into multiple regions and fix scatter reg…
Browse files Browse the repository at this point in the history
…ion timeout issues (#12343)
  • Loading branch information
zimulala authored and sre-bot committed Sep 25, 2019
1 parent d013b31 commit 4120985
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 163 deletions.
39 changes: 20 additions & 19 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
max := int64(1 << tbInfo.ShardRowIDBits)
splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits - 1)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID),
zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
splitTableKeys = append(splitTableKeys, key)
}
var err error
regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed",
zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
Expand All @@ -93,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
if len(regionIDs) == 1 {
return regionIDs[0]
}
return 0
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
splitKeys := make([][]byte, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
splitKeys = append(splitKeys, indexPrefix)
}
regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
return regionIDs
}
Expand Down
20 changes: 15 additions & 5 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,7 +1963,7 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) {
}

func (s *testSuite) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -1972,7 +1972,12 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)

// Test scatter regions timeout.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil)
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down Expand Up @@ -3849,7 +3854,7 @@ func (s *testSuite) TestReadPartitionedTable(c *C) {
func (s *testSuite) TestSplitRegion(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
_, err := tk.Exec(`split table t index idx1 by ("abcd");`)
Expand Down Expand Up @@ -3891,7 +3896,7 @@ func (s *testSuite) TestSplitRegion(c *C) {

// Test for split table region.
tk.MustExec(`split table t between (0) and (1000000000) regions 10`)
// Check the ower value is more than the upper value.
// Check the lower value is more than the upper value.
_, err = tk.Exec(`split table t between (2) and (1) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1")
Expand Down Expand Up @@ -3926,8 +3931,13 @@ func (s *testSuite) TestSplitRegion(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid")

// Test split region by syntax
// Test split region by syntax.
tk.MustExec(`split table t by (0),(1000),(1000000)`)

// Test split region twice to test for multiple batch split region requests.
tk.MustExec("create table t1(a int, b int)")
tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1"))
tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1"))
}

func (s *testSuite) TestShowTableRegion(c *C) {
Expand Down
103 changes: 42 additions & 61 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -44,12 +43,13 @@ import (
type SplitIndexRegionExec struct {
baseExecutor

tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
splitIdxKeys [][]byte

done bool
splitRegionResult
Expand All @@ -61,8 +61,9 @@ type splitRegionResult struct {
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) {
e.splitIdxKeys, err = e.getSplitIdxKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -71,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
if err := e.splitIndexRegion(ctx); err != nil {
return err
}

appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -86,35 +91,22 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
if !ok {
return nil
}
splitIdxKeys, err := e.getSplitIdxKeys()
if err != nil {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down Expand Up @@ -258,14 +250,16 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum
splitKeys [][]byte

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) {
e.splitKeys, err = e.getSplitTableKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -274,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true

if err := e.splitTableRegion(ctx); err != nil {
return err
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -290,34 +288,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

splitKeys, err := e.getSplitTableKeys()
regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true)
if err != nil {
return err
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ type Iterator interface {

// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error)
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
27 changes: 19 additions & 8 deletions store/mockstore/mocktikv/rpc.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,25 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc
}

func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
key := NewMvccKey(req.GetSplitKey())
region, _ := h.cluster.GetRegionByKey(key)
if bytes.Equal(region.GetStartKey(), key) {
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
for i, key := range keys {
k := NewMvccKey(key)
region, _ := h.cluster.GetRegionByKey(k)
if bytes.Equal(region.GetStartKey(), key) {
continue
}
if i == 0 {
// Set the leftmost region.
resp.Regions = append(resp.Regions, region)
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
// The mocktikv should return a deep copy of meta info to avoid data race
metaCloned := proto.Clone(newRegion.Meta)
resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region))
}
return resp
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ const (
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
maxSplitRegionsBackoff = 120000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
locateRegionMaxBackoff = 20000
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*
}

func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
for i, key := range keys {
keyToValue[string(key)] = values[i]
}
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 4120985

Please sign in to comment.