From 8005c6e8cbe3ce0fa8c0808e3750ecc225dab9ed Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 14 Aug 2019 12:36:20 +0800 Subject: [PATCH 01/12] *: support a region divided into multiple regions --- ddl/split_region.go | 39 ++++--- executor/executor_test.go | 4 +- executor/split.go | 61 ++++------ go.mod | 2 + go.sum | 5 +- kv/kv.go | 2 +- store/mockstore/mocktikv/rpc.go | 25 +++-- store/tikv/scan_test.go | 4 +- store/tikv/split_region.go | 190 ++++++++++++++++++++++++-------- 9 files changed, 208 insertions(+), 124 deletions(-) mode change 100644 => 100755 store/mockstore/mocktikv/rpc.go diff --git a/ddl/split_region.go b/ddl/split_region.go index a7c710b381315..c7aadeddd8677 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -14,6 +14,8 @@ package ddl import ( + "context" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" @@ -73,16 +75,18 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat // The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number. // So we only need to split the region for the positive number. max := int64(1 << (tbInfo.ShardRowIDBits - 1)) + splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions-1)) for p := int64(step); p < max; p += step { recordID := p << (64 - tbInfo.ShardRowIDBits) recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID) key := tablecodec.EncodeRecordKey(recordPrefix, recordID) - regionID, err := store.SplitRegion(key, scatter) - if err != nil { - logutil.BgLogger().Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err)) - } else { - regionIDs = append(regionIDs, regionID) - } + splitTableKeys = append(splitTableKeys, key) + } + var err error + regionIDs, err = store.SplitRegions(context.Background(), splitTableKeys, scatter) + if err != nil { + logutil.BgLogger().Warn("[ddl] pre split some table regions failed", + zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...) if scatter { @@ -92,26 +96,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 { tableStartKey := tablecodec.GenTablePrefix(tableID) - regionID, err := store.SplitRegion(tableStartKey, scatter) + regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter) if err != nil { // It will be automatically split by TiKV later. logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err)) } - return regionID + if len(regionIDs) == 1 { + return regionIDs[0] + } + return 0 } func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 { - regionIDs := make([]uint64, 0, len(tblInfo.Indices)) + splitKeys := make([][]byte, 0, len(tblInfo.Indices)) for _, idx := range tblInfo.Indices { indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID) - regionID, err := store.SplitRegion(indexPrefix, scatter) - if err != nil { - logutil.BgLogger().Warn("[ddl] pre split table index region failed", - zap.Stringer("table", tblInfo.Name), - zap.Stringer("index", idx.Name), - zap.Error(err)) - } - regionIDs = append(regionIDs, regionID) + splitKeys = append(splitKeys, indexPrefix) + } + regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter) + if err != nil { + logutil.BgLogger().Warn("[ddl] pre split some table index regions failed", + zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } return regionIDs } diff --git a/executor/executor_test.go b/executor/executor_test.go index 348c7c896d1de..cb9ebc4fd717f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2108,7 +2108,7 @@ func (s *testSuiteP1) TestPointGetRepeatableRead(c *C) { } func (s *testSuite4) TestSplitRegionTimeout(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -2117,7 +2117,7 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) { tk.MustExec(`set @@tidb_wait_split_region_timeout=1`) // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil) } func (s *testSuiteP1) TestRow(c *C) { diff --git a/executor/split.go b/executor/split.go index eefda2fa65601..a33877acde12f 100755 --- a/executor/split.go +++ b/executor/split.go @@ -23,7 +23,6 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -93,27 +92,18 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs := make([]uint64, 0, len(splitIdxKeys)) - for _, idxKey := range splitIdxKeys { - if isCtxDone(ctxWithTimeout) { - break - } - - regionID, err := s.SplitRegion(idxKey, true) - if err != nil { - logutil.BgLogger().Warn("split table index region failed", - zap.String("table", e.tableInfo.Name.L), - zap.String("index", e.indexInfo.Name.L), - zap.Error(err)) - continue - } - if regionID == 0 { - continue - } - regionIDs = append(regionIDs, regionID) - + regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true) + if err != nil { + logutil.BgLogger().Warn("split table index region failed", + zap.String("table", e.tableInfo.Name.L), + zap.String("index", e.indexInfo.Name.L), + zap.Error(err)) } e.splitRegions = len(regionIDs) + if e.splitRegions == 0 { + return nil + } + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } @@ -283,30 +273,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { if err != nil { return err } - regionIDs := make([]uint64, 0, len(splitKeys)) - for _, key := range splitKeys { - failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) { - if val.(bool) { - time.Sleep(time.Second*1 + time.Millisecond*10) - } - }) - if isCtxDone(ctxWithTimeout) { - break - } - regionID, err := s.SplitRegion(key, true) - if err != nil { - logutil.BgLogger().Warn("split table region failed", - zap.String("table", e.tableInfo.Name.L), - zap.Error(err)) - continue - } - if regionID == 0 { - continue - } - regionIDs = append(regionIDs, regionID) - + regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true) + if err != nil { + logutil.BgLogger().Warn("split table region failed", + zap.String("table", e.tableInfo.Name.L), + zap.Error(err)) } e.splitRegions = len(regionIDs) + if e.splitRegions == 0 { + return nil + } + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } diff --git a/go.mod b/go.mod index 0a373f0dc22d6..4be435645b715 100644 --- a/go.mod +++ b/go.mod @@ -75,3 +75,5 @@ require ( sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) + +replace github.com/pingcap/kvproto => github.com/disksing/kvproto v0.0.0-20190813043633-5071b679fe14 diff --git a/go.sum b/go.sum index b4170e6ca9006..f8e7083b566f4 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f h1:dDxpBYafY/GYpcl+LS4Bn3ziLPuEdGRkRjYAbSlWxSA= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/disksing/kvproto v0.0.0-20190813043633-5071b679fe14 h1:QuZfs01/4Y5CM89f5WnVFqImWaCWTUwfaW5ViY1B2MM= +github.com/disksing/kvproto v0.0.0-20190813043633-5071b679fe14/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -159,9 +161,6 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/kv/kv.go b/kv/kv.go index 895d8ebed1338..8520137383945 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -305,7 +305,7 @@ type Iterator interface { // SplitableStore is the kv store which supports split regions. type SplitableStore interface { - SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error) + SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error) WaitScatterRegionFinish(regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go old mode 100644 new mode 100755 index 8cf5d481bd788..34343c983ba32 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -603,14 +603,23 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc } func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { - key := NewMvccKey(req.GetSplitKey()) - region, _ := h.cluster.GetRegionByKey(key) - if bytes.Equal(region.GetStartKey(), key) { - return &kvrpcpb.SplitRegionResponse{} - } - newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) - newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0]) - return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta} + keys := req.GetSplitKeys() + resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)} + for i, key := range keys { + k := NewMvccKey(key) + region, _ := h.cluster.GetRegionByKey(k) + if bytes.Equal(region.GetStartKey(), key) { + continue + } + if i == 0 { + // Set the leftmost region. + resp.Regions = append(resp.Regions, region) + } + newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) + newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0]) + resp.Regions = append(resp.Regions, newRegion.Meta) + } + return resp } // RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index 0deeb0751ef48..fb99bffd33454 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) { c.Assert(err, IsNil) if rowNum > 123 { - _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false) + _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false) c.Assert(err, IsNil) } if rowNum > 456 { - _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false) + _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false) c.Assert(err, IsNil) } diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 701a2734f8f0a..4d3b1271319dd 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -17,70 +17,163 @@ import ( "bytes" "context" "encoding/hex" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) -// SplitRegion splits the region contains splitKey into 2 regions: [start, -// splitKey) and [splitKey, end). -func (s *tikvStore) SplitRegion(splitKey kv.Key, scatter bool) (regionID uint64, err error) { - logutil.BgLogger().Info("start split region", - zap.Binary("at", splitKey)) - bo := NewBackoffer(context.Background(), splitRegionBackoff) - sender := NewRegionRequestSender(s.regionCache, s.client) +func (s *tikvStore) spliteBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) { + groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys) + if err != nil { + return nil, errors.Trace(err) + } + + var batches []batch + for regionID, groupKeys := range groups { + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + } + + if len(batches) == 0 { + return nil, nil + } + if len(batches) == 1 { + resp := s.batchSendSingleRegion(bo, batches[0], scatter) + return resp.resp, errors.Trace(resp.err) + } + ch := make(chan singleBatchResp, len(batches)) + for _, batch1 := range batches { + batch := batch1 + go func() { + backoffer, cancel := bo.Fork() + defer cancel() + + select { + case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case <-bo.ctx.Done(): + ch <- singleBatchResp{err: bo.ctx.Err()} + } + }() + } + + srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} + for i := 0; i < len(batches); i++ { + batchResp := <-ch + if batchResp.err != nil { + logutil.BgLogger().Debug("tikv store batch send failed", + zap.Error(batchResp.err)) + if err == nil { + err = batchResp.err + } + continue + } + + spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions[:len(regions)-1]...) + } + return &tikvrpc.Response{Resp: srResp}, errors.Trace(err) +} + +func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool) singleBatchResp { + failpoint.Inject("MockSplitRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Second*1 + time.Millisecond*10) + } + }) + req := tikvrpc.NewRequest(tikvrpc.CmdSplitRegion, &kvrpcpb.SplitRegionRequest{ - SplitKey: splitKey, + SplitKeys: batch.keys, }, kvrpcpb.Context{ Priority: kvrpcpb.CommandPri_Normal, }) - for { - loc, err := s.regionCache.LocateKey(bo, splitKey) - if err != nil { - return 0, errors.Trace(err) - } - if bytes.Equal(splitKey, loc.StartKey) { - logutil.BgLogger().Info("skip split region", - zap.String("at", hex.EncodeToString(splitKey))) - return 0, nil - } - res, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort) + + sender := NewRegionRequestSender(s.regionCache, s.client) + resp, err := sender.SendReq(bo, req, batch.regionID, readTimeoutShort) + + batchResp := singleBatchResp{resp: resp} + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + regionErr, err := resp.GetRegionError() + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + if regionErr != nil { + err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return 0, errors.Trace(err) + batchResp.err = errors.Trace(err) + return batchResp } - regionErr, err := res.GetRegionError() - if err != nil { - return 0, errors.Trace(err) + resp, err = s.spliteBatchRegionsReq(bo, batch.keys, scatter) + batchResp.resp = resp + batchResp.err = err + return batchResp + } + + spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) + left := spResp.GetLeft() + regions := spResp.GetRegions() + if left != nil { + spResp.Regions = []*metapb.Region{left} + } else if len(regions) > 0 { + spResp.Regions = regions[:len(regions)-1] + } + if !scatter { + if len(spResp.Regions) == 0 { + return batchResp } - if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return 0, errors.Trace(err) - } + logutil.BgLogger().Info("batch split regions complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.String("first at", hex.EncodeToString(batch.keys[0])), + zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])), + zap.Int("new region count", len(spResp.Regions)+1)) + return batchResp + } + + for i, r := range spResp.Regions { + err = s.scatterRegion(r.Id) + if err == nil { + logutil.BgLogger().Info("scatter a region complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.String("at", hex.EncodeToString(batch.keys[i])), + zap.Stringer("new region left", logutil.Hex(r))) continue } - splitRegion := res.Resp.(*kvrpcpb.SplitRegionResponse) - logutil.BgLogger().Info("split region complete", - zap.String("at", hex.EncodeToString(splitKey)), - zap.Stringer("new region left", logutil.Hex(splitRegion.GetLeft())), - zap.Stringer("new region right", logutil.Hex(splitRegion.GetRight()))) - left := splitRegion.GetLeft() - if left == nil { - return 0, nil + + logutil.BgLogger().Info("scatter a region failed", + zap.Uint64("batch region ID", batch.regionID.id), + zap.String("at", hex.EncodeToString(batch.keys[i])), + zap.Stringer("new region left", logutil.Hex(r)), + zap.Error(err)) + if batchResp.err == nil { + batchResp.err = err } - if scatter { - err = s.scatterRegion(left.Id) - if err != nil { - return 0, errors.Trace(err) - } + } + return batchResp +} + +// SplitRegions splits regions contains splitKeys into some egions. +func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { + // TODO: DO we need update this backoff? + bo := NewBackoffer(ctx, splitRegionBackoff) + resp, err := s.spliteBatchRegionsReq(bo, splitKeys, scatter) + regionIDs = make([]uint64, 0, len(splitKeys)) + if resp != nil && resp.Resp != nil { + spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) + for _, r := range spResp.Regions { + regionIDs = append(regionIDs, r.Id) } - return left.Id, nil } + return regionIDs, errors.Trace(err) } func (s *tikvStore) scatterRegion(regionID uint64) error { @@ -89,14 +182,13 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { bo := NewBackoffer(context.Background(), scatterRegionBackoff) for { err := s.pdClient.ScatterRegion(context.Background(), regionID) + if err == nil { + break + } + err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) if err != nil { - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) - if err != nil { - return errors.Trace(err) - } - continue + return errors.Trace(err) } - break } logutil.BgLogger().Info("scatter region complete", zap.Uint64("regionID", regionID)) From 211ea042a47f0cdd1162d3e5b2ec18965838b7cb Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 15 Aug 2019 23:15:27 +0800 Subject: [PATCH 02/12] store: add logs --- store/tikv/split_region.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 4d3b1271319dd..71520dc374c0d 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -142,14 +142,14 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo for i, r := range spResp.Regions { err = s.scatterRegion(r.Id) if err == nil { - logutil.BgLogger().Info("scatter a region complete", + logutil.BgLogger().Info("batch split regions, scatter a region complete", zap.Uint64("batch region ID", batch.regionID.id), zap.String("at", hex.EncodeToString(batch.keys[i])), zap.Stringer("new region left", logutil.Hex(r))) continue } - logutil.BgLogger().Info("scatter a region failed", + logutil.BgLogger().Info("batch split regions,scatter a region failed", zap.Uint64("batch region ID", batch.regionID.id), zap.String("at", hex.EncodeToString(batch.keys[i])), zap.Stringer("new region left", logutil.Hex(r)), @@ -172,6 +172,7 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte for _, r := range spResp.Regions { regionIDs = append(regionIDs, r.Id) } + logutil.BgLogger().Info("split regions complete", zap.Uint64s("region IDs", regionIDs)) } return regionIDs, errors.Trace(err) } From cfb0522829d3171d3e86a870d261099e7cddeddb Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 22 Aug 2019 11:12:43 +0800 Subject: [PATCH 03/12] *: update the package of kvproto --- go.mod | 4 +--- go.sum | 6 ++---- store/tikv/split_region.go | 3 +-- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index a6441745c4878..14577adcf1575 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 + github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190819021501-c5d6ce829420 github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b @@ -75,5 +75,3 @@ require ( sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) - -replace github.com/pingcap/kvproto => github.com/disksing/kvproto v0.0.0-20190813043633-5071b679fe14 diff --git a/go.sum b/go.sum index 74cb0a221aa72..fd730edebc718 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,6 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f h1:dDxpBYafY/GYpcl+LS4Bn3ziLPuEdGRkRjYAbSlWxSA= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/disksing/kvproto v0.0.0-20190813043633-5071b679fe14 h1:QuZfs01/4Y5CM89f5WnVFqImWaCWTUwfaW5ViY1B2MM= -github.com/disksing/kvproto v0.0.0-20190813043633-5071b679fe14/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -162,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE= -github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae h1:WR4d5ga8zXT+QDWYFzzyA+PJMMszR0kQxyYMh6dvHPg= +github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index b645d98dd86e5..8e9cf37700f5d 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -16,8 +16,6 @@ package tikv import ( "bytes" "context" - "encoding/hex" - "github.com/pingcap/tidb/kv" "time" "github.com/pingcap/errors" @@ -25,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" From f2beb3169245ba9f65cdfc56e338d3d3aeda2a98 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 22 Aug 2019 11:35:00 +0800 Subject: [PATCH 04/12] ddl: tiny update --- ddl/split_region.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ddl/split_region.go b/ddl/split_region.go index 73fc2a953e3b9..79472faad43ea 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -70,7 +70,6 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat // And the max _tidb_rowid is 9223372036854775807, it won't be negative number. // Split table region. - regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices)) step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions)) max := int64(1 << tbInfo.ShardRowIDBits) splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions)) @@ -81,7 +80,7 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat splitTableKeys = append(splitTableKeys, key) } var err error - regionIDs, err = store.SplitRegions(context.Background(), splitTableKeys, scatter) + regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter) if err != nil { logutil.BgLogger().Warn("[ddl] pre split some table regions failed", zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) From efbbcabcb9a3394eada6b0e902fdd201c371b2c6 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 27 Aug 2019 10:38:07 +0800 Subject: [PATCH 05/12] store: address comments --- store/tikv/split_region.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 8e9cf37700f5d..6bf53e4d660ed 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -16,6 +16,7 @@ package tikv import ( "bytes" "context" + "math" "time" "github.com/pingcap/errors" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -54,11 +56,17 @@ func (s *tikvStore) spliteBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter backoffer, cancel := bo.Fork() defer cancel() - select { - case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): - case <-bo.ctx.Done(): - ch <- singleBatchResp{err: bo.ctx.Err()} - } + util.WithRecovery(func() { + select { + case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case <-bo.ctx.Done(): + ch <- singleBatchResp{err: bo.ctx.Err()} + } + }, func(r interface{}) { + if r != nil { + ch <- singleBatchResp{err: errors.Errorf("%v", r)} + } + }) }() } @@ -76,6 +84,8 @@ func (s *tikvStore) spliteBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) regions := spResp.GetRegions() + // Divide a region into n, one can not need to be scattered, + // so n-1 needs to be scattered to other storage servers. srResp.Regions = append(srResp.Regions, regions[:len(regions)-1]...) } return &tikvrpc.Response{Resp: srResp}, errors.Trace(err) @@ -125,6 +135,8 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo if left != nil { spResp.Regions = []*metapb.Region{left} } else if len(regions) > 0 { + // Divide a region into n, one can not need to be scattered, + // so n-1 needs to be scattered to other storage servers. spResp.Regions = regions[:len(regions)-1] } if !scatter { @@ -161,10 +173,10 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo return batchResp } -// SplitRegions splits regions contains splitKeys into some egions. +// SplitRegions splits regions contains splitKeys into some regions. func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { - // TODO: DO we need update this backoff? - bo := NewBackoffer(ctx, splitRegionBackoff) + splitRegionsBo := splitRegionBackoff * math.Min(float64(len(splitKeys)), 10) + bo := NewBackoffer(ctx, int(splitRegionsBo)) resp, err := s.spliteBatchRegionsReq(bo, splitKeys, scatter) regionIDs = make([]uint64, 0, len(splitKeys)) if resp != nil && resp.Resp != nil { From c0420c130ed9cb0ce076b2589a33b0f9c7997570 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 28 Aug 2019 12:51:00 +0800 Subject: [PATCH 06/12] store/tikv: address comments --- store/tikv/split_region.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 6bf53e4d660ed..2cba1c79b9e91 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -31,7 +31,7 @@ import ( "go.uber.org/zap" ) -func (s *tikvStore) spliteBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) { +func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) { groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys) if err != nil { return nil, errors.Trace(err) @@ -45,6 +45,14 @@ func (s *tikvStore) spliteBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter if len(batches) == 0 { return nil, nil } + // The first time it enters this function. + if bo.totalSleep == 0 { + logutil.BgLogger().Info("split batch regions request", + zap.Int("split key count", len(keys)), + zap.Int("batch count", len(batches)), + zap.Uint64("first batch, region ID", batches[0].regionID.id), + zap.Stringer("first split key", kv.Key(batches[0].keys[0]))) + } if len(batches) == 1 { resp := s.batchSendSingleRegion(bo, batches[0], scatter) return resp.resp, errors.Trace(resp.err) @@ -130,11 +138,8 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo } spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) - left := spResp.GetLeft() regions := spResp.GetRegions() - if left != nil { - spResp.Regions = []*metapb.Region{left} - } else if len(regions) > 0 { + if len(regions) > 0 { // Divide a region into n, one can not need to be scattered, // so n-1 needs to be scattered to other storage servers. spResp.Regions = regions[:len(regions)-1] From d1878833eb00c0776b03e8cc939c3bd53dda1566 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 28 Aug 2019 12:55:07 +0800 Subject: [PATCH 07/12] store/tikv: tiny update --- store/tikv/split_region.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 2cba1c79b9e91..4ef05c89689aa 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -131,7 +131,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo batchResp.err = errors.Trace(err) return batchResp } - resp, err = s.spliteBatchRegionsReq(bo, batch.keys, scatter) + resp, err = s.splitBatchRegionsReq(bo, batch.keys, scatter) batchResp.resp = resp batchResp.err = err return batchResp @@ -182,7 +182,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { splitRegionsBo := splitRegionBackoff * math.Min(float64(len(splitKeys)), 10) bo := NewBackoffer(ctx, int(splitRegionsBo)) - resp, err := s.spliteBatchRegionsReq(bo, splitKeys, scatter) + resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter) regionIDs = make([]uint64, 0, len(splitKeys)) if resp != nil && resp.Resp != nil { spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) From 2c1f07bcc5a68e239e5137a4a8efcb39de28a1aa Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 30 Aug 2019 11:06:36 +0800 Subject: [PATCH 08/12] *: address comments --- executor/executor_test.go | 9 +++++++-- store/tikv/split_region.go | 8 +++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index f8ea98f0e8cbd..f2eb2448d6bd9 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4050,7 +4050,7 @@ func (s *testSuiteP1) TestReadPartitionedTable(c *C) { func (s *testSuiteP1) TestSplitRegion(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists t, t1") tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))") tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`) _, err := tk.Exec(`split table t index idx1 by ("abcd");`) @@ -4127,8 +4127,13 @@ func (s *testSuiteP1) TestSplitRegion(c *C) { c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid") - // Test split region by syntax + // Test split region by syntax. tk.MustExec(`split table t by (0),(1000),(1000000)`) + + // Test split region twice to test for multiple batch split region requests. + tk.MustExec("create table t1(a int, b int)") + tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1")) + tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1")) } func (s *testSuite) TestShowTableRegion(c *C) { diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 4ef05c89689aa..4b67333e07d78 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -92,9 +92,7 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) regions := spResp.GetRegions() - // Divide a region into n, one can not need to be scattered, - // so n-1 needs to be scattered to other storage servers. - srResp.Regions = append(srResp.Regions, regions[:len(regions)-1]...) + srResp.Regions = append(srResp.Regions, regions...) } return &tikvrpc.Response{Resp: srResp}, errors.Trace(err) } @@ -152,7 +150,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo zap.Uint64("batch region ID", batch.regionID.id), zap.Stringer("first at", kv.Key(batch.keys[0])), zap.Stringer("first new region left", logutil.Hex(spResp.Regions[0])), - zap.Int("new region count", len(spResp.Regions)+1)) + zap.Int("new region count", len(spResp.Regions))) return batchResp } @@ -166,7 +164,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo continue } - logutil.BgLogger().Info("batch split regions,scatter a region failed", + logutil.BgLogger().Info("batch split regions, scatter a region failed", zap.Uint64("batch region ID", batch.regionID.id), zap.Stringer("at", kv.Key(batch.keys[i])), zap.Stringer("new region left", logutil.Hex(r)), From ba9fa15b6276dbf6f8fb7828d9606c25360c2928 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 30 Aug 2019 21:15:22 +0800 Subject: [PATCH 09/12] store/tikv: address comments --- store/tikv/split_region.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 4b67333e07d78..bd74fe3b476e9 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -138,8 +138,8 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo spResp := resp.Resp.(*kvrpcpb.SplitRegionResponse) regions := spResp.GetRegions() if len(regions) > 0 { - // Divide a region into n, one can not need to be scattered, - // so n-1 needs to be scattered to other storage servers. + // Divide a region into n, one of them may not need to be scattered, + // so n-1 needs to be scattered to other stores. spResp.Regions = regions[:len(regions)-1] } if !scatter { @@ -155,8 +155,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo } for i, r := range spResp.Regions { - err = s.scatterRegion(r.Id) - if err == nil { + if err = s.scatterRegion(r.Id); err == nil { logutil.BgLogger().Info("batch split regions, scatter a region complete", zap.Uint64("batch region ID", batch.regionID.id), zap.Stringer("at", kv.Key(batch.keys[i])), @@ -176,9 +175,9 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo return batchResp } -// SplitRegions splits regions contains splitKeys into some regions. +// SplitRegions splits regions by splitKeys. func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { - splitRegionsBo := splitRegionBackoff * math.Min(float64(len(splitKeys)), 10) + splitRegionsBo := splitRegionBackoff * math.Min(float64(len(splitKeys)), 6) bo := NewBackoffer(ctx, int(splitRegionsBo)) resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter) regionIDs = make([]uint64, 0, len(splitKeys)) From fde2a7c96247b3ddfaff96599f7eb7677df9cfb7 Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 2 Sep 2019 13:32:33 +0800 Subject: [PATCH 10/12] store/tikv: address comment --- store/tikv/backoff.go | 1 + store/tikv/split_region.go | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 4f3c88e8f6506..1eae476da5bad 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -219,6 +219,7 @@ const ( deleteRangeOneRegionMaxBackoff = 100000 rawkvMaxBackoff = 20000 splitRegionBackoff = 20000 + maxSplitRegionsBackoff = 120000 scatterRegionBackoff = 20000 waitScatterRegionFinishBackoff = 120000 locateRegionMaxBackoff = 20000 diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index bd74fe3b476e9..e4053f6d570f3 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -177,8 +177,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo // SplitRegions splits regions by splitKeys. func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { - splitRegionsBo := splitRegionBackoff * math.Min(float64(len(splitKeys)), 6) - bo := NewBackoffer(ctx, int(splitRegionsBo)) + bo := NewBackoffer(ctx, int(math.Min(float64(len(splitKeys)), maxSplitRegionsBackoff))) resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter) regionIDs = make([]uint64, 0, len(splitKeys)) if resp != nil && resp.Resp != nil { From dac8d5e0bae31a5e9a7c4736dfdab11ac5c1ff16 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 3 Sep 2019 13:00:59 +0800 Subject: [PATCH 11/12] store/tikv: filter split keys --- store/tikv/2pc.go | 2 +- store/tikv/rawkv.go | 4 ++-- store/tikv/region_cache.go | 6 +++++- store/tikv/scan_test.go | 2 ++ store/tikv/snapshot.go | 2 +- store/tikv/split_region.go | 13 +++++++++++-- store/tikv/split_test.go | 2 +- 7 files changed, 23 insertions(+), 8 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 05ba781a50ba8..4f3715ee11014 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -331,7 +331,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA if len(keys) == 0 { return nil } - groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys) + groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index c510dddba9d9f..c85489d7a3ea8 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -395,7 +395,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (* } func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return nil, errors.Trace(err) } @@ -544,7 +544,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error { for i, key := range keys { keyToValue[string(key)] = values[i] } - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index b6e312bd76377..3f1e60276d734 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -473,7 +473,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. -func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { +// filter is used to filter some unwanted keys. +func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) { groups := make(map[RegionVerID][][]byte) var first RegionVerID var lastLoc *KeyLocation @@ -484,6 +485,9 @@ func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[Regio if err != nil { return nil, first, errors.Trace(err) } + if filter != nil && filter(k, lastLoc.StartKey) { + continue + } } id := lastLoc.Region if i == 0 { diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index fb99bffd33454..15d196a1e210a 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -91,11 +91,13 @@ func (s *testScanSuite) TestScan(c *C) { c.Assert(err, IsNil) if rowNum > 123 { + fmt.Println(s.prefix, s08d("key", 123)) _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false) c.Assert(err, IsNil) } if rowNum > 456 { + fmt.Println(s.prefix, s08d("key", 456)) _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false) c.Assert(err, IsNil) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 875b45ae45212..e13fb0674d7cb 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -114,7 +114,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] } func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { - groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index e4053f6d570f3..bf1ac13d9ad26 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -31,8 +31,17 @@ import ( "go.uber.org/zap" ) +func equalRegionStartKey(key, regionStartKey []byte) bool { + if bytes.Equal(key, regionStartKey) { + return true + } + return false +} + func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) { - groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys) + // equalRegionStartKey is used to filter split keys. + // If the split key is equal to the start key of the region, then the key has been split, we need to skip the split key. + groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys, equalRegionStartKey) if err != nil { return nil, errors.Trace(err) } @@ -177,7 +186,7 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo // SplitRegions splits regions by splitKeys. func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { - bo := NewBackoffer(ctx, int(math.Min(float64(len(splitKeys)), maxSplitRegionsBackoff))) + bo := NewBackoffer(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff))) resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter) regionIDs = make([]uint64, 0, len(splitKeys)) if resp != nil && resp.Resp != nil { diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index fe5caa718891e..1f70994501a4a 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -61,7 +61,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) keys := [][]byte{{'a'}, {'b'}, {'c'}} - _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys) + _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys, nil) c.Assert(err, IsNil) batch := batchKeys{ region: region, From 883d0045241d816591f0a1842c4feeb4f389d049 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 3 Sep 2019 13:14:19 +0800 Subject: [PATCH 12/12] store: tiny update --- store/tikv/scan_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index 15d196a1e210a..fb99bffd33454 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -91,13 +91,11 @@ func (s *testScanSuite) TestScan(c *C) { c.Assert(err, IsNil) if rowNum > 123 { - fmt.Println(s.prefix, s08d("key", 123)) _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false) c.Assert(err, IsNil) } if rowNum > 456 { - fmt.Println(s.prefix, s08d("key", 456)) _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false) c.Assert(err, IsNil) }