Skip to content

Commit

Permalink
Merge branch 'master' into uniform
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored Aug 3, 2022
2 parents bd70bf4 + ed6e4b8 commit 6e98d3b
Showing 1 changed file with 168 additions and 46 deletions.
214 changes: 168 additions & 46 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"flag"
"fmt"
"log"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -29,13 +30,22 @@ import (

var (
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
storeCount = flag.Int("store", 20, "store count")
regionCount = flag.Uint64("region", 1000000, "region count")
keyLen = flag.Int("keylen", 56, "key length")
storeCount = flag.Int("store", 40, "store count")
regionCount = flag.Int("region", 1000000, "region count")
keyLen = flag.Int("key-len", 56, "key length")
replica = flag.Int("replica", 3, "replica count")
regionUpdateRatio = flag.Float64("region-update-ratio", 0.05, "ratio of the region need to update")
leaderUpdateRatio = flag.Float64("leader", 0.06, "ratio of the region leader need to update, they need save-tree")
epochUpdateRatio = flag.Float64("epoch", 0.04, "ratio of the region epoch need to update, they need save-kv")
spaceUpdateRatio = flag.Float64("space", 0.15, "ratio of the region space need to update")
flowUpdateRatio = flag.Float64("flow", 0.35, "ratio of the region flow need to update")
sample = flag.Bool("sample", false, "sample per second")
heartbeatRounds = flag.Int("heartbeat-rounds", 5, "total rounds of heartbeat")
heartbeatRounds = flag.Int("heartbeat-rounds", 4, "total rounds of heartbeat")
)

const (
bytesUnit = 1 << 23 // 8MB
keysUint = 1 << 13 // 8K
intervalUint = 60 // 60s
)

var clusterID uint64
Expand Down Expand Up @@ -136,61 +146,168 @@ func newEndKey(id uint64) []byte {
return k
}

// Store simulates a TiKV to heartbeat.
type Store struct {
id uint64
// Regions simulates all regions to heartbeat.
type Regions struct {
regions []*pdpb.RegionHeartbeatRequest

updateRound int

updateLeader []int
updateEpoch []int
updateSpace []int
updateFlow []int
}

func (rs *Regions) init() {
rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, *regionCount)
rs.updateRound = 0

// Generate regions
id := uint64(1)
now := uint64(time.Now().Unix())

for i := 0; i < *regionCount; i++ {
region := &pdpb.RegionHeartbeatRequest{
Header: header(),
Region: &metapb.Region{
Id: id,
StartKey: newStartKey(id),
EndKey: newEndKey(id),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1},
},
ApproximateSize: bytesUnit,
Interval: &pdpb.TimeInterval{
StartTimestamp: now,
EndTimestamp: now + intervalUint,
},
ApproximateKeys: keysUint,
Term: 1,
}
id += 1

peers := make([]*metapb.Peer, 0, *replica)
for j := 0; j < *replica; j++ {
peers = append(peers, &metapb.Peer{Id: id, StoreId: uint64((i+j)%*storeCount + 1)})
id += 1
}

region.Region.Peers = peers
region.Leader = peers[0]
rs.regions = append(rs.regions, region)
}

// Generate sample index
slice := make([]int, *regionCount)
for i := range slice {
slice[i] = i
}

rand.Seed(0) // Ensure consistent behavior multiple times
pick := func(ratio float64) []int {
rand.Shuffle(*regionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
return append(slice[:0:0], slice[0:int(float64(*regionCount)*ratio)]...)
}

rs.updateLeader = pick(*leaderUpdateRatio)
rs.updateEpoch = pick(*epochUpdateRatio)
rs.updateSpace = pick(*spaceUpdateRatio)
rs.updateFlow = pick(*flowUpdateRatio)
}

func (rs *Regions) update() {
rs.updateRound += 1

// update leader
for _, i := range rs.updateLeader {
region := rs.regions[i]
region.Leader = region.Region.Peers[rs.updateRound%*replica]
}
// update epoch
for _, i := range rs.updateEpoch {
region := rs.regions[i]
region.Region.RegionEpoch.Version += 1
}
// update space
for _, i := range rs.updateSpace {
region := rs.regions[i]
region.ApproximateSize += bytesUnit
region.ApproximateKeys += keysUint
}
// update flow
for _, i := range rs.updateFlow {
region := rs.regions[i]
region.BytesWritten += bytesUnit
region.BytesRead += bytesUnit
region.KeysWritten += keysUint
region.KeysRead += keysUint
}
// update interval
for _, region := range rs.regions {
region.Interval.StartTimestamp = region.Interval.EndTimestamp
region.Interval.EndTimestamp = region.Interval.StartTimestamp + intervalUint
}
}

// Run runs the store.
func (s *Store) Run(startNotifier chan report.Report, endNotifier chan struct{}) {
func (rs *Regions) send(storeID uint64, startNotifier chan report.Report, endNotifier chan struct{}) {
cli := newClient()
stream, err := cli.RegionHeartbeat(context.TODO())
if err != nil {
log.Fatal(err)
}
var peers []*metapb.Peer
for i := 0; i < *replica; i++ {
storeID := s.id + uint64(i)
if storeID > uint64(*storeCount) {
storeID -= uint64(*storeCount)
}
peers = append(peers, &metapb.Peer{Id: uint64(i + 1), StoreId: storeID})
}

count := 1
for r := range startNotifier {
startTime := time.Now()
for regionID := s.id; regionID <= *regionCount+uint64(*storeCount); regionID += uint64(*storeCount) {
updateRegionCount := uint64(float64(*regionCount) * (*regionUpdateRatio) / float64(*storeCount))
storeUpdateRegionMaxID := s.id + updateRegionCount*uint64(*storeCount)
meta := &metapb.Region{
Id: regionID,
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1},
StartKey: newStartKey(regionID),
EndKey: newEndKey(regionID),
}
if regionID < storeUpdateRegionMaxID {
meta.RegionEpoch.Version = uint64(count)
count := 0
for _, region := range rs.regions {
if region.Leader.StoreId != storeID {
continue
}
count += 1
reqStart := time.Now()
err = stream.Send(&pdpb.RegionHeartbeatRequest{
Header: header(),
Region: meta,
Leader: peers[0],
})

err = stream.Send(region)
r.Results() <- report.Result{Start: reqStart, End: time.Now(), Err: err}
if err != nil {
log.Fatal(err)
}
}
log.Printf("store %v finish heartbeat, cost time: %v", s.id, time.Since(startTime))
count++
log.Printf("store %v finish heartbeat, count: %v, cost time: %v", storeID, count, time.Since(startTime))
endNotifier <- struct{}{}
}
}

func (rs *Regions) result(sec float64) string {
if rs.updateRound == 0 {
// There was no difference in the first round
return ""
}

updated := make(map[int]struct{})
for _, i := range rs.updateLeader {
updated[i] = struct{}{}
}
for _, i := range rs.updateEpoch {
updated[i] = struct{}{}
}
for _, i := range rs.updateSpace {
updated[i] = struct{}{}
}
for _, i := range rs.updateFlow {
updated[i] = struct{}{}
}
inactiveCount := *regionCount - len(updated)

ret := "Update speed of each category:\n"
ret += fmt.Sprintf(" Requests/sec: %12.4f\n", float64(*regionCount)/sec)
ret += fmt.Sprintf(" Save-Tree/sec: %12.4f\n", float64(len(rs.updateLeader))/sec)
ret += fmt.Sprintf(" Save-KV/sec: %12.4f\n", float64(len(rs.updateEpoch))/sec)
ret += fmt.Sprintf(" Save-Space/sec: %12.4f\n", float64(len(rs.updateSpace))/sec)
ret += fmt.Sprintf(" Save-Flow/sec: %12.4f\n", float64(len(rs.updateFlow))/sec)
ret += fmt.Sprintf(" Skip/sec: %12.4f\n", float64(inactiveCount)/sec)
return ret
}

func main() {
log.SetFlags(0)
flag.Parse()
Expand All @@ -203,30 +320,35 @@ func main() {
log.Println("finish put stores")
groupStartNotify := make([]chan report.Report, *storeCount+1)
groupEndNotify := make([]chan struct{}, *storeCount+1)
regions := new(Regions)
regions.init()

for i := 1; i <= *storeCount; i++ {
s := Store{id: uint64(i)}
startNotifier := make(chan report.Report)
endNotifier := make(chan struct{})
groupStartNotify[i] = startNotifier
groupEndNotify[i] = endNotifier
go s.Run(startNotifier, endNotifier)
go regions.send(uint64(i), startNotifier, endNotifier)
}

for i := 0; i < *heartbeatRounds; i++ {
log.Printf("\n--------- Bench heartbeat (Round %d) ----------\n", i+1)
report := newReport()
rs := report.Run()
repo := newReport()
rs := repo.Run()
// All stores start heartbeat.
startTime := time.Now()
for storeID := 1; storeID <= *storeCount; storeID++ {
startNotifier := groupStartNotify[storeID]
startNotifier <- report
startNotifier <- repo
}
// All stores finished heartbeat once.
for storeID := 1; storeID <= *storeCount; storeID++ {
<-groupEndNotify[storeID]
}

close(report.Results())
since := time.Since(startTime).Seconds()
close(repo.Results())
log.Println(<-rs)
log.Println(regions.result(since))
regions.update()
}
}

0 comments on commit 6e98d3b

Please sign in to comment.