diff --git a/pkg/cmd/allocsim/configs/multiple-nodes-per-locality-low-latency.json b/pkg/cmd/allocsim/configs/multiple-nodes-per-locality-low-latency.json new file mode 100644 index 000000000000..f11a0e99a705 --- /dev/null +++ b/pkg/cmd/allocsim/configs/multiple-nodes-per-locality-low-latency.json @@ -0,0 +1,17 @@ +{ + "NumWorkers": 32, + "Localities": [ + { + "Name": "1", + "NumNodes": 3 + }, + { + "Name": "2", + "NumNodes": 3 + }, + { + "Name": "3", + "NumNodes": 3 + } + ] +} diff --git a/pkg/cmd/allocsim/main.go b/pkg/cmd/allocsim/main.go index caf30d3c3869..5170a1be4eb5 100644 --- a/pkg/cmd/allocsim/main.go +++ b/pkg/cmd/allocsim/main.go @@ -121,14 +121,19 @@ type allocSim struct { } ranges struct { syncutil.Mutex - count int - replicas []int - leases []int - leaseTransfers []int + stats allocStats } localities []Locality } +type allocStats struct { + count int + replicas []int + leases []int + replicaAdds []int + leaseTransfers []int +} + func newAllocSim(c *localcluster.Cluster) *allocSim { return &allocSim{ Cluster: c, @@ -225,10 +230,13 @@ func (a *allocSim) roundRobinWorker(startNum, workers int) { } } -func (a *allocSim) rangeInfo() (total int, replicas, leases, leaseTransfers []int) { - replicas = make([]int, len(a.Nodes)) - leases = make([]int, len(a.Nodes)) - leaseTransfers = make([]int, len(a.Nodes)) +func (a *allocSim) rangeInfo() allocStats { + stats := allocStats{ + replicas: make([]int, len(a.Nodes)), + replicaAdds: make([]int, len(a.Nodes)), + leases: make([]int, len(a.Nodes)), + leaseTransfers: make([]int, len(a.Nodes)), + } // Retrieve the metrics for each node and extract the replica and leaseholder // counts. @@ -251,33 +259,33 @@ func (a *allocSim) rangeInfo() (total int, replicas, leases, leaseTransfers []in for _, v := range stores { storeMetrics := v.(map[string]interface{}) if v, ok := storeMetrics["replicas"]; ok { - replicas[i] += int(v.(float64)) + stats.replicas[i] += int(v.(float64)) } if v, ok := storeMetrics["replicas.leaseholders"]; ok { - leases[i] += int(v.(float64)) + stats.leases[i] += int(v.(float64)) + } + if v, ok := storeMetrics["range.adds"]; ok { + stats.replicaAdds[i] += int(v.(float64)) } if v, ok := storeMetrics["leases.transfers.success"]; ok { - leaseTransfers[i] += int(v.(float64)) + stats.leaseTransfers[i] += int(v.(float64)) } } }(i) } wg.Wait() - for _, v := range replicas { - total += v + for _, v := range stats.replicas { + stats.count += v } - return total, replicas, leases, leaseTransfers + return stats } func (a *allocSim) rangeStats(d time.Duration) { for { - count, replicas, leases, leaseTransfers := a.rangeInfo() + stats := a.rangeInfo() a.ranges.Lock() - a.ranges.count = count - a.ranges.replicas = replicas - a.ranges.leases = leases - a.ranges.leaseTransfers = leaseTransfers + a.ranges.stats = stats a.ranges.Unlock() time.Sleep(d) @@ -300,14 +308,15 @@ func formatHeader(header string, numberNodes int, localities []Locality) string } func (a *allocSim) monitor(d time.Duration) { - formatNodes := func(replicas, leases, leaseTransfers []int) string { + formatNodes := func(stats allocStats) string { var buf bytes.Buffer - for i := range replicas { + for i := range stats.replicas { alive := a.Nodes[i].Alive() if !alive { _, _ = buf.WriteString("\033[0;31;49m") } - fmt.Fprintf(&buf, "%*s", len(padding), fmt.Sprintf("%d/%d/%d", replicas[i], leases[i], leaseTransfers[i])) + fmt.Fprintf(&buf, "%*s", len(padding), fmt.Sprintf("%d/%d/%d/%d", + stats.replicas[i], stats.leases[i], stats.replicaAdds[i], stats.leaseTransfers[i])) if !alive { _, _ = buf.WriteString("\033[0m") } @@ -329,14 +338,11 @@ func (a *allocSim) monitor(d time.Duration) { totalLatencyNanos := atomic.LoadUint64(&a.stats.totalLatencyNanos) a.ranges.Lock() - ranges := a.ranges.count - replicas := a.ranges.replicas - leases := a.ranges.leases - leaseTransfers := a.ranges.leaseTransfers + rangeStats := a.ranges.stats a.ranges.Unlock() - if ticks%20 == 0 || numReplicas != len(replicas) { - numReplicas = len(replicas) + if ticks%20 == 0 || numReplicas != len(rangeStats.replicas) { + numReplicas = len(rangeStats.replicas) fmt.Println(formatHeader("_elapsed__ops/sec__average__latency___errors_replicas", numReplicas, a.localities)) } @@ -347,7 +353,7 @@ func (a *allocSim) monitor(d time.Duration) { fmt.Printf("%8s %8.1f %8.1f %6.1fms %8d %8d%s\n", time.Duration(now.Sub(start).Seconds()+0.5)*time.Second, float64(ops-lastOps)/elapsed, float64(ops)/now.Sub(start).Seconds(), avgLatency, - atomic.LoadUint64(&a.stats.errors), ranges, formatNodes(replicas, leases, leaseTransfers)) + atomic.LoadUint64(&a.stats.errors), rangeStats.count, formatNodes(rangeStats)) lastTime = now lastOps = ops } @@ -360,7 +366,7 @@ func (a *allocSim) finalStatus() { // TODO(bram): With the addition of localities, these stats will have to be // updated. - fmt.Println(formatHeader("___stats___________________________", len(a.ranges.replicas), a.localities)) + fmt.Println(formatHeader("___stats___________________________", len(a.ranges.stats.replicas), a.localities)) genStats := func(name string, counts []int) { var total float64 @@ -380,8 +386,8 @@ func (a *allocSim) finalStatus() { } fmt.Println(buf.String()) } - genStats("replicas", a.ranges.replicas) - genStats("leases", a.ranges.leases) + genStats("replicas", a.ranges.stats.replicas) + genStats("leases", a.ranges.stats.leases) } func handleStart() bool { @@ -510,7 +516,7 @@ func main() { os.Exit(exitStatus) }() - allNodeArgs := append(flag.Args(), "--vmodule=allocator=1") + allNodeArgs := append(flag.Args(), "--vmodule=allocator=3,allocator_scorer=3") c.Start("allocsim", *workers, os.Args[0], allNodeArgs, perNodeArgs, perNodeEnv) c.UpdateZoneConfig(1, 1<<20) _, err := c.DB[0].Exec("SET CLUSTER SETTING kv.raft_log.synchronize = false;")