Skip to content

Commit

Permalink
executor: fix scatter region timeout issues and "show processlist" di…
Browse files Browse the repository at this point in the history
…splay issues (pingcap#12057)
  • Loading branch information
zimulala committed Sep 24, 2019
1 parent 7fa8297 commit 041b8e1
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 47 deletions.
7 changes: 6 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,11 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)

// Test scatter regions timeout.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil)
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down Expand Up @@ -3891,7 +3896,7 @@ func (s *testSuite) TestSplitRegion(c *C) {

// Test for split table region.
tk.MustExec(`split table t between (0) and (1000000000) regions 10`)
// Check the ower value is more than the upper value.
// Check the lower value is more than the upper value.
_, err = tk.Exec(`split table t between (2) and (1) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1")
Expand Down
48 changes: 26 additions & 22 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ import (
type SplitIndexRegionExec struct {
baseExecutor

tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
tableInfo *model.TableInfo
indexInfo *model.IndexInfo
lower []types.Datum
upper []types.Datum
num int
valueLists [][]types.Datum
splitIdxKeys [][]byte

done bool
splitRegionResult
Expand All @@ -60,8 +61,9 @@ type splitRegionResult struct {
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) {
e.splitIdxKeys, err = e.getSplitIdxKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -70,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
if err := e.splitIndexRegion(ctx); err != nil {
return err
}

appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -85,15 +91,11 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
if !ok {
return nil
}
splitIdxKeys, err := e.getSplitIdxKeys()
if err != nil {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true)
regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down Expand Up @@ -248,14 +250,16 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum
splitKeys [][]byte

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) {
e.splitKeys, err = e.getSplitTableKeys()
return err
}

// Next implements the Executor Next interface.
Expand All @@ -264,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true

if err := e.splitTableRegion(ctx); err != nil {
return err
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
return nil
}

Expand All @@ -280,11 +288,7 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

splitKeys, err := e.getSplitTableKeys()
if err != nil {
return err
}
regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true)
regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
Expand Down
60 changes: 36 additions & 24 deletions store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b

var batches []batch
for regionID, groupKeys := range groups {
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPutSize)
}

if len(batches) == 0 {
Expand All @@ -67,14 +67,13 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b
}
ch := make(chan singleBatchResp, len(batches))
for _, batch1 := range batches {
batch := batch1
go func() {
go func(b batch) {
backoffer, cancel := bo.Fork()
defer cancel()

util.WithRecovery(func() {
select {
case ch <- s.batchSendSingleRegion(backoffer, batch, scatter):
case ch <- s.batchSendSingleRegion(backoffer, b, scatter):
case <-bo.ctx.Done():
ch <- singleBatchResp{err: bo.ctx.Err()}
}
Expand All @@ -83,24 +82,25 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b
ch <- singleBatchResp{err: errors.Errorf("%v", r)}
}
})
}()
}(batch1)
}

srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)}
for i := 0; i < len(batches); i++ {
batchResp := <-ch
if batchResp.err != nil {
logutil.Logger(context.Background()).Debug("tikv store batch send failed",
zap.Error(batchResp.err))
logutil.Logger(context.Background()).Debug("batch split regions failed", zap.Error(batchResp.err))
if err == nil {
err = batchResp.err
}
continue
}

spResp := batchResp.resp.SplitRegion
regions := spResp.GetRegions()
srResp.Regions = append(srResp.Regions, regions...)
// If the split succeeds and the scatter fails, we also need to add the region IDs.
if batchResp.resp != nil {
spResp := batchResp.resp.SplitRegion
regions := spResp.GetRegions()
srResp.Regions = append(srResp.Regions, regions...)
}
}
return &tikvrpc.Response{SplitRegion: srResp}, errors.Trace(err)
}
Expand Down Expand Up @@ -150,35 +150,39 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo
// so n-1 needs to be scattered to other stores.
spResp.Regions = regions[:len(regions)-1]
}
logutil.Logger(context.Background()).Info("batch split regions complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Binary("first at", batch.keys[0]),
zap.Stringer("first new region left", spResp.Regions[0]),
zap.Int("new region count", len(spResp.Regions)))

if !scatter {
if len(spResp.Regions) == 0 {
return batchResp
}
logutil.Logger(context.Background()).Info("batch split regions complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Binary("first at", batch.keys[0]),
zap.String("first new region left", spResp.Regions[0].String()),
zap.Int("new region count", len(spResp.Regions)))
return batchResp
}

for i, r := range spResp.Regions {
if err = s.scatterRegion(r.Id); err == nil {
logutil.Logger(context.Background()).Info("batch split regions, scatter a region complete",
logutil.Logger(context.Background()).Info("batch split regions, scatter region complete",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Binary("at", batch.keys[i]),
zap.String("new region left", r.String()))
continue
}

logutil.Logger(context.Background()).Info("batch split regions, scatter a region failed",
logutil.Logger(context.Background()).Info("batch split regions, scatter region failed",
zap.Uint64("batch region ID", batch.regionID.id),
zap.Binary("at", batch.keys[i]),
zap.String("new region left", r.String()),
zap.Stringer("new region left", r),
zap.Error(err))
if batchResp.err == nil {
batchResp.err = err
}
if ErrPDServerTimeout.Equal(err) {
break
}
}
return batchResp
}
Expand All @@ -193,12 +197,19 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte
for _, r := range spResp.Regions {
regionIDs = append(regionIDs, r.Id)
}
logutil.Logger(context.Background()).Info("split regions complete", zap.Uint64s("region IDs", regionIDs))
logutil.Logger(context.Background()).Info("split regions complete",
zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs))
}
return regionIDs, errors.Trace(err)
}

func (s *tikvStore) scatterRegion(regionID uint64) error {
failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ErrPDServerTimeout)
}
})

logutil.Logger(context.Background()).Info("start scatter region",
zap.Uint64("regionID", regionID))
bo := NewBackoffer(context.Background(), scatterRegionBackoff)
Expand All @@ -207,12 +218,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
if err == nil {
break
}
err = bo.Backoff(BoRegionMiss, errors.New(err.Error()))
err = bo.Backoff(BoPDRPC, errors.New(err.Error()))
if err != nil {
return errors.Trace(err)
}
}
logutil.Logger(context.Background()).Info("scatter region complete",
logutil.Logger(context.Background()).Debug("scatter region complete",
zap.Uint64("regionID", regionID))
return nil
}
Expand All @@ -221,11 +232,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, the default wait scatter back off time will be used.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID))
if backOff <= 0 {
backOff = waitScatterRegionFinishBackoff
}
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff))

bo := NewBackoffer(context.Background(), backOff)
logFreq := 0
for {
Expand Down

0 comments on commit 041b8e1

Please sign in to comment.