Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support a region divided into multiple regions #11739

Merged
merged 15 commits into from
Sep 3, 2019
Merged
40 changes: 22 additions & 18 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package ddl

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -68,19 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
max := int64(1 << tbInfo.ShardRowIDBits)
splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits - 1)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
splitTableKeys = append(splitTableKeys, key)
}
var err error
regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split some table regions failed",
zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
Expand All @@ -90,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
if len(regionIDs) == 1 {
return regionIDs[0]
}
return 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we return an error here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the original code and several functions on the upper layer also did not return an error message.

}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
splitKeys := make([][]byte, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
splitKeys = append(splitKeys, indexPrefix)
}
regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split some table index regions failed",
zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
return regionIDs
}
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (s *testSuiteP1) TestBatchPointGetRepeatableRead(c *C) {
}

func (s *testSuite4) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -2152,7 +2152,7 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) {
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)
}

func (s *testSuiteP1) TestRow(c *C) {
Expand Down
61 changes: 19 additions & 42 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -93,27 +92,18 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.BgLogger().Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using ctxWithTimeout here? IMHO a timeout is still needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code hasn't and SplitRegions has itself timeout, so I don't add it.
If you feel the need, is there a suggested value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, never mind

if err != nil {
logutil.BgLogger().Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down Expand Up @@ -283,30 +273,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
if err != nil {
return err
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190819021501-c5d6ce829420
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae h1:WR4d5ga8zXT+QDWYFzzyA+PJMMszR0kQxyYMh6dvHPg=
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ type Iterator interface {

// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error)
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
25 changes: 17 additions & 8 deletions store/mockstore/mocktikv/rpc.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,23 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc
}

func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
key := NewMvccKey(req.GetSplitKey())
region, _ := h.cluster.GetRegionByKey(key)
if bytes.Equal(region.GetStartKey(), key) {
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
for i, key := range keys {
k := NewMvccKey(key)
region, _ := h.cluster.GetRegionByKey(k)
if bytes.Equal(region.GetStartKey(), key) {
continue
}
if i == 0 {
// Set the leftmost region.
resp.Regions = append(resp.Regions, region)
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
resp.Regions = append(resp.Regions, newRegion.Meta)
}
return resp
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) {
c.Assert(err, IsNil)

if rowNum > 123 {
_, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false)
_, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false)
c.Assert(err, IsNil)
}

if rowNum > 456 {
_, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false)
_, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false)
c.Assert(err, IsNil)
}

Expand Down
Loading