From 041b8e123fb34343b16b01103256c09030387425 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 17 Sep 2019 12:00:15 +0800 Subject: [PATCH] executor: fix scatter region timeout issues and "show processlist" display issues (#12057) --- executor/executor_test.go | 7 ++++- executor/split.go | 48 ++++++++++++++++-------------- store/tikv/split_region.go | 60 +++++++++++++++++++++++--------------- 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 582cfcf24bf21..0685150707ffd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1973,6 +1973,11 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) { // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil) + + // Test scatter regions timeout. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil) + tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1")) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil) } func (s *testSuite) TestRow(c *C) { @@ -3891,7 +3896,7 @@ func (s *testSuite) TestSplitRegion(c *C) { // Test for split table region. tk.MustExec(`split table t between (0) and (1000000000) regions 10`) - // Check the ower value is more than the upper value. + // Check the lower value is more than the upper value. _, err = tk.Exec(`split table t between (2) and (1) regions 10`) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1") diff --git a/executor/split.go b/executor/split.go index 771b6effbb7a4..ce1afaa4288b3 100755 --- a/executor/split.go +++ b/executor/split.go @@ -43,12 +43,13 @@ import ( type SplitIndexRegionExec struct { baseExecutor - tableInfo *model.TableInfo - indexInfo *model.IndexInfo - lower []types.Datum - upper []types.Datum - num int - valueLists [][]types.Datum + tableInfo *model.TableInfo + indexInfo *model.IndexInfo + lower []types.Datum + upper []types.Datum + num int + valueLists [][]types.Datum + splitIdxKeys [][]byte done bool splitRegionResult @@ -60,8 +61,9 @@ type splitRegionResult struct { } // Open implements the Executor Open interface. -func (e *SplitIndexRegionExec) Open(ctx context.Context) error { - return e.splitIndexRegion(ctx) +func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) { + e.splitIdxKeys, err = e.getSplitIdxKeys() + return err } // Next implements the Executor Next interface. @@ -70,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error if e.done { return nil } - appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) e.done = true + if err := e.splitIndexRegion(ctx); err != nil { + return err + } + + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) return nil } @@ -85,15 +91,11 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { if !ok { return nil } - splitIdxKeys, err := e.getSplitIdxKeys() - if err != nil { - return err - } start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true) + regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true) if err != nil { logutil.Logger(context.Background()).Warn("split table index region failed", zap.String("table", e.tableInfo.Name.L), @@ -248,14 +250,16 @@ type SplitTableRegionExec struct { upper types.Datum num int valueLists [][]types.Datum + splitKeys [][]byte done bool splitRegionResult } // Open implements the Executor Open interface. -func (e *SplitTableRegionExec) Open(ctx context.Context) error { - return e.splitTableRegion(ctx) +func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) { + e.splitKeys, err = e.getSplitTableKeys() + return err } // Next implements the Executor Next interface. @@ -264,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error if e.done { return nil } - appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) e.done = true + + if err := e.splitTableRegion(ctx); err != nil { + return err + } + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) return nil } @@ -280,11 +288,7 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - splitKeys, err := e.getSplitTableKeys() - if err != nil { - return err - } - regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true) + regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true) if err != nil { logutil.Logger(context.Background()).Warn("split table region failed", zap.String("table", e.tableInfo.Name.L), diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 4d55d10d3a357..1538ea6476a3c 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -47,7 +47,7 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b var batches []batch for regionID, groupKeys := range groups { - batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPutSize) } if len(batches) == 0 { @@ -67,14 +67,13 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b } ch := make(chan singleBatchResp, len(batches)) for _, batch1 := range batches { - batch := batch1 - go func() { + go func(b batch) { backoffer, cancel := bo.Fork() defer cancel() util.WithRecovery(func() { select { - case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case ch <- s.batchSendSingleRegion(backoffer, b, scatter): case <-bo.ctx.Done(): ch <- singleBatchResp{err: bo.ctx.Err()} } @@ -83,24 +82,25 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b ch <- singleBatchResp{err: errors.Errorf("%v", r)} } }) - }() + }(batch1) } srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} for i := 0; i < len(batches); i++ { batchResp := <-ch if batchResp.err != nil { - logutil.Logger(context.Background()).Debug("tikv store batch send failed", - zap.Error(batchResp.err)) + logutil.Logger(context.Background()).Debug("batch split regions failed", zap.Error(batchResp.err)) if err == nil { err = batchResp.err } - continue } - spResp := batchResp.resp.SplitRegion - regions := spResp.GetRegions() - srResp.Regions = append(srResp.Regions, regions...) + // If the split succeeds and the scatter fails, we also need to add the region IDs. + if batchResp.resp != nil { + spResp := batchResp.resp.SplitRegion + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions...) + } } return &tikvrpc.Response{SplitRegion: srResp}, errors.Trace(err) } @@ -150,35 +150,39 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo // so n-1 needs to be scattered to other stores. spResp.Regions = regions[:len(regions)-1] } + logutil.Logger(context.Background()).Info("batch split regions complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Binary("first at", batch.keys[0]), + zap.Stringer("first new region left", spResp.Regions[0]), + zap.Int("new region count", len(spResp.Regions))) + if !scatter { if len(spResp.Regions) == 0 { return batchResp } - logutil.Logger(context.Background()).Info("batch split regions complete", - zap.Uint64("batch region ID", batch.regionID.id), - zap.Binary("first at", batch.keys[0]), - zap.String("first new region left", spResp.Regions[0].String()), - zap.Int("new region count", len(spResp.Regions))) return batchResp } for i, r := range spResp.Regions { if err = s.scatterRegion(r.Id); err == nil { - logutil.Logger(context.Background()).Info("batch split regions, scatter a region complete", + logutil.Logger(context.Background()).Info("batch split regions, scatter region complete", zap.Uint64("batch region ID", batch.regionID.id), zap.Binary("at", batch.keys[i]), zap.String("new region left", r.String())) continue } - logutil.Logger(context.Background()).Info("batch split regions, scatter a region failed", + logutil.Logger(context.Background()).Info("batch split regions, scatter region failed", zap.Uint64("batch region ID", batch.regionID.id), zap.Binary("at", batch.keys[i]), - zap.String("new region left", r.String()), + zap.Stringer("new region left", r), zap.Error(err)) if batchResp.err == nil { batchResp.err = err } + if ErrPDServerTimeout.Equal(err) { + break + } } return batchResp } @@ -193,12 +197,19 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte for _, r := range spResp.Regions { regionIDs = append(regionIDs, r.Id) } - logutil.Logger(context.Background()).Info("split regions complete", zap.Uint64s("region IDs", regionIDs)) + logutil.Logger(context.Background()).Info("split regions complete", + zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs)) } return regionIDs, errors.Trace(err) } func (s *tikvStore) scatterRegion(regionID uint64) error { + failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ErrPDServerTimeout) + } + }) + logutil.Logger(context.Background()).Info("start scatter region", zap.Uint64("regionID", regionID)) bo := NewBackoffer(context.Background(), scatterRegionBackoff) @@ -207,12 +218,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { if err == nil { break } - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) + err = bo.Backoff(BoPDRPC, errors.New(err.Error())) if err != nil { return errors.Trace(err) } } - logutil.Logger(context.Background()).Info("scatter region complete", + logutil.Logger(context.Background()).Debug("scatter region complete", zap.Uint64("regionID", regionID)) return nil } @@ -221,11 +232,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { // backOff is the back off time of the wait scatter region.(Milliseconds) // if backOff <= 0, the default wait scatter back off time will be used. func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error { - logutil.Logger(context.Background()).Info("wait scatter region", - zap.Uint64("regionID", regionID)) if backOff <= 0 { backOff = waitScatterRegionFinishBackoff } + logutil.Logger(context.Background()).Info("wait scatter region", + zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff)) + bo := NewBackoffer(context.Background(), backOff) logFreq := 0 for {