From 39b4cf0c53055025810a92867466e917f98bdd1b Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 31 Jul 2019 11:13:30 +0800 Subject: [PATCH] =?UTF-8?q?executor/split:=20return=20split=20result=20whe?= =?UTF-8?q?n=20do=20split=20region=20and=20r=E2=80=A6=20(#11484)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ddl/split_region.go | 2 +- executor/builder.go | 5 +- executor/executor_test.go | 19 ++-- executor/split.go | 153 ++++++++++++++++++++-------- kv/kv.go | 2 +- planner/core/planbuilder.go | 9 ++ store/mockstore/mocktikv/cluster.go | 3 +- store/mockstore/mocktikv/rpc.go | 4 +- store/tikv/split_region.go | 9 +- 9 files changed, 143 insertions(+), 63 deletions(-) mode change 100644 => 100755 executor/split.go diff --git a/ddl/split_region.go b/ddl/split_region.go index 031f24ffb975b..861594ba76608 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -121,7 +121,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) { for _, regionID := range regionIDs { - err := store.WaitScatterRegionFinish(regionID) + err := store.WaitScatterRegionFinish(regionID, 0) if err != nil { logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err)) } diff --git a/executor/builder.go b/executor/builder.go index a31d958aafabc..88c02a2dbae11 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1265,8 +1265,9 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo } func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor { - base := newBaseExecutor(b.ctx, nil, v.ExplainID()) - base.initCap = chunk.ZeroCapacity + base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()) + base.initCap = 1 + base.maxChunkSize = 1 if v.IndexInfo != nil { return &SplitIndexRegionExec{ baseExecutor: base, diff --git a/executor/executor_test.go b/executor/executor_test.go index 36636c431832b..ef0dc69f208e5 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1976,16 +1976,9 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) { tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))") tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`) tk.MustExec(`set @@tidb_wait_split_region_timeout=1`) - _, err := tk.Exec(`split table t between (0) and (10000) regions 10`) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "split region timeout(1s)") + // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. + tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil) - - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil) - _, err = tk.Exec(`split table t between (0) and (10000) regions 10`) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)") - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil) } func (s *testSuite) TestRow(c *C) { @@ -3952,7 +3945,7 @@ func (s *testSuite) TestShowTableRegion(c *C) { // Test show table regions. tk.MustExec(`split table t_regions1 by (0)`) - tk.MustExec(`split table t_regions between (-10000) and (10000) regions 4;`) + tk.MustQuery(`split table t_regions between (-10000) and (10000) regions 4;`).Check(testkit.Rows("3 1")) re := tk.MustQuery("show table t_regions regions") rows := re.Rows() // Table t_regions should have 4 regions now. @@ -3967,7 +3960,7 @@ func (s *testSuite) TestShowTableRegion(c *C) { c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_5000", tbl.Meta().ID)) // Test show table index regions. - tk.MustExec(`split table t_regions index idx between (-1000) and (1000) regions 4;`) + tk.MustQuery(`split table t_regions index idx between (-1000) and (1000) regions 4;`).Check(testkit.Rows("4 1")) re = tk.MustQuery("show table t_regions index idx regions") rows = re.Rows() // The index `idx` of table t_regions should have 4 regions now. @@ -3997,7 +3990,7 @@ func (s *testSuite) TestShowTableRegion(c *C) { // Test show table regions. tk.MustExec(`set @@session.tidb_wait_split_region_finish=1;`) - tk.MustExec(`split table t_regions between (0) and (10000) regions 4;`) + tk.MustQuery(`split table t_regions by (2500),(5000),(7500);`).Check(testkit.Rows("3 1")) re = tk.MustQuery("show table t_regions regions") rows = re.Rows() // Table t_regions should have 4 regions now. @@ -4010,7 +4003,7 @@ func (s *testSuite) TestShowTableRegion(c *C) { c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_7500", tbl.Meta().ID)) // Test show table index regions. - tk.MustExec(`split table t_regions index idx between (0) and (1000) regions 4;`) + tk.MustQuery(`split table t_regions index idx by (250),(500),(750);`).Check(testkit.Rows("4 1")) re = tk.MustQuery("show table t_regions index idx regions") rows = re.Rows() // The index `idx` of table t_regions should have 4 regions now. diff --git a/executor/split.go b/executor/split.go old mode 100644 new mode 100755 index afb2459d8a29a..68e8cd0f5a864 --- a/executor/split.go +++ b/executor/split.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -48,10 +49,37 @@ type SplitIndexRegionExec struct { upper []types.Datum num int valueLists [][]types.Datum + + done bool + splitRegionResult +} + +type splitRegionResult struct { + splitRegions int + finishScatterNum int +} + +// Open implements the Executor Open interface. +func (e *SplitIndexRegionExec) Open(ctx context.Context) error { + return e.splitIndexRegion(ctx) } // Next implements the Executor Next interface. -func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { +func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error { + chk.Reset() + if e.done { + return nil + } + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) + e.done = true + return nil +} + +// checkScatterRegionFinishBackOff is the back off time that used to check if a region has finished scattering before split region timeout. +const checkScatterRegionFinishBackOff = 50 + +// splitIndexRegion is used to split index regions. +func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { store := e.ctx.GetStore() s, ok := store.(kv.SplitableStore) if !ok { @@ -62,10 +90,15 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { return err } + start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() regionIDs := make([]uint64, 0, len(splitIdxKeys)) for _, idxKey := range splitIdxKeys { + if isCtxDone(ctxWithTimeout) { + break + } + regionID, err := s.SplitRegion(idxKey, true) if err != nil { logutil.Logger(context.Background()).Warn("split table index region failed", @@ -74,28 +107,17 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { zap.Error(err)) continue } + if regionID == 0 { + continue + } regionIDs = append(regionIDs, regionID) - if isCtxDone(ctxWithTimeout) { - return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) - } } + e.splitRegions = len(regionIDs) if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } - for _, regionID := range regionIDs { - err := s.WaitScatterRegionFinish(regionID) - if err != nil { - logutil.Logger(context.Background()).Warn("wait scatter region failed", - zap.Uint64("regionID", regionID), - zap.String("table", e.tableInfo.Name.L), - zap.String("index", e.indexInfo.Name.L), - zap.Error(err)) - } - if isCtxDone(ctxWithTimeout) { - return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) - } - } + e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, e.indexInfo.Name.L) return nil } @@ -225,16 +247,35 @@ type SplitTableRegionExec struct { upper types.Datum num int valueLists [][]types.Datum + + done bool + splitRegionResult +} + +// Open implements the Executor Open interface. +func (e *SplitTableRegionExec) Open(ctx context.Context) error { + return e.splitTableRegion(ctx) } // Next implements the Executor Next interface. -func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { +func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error { + chk.Reset() + if e.done { + return nil + } + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) + e.done = true + return nil +} + +func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { store := e.ctx.GetStore() s, ok := store.(kv.SplitableStore) if !ok { return nil } + start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() @@ -244,6 +285,14 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { } regionIDs := make([]uint64, 0, len(splitKeys)) for _, key := range splitKeys { + failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Second*1 + time.Millisecond*10) + } + }) + if isCtxDone(ctxWithTimeout) { + break + } regionID, err := s.SplitRegion(key, true) if err != nil { logutil.Logger(context.Background()).Warn("split table region failed", @@ -251,41 +300,63 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { zap.Error(err)) continue } + if regionID == 0 { + continue + } regionIDs = append(regionIDs, regionID) - failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) { - if val.(bool) { - time.Sleep(time.Second * 1) - } - }) - - if isCtxDone(ctxWithTimeout) { - return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) - } } + e.splitRegions = len(regionIDs) if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } + + e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, "") + return nil +} + +func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Context, startTime time.Time, store kv.SplitableStore, regionIDs []uint64, tableName, indexName string) int { + remainMillisecond := 0 + finishScatterNum := 0 for _, regionID := range regionIDs { - err := s.WaitScatterRegionFinish(regionID) - if err != nil { - logutil.Logger(context.Background()).Warn("wait scatter region failed", - zap.Uint64("regionID", regionID), - zap.String("table", e.tableInfo.Name.L), - zap.Error(err)) + if isCtxDone(ctxWithTimeout) { + // Do not break here for checking remain regions scatter finished with a very short backoff time. + // Consider this situation - Regions 1, 2, and 3 are to be split. + // Region 1 times out before scattering finishes, while Region 2 and Region 3 have finished scattering. + // In this case, we should return 2 Regions, instead of 0, have finished scattering. + remainMillisecond = checkScatterRegionFinishBackOff + } else { + remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000) } - failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) { - if val.(bool) { - time.Sleep(time.Second * 1) + err := store.WaitScatterRegionFinish(regionID, remainMillisecond) + if err == nil { + finishScatterNum++ + } else { + if len(indexName) == 0 { + logutil.Logger(context.Background()).Warn("wait scatter region failed", + zap.Uint64("regionID", regionID), + zap.String("table", tableName), + zap.Error(err)) + } else { + logutil.Logger(context.Background()).Warn("wait scatter region failed", + zap.Uint64("regionID", regionID), + zap.String("table", tableName), + zap.String("index", indexName), + zap.Error(err)) } - }) - - if isCtxDone(ctxWithTimeout) { - return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) } } - return nil + return finishScatterNum +} + +func appendSplitRegionResultToChunk(chk *chunk.Chunk, totalRegions, finishScatterNum int) { + chk.AppendInt64(0, int64(totalRegions)) + if finishScatterNum > 0 && totalRegions > 0 { + chk.AppendFloat64(1, float64(finishScatterNum)/float64(totalRegions)) + } else { + chk.AppendFloat64(1, float64(0)) + } } func isCtxDone(ctx context.Context) bool { diff --git a/kv/kv.go b/kv/kv.go index a646b19b50a02..87d993717fb27 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -299,6 +299,6 @@ type Iterator interface { // SplitableStore is the kv store which supports split regions. type SplitableStore interface { SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error) - WaitScatterRegionFinish(regionID uint64) error + WaitScatterRegionFinish(regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 545f1fc933a64..4c7287d2cc503 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -999,6 +999,13 @@ func buildTableRegionsSchema() *expression.Schema { return schema } +func buildSplitRegionsSchema() *expression.Schema { + schema := expression.NewSchema(make([]*expression.Column, 0, 2)...) + schema.Append(buildColumn("", "TOTAL_SPLIT_REGION", mysql.TypeLonglong, 4)) + schema.Append(buildColumn("", "SCATTER_FINISH_RATIO", mysql.TypeDouble, 8)) + return schema +} + func buildShowDDLJobQueriesFields() *expression.Schema { schema := expression.NewSchema(make([]*expression.Column, 0, 1)...) schema.Append(buildColumn("", "QUERY", mysql.TypeVarchar, 256)) @@ -1687,6 +1694,7 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitRegionStmt) (Plan, er TableInfo: tblInfo, IndexInfo: indexInfo, } + p.SetSchema(buildSplitRegionsSchema()) // Split index regions by user specified value lists. if len(node.SplitOpt.ValueLists) > 0 { indexValues := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists)) @@ -1801,6 +1809,7 @@ func (b *PlanBuilder) buildSplitTableRegion(node *ast.SplitRegionStmt) (Plan, er p := &SplitRegion{ TableInfo: tblInfo, } + p.SetSchema(buildSplitRegionsSchema()) if len(node.SplitOpt.ValueLists) > 0 { values := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists)) for i, valuesItem := range node.SplitOpt.ValueLists { diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index 88b3195c8ffa4..460fffd6d39b8 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -319,12 +319,13 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint } // SplitRaw splits a Region at the key (not encoded) and creates new Region. -func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) { +func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region { c.Lock() defer c.Unlock() newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID) c.regions[newRegionID] = newRegion + return newRegion } // Merge merges 2 regions, their key ranges should be adjacent. diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 5e9d253cc4ffe..e4919318048fa 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -597,8 +597,8 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb return &kvrpcpb.SplitRegionResponse{} } newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) - h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0]) - return &kvrpcpb.SplitRegionResponse{} + newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0]) + return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta} } // RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 8851b9078bf29..4a84f29ddb774 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -104,10 +104,15 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { } // WaitScatterRegionFinish implements SplitableStore interface. -func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error { +// backOff is the back off time of the wait scatter region.(Milliseconds) +// if backOff <= 0, the default wait scatter back off time will be used. +func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error { logutil.Logger(context.Background()).Info("wait scatter region", zap.Uint64("regionID", regionID)) - bo := NewBackoffer(context.Background(), waitScatterRegionFinishBackoff) + if backOff <= 0 { + backOff = waitScatterRegionFinishBackoff + } + bo := NewBackoffer(context.Background(), backOff) logFreq := 0 for { resp, err := s.pdClient.GetOperator(context.Background(), regionID)