Skip to content

Commit

Permalink
executor/split: return split result when do split region and r… (#11484)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and winkyao committed Jul 31, 2019
1 parent a93a97a commit 39b4cf0
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 63 deletions.
2 changes: 1 addition & 1 deletion ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
err := store.WaitScatterRegionFinish(regionID, 0)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
Expand Down
5 changes: 3 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,8 +1265,9 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo
}

func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = 1
base.maxChunkSize = 1
if v.IndexInfo != nil {
return &SplitIndexRegionExec{
baseExecutor: base,
Expand Down
19 changes: 6 additions & 13 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,16 +1976,9 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "split region timeout(1s)")
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down Expand Up @@ -3952,7 +3945,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {

// Test show table regions.
tk.MustExec(`split table t_regions1 by (0)`)
tk.MustExec(`split table t_regions between (-10000) and (10000) regions 4;`)
tk.MustQuery(`split table t_regions between (-10000) and (10000) regions 4;`).Check(testkit.Rows("3 1"))
re := tk.MustQuery("show table t_regions regions")
rows := re.Rows()
// Table t_regions should have 4 regions now.
Expand All @@ -3967,7 +3960,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_5000", tbl.Meta().ID))

// Test show table index regions.
tk.MustExec(`split table t_regions index idx between (-1000) and (1000) regions 4;`)
tk.MustQuery(`split table t_regions index idx between (-1000) and (1000) regions 4;`).Check(testkit.Rows("4 1"))
re = tk.MustQuery("show table t_regions index idx regions")
rows = re.Rows()
// The index `idx` of table t_regions should have 4 regions now.
Expand Down Expand Up @@ -3997,7 +3990,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {

// Test show table regions.
tk.MustExec(`set @@session.tidb_wait_split_region_finish=1;`)
tk.MustExec(`split table t_regions between (0) and (10000) regions 4;`)
tk.MustQuery(`split table t_regions by (2500),(5000),(7500);`).Check(testkit.Rows("3 1"))
re = tk.MustQuery("show table t_regions regions")
rows = re.Rows()
// Table t_regions should have 4 regions now.
Expand All @@ -4010,7 +4003,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_7500", tbl.Meta().ID))

// Test show table index regions.
tk.MustExec(`split table t_regions index idx between (0) and (1000) regions 4;`)
tk.MustQuery(`split table t_regions index idx by (250),(500),(750);`).Check(testkit.Rows("4 1"))
re = tk.MustQuery("show table t_regions index idx regions")
rows = re.Rows()
// The index `idx` of table t_regions should have 4 regions now.
Expand Down
153 changes: 112 additions & 41 deletions executor/split.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -48,10 +49,37 @@ type SplitIndexRegionExec struct {
upper []types.Datum
num int
valueLists [][]types.Datum

done bool
splitRegionResult
}

type splitRegionResult struct {
splitRegions int
finishScatterNum int
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
}

// Next implements the Executor Next interface.
func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
return nil
}

// checkScatterRegionFinishBackOff is the back off time that used to check if a region has finished scattering before split region timeout.
const checkScatterRegionFinishBackOff = 50

// splitIndexRegion is used to split index regions.
func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
store := e.ctx.GetStore()
s, ok := store.(kv.SplitableStore)
if !ok {
Expand All @@ -62,10 +90,15 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
Expand All @@ -74,28 +107,17 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.splitRegions = len(regionIDs)
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, e.indexInfo.Name.L)
return nil
}

Expand Down Expand Up @@ -225,16 +247,35 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
}

// Next implements the Executor Next interface.
func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
return nil
}

func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
store := e.ctx.GetStore()
s, ok := store.(kv.SplitableStore)
if !ok {
return nil
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

Expand All @@ -244,48 +285,78 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.splitRegions = len(regionIDs)
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}

e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, "")
return nil
}

func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Context, startTime time.Time, store kv.SplitableStore, regionIDs []uint64, tableName, indexName string) int {
remainMillisecond := 0
finishScatterNum := 0
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
if isCtxDone(ctxWithTimeout) {
// Do not break here for checking remain regions scatter finished with a very short backoff time.
// Consider this situation - Regions 1, 2, and 3 are to be split.
// Region 1 times out before scattering finishes, while Region 2 and Region 3 have finished scattering.
// In this case, we should return 2 Regions, instead of 0, have finished scattering.
remainMillisecond = checkScatterRegionFinishBackOff
} else {
remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000)
}

failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
err := store.WaitScatterRegionFinish(regionID, remainMillisecond)
if err == nil {
finishScatterNum++
} else {
if len(indexName) == 0 {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", tableName),
zap.Error(err))
} else {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", tableName),
zap.String("index", indexName),
zap.Error(err))
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
return nil
return finishScatterNum
}

func appendSplitRegionResultToChunk(chk *chunk.Chunk, totalRegions, finishScatterNum int) {
chk.AppendInt64(0, int64(totalRegions))
if finishScatterNum > 0 && totalRegions > 0 {
chk.AppendFloat64(1, float64(finishScatterNum)/float64(totalRegions))
} else {
chk.AppendFloat64(1, float64(0))
}
}

func isCtxDone(ctx context.Context) bool {
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,6 @@ type Iterator interface {
// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
WaitScatterRegionFinish(regionID uint64) error
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
9 changes: 9 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,13 @@ func buildTableRegionsSchema() *expression.Schema {
return schema
}

func buildSplitRegionsSchema() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 2)...)
schema.Append(buildColumn("", "TOTAL_SPLIT_REGION", mysql.TypeLonglong, 4))
schema.Append(buildColumn("", "SCATTER_FINISH_RATIO", mysql.TypeDouble, 8))
return schema
}

func buildShowDDLJobQueriesFields() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 1)...)
schema.Append(buildColumn("", "QUERY", mysql.TypeVarchar, 256))
Expand Down Expand Up @@ -1687,6 +1694,7 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitRegionStmt) (Plan, er
TableInfo: tblInfo,
IndexInfo: indexInfo,
}
p.SetSchema(buildSplitRegionsSchema())
// Split index regions by user specified value lists.
if len(node.SplitOpt.ValueLists) > 0 {
indexValues := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
Expand Down Expand Up @@ -1801,6 +1809,7 @@ func (b *PlanBuilder) buildSplitTableRegion(node *ast.SplitRegionStmt) (Plan, er
p := &SplitRegion{
TableInfo: tblInfo,
}
p.SetSchema(buildSplitRegionsSchema())
if len(node.SplitOpt.ValueLists) > 0 {
values := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
for i, valuesItem := range node.SplitOpt.ValueLists {
Expand Down
3 changes: 2 additions & 1 deletion store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,13 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
}

// SplitRaw splits a Region at the key (not encoded) and creates new Region.
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) {
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region {
c.Lock()
defer c.Unlock()

newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
c.regions[newRegionID] = newRegion
return newRegion
}

// Merge merges 2 regions, their key ranges should be adjacent.
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{}
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
9 changes: 7 additions & 2 deletions store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,15 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
}

// WaitScatterRegionFinish implements SplitableStore interface.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, the default wait scatter back off time will be used.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID))
bo := NewBackoffer(context.Background(), waitScatterRegionFinishBackoff)
if backOff <= 0 {
backOff = waitScatterRegionFinishBackoff
}
bo := NewBackoffer(context.Background(), backOff)
logFreq := 0
for {
resp, err := s.pdClient.GetOperator(context.Background(), regionID)
Expand Down

0 comments on commit 39b4cf0

Please sign in to comment.