diff --git a/server/core/region.go b/server/core/region.go index 021ab5a0b27..06f4b3bd111 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -736,6 +736,11 @@ func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo { return res } +// ScanRangeWithIterator scans region with start key, until iterator returns false. +func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(metaRegion *metapb.Region) bool) { + r.tree.scanRange(startKey, iterator) +} + // GetAdjacentRegions returns region's info that is adjacent with specific region func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { metaPrev, metaNext := r.tree.getAdjacentRegions(region.meta) diff --git a/table/codec.go b/table/codec.go index 8868bde9e1f..b25b390e432 100644 --- a/table/codec.go +++ b/table/codec.go @@ -21,8 +21,9 @@ import ( ) var ( - tablePrefix = []byte{'t'} - metaPrefix = []byte{'m'} + tablePrefix = []byte{'t'} + metaPrefix = []byte{'m'} + recordPrefix = []byte{'r'} ) const ( @@ -38,7 +39,7 @@ type Key []byte // TableID returns the table ID of the key, if the key is not table key, returns 0. func (k Key) TableID() int64 { - _, key, err := decodeBytes(k) + _, key, err := DecodeBytes(k) if err != nil { // should never happen return 0 @@ -54,13 +55,58 @@ func (k Key) TableID() int64 { // IsMeta returns if the key is a meta key. func (k Key) IsMeta() bool { - _, key, err := decodeBytes(k) + _, key, err := DecodeBytes(k) if err != nil { return false } return bytes.HasPrefix(key, metaPrefix) } +var pads = make([]byte, encGroupSize) + +// EncodeBytes guarantees the encoded value is in ascending order for comparison, +// encoding with the following rule: +// [group1][marker1]...[groupN][markerN] +// group is 8 bytes slice which is padding with 0. +// marker is `0xFF - padding 0 count` +// For example: +// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247] +// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250] +// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251] +// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247] +// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format +func EncodeBytes(data []byte) Key { + // Allocate more space to avoid unnecessary slice growing. + // Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes, + // that is `(len(data) / 8 + 1) * 9` in our implement. + dLen := len(data) + result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1)) + for idx := 0; idx <= dLen; idx += encGroupSize { + remain := dLen - idx + padCount := 0 + if remain >= encGroupSize { + result = append(result, data[idx:idx+encGroupSize]...) + } else { + padCount = encGroupSize - remain + result = append(result, data[idx:]...) + result = append(result, pads[:padCount]...) + } + + marker := encMarker - byte(padCount) + result = append(result, marker) + } + return result +} + +// EncodeInt appends the encoded value to slice b and returns the appended slice. +// EncodeInt guarantees that the encoded value is in ascending order for comparison. +func EncodeInt(b []byte, v int64) []byte { + var data [8]byte + u := encodeIntToCmpUint(v) + binary.BigEndian.PutUint64(data[:], u) + return append(b, data[:]...) +} + // DecodeInt decodes value encoded by EncodeInt before. // It returns the leftover un-decoded slice, decoded value if no error. func DecodeInt(b []byte) ([]byte, int64, error) { @@ -74,11 +120,20 @@ func DecodeInt(b []byte) ([]byte, int64, error) { return b, v, nil } +func encodeIntToCmpUint(v int64) uint64 { + return uint64(v) ^ signMask +} + func decodeCmpUintToInt(u uint64) int64 { return int64(u ^ signMask) } -func decodeBytes(b []byte) ([]byte, []byte, error) { +// DecodeBytes decodes bytes which is encoded by EncodeBytes before, +// returns the leftover bytes and decoded value if no error. +func DecodeBytes(b []byte) ([]byte, []byte, error) { + if len(b) == 0 { + return b, nil, nil + } data := make([]byte, 0, len(b)) for { if len(b) < encGroupSize+1 { @@ -112,3 +167,21 @@ func decodeBytes(b []byte) ([]byte, []byte, error) { } return b, data, nil } + +// GenerateTableKey generates a table split key. +func GenerateTableKey(tableID int64) []byte { + buf := make([]byte, 0, len(tablePrefix)+8) + buf = append(buf, tablePrefix...) + buf = EncodeInt(buf, tableID) + return buf +} + +// GenerateRowKey generates a row key. +func GenerateRowKey(tableID, rowID int64) []byte { + buf := make([]byte, 0, len(tablePrefix)+len(recordPrefix)+8*2) + buf = append(buf, tablePrefix...) + buf = EncodeInt(buf, tableID) + buf = append(buf, recordPrefix...) + buf = EncodeInt(buf, rowID) + return buf +} diff --git a/table/codec_test.go b/table/codec_test.go index 1224a51cdd0..9eefdb4ed88 100644 --- a/table/codec_test.go +++ b/table/codec_test.go @@ -23,57 +23,32 @@ func TestTable(t *testing.T) { TestingT(t) } -var pads = make([]byte, encGroupSize) - var _ = Suite(&testCodecSuite{}) type testCodecSuite struct{} -func encodeBytes(data []byte) Key { - // Allocate more space to avoid unnecessary slice growing. - // Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes, - // that is `(len(data) / 8 + 1) * 9` in our implement. - dLen := len(data) - result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1)) - for idx := 0; idx <= dLen; idx += encGroupSize { - remain := dLen - idx - padCount := 0 - if remain >= encGroupSize { - result = append(result, data[idx:idx+encGroupSize]...) - } else { - padCount = encGroupSize - remain - result = append(result, data[idx:]...) - result = append(result, pads[:padCount]...) - } - - marker := encMarker - byte(padCount) - result = append(result, marker) - } - return result -} - func (s *testCodecSuite) TestDecodeBytes(c *C) { key := "abcdefghijklmnopqrstuvwxyz" for i := 0; i < len(key); i++ { - _, k, err := decodeBytes(encodeBytes([]byte(key[:i]))) + _, k, err := DecodeBytes(EncodeBytes([]byte(key[:i]))) c.Assert(err, IsNil) c.Assert(string(k), Equals, key[:i]) } } func (s *testCodecSuite) TestTableID(c *C) { - key := encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff")) + key := EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff")) c.Assert(key.TableID(), Equals, int64(0xff)) - key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02")) + key = EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02")) c.Assert(key.TableID(), Equals, int64(0xff)) key = []byte("t\x80\x00\x00\x00\x00\x00\x00\xff") c.Assert(key.TableID(), Equals, int64(0)) - key = encodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff")) + key = EncodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff")) c.Assert(key.TableID(), Equals, int64(0)) - key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff")) + key = EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff")) c.Assert(key.TableID(), Equals, int64(0)) } diff --git a/table/namespace_classifier_test.go b/table/namespace_classifier_test.go index 73fcfceb1bf..410c82c8ac4 100644 --- a/table/namespace_classifier_test.go +++ b/table/namespace_classifier_test.go @@ -112,14 +112,14 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) { {false, "t\x80\x00\x00\x00\x00\x00\x00\x03", "t\x80\x00\x00\x00\x00\x00\x00\x04", 3, false, "global"}, {false, "m\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, true, "ns2"}, {false, "", "m\x80\x00\x00\x00\x00\x00\x00\x01", 0, false, "global"}, - {true, string(encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"}, + {true, string(EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"}, {true, "t\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, false, "global"}, // decode error } classifier := s.newClassifier(c) for _, t := range testCases { startKey, endKey := Key(t.startKey), Key(t.endKey) if !t.endcoded { - startKey, endKey = encodeBytes(startKey), encodeBytes(endKey) + startKey, endKey = EncodeBytes(startKey), EncodeBytes(endKey) } c.Assert(startKey.TableID(), Equals, t.tableID) c.Assert(startKey.IsMeta(), Equals, t.isMeta) diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index b354f0a42c0..95bc282827a 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -50,6 +50,7 @@ type Case struct { RegionSplitSize int64 RegionSplitKeys int64 Events []EventInner + TableNumber int Checker CheckerFunc // To check the schedule is finished. } @@ -83,6 +84,7 @@ var CaseMap = map[string]func() *Case{ "hot-read": newHotRead, "hot-write": newHotWrite, "makeup-down-replicas": newMakeupDownReplicas, + "import-data": newImportData, } // NewCase creates a new case. diff --git a/tools/pd-simulator/simulator/cases/import_data.go b/tools/pd-simulator/simulator/cases/import_data.go new file mode 100644 index 00000000000..606969117a4 --- /dev/null +++ b/tools/pd-simulator/simulator/cases/import_data.go @@ -0,0 +1,121 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "bytes" + "fmt" + "math/rand" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/table" + "github.com/pingcap/pd/tools/pd-simulator/simulator/simutil" +) + +func newImportData() *Case { + var simCase Case + var id idAllocator + // Initialize the cluster + for i := 1; i <= 10; i++ { + simCase.Stores = append(simCase.Stores, &Store{ + ID: id.nextID(), + Status: metapb.StoreState_Up, + Capacity: 1 * TB, + Available: 900 * GB, + Version: "2.1.0", + }) + } + + for i := 0; i < 30; i++ { + storeIDs := rand.Perm(10) + peers := []*metapb.Peer{ + {Id: id.nextID(), StoreId: uint64(storeIDs[0] + 1)}, + {Id: id.nextID(), StoreId: uint64(storeIDs[1] + 1)}, + {Id: id.nextID(), StoreId: uint64(storeIDs[2] + 1)}, + } + simCase.Regions = append(simCase.Regions, Region{ + ID: id.nextID(), + Peers: peers, + Leader: peers[0], + Size: 32 * MB, + Keys: 320000, + }) + } + simCase.MaxID = id.maxID + simCase.RegionSplitSize = 64 * MB + simCase.RegionSplitKeys = 640000 + simCase.TableNumber = 10 + // Events description + e := &WriteFlowOnSpotInner{} + table2 := string(table.EncodeBytes(table.GenerateTableKey(2))) + table3 := string(table.EncodeBytes(table.GenerateTableKey(3))) + table5 := string(table.EncodeBytes(table.GenerateTableKey(5))) + e.Step = func(tick int64) map[string]int64 { + return map[string]int64{ + table2: 4 * MB, + table3: 1 * MB, + table5: 16 * MB, + } + } + simCase.Events = []EventInner{e} + + // Checker description + startTime := time.Now() + simCase.Checker = func(regions *core.RegionsInfo) bool { + leaderDstb := make(map[uint64]int) + peerDstb := make(map[uint64]int) + leaderTotal := 0 + peerTotal := 0 + res := make([]*core.RegionInfo, 0, 100) + regions.ScanRangeWithIterator([]byte(table2), func(region *metapb.Region) bool { + if bytes.Compare(region.EndKey, []byte(table3)) < 0 { + res = append(res, regions.GetRegion(region.GetId())) + return true + } + return false + }) + + for _, r := range res { + leaderDstb[r.GetLeader().GetStoreId()]++ + leaderTotal++ + for _, p := range r.GetPeers() { + peerDstb[p.GetStoreId()]++ + peerTotal++ + } + } + if leaderTotal == 0 || peerTotal == 0 { + simutil.Logger.Info("scan zore region") + return false + } + leaderLog := "leader distribute (table2)" + peerLog := "peer distribute (table2)" + for storeID := 1; storeID <= 10; storeID++ { + if leaderCount, ok := leaderDstb[uint64(storeID)]; ok { + leaderLog = fmt.Sprintf("%s [store %d]:%.2f%%", leaderLog, storeID, float64(leaderCount)/float64(leaderTotal)*100) + } + } + for storeID := 1; storeID <= 10; storeID++ { + if peerCount, ok := peerDstb[uint64(storeID)]; ok { + peerLog = fmt.Sprintf("%s [store %d]:%.2f%%", peerLog, storeID, float64(peerCount)/float64(peerTotal)*100) + } + } + + simutil.Logger.Info(leaderLog) + simutil.Logger.Info(peerLog) + return startTime.Add(time.Minute).Before(time.Now()) + } + return &simCase +} diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index c2c5814ea3a..1d8e01c756a 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -130,7 +130,7 @@ func (n *Node) stepTask() { for _, task := range n.tasks { task.Step(n.raftEngine) if task.IsFinished() { - simutil.Logger.Infof("[store %d][region %d] task finished: %s final: %v", n.GetId(), task.RegionID(), task.Desc(), n.raftEngine.GetRegion(task.RegionID())) + simutil.Logger.Debugf("[store %d][region %d] task finished: %s final: %+v", n.GetId(), task.RegionID(), task.Desc(), n.raftEngine.GetRegion(task.RegionID()).GetMeta()) delete(n.tasks, task.RegionID()) } } diff --git a/tools/pd-simulator/simulator/raft.go b/tools/pd-simulator/simulator/raft.go index ba7c9fe48c8..4f5f130625a 100644 --- a/tools/pd-simulator/simulator/raft.go +++ b/tools/pd-simulator/simulator/raft.go @@ -14,6 +14,7 @@ package simulator import ( + "bytes" "context" "math/rand" "sort" @@ -21,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/table" "github.com/pingcap/pd/tools/pd-simulator/simulator/cases" "github.com/pingcap/pd/tools/pd-simulator/simulator/simutil" "github.com/pkg/errors" @@ -29,13 +31,14 @@ import ( // RaftEngine records all raft infomations. type RaftEngine struct { sync.RWMutex - regionsInfo *core.RegionsInfo - conn *Connection - regionChange map[uint64][]uint64 - schedulerStats *schedulerStatistics - regionSplitSize int64 - regionSplitKeys int64 - storeConfig *SimConfig + regionsInfo *core.RegionsInfo + conn *Connection + regionChange map[uint64][]uint64 + schedulerStats *schedulerStatistics + regionSplitSize int64 + regionSplitKeys int64 + storeConfig *SimConfig + useTiDBEncodeKey bool } // NewRaftEngine creates the initialized raft with the configuration. @@ -49,8 +52,14 @@ func NewRaftEngine(conf *cases.Case, conn *Connection, storeConfig *SimConfig) * regionSplitKeys: conf.RegionSplitKeys, storeConfig: storeConfig, } + var splitKeys []string + if conf.TableNumber > 0 { + splitKeys = generateTableKeys(conf.TableNumber, len(conf.Regions)-1) + r.useTiDBEncodeKey = true + } else { + splitKeys = generateKeys(len(conf.Regions) - 1) + } - splitKeys := generateKeys(len(conf.Regions) - 1) for i, region := range conf.Regions { meta := &metapb.Region{ Id: region.ID, @@ -125,7 +134,12 @@ func (r *RaftEngine) stepSplit(region *core.RegionInfo) { } } - splitKey := generateSplitKey(region.GetStartKey(), region.GetEndKey()) + var splitKey []byte + if r.useTiDBEncodeKey { + splitKey = generateMvccSplitKey(region.GetStartKey(), region.GetEndKey()) + } else { + splitKey = generateSplitKey(region.GetStartKey(), region.GetEndKey()) + } left := region.Clone( core.WithNewRegionID(ids[len(ids)-1]), core.WithNewPeerIds(ids[0:len(ids)-1]...), @@ -145,6 +159,7 @@ func (r *RaftEngine) stepSplit(region *core.RegionInfo) { r.SetRegion(right) r.SetRegion(left) + simutil.Logger.Debugf("[region %d] origin: %v split to left:%v, right:%v", region.GetID(), region.GetMeta(), left.GetMeta(), right.GetMeta()) r.recordRegionChange(left) r.recordRegionChange(right) } @@ -283,6 +298,30 @@ func generateKeys(size int) []string { return v } +func generateTableKeys(tableCount, size int) []string { + v := make([]string, 0, size) + groupNumber := size / tableCount + id := 0 + for size > 0 { + id++ + tableID := id + for rowID := 0; rowID < groupNumber && size > 0; rowID++ { + if rowID == 0 { + key := table.GenerateTableKey(int64(tableID)) + key = table.EncodeBytes(key) + v = append(v, string(key)) + size-- + continue + } + key := table.GenerateRowKey(int64(tableID), int64(rowID)) + key = table.EncodeBytes(key) + v = append(v, string(key)) + size-- + } + } + return v +} + func generateSplitKey(start, end []byte) []byte { var key []byte // lessThanEnd is set as true when the key is already less than end key. @@ -305,3 +344,63 @@ func generateSplitKey(start, end []byte) []byte { key = append(key, ('a'+'z')/2) return key } + +func mustDecodeMvccKey(key []byte) []byte { + left, res, err := table.DecodeBytes(key) + if len(left) > 0 { + simutil.Logger.Fatalf("Decode key left some bytes: %v", key) + } + if err != nil { + simutil.Logger.Fatalf("Decode key %v meet error: %v", res, err) + } + return res +} + +func generateMvccSplitKey(start, end []byte) []byte { + start = mustDecodeMvccKey(start) + end = mustDecodeMvccKey(end) + if len(start) == 0 && len(end) == 0 { + // suppose use table key with table ID: 0 + key := table.GenerateTableKey(int64(0)) + return table.EncodeBytes(key) + } + if len(end) == 0 { + end = make([]byte, len(start)) + } else if len(start) < len(end) { + pad := make([]byte, len(end)-len(start)) + start = append(start, pad...) + } else if len(end) < len(start) { + pad := make([]byte, len(start)-len(end)) + end = append(end, pad...) + } + borrow := false + var ( + d uint16 + e uint16 + ) + res := make([]byte, len(start)) + for i := 0; i < len(start); i++ { + s := start[i] + e = uint16(end[i]) + if borrow { + e += 0x100 + } + borrow = (uint16(s)+e)%2 != 0 + d = (uint16(s) + e) / 2 + if d >= 0x100 { + d -= 0x100 + for j := i - 1; j >= 0; j-- { + res[j]++ + if res[j] != 0 { + break + } + } + } + res[i] = byte(d) + } + if bytes.Equal(res, start) { + res = append(res, 0) + } + + return table.EncodeBytes(res) +} diff --git a/tools/pd-simulator/simulator/raft_test.go b/tools/pd-simulator/simulator/raft_test.go new file mode 100644 index 00000000000..0df0d7c045e --- /dev/null +++ b/tools/pd-simulator/simulator/raft_test.go @@ -0,0 +1,51 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package simulator + +import ( + "bytes" + "strings" + "testing" +) + +func TestGenerateTableKeys(t *testing.T) { + tableCount := 3 + size := 10 + keys := generateTableKeys(tableCount, size) + if len(keys) != size { + t.Fatalf("keys length %d, expected %d", len(keys), size) + } + for i := 1; i < len(keys); i++ { + if strings.Compare(keys[i-1], keys[i]) >= 0 { + t.Fatal("not an increamental sequence") + } + splitKey := string(generateMvccSplitKey([]byte(keys[i-1]), []byte(keys[i]))) + if strings.Compare(keys[i-1], splitKey) >= 0 { + t.Fatalf("not expected split key") + } + if strings.Compare(splitKey, keys[i]) >= 0 { + t.Fatalf("not expected split key") + } + } + // empty key + startKey := []byte("") + endKey := []byte{116, 128, 0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 0, 0, 0, 0, 0, 248} + splitKey := generateMvccSplitKey(startKey, endKey) + if bytes.Compare(startKey, splitKey) >= 0 { + t.Fatalf("not expected split key") + } + if bytes.Compare(splitKey, endKey) >= 0 { + t.Fatalf("not expected split key") + } +}