Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support a region divided into multiple regions and fix scatter region timeout issues #12343

Merged
merged 3 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
max := int64(1 << tbInfo.ShardRowIDBits)
splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits - 1)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID),
zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
splitTableKeys = append(splitTableKeys, key)
}
var err error
regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed",
zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
Expand All @@ -93,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
if len(regionIDs) == 1 {
return regionIDs[0]
}
return 0
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
splitKeys := make([][]byte, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
splitKeys = append(splitKeys, indexPrefix)
}
regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
return regionIDs
}
Expand Down
20 changes: 15 additions & 5 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,7 +1963,7 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) {
}

func (s *testSuite) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -1972,7 +1972,12 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)

// Test scatter regions timeout.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil)
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down Expand Up @@ -3849,7 +3854,7 @@ func (s *testSuite) TestReadPartitionedTable(c *C) {
func (s *testSuite) TestSplitRegion(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
_, err := tk.Exec(`split table t index idx1 by ("abcd");`)
Expand Down Expand Up @@ -3891,7 +3896,7 @@ func (s *testSuite) TestSplitRegion(c *C) {

// Test for split table region.
tk.MustExec(`split table t between (0) and (1000000000) regions 10`)
// Check the ower value is more than the upper value.
// Check the lower value is more than the upper value.
_, err = tk.Exec(`split table t between (2) and (1) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1")
Expand Down Expand Up @@ -3926,8 +3931,13 @@ func (s *testSuite) TestSplitRegion(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid")

// Test split region by syntax
// Test split region by syntax.
tk.MustExec(`split table t by (0),(1000),(1000000)`)

// Test split region twice to test for multiple batch split region requests.
tk.MustExec("create table t1(a int, b int)")
tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1"))
tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1"))
}

func (s *testSuite) TestShowTableRegion(c *C) {
Expand Down
103 changes: 42 additions & 61 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -44,12 +43,13 @@ import (
type SplitIndexRegionExec struct {
baseExecutor

tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
splitIdxKeys [][]byte

done bool
splitRegionResult
Expand All @@ -61,8 +61,9 @@ type splitRegionResult struct {
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) {
e.splitIdxKeys, err = e.getSplitIdxKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -71,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
if err := e.splitIndexRegion(ctx); err != nil {
return err
}

appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -86,35 +91,22 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
if !ok {
return nil
}
splitIdxKeys, err := e.getSplitIdxKeys()
if err != nil {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down Expand Up @@ -258,14 +250,16 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum
splitKeys [][]byte

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) {
e.splitKeys, err = e.getSplitTableKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -274,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true

if err := e.splitTableRegion(ctx); err != nil {
return err
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -290,34 +288,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

splitKeys, err := e.getSplitTableKeys()
regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true)
if err != nil {
return err
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ type Iterator interface {

// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error)
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
27 changes: 19 additions & 8 deletions store/mockstore/mocktikv/rpc.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -592,14 +592,25 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc
}

func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
key := NewMvccKey(req.GetSplitKey())
region, _ := h.cluster.GetRegionByKey(key)
if bytes.Equal(region.GetStartKey(), key) {
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
for i, key := range keys {
k := NewMvccKey(key)
region, _ := h.cluster.GetRegionByKey(k)
if bytes.Equal(region.GetStartKey(), key) {
continue
}
if i == 0 {
// Set the leftmost region.
resp.Regions = append(resp.Regions, region)
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
// The mocktikv should return a deep copy of meta info to avoid data race
metaCloned := proto.Clone(newRegion.Meta)
resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region))
}
return resp
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ const (
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
maxSplitRegionsBackoff = 120000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
locateRegionMaxBackoff = 20000
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*
}

func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -570,7 +570,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
for i, key := range keys {
keyToValue[string(key)] = values[i]
}
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading