Skip to content

Commit

Permalink
allocsim: Print number of range relocations in output
Browse files Browse the repository at this point in the history
Also add a second config with multiple nodes per locality.
  • Loading branch information
a-robinson committed Jul 7, 2017
1 parent eec63ac commit aee49a6
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"NumWorkers": 32,
"Localities": [
{
"Name": "1",
"NumNodes": 3
},
{
"Name": "2",
"NumNodes": 3
},
{
"Name": "3",
"NumNodes": 3
}
]
}
72 changes: 39 additions & 33 deletions pkg/cmd/allocsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,19 @@ type allocSim struct {
}
ranges struct {
syncutil.Mutex
count int
replicas []int
leases []int
leaseTransfers []int
stats allocStats
}
localities []Locality
}

type allocStats struct {
count int
replicas []int
leases []int
replicaAdds []int
leaseTransfers []int
}

func newAllocSim(c *localcluster.Cluster) *allocSim {
return &allocSim{
Cluster: c,
Expand Down Expand Up @@ -225,10 +230,13 @@ func (a *allocSim) roundRobinWorker(startNum, workers int) {
}
}

func (a *allocSim) rangeInfo() (total int, replicas, leases, leaseTransfers []int) {
replicas = make([]int, len(a.Nodes))
leases = make([]int, len(a.Nodes))
leaseTransfers = make([]int, len(a.Nodes))
func (a *allocSim) rangeInfo() allocStats {
stats := allocStats{
replicas: make([]int, len(a.Nodes)),
replicaAdds: make([]int, len(a.Nodes)),
leases: make([]int, len(a.Nodes)),
leaseTransfers: make([]int, len(a.Nodes)),
}

// Retrieve the metrics for each node and extract the replica and leaseholder
// counts.
Expand All @@ -251,33 +259,33 @@ func (a *allocSim) rangeInfo() (total int, replicas, leases, leaseTransfers []in
for _, v := range stores {
storeMetrics := v.(map[string]interface{})
if v, ok := storeMetrics["replicas"]; ok {
replicas[i] += int(v.(float64))
stats.replicas[i] += int(v.(float64))
}
if v, ok := storeMetrics["replicas.leaseholders"]; ok {
leases[i] += int(v.(float64))
stats.leases[i] += int(v.(float64))
}
if v, ok := storeMetrics["range.adds"]; ok {
stats.replicaAdds[i] += int(v.(float64))
}
if v, ok := storeMetrics["leases.transfers.success"]; ok {
leaseTransfers[i] += int(v.(float64))
stats.leaseTransfers[i] += int(v.(float64))
}
}
}(i)
}
wg.Wait()

for _, v := range replicas {
total += v
for _, v := range stats.replicas {
stats.count += v
}
return total, replicas, leases, leaseTransfers
return stats
}

func (a *allocSim) rangeStats(d time.Duration) {
for {
count, replicas, leases, leaseTransfers := a.rangeInfo()
stats := a.rangeInfo()
a.ranges.Lock()
a.ranges.count = count
a.ranges.replicas = replicas
a.ranges.leases = leases
a.ranges.leaseTransfers = leaseTransfers
a.ranges.stats = stats
a.ranges.Unlock()

time.Sleep(d)
Expand All @@ -300,14 +308,15 @@ func formatHeader(header string, numberNodes int, localities []Locality) string
}

func (a *allocSim) monitor(d time.Duration) {
formatNodes := func(replicas, leases, leaseTransfers []int) string {
formatNodes := func(stats allocStats) string {
var buf bytes.Buffer
for i := range replicas {
for i := range stats.replicas {
alive := a.Nodes[i].Alive()
if !alive {
_, _ = buf.WriteString("\033[0;31;49m")
}
fmt.Fprintf(&buf, "%*s", len(padding), fmt.Sprintf("%d/%d/%d", replicas[i], leases[i], leaseTransfers[i]))
fmt.Fprintf(&buf, "%*s", len(padding), fmt.Sprintf("%d/%d/%d/%d",
stats.replicas[i], stats.leases[i], stats.replicaAdds[i], stats.leaseTransfers[i]))
if !alive {
_, _ = buf.WriteString("\033[0m")
}
Expand All @@ -329,14 +338,11 @@ func (a *allocSim) monitor(d time.Duration) {
totalLatencyNanos := atomic.LoadUint64(&a.stats.totalLatencyNanos)

a.ranges.Lock()
ranges := a.ranges.count
replicas := a.ranges.replicas
leases := a.ranges.leases
leaseTransfers := a.ranges.leaseTransfers
rangeStats := a.ranges.stats
a.ranges.Unlock()

if ticks%20 == 0 || numReplicas != len(replicas) {
numReplicas = len(replicas)
if ticks%20 == 0 || numReplicas != len(rangeStats.replicas) {
numReplicas = len(rangeStats.replicas)
fmt.Println(formatHeader("_elapsed__ops/sec__average__latency___errors_replicas", numReplicas, a.localities))
}

Expand All @@ -347,7 +353,7 @@ func (a *allocSim) monitor(d time.Duration) {
fmt.Printf("%8s %8.1f %8.1f %6.1fms %8d %8d%s\n",
time.Duration(now.Sub(start).Seconds()+0.5)*time.Second,
float64(ops-lastOps)/elapsed, float64(ops)/now.Sub(start).Seconds(), avgLatency,
atomic.LoadUint64(&a.stats.errors), ranges, formatNodes(replicas, leases, leaseTransfers))
atomic.LoadUint64(&a.stats.errors), rangeStats.count, formatNodes(rangeStats))
lastTime = now
lastOps = ops
}
Expand All @@ -360,7 +366,7 @@ func (a *allocSim) finalStatus() {
// TODO(bram): With the addition of localities, these stats will have to be
// updated.

fmt.Println(formatHeader("___stats___________________________", len(a.ranges.replicas), a.localities))
fmt.Println(formatHeader("___stats___________________________", len(a.ranges.stats.replicas), a.localities))

genStats := func(name string, counts []int) {
var total float64
Expand All @@ -380,8 +386,8 @@ func (a *allocSim) finalStatus() {
}
fmt.Println(buf.String())
}
genStats("replicas", a.ranges.replicas)
genStats("leases", a.ranges.leases)
genStats("replicas", a.ranges.stats.replicas)
genStats("leases", a.ranges.stats.leases)
}

func handleStart() bool {
Expand Down Expand Up @@ -510,7 +516,7 @@ func main() {
os.Exit(exitStatus)
}()

allNodeArgs := append(flag.Args(), "--vmodule=allocator=1")
allNodeArgs := append(flag.Args(), "--vmodule=allocator=3,allocator_scorer=3")
c.Start("allocsim", *workers, os.Args[0], allNodeArgs, perNodeArgs, perNodeEnv)
c.UpdateZoneConfig(1, 1<<20)
_, err := c.DB[0].Exec("SET CLUSTER SETTING kv.raft_log.synchronize = false;")
Expand Down

0 comments on commit aee49a6

Please sign in to comment.