Skip to content

Commit

Permalink
simulator: add case about import data
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Oct 10, 2018
1 parent 25d9967 commit 5629efc
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 47 deletions.
5 changes: 5 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@ func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
return res
}

// ScanRangeWithIterator scans region with start key, until iterator returns false.
func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(metaRegion *metapb.Region) bool) {
r.tree.scanRange(startKey, iterator)
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
metaPrev, metaNext := r.tree.getAdjacentRegions(region.meta)
Expand Down
83 changes: 78 additions & 5 deletions table/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

var (
tablePrefix = []byte{'t'}
metaPrefix = []byte{'m'}
tablePrefix = []byte{'t'}
metaPrefix = []byte{'m'}
recordPrefix = []byte{'r'}
)

const (
Expand All @@ -38,7 +39,7 @@ type Key []byte

// TableID returns the table ID of the key, if the key is not table key, returns 0.
func (k Key) TableID() int64 {
_, key, err := decodeBytes(k)
_, key, err := DecodeBytes(k)
if err != nil {
// should never happen
return 0
Expand All @@ -54,13 +55,58 @@ func (k Key) TableID() int64 {

// IsMeta returns if the key is a meta key.
func (k Key) IsMeta() bool {
_, key, err := decodeBytes(k)
_, key, err := DecodeBytes(k)
if err != nil {
return false
}
return bytes.HasPrefix(key, metaPrefix)
}

var pads = make([]byte, encGroupSize)

// EncodeBytes guarantees the encoded value is in ascending order for comparison,
// encoding with the following rule:
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
// For example:
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
func EncodeBytes(data []byte) Key {
// Allocate more space to avoid unnecessary slice growing.
// Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes,
// that is `(len(data) / 8 + 1) * 9` in our implement.
dLen := len(data)
result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1))
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, data[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, data[idx:]...)
result = append(result, pads[:padCount]...)
}

marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

// EncodeInt appends the encoded value to slice b and returns the appended slice.
// EncodeInt guarantees that the encoded value is in ascending order for comparison.
func EncodeInt(b []byte, v int64) []byte {
var data [8]byte
u := encodeIntToCmpUint(v)
binary.BigEndian.PutUint64(data[:], u)
return append(b, data[:]...)
}

// DecodeInt decodes value encoded by EncodeInt before.
// It returns the leftover un-decoded slice, decoded value if no error.
func DecodeInt(b []byte) ([]byte, int64, error) {
Expand All @@ -74,11 +120,20 @@ func DecodeInt(b []byte) ([]byte, int64, error) {
return b, v, nil
}

func encodeIntToCmpUint(v int64) uint64 {
return uint64(v) ^ signMask
}

func decodeCmpUintToInt(u uint64) int64 {
return int64(u ^ signMask)
}

func decodeBytes(b []byte) ([]byte, []byte, error) {
// DecodeBytes decodes bytes which is encoded by EncodeBytes before,
// returns the leftover bytes and decoded value if no error.
func DecodeBytes(b []byte) ([]byte, []byte, error) {
if len(b) == 0 {
return b, nil, nil
}
data := make([]byte, 0, len(b))
for {
if len(b) < encGroupSize+1 {
Expand Down Expand Up @@ -112,3 +167,21 @@ func decodeBytes(b []byte) ([]byte, []byte, error) {
}
return b, data, nil
}

// GenerateTableKey generates a table split key.
func GenerateTableKey(tableID int64) []byte {
buf := make([]byte, 0, len(tablePrefix)+8)
buf = append(buf, tablePrefix...)
buf = EncodeInt(buf, tableID)
return buf
}

// GenerateRowKey generates a row key.
func GenerateRowKey(tableID, rowID int64) []byte {
buf := make([]byte, 0, len(tablePrefix)+len(recordPrefix)+8*2)
buf = append(buf, tablePrefix...)
buf = EncodeInt(buf, tableID)
buf = append(buf, recordPrefix...)
buf = EncodeInt(buf, rowID)
return buf
}
35 changes: 5 additions & 30 deletions table/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,57 +23,32 @@ func TestTable(t *testing.T) {
TestingT(t)
}

var pads = make([]byte, encGroupSize)

var _ = Suite(&testCodecSuite{})

type testCodecSuite struct{}

func encodeBytes(data []byte) Key {
// Allocate more space to avoid unnecessary slice growing.
// Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes,
// that is `(len(data) / 8 + 1) * 9` in our implement.
dLen := len(data)
result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1))
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, data[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, data[idx:]...)
result = append(result, pads[:padCount]...)
}

marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

func (s *testCodecSuite) TestDecodeBytes(c *C) {
key := "abcdefghijklmnopqrstuvwxyz"
for i := 0; i < len(key); i++ {
_, k, err := decodeBytes(encodeBytes([]byte(key[:i])))
_, k, err := DecodeBytes(EncodeBytes([]byte(key[:i])))
c.Assert(err, IsNil)
c.Assert(string(k), Equals, key[:i])
}
}

func (s *testCodecSuite) TestTableID(c *C) {
key := encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff"))
key := EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff"))
c.Assert(key.TableID(), Equals, int64(0xff))

key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02"))
key = EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\xff_i\x01\x02"))
c.Assert(key.TableID(), Equals, int64(0xff))

key = []byte("t\x80\x00\x00\x00\x00\x00\x00\xff")
c.Assert(key.TableID(), Equals, int64(0))

key = encodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff"))
key = EncodeBytes([]byte("T\x00\x00\x00\x00\x00\x00\x00\xff"))
c.Assert(key.TableID(), Equals, int64(0))

key = encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff"))
key = EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\xff"))
c.Assert(key.TableID(), Equals, int64(0))
}
4 changes: 2 additions & 2 deletions table/namespace_classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) {
{false, "t\x80\x00\x00\x00\x00\x00\x00\x03", "t\x80\x00\x00\x00\x00\x00\x00\x04", 3, false, "global"},
{false, "m\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, true, "ns2"},
{false, "", "m\x80\x00\x00\x00\x00\x00\x00\x01", 0, false, "global"},
{true, string(encodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"},
{true, string(EncodeBytes([]byte("t\x80\x00\x00\x00\x00\x00\x00\x01"))), "", testTable1, false, "ns1"},
{true, "t\x80\x00\x00\x00\x00\x00\x00\x01", "", 0, false, "global"}, // decode error
}
classifier := s.newClassifier(c)
for _, t := range testCases {
startKey, endKey := Key(t.startKey), Key(t.endKey)
if !t.endcoded {
startKey, endKey = encodeBytes(startKey), encodeBytes(endKey)
startKey, endKey = EncodeBytes(startKey), EncodeBytes(endKey)
}
c.Assert(startKey.TableID(), Equals, t.tableID)
c.Assert(startKey.IsMeta(), Equals, t.isMeta)
Expand Down
2 changes: 2 additions & 0 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Case struct {
RegionSplitSize int64
RegionSplitKeys int64
Events []EventInner
TableNumber int

Checker CheckerFunc // To check the schedule is finished.
}
Expand Down Expand Up @@ -83,6 +84,7 @@ var CaseMap = map[string]func() *Case{
"hot-read": newHotRead,
"hot-write": newHotWrite,
"makeup-down-replicas": newMakeupDownReplicas,
"import-data": newImportData,
}

// NewCase creates a new case.
Expand Down
121 changes: 121 additions & 0 deletions tools/pd-simulator/simulator/cases/import_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"bytes"
"fmt"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/table"
"github.com/pingcap/pd/tools/pd-simulator/simulator/simutil"
)

func newImportData() *Case {
var simCase Case
var id idAllocator
// Initialize the cluster
for i := 1; i <= 10; i++ {
simCase.Stores = append(simCase.Stores, &Store{
ID: id.nextID(),
Status: metapb.StoreState_Up,
Capacity: 1 * TB,
Available: 900 * GB,
Version: "2.1.0",
})
}

for i := 0; i < 30; i++ {
storeIDs := rand.Perm(10)
peers := []*metapb.Peer{
{Id: id.nextID(), StoreId: uint64(storeIDs[0] + 1)},
{Id: id.nextID(), StoreId: uint64(storeIDs[1] + 1)},
{Id: id.nextID(), StoreId: uint64(storeIDs[2] + 1)},
}
simCase.Regions = append(simCase.Regions, Region{
ID: id.nextID(),
Peers: peers,
Leader: peers[0],
Size: 32 * MB,
Keys: 320000,
})
}
simCase.MaxID = id.maxID
simCase.RegionSplitSize = 64 * MB
simCase.RegionSplitKeys = 640000
simCase.TableNumber = 10
// Events description
e := &WriteFlowOnSpotInner{}
table2 := string(table.EncodeBytes(table.GenerateTableKey(2)))
table3 := string(table.EncodeBytes(table.GenerateTableKey(3)))
table5 := string(table.EncodeBytes(table.GenerateTableKey(5)))
e.Step = func(tick int64) map[string]int64 {
return map[string]int64{
table2: 4 * MB,
table3: 1 * MB,
table5: 16 * MB,
}
}
simCase.Events = []EventInner{e}

// Checker description
startTime := time.Now()
simCase.Checker = func(regions *core.RegionsInfo) bool {
leaderDstb := make(map[uint64]int)
peerDstb := make(map[uint64]int)
leaderTotal := 0
peerTotal := 0
res := make([]*core.RegionInfo, 0, 100)
regions.ScanRangeWithIterator([]byte(table2), func(region *metapb.Region) bool {
if bytes.Compare(region.EndKey, []byte(table3)) < 0 {
res = append(res, regions.GetRegion(region.GetId()))
return true
}
return false
})

for _, r := range res {
leaderDstb[r.GetLeader().GetStoreId()]++
leaderTotal++
for _, p := range r.GetPeers() {
peerDstb[p.GetStoreId()]++
peerTotal++
}
}
if leaderTotal == 0 || peerTotal == 0 {
simutil.Logger.Info("scan zore region")
return false
}
leaderLog := "leader distribute (table2)"
peerLog := "peer distribute (table2)"
for storeID := 1; storeID <= 10; storeID++ {
if leaderCount, ok := leaderDstb[uint64(storeID)]; ok {
leaderLog = fmt.Sprintf("%s [store %d]:%.2f%%", leaderLog, storeID, float64(leaderCount)/float64(leaderTotal)*100)
}
}
for storeID := 1; storeID <= 10; storeID++ {
if peerCount, ok := peerDstb[uint64(storeID)]; ok {
peerLog = fmt.Sprintf("%s [store %d]:%.2f%%", peerLog, storeID, float64(peerCount)/float64(peerTotal)*100)
}
}

simutil.Logger.Info(leaderLog)
simutil.Logger.Info(peerLog)
return startTime.Add(time.Minute).Before(time.Now())
}
return &simCase
}
2 changes: 1 addition & 1 deletion tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (n *Node) stepTask() {
for _, task := range n.tasks {
task.Step(n.raftEngine)
if task.IsFinished() {
simutil.Logger.Infof("[store %d][region %d] task finished: %s final: %v", n.GetId(), task.RegionID(), task.Desc(), n.raftEngine.GetRegion(task.RegionID()))
simutil.Logger.Debugf("[store %d][region %d] task finished: %s final: %+v", n.GetId(), task.RegionID(), task.Desc(), n.raftEngine.GetRegion(task.RegionID()).GetMeta())
delete(n.tasks, task.RegionID())
}
}
Expand Down
Loading

0 comments on commit 5629efc

Please sign in to comment.