Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simulator: add case about import data #1263

Merged
merged 12 commits into from
Jan 17, 2019
5 changes: 5 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,11 @@ func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
return res
}

// ScanRangeWithIterator scans region with start key, until iterator returns false.
func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(metaRegion *metapb.Region) bool) {
r.tree.scanRange(startKey, iterator)
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
metaPrev, metaNext := r.tree.getAdjacentRegions(region.meta)
Expand Down
81 changes: 76 additions & 5 deletions table/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

var (
tablePrefix = []byte{'t'}
metaPrefix = []byte{'m'}
tablePrefix = []byte{'t'}
metaPrefix = []byte{'m'}
recordPrefix = []byte{'r'}
)

const (
Expand All @@ -38,7 +39,7 @@ type Key []byte

// TableID returns the table ID of the key, if the key is not table key, returns 0.
func (k Key) TableID() int64 {
_, key, err := decodeBytes(k)
_, key, err := DecodeBytes(k)
if err != nil {
// should never happen
return 0
Expand All @@ -54,13 +55,58 @@ func (k Key) TableID() int64 {

// IsMeta returns if the key is a meta key.
func (k Key) IsMeta() bool {
_, key, err := decodeBytes(k)
_, key, err := DecodeBytes(k)
if err != nil {
return false
}
return bytes.HasPrefix(key, metaPrefix)
}

var pads = make([]byte, encGroupSize)

// EncodeBytes guarantees the encoded value is in ascending order for comparison,
// encoding with the following rule:
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
// For example:
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
func EncodeBytes(data []byte) Key {
// Allocate more space to avoid unnecessary slice growing.
// Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes,
// that is `(len(data) / 8 + 1) * 9` in our implement.
dLen := len(data)
result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1))
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, data[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, data[idx:]...)
result = append(result, pads[:padCount]...)
}

marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

// EncodeInt appends the encoded value to slice b and returns the appended slice.
// EncodeInt guarantees that the encoded value is in ascending order for comparison.
func EncodeInt(b []byte, v int64) []byte {
var data [8]byte
u := encodeIntToCmpUint(v)
binary.BigEndian.PutUint64(data[:], u)
return append(b, data[:]...)
}

// DecodeInt decodes value encoded by EncodeInt before.
// It returns the leftover un-decoded slice, decoded value if no error.
func DecodeInt(b []byte) ([]byte, int64, error) {
Expand All @@ -74,11 +120,18 @@ func DecodeInt(b []byte) ([]byte, int64, error) {
return b, v, nil
}

func encodeIntToCmpUint(v int64) uint64 {
return uint64(v) ^ signMask
}

func decodeCmpUintToInt(u uint64) int64 {
return int64(u ^ signMask)
}

func decodeBytes(b []byte) ([]byte, []byte, error) {
// DecodeBytes decodes bytes which is encoded by EncodeBytes before,
// returns the leftover bytes and decoded value if no error.
func DecodeBytes(b []byte) ([]byte, []byte, error) {

data := make([]byte, 0, len(b))
for {
if len(b) < encGroupSize+1 {
Expand Down Expand Up @@ -112,3 +165,21 @@ func decodeBytes(b []byte) ([]byte, []byte, error) {
}
return b, data, nil
}

// GenerateTableKey generates a table split key.
func GenerateTableKey(tableID int64) []byte {
buf := make([]byte, 0, len(tablePrefix)+8)
buf = append(buf, tablePrefix...)
buf = EncodeInt(buf, tableID)
return buf
}

// GenerateRowKey generates a row key.
func GenerateRowKey(tableID, rowID int64) []byte {
buf := make([]byte, 0, len(tablePrefix)+len(recordPrefix)+8*2)
buf = append(buf, tablePrefix...)
buf = EncodeInt(buf, tableID)
buf = append(buf, recordPrefix...)
buf = EncodeInt(buf, rowID)
return buf
}
35 changes: 5 additions & 30 deletions table/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,57 +23,32 @@ func TestTable(t *testing.T) {
TestingT(t)
}

var pads = make([]byte, encGroupSize)

var _ = Suite(&testCodecSuite{})

type testCodecSuite struct{}

func encodeBytes(data []byte) Key {
// Allocate more space to avoid unnecessary slice growing.
// Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes,
// that is `(len(data) / 8 + 1) * 9` in our implement.
dLen := len(data)
result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1))
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, data[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, data[idx:]...)
result = append(result, pads[:padCount]...)
}

marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

func (s *testCodecSuite) TestDecodeBytes(c *C) {
key := "abcdefghijklmnopqrstuvwxyz"
for i := 0; i < len(key); i++ {
_, k, err := decodeBytes(encodeBytes([]byte(key[:i])))
_, k, err := DecodeBytes(EncodeBytes([]byte(key[:i])))
c.Assert(err, IsNil)
c.Assert(string(k), Equals, key[:i])
}
}

func (s *testCodecSuite) TestTableID(c *C) {
key := encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff"))
key := EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff"))
c.Assert(key.TableID(), Equals, int64(0xff))

key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02"))
key = EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02"))
c.Assert(key.TableID(), Equals, int64(0xff))

key = []byte("t\x80\x00\x00\x00\x00\x00\x00\xff")
c.Assert(key.TableID(), Equals, int64(0))

key = encodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff"))
key = EncodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff"))
c.Assert(key.TableID(), Equals, int64(0))

key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff"))
key = EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff"))
c.Assert(key.TableID(), Equals, int64(0))
}
4 changes: 2 additions & 2 deletions table/namespace_classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) {
{false, "t\x80\x00\x00\x00\x00\x00\x00\x03", "t\x80\x00\x00\x00\x00\x00\x00\x04", 3, false, "global"},
{false, "m\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, true, "ns2"},
{false, "", "m\x80\x00\x00\x00\x00\x00\x00\x01", 0, false, "global"},
{true, string(encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"},
{true, string(EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"},
{true, "t\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, false, "global"}, // decode error
}
classifier := s.newClassifier(c)
for _, t := range testCases {
startKey, endKey := Key(t.startKey), Key(t.endKey)
if !t.endcoded {
startKey, endKey = encodeBytes(startKey), encodeBytes(endKey)
startKey, endKey = EncodeBytes(startKey), EncodeBytes(endKey)
}
c.Assert(startKey.TableID(), Equals, t.tableID)
c.Assert(startKey.IsMeta(), Equals, t.isMeta)
Expand Down
2 changes: 2 additions & 0 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Case struct {
RegionSplitSize int64
RegionSplitKeys int64
Events []EventDescriptor
TableNumber int

Checker CheckerFunc // To check the schedule is finished.
}
Expand Down Expand Up @@ -97,6 +98,7 @@ var CaseMap = map[string]func() *Case{
"hot-read": newHotRead,
"hot-write": newHotWrite,
"makeup-down-replicas": newMakeupDownReplicas,
"import-data": newImportData,
}

// NewCase creates a new case.
Expand Down
141 changes: 141 additions & 0 deletions tools/pd-simulator/simulator/cases/import_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"bytes"
"fmt"
"math/rand"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/table"
"github.com/pingcap/pd/tools/pd-simulator/simulator/simutil"
)

func newImportData() *Case {
var simCase Case
// Initialize the cluster
for i := 1; i <= 10; i++ {
simCase.Stores = append(simCase.Stores, &Store{
ID: IDAllocator.nextID(),
Status: metapb.StoreState_Up,
Capacity: 1 * TB,
Available: 900 * GB,
Version: "2.1.0",
})
}

storeIDs := rand.Perm(3)
for i := 0; i < 40; i++ {
peers := []*metapb.Peer{
{Id: IDAllocator.nextID(), StoreId: uint64(storeIDs[0] + 1)},
{Id: IDAllocator.nextID(), StoreId: uint64(storeIDs[1] + 1)},
{Id: IDAllocator.nextID(), StoreId: uint64(storeIDs[2] + 1)},
}
simCase.Regions = append(simCase.Regions, Region{
ID: IDAllocator.nextID(),
Peers: peers,
Leader: peers[0],
Size: 32 * MB,
Keys: 320000,
})
}

simCase.RegionSplitSize = 64 * MB
simCase.RegionSplitKeys = 640000
simCase.TableNumber = 10
// Events description
e := &WriteFlowOnSpotDescriptor{}
table2 := string(table.EncodeBytes(table.GenerateTableKey(2)))
table3 := string(table.EncodeBytes(table.GenerateTableKey(3)))
table5 := string(table.EncodeBytes(table.GenerateTableKey(5)))
e.Step = func(tick int64) map[string]int64 {
if tick < 100 {
return map[string]int64{
table3: 4 * MB,
table5: 32 * MB,
}
}
return map[string]int64{
table2: 2 * MB,
table3: 4 * MB,
table5: 16 * MB,
}
}
simCase.Events = []EventDescriptor{e}

// Checker description
simCase.Checker = func(regions *core.RegionsInfo) bool {
leaderDist := make(map[uint64]int)
peerDist := make(map[uint64]int)
leaderTotal := 0
peerTotal := 0
res := make([]*core.RegionInfo, 0, 100)
regions.ScanRangeWithIterator([]byte(table2), func(region *metapb.Region) bool {
if bytes.Compare(region.EndKey, []byte(table3)) < 0 {
res = append(res, regions.GetRegion(region.GetId()))
return true
}
return false
})

for _, r := range res {
leaderTotal++
leaderDist[r.GetLeader().GetStoreId()]++
for _, p := range r.GetPeers() {
peerDist[p.GetStoreId()]++
peerTotal++
}
}
if leaderTotal == 0 || peerTotal == 0 {
return false
}
tableLeaderLog := fmt.Sprintf("%d leader:", leaderTotal)
tablePeerLog := fmt.Sprintf("%d peer: ", peerTotal)
for storeID := 1; storeID <= 10; storeID++ {
if leaderCount, ok := leaderDist[uint64(storeID)]; ok {
tableLeaderLog = fmt.Sprintf("%s [store %d]:%.2f%%", tableLeaderLog, storeID, float64(leaderCount)/float64(leaderTotal)*100)
}
}
for storeID := 1; storeID <= 10; storeID++ {
if peerCount, ok := peerDist[uint64(storeID)]; ok {
tablePeerLog = fmt.Sprintf("%s [store %d]:%.2f%%", tablePeerLog, storeID, float64(peerCount)/float64(peerTotal)*100)
}
}
regionTotal := regions.GetRegionCount()
totalLeaderLog := fmt.Sprintf("%d leader:", regionTotal)
totalPeerLog := fmt.Sprintf("%d peer:", regionTotal*3)
isEnd := true
for storeID := uint64(1); storeID <= 10; storeID++ {
regions.GetStoreRegionCount(uint64(storeID))
totalLeaderLog = fmt.Sprintf("%s [store %d]:%.2f%%", totalLeaderLog, storeID, float64(regions.GetStoreLeaderCount(storeID))/float64(regionTotal)*100)
regionProp := float64(regions.GetStoreRegionCount(storeID)) / float64(regionTotal*3) * 100
if regionProp > 13.8 {
isEnd = false
}
totalPeerLog = fmt.Sprintf("%s [store %d]:%.2f%%", totalPeerLog, storeID, regionProp)
}
simutil.Logger.Infof(`
******* table 2 *******
%s
%s
******* global *******
%s
%s
`, tableLeaderLog, tablePeerLog, totalLeaderLog, totalPeerLog)
return isEnd
}
return &simCase
}
1 change: 1 addition & 0 deletions tools/pd-simulator/simulator/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (e *WriteFlowOnSpot) Run(raft *RaftEngine, tickCount int64) bool {
res := e.descriptor.Step(tickCount)
for key, size := range res {
region := raft.SearchRegion([]byte(key))
simutil.Logger.Debugf("search the region: %v", region.GetMeta())
if region == nil {
simutil.Logger.Errorf("region not found for key %s", key)
continue
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (n *Node) stepTask() {
for _, task := range n.tasks {
task.Step(n.raftEngine)
if task.IsFinished() {
simutil.Logger.Infof("[store %d][region %d] task finished: %s final: %v", n.GetId(), task.RegionID(), task.Desc(), n.raftEngine.GetRegion(task.RegionID()))
simutil.Logger.Debugf("[store %d][region %d] task finished: %s final: %+v", n.GetId(), task.RegionID(), task.Desc(), n.raftEngine.GetRegion(task.RegionID()).GetMeta())
delete(n.tasks, task.RegionID())
}
}
Expand Down
Loading