diff --git a/pkg/cli/systembench.go b/pkg/cli/systembench.go index 215dfc508092..a4fe4d4f36ba 100644 --- a/pkg/cli/systembench.go +++ b/pkg/cli/systembench.go @@ -30,6 +30,17 @@ Runs the sequential disk write benchmark. RunE: MaybeDecorateGRPCError(RunSeqWriteBench), } +// A cpuBench command runs CPU benchmarks on cockroach. +var cpuBench = &cobra.Command{ + Use: "cpu", + Short: "Runs the prime finding cpu benchmark.", + Long: ` +Runs the prime finding cpu benchmark. +`, + Args: cobra.NoArgs, + RunE: MaybeDecorateGRPCError(RunCPUBench), +} + // RunSeqWriteBench runs a sequential write I/O benchmark. func RunSeqWriteBench(cmd *cobra.Command, args []string) error { iOOpts := systembench.DiskOptions{ @@ -44,8 +55,20 @@ func RunSeqWriteBench(cmd *cobra.Command, args []string) error { return systembench.Run(iOOpts) } +// RunCPUBench runs the prime finding cpu benchmark. +func RunCPUBench(cmd *cobra.Command, args []string) error { + cpuOptions := systembench.CPUOptions{ + Concurrency: systemBenchCtx.concurrency, + Duration: systemBenchCtx.duration, + + Type: systembench.CPUPrimeTest, + } + return systembench.RunCPU(cpuOptions) +} + var systemBenchCmds = []*cobra.Command{ seqWriteBench, + cpuBench, } var systemBenchCmd = &cobra.Command{ diff --git a/pkg/cli/systembench/cpu_bench.go b/pkg/cli/systembench/cpu_bench.go new file mode 100644 index 000000000000..e70307dadd3b --- /dev/null +++ b/pkg/cli/systembench/cpu_bench.go @@ -0,0 +1,170 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package systembench + +import ( + "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/dustin/go-humanize" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "math" + "sync/atomic" + "time" +) + +// CPUBenchmarkType represents a CPU Benchmark. +type CPUBenchmarkType int + +const ( + // CPUPrimeTest identifies a prime factoring CPU test. + CPUPrimeTest CPUBenchmarkType = iota +) + +// CPUOptions holds parameters for the test. +type CPUOptions struct { + Concurrency int + Duration time.Duration + + Type CPUBenchmarkType +} + +// workerCPUPrimes holds a latency histogram. +type workerCPUPrimes struct { + latency *namedHistogram +} + +// workerCPUPrimes implements the worker interface. +func (w *workerCPUPrimes) run(ctx context.Context) error { + dividend := 1 + // This is to make sure we're measuring the divisions + // vs the performance of locking. + batchSize := 1000 + for { + start := timeutil.Now() + count := uint64(0) + for i := 0; i < batchSize; i++ { + limit := math.Sqrt(float64(dividend)) + divisor := 1 + for float64(divisor) <= limit { + if ctx.Err() != nil { + return ctx.Err() + } + remainder := dividend % divisor + count++ + + if remainder == 0 { + dividend++ + break + } + divisor++ + } + } + elapsed := timeutil.Since(start) + atomic.AddUint64(&numOps, count) + w.latency.Record(elapsed) + } +} + +// workerCPUPrimes implements the worker interface. +func (w *workerCPUPrimes) getLatencyHistogram() *namedHistogram { + return w.latency +} + +// newWorkerCPUPrimes creates a worker that will verify prime numbers by doing standard +// division of the number by all numbers between 2 and the square root of the number. +// If any number gives a remainder of 0, the next number is calculated. +func newWorkerCPUPrimes( + ctx context.Context, cpuOptions *CPUOptions, registry *histogramRegistry, +) worker { + registry.Register("ops") + w := &workerCPUPrimes{} + w.latency = registry.Register("ops") + return w +} + +// RunCPU runs cpu benchmarks specified by cpuOptions. +func RunCPU(cpuOptions CPUOptions) error { + ctx := context.Background() + reg := newHistogramRegistry() + + workers := make([]worker, cpuOptions.Concurrency) + var workerCreator func(ctx context.Context, cpuOptions *CPUOptions, registry *histogramRegistry) worker + + switch cpuOptions.Type { + case CPUPrimeTest: + workerCreator = newWorkerCPUPrimes + default: + return errors.Errorf("Please specify a valid subtest.") + } + + for i := range workers { + workers[i] = workerCreator(ctx, &cpuOptions, reg) + } + + start := timeutil.Now() + lastNow := start + var lastOps uint64 + + return runTest(ctx, test{ + init: func(g *errgroup.Group) { + for i := range workers { + g.Go(func() error { + return workers[i].run(ctx) + }) + } + }, + + tick: func(elapsed time.Duration, i int) { + now := timeutil.Now() + ops := atomic.LoadUint64(&numOps) + elapsedSinceLastTick := now.Sub(lastNow) + if i%20 == 0 { + fmt.Println("_elapsed____ops/sec__p50(ms)__p95(ms)__p99(ms)_pMax(ms)") + } + reg.Tick(func(tick histogramTick) { + h := tick.Hist + fmt.Printf("%8s %10s %8.1f %8.1f %8.1f %8.1f\n", + time.Duration(timeutil.Since(start).Seconds()+0.5)*time.Second, + humanize.Comma(int64(float64(ops-lastOps)/elapsedSinceLastTick.Seconds())), + time.Duration(h.ValueAtQuantile(50)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(95)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(99)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(100)).Seconds()*1000, + ) + }) + lastNow = now + lastOps = ops + }, + + done: func(elapsed time.Duration) { + startElapsed := timeutil.Since(start) + const totalHeader = "\n_elapsed____ops(total)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)" + fmt.Println(totalHeader + `__total`) + reg.Tick(func(tick histogramTick) { + h := tick.Cumulative + fmt.Printf("%8s %13s %8.1f %8.1f %8.1f %8.1f %8.1f\n", + time.Duration(startElapsed.Seconds())*time.Second, + humanize.Comma(int64(atomic.LoadUint64(&numOps))), + time.Duration(h.Mean()).Seconds()*1000, + time.Duration(h.ValueAtQuantile(50)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(95)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(99)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(100)).Seconds()*1000) + }) + }, + }, cpuOptions.Duration) +} diff --git a/pkg/cli/systembench/disk_bench.go b/pkg/cli/systembench/disk_bench.go index ac8dc16ef327..7d341ec287ed 100644 --- a/pkg/cli/systembench/disk_bench.go +++ b/pkg/cli/systembench/disk_bench.go @@ -18,10 +18,8 @@ import ( "context" "fmt" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/sysutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/codahale/hdrhistogram" "github.com/pkg/errors" "golang.org/x/sync/errgroup" "io/ioutil" @@ -33,14 +31,9 @@ import ( ) const ( - minLatency = 100 * time.Microsecond - maxLatency = 10 * time.Second maxFileSize = 1 << 30 // A gigabyte ) -var numOps uint64 -var numBytes uint64 - // DiskBenchmarkType represents an I/O benchmark. type DiskBenchmarkType int @@ -61,22 +54,10 @@ type DiskOptions struct { Type DiskBenchmarkType } -// latency is a histogram for the latency of a single worker. -type latency struct { - syncutil.Mutex - *hdrhistogram.WindowedHistogram -} - -// worker represents a single worker process generating load. -type worker interface { - getLatencyHistogram() *latency - run(ctx context.Context) error -} - // workerSeqWrite holds a temp file and random byte array to write to // the temp file. type workerSeqWrite struct { - latency latency + latency *namedHistogram data []byte tempFileDir string @@ -85,7 +66,9 @@ type workerSeqWrite struct { // newWorkerSeqWrite creates a worker that writes writeSize sized // sequential blocks with fsync every syncInterval bytes to a file. -func newWorkerSeqWrite(ctx context.Context, diskOptions *DiskOptions) worker { +func newWorkerSeqWrite( + ctx context.Context, diskOptions *DiskOptions, registry *histogramRegistry, +) worker { data := make([]byte, diskOptions.WriteSize) _, err := rand.Read(data) if err != nil { @@ -96,8 +79,7 @@ func newWorkerSeqWrite(ctx context.Context, diskOptions *DiskOptions) worker { tempFileDir: diskOptions.Dir, syncInterval: diskOptions.SyncInterval, } - w.latency.WindowedHistogram = hdrhistogram.NewWindowed(1, - minLatency.Nanoseconds(), maxLatency.Nanoseconds(), 1) + w.latency = registry.Register("ops") return w } @@ -149,18 +131,13 @@ func (w *workerSeqWrite) run(ctx context.Context) error { bytes := uint64(writeSize) atomic.AddUint64(&numOps, 1) atomic.AddUint64(&numBytes, bytes) - elapsed := timeutil.Since(start) - w.latency.Lock() - if err := w.latency.Current.RecordValue(elapsed.Nanoseconds()); err != nil { - return err - } - w.latency.Unlock() + w.latency.Record(timeutil.Since(start)) } } // workerSeqWrite implements the worker interface. -func (w *workerSeqWrite) getLatencyHistogram() *latency { - return &w.latency +func (w *workerSeqWrite) getLatencyHistogram() *namedHistogram { + return w.latency } // newTempFile creates a temporary file, closes it and @@ -183,6 +160,7 @@ func newTempFile(dir string) (*os.File, error) { // Run runs I/O benchmarks specified by diskOpts. func Run(diskOpts DiskOptions) error { ctx := context.Background() + reg := newHistogramRegistry() // Check if the directory exists. _, err := os.Stat(diskOpts.Dir) @@ -193,7 +171,7 @@ func Run(diskOpts DiskOptions) error { log.Infof(ctx, "writing to %s\n", diskOpts.Dir) workers := make([]worker, diskOpts.Concurrency) - var workerCreator func(ctx context.Context, diskOptions *DiskOptions) worker + var workerCreator func(ctx context.Context, diskOptions *DiskOptions, registry *histogramRegistry) worker switch diskOpts.Type { case SeqWriteTest: @@ -209,7 +187,7 @@ func Run(diskOpts DiskOptions) error { defer cancel() for i := range workers { - workers[i] = workerCreator(ctx, &diskOpts) + workers[i] = workerCreator(ctx, &diskOpts, reg) } for i := range workers { @@ -245,37 +223,9 @@ func Run(diskOpts DiskOptions) error { var lastOps uint64 var lastBytes uint64 - // This is a histogram that combines the output for multiple workers. - var cumulative *hdrhistogram.Histogram - for i := 0; ; i++ { select { case <-ticker.C: - var h *hdrhistogram.Histogram - for _, w := range workers { - workerLatency := w.getLatencyHistogram() - workerLatency.Lock() - m := workerLatency.Merge() - workerLatency.Rotate() - workerLatency.Unlock() - if h == nil { - h = m - } else { - h.Merge(m) - } - } - - if cumulative == nil { - cumulative = h - } else { - cumulative.Merge(h) - } - - p50 := h.ValueAtQuantile(50) - p95 := h.ValueAtQuantile(95) - p99 := h.ValueAtQuantile(99) - pMax := h.ValueAtQuantile(100) - now := timeutil.Now() elapsed := now.Sub(lastNow) ops := atomic.LoadUint64(&numOps) @@ -284,15 +234,18 @@ func Run(diskOpts DiskOptions) error { if i%20 == 0 { fmt.Println("_elapsed____ops/sec___mb/sec__p50(ms)__p95(ms)__p99(ms)_pMax(ms)") } - fmt.Printf("%8s %10.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", - time.Duration(timeutil.Since(start).Seconds()+0.5)*time.Second, - float64(ops-lastOps)/elapsed.Seconds(), - float64(bytes-lastBytes)/(1024.0*1024.0)/elapsed.Seconds(), - time.Duration(p50).Seconds()*1000, - time.Duration(p95).Seconds()*1000, - time.Duration(p99).Seconds()*1000, - time.Duration(pMax).Seconds()*1000, - ) + reg.Tick(func(tick histogramTick) { + h := tick.Hist + fmt.Printf("%8s %10.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", + time.Duration(timeutil.Since(start).Seconds()+0.5)*time.Second, + float64(ops-lastOps)/elapsed.Seconds(), + float64(bytes-lastBytes)/(1024.0*1024.0)/elapsed.Seconds(), + time.Duration(h.ValueAtQuantile(50)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(95)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(99)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(100)).Seconds()*1000, + ) + }) lastNow = now lastOps = ops lastBytes = bytes @@ -302,16 +255,18 @@ func Run(diskOpts DiskOptions) error { startElapsed := timeutil.Since(start) const totalHeader = "\n_elapsed____ops(total)__mb(total)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)" fmt.Println(totalHeader + `__total`) - fmt.Printf("%8s %13d %10.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", - time.Duration(startElapsed.Seconds())*time.Second, - atomic.LoadUint64(&numOps), - float64(atomic.LoadUint64(&numBytes)/(1024.0*1024.0)), - time.Duration(cumulative.Mean()).Seconds()*1000, - time.Duration(cumulative.ValueAtQuantile(50)).Seconds()*1000, - time.Duration(cumulative.ValueAtQuantile(95)).Seconds()*1000, - time.Duration(cumulative.ValueAtQuantile(99)).Seconds()*1000, - time.Duration(cumulative.ValueAtQuantile(100)).Seconds()*1000, - ) + reg.Tick(func(tick histogramTick) { + h := tick.Cumulative + fmt.Printf("%8s %13d %10.1f %8.1f %8.1f %8.1f %8.1f %8.1f\n", + time.Duration(startElapsed.Seconds())*time.Second, + atomic.LoadUint64(&numOps), + float64(atomic.LoadUint64(&numBytes)/(1024.0*1024.0)), + time.Duration(h.Mean()).Seconds()*1000, + time.Duration(h.ValueAtQuantile(50)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(95)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(99)).Seconds()*1000, + time.Duration(h.ValueAtQuantile(100)).Seconds()*1000) + }) return nil case err := <-errs: return err diff --git a/pkg/cli/systembench/systembench_common.go b/pkg/cli/systembench/systembench_common.go new file mode 100644 index 000000000000..dbcc30eb33e0 --- /dev/null +++ b/pkg/cli/systembench/systembench_common.go @@ -0,0 +1,28 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package systembench + +import ( + "context" +) + +// worker represents a single worker process generating load. +type worker interface { + getLatencyHistogram() *namedHistogram + run(ctx context.Context) error +} + +var numOps uint64 +var numBytes uint64 diff --git a/pkg/cli/systembench/tests.go b/pkg/cli/systembench/tests.go new file mode 100644 index 000000000000..aeee096e0274 --- /dev/null +++ b/pkg/cli/systembench/tests.go @@ -0,0 +1,221 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package systembench + +import ( + "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/sysutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "golang.org/x/sync/errgroup" + "os" + "os/signal" + "sort" + "time" + + "github.com/codahale/hdrhistogram" +) + +const ( + minLatency = 10 * time.Microsecond + maxLatency = 10 * time.Second +) + +func newHistogram() *hdrhistogram.Histogram { + return hdrhistogram.New(minLatency.Nanoseconds(), maxLatency.Nanoseconds(), 1) +} + +type namedHistogram struct { + name string + mu struct { + syncutil.Mutex + current *hdrhistogram.Histogram + } +} + +func newNamedHistogram(name string) *namedHistogram { + w := &namedHistogram{name: name} + w.mu.current = newHistogram() + return w +} + +func (w *namedHistogram) Record(elapsed time.Duration) { + if elapsed < minLatency { + elapsed = minLatency + } else if elapsed > maxLatency { + elapsed = maxLatency + } + + w.mu.Lock() + err := w.mu.current.RecordValue(elapsed.Nanoseconds()) + w.mu.Unlock() + + if err != nil { + // Note that a histogram only drops recorded values that are out of range, + // but we clamp the latency value to the configured range to prevent such + // drops. This code path should never happen. + panic(fmt.Sprintf(`%s: recording value: %s`, w.name, err)) + } +} + +func (w *namedHistogram) tick(fn func(h *hdrhistogram.Histogram)) { + w.mu.Lock() + defer w.mu.Unlock() + h := w.mu.current + w.mu.current = newHistogram() + fn(h) +} + +type histogramTick struct { + // Name is the name given to the histograms represented by this tick. + Name string + // Hist is the merged result of the represented histograms for this tick. + // Hist.TotalCount() is the number of operations that occurred for this tick. + Hist *hdrhistogram.Histogram + // Cumulative is the merged result of the represented histograms for all + // time. Cumulative.TotalCount() is the total number of operations that have + // occurred over all time. + Cumulative *hdrhistogram.Histogram + // Elapsed is the amount of time since the last tick. + Elapsed time.Duration + // Now is the time at which the tick was gathered. It covers the period + // [Now-Elapsed,Now). + Now time.Time +} + +type histogramRegistry struct { + mu struct { + syncutil.Mutex + registered []*namedHistogram + } + + start time.Time + cumulative map[string]*hdrhistogram.Histogram + prevTick map[string]time.Time +} + +func newHistogramRegistry() *histogramRegistry { + return &histogramRegistry{ + start: timeutil.Now(), + cumulative: make(map[string]*hdrhistogram.Histogram), + prevTick: make(map[string]time.Time), + } +} + +func (w *histogramRegistry) Register(name string) *namedHistogram { + hist := newNamedHistogram(name) + + w.mu.Lock() + w.mu.registered = append(w.mu.registered, hist) + w.mu.Unlock() + + return hist +} + +func (w *histogramRegistry) Tick(fn func(histogramTick)) { + w.mu.Lock() + registered := append([]*namedHistogram(nil), w.mu.registered...) + w.mu.Unlock() + + merged := make(map[string]*hdrhistogram.Histogram) + var names []string + for _, hist := range registered { + hist.tick(func(h *hdrhistogram.Histogram) { + if m, ok := merged[hist.name]; ok { + m.Merge(h) + } else { + merged[hist.name] = h + names = append(names, hist.name) + } + }) + } + + now := timeutil.Now() + sort.Strings(names) + for _, name := range names { + mergedHist := merged[name] + if _, ok := w.cumulative[name]; !ok { + w.cumulative[name] = newHistogram() + } + w.cumulative[name].Merge(mergedHist) + + prevTick, ok := w.prevTick[name] + if !ok { + prevTick = w.start + } + w.prevTick[name] = now + fn(histogramTick{ + Name: name, + Hist: merged[name], + Cumulative: w.cumulative[name], + Elapsed: now.Sub(prevTick), + Now: now, + }) + } +} + +type test struct { + init func(g *errgroup.Group) + tick func(elapsed time.Duration, i int) + done func(elapsed time.Duration) +} + +func runTest(ctx context.Context, t test, duration time.Duration) error { + g, ctx := errgroup.WithContext(ctx) + + var cancel func() + _, cancel = context.WithCancel(ctx) + defer cancel() + + t.init(g) + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + errs := make(chan error, 1) + done := make(chan os.Signal, 3) + signal.Notify(done, os.Interrupt) + + go func() { + if err := g.Wait(); err != nil { + errs <- err + } else { + done <- sysutil.Signal(0) + } + }() + + if duration > 0 { + go func() { + time.Sleep(duration) + done <- sysutil.Signal(0) + }() + } + + start := timeutil.Now() + for i := 0; ; i++ { + select { + case <-ticker.C: + t.tick(timeutil.Since(start), i) + + case <-done: + cancel() + t.done(timeutil.Since(start)) + return nil + case err := <-errs: + return err + } + } +}