Skip to content

Commit

Permalink
[m3nsch] support generating new unique metrics
Browse files Browse the repository at this point in the history
`m3nsch` supports generating load for a defined set of metrics, but does
not currently support generating new uniques over time. This allows
specifying a percentage of generated metrics that will be unique.
  • Loading branch information
schallert committed Feb 25, 2019
1 parent 0647a12 commit 40ec4e1
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 61 deletions.
5 changes: 5 additions & 0 deletions src/cmd/services/m3nsch_client/cmd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func (w *cliWorkload) validate() error {
if w.Namespace == "" {
multiErr = multiErr.Add(fmt.Errorf("namespace must be set"))
}
if w.UniqueAmplifier < 0.0 || w.UniqueAmplifier > 1.0 {
multiErr = multiErr.Add(fmt.Errorf("unique-amplifier must be between 0.0 and 1.0 (is %f)", w.UniqueAmplifier))
}
return multiErr.FinalError()
}

Expand All @@ -68,4 +71,6 @@ func registerWorkloadFlags(flags *pflag.FlagSet, workload *cliWorkload) {
`aggregate workload cardinality`)
flags.IntVarP(&workload.IngressQPS, "ingress-qps", "i", 1000,
`aggregate workload ingress qps`)
flags.Float64VarP(&workload.UniqueAmplifier, "unique-amplifier", "u", 0.0,
`% of generatic metrics as float [0.0,1.0] that will be unique`)
}
45 changes: 33 additions & 12 deletions src/m3nsch/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ package agent
import (
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/m3nsch"
"github.com/m3db/m3/src/m3nsch/datums"
"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
xlog "github.com/m3db/m3x/log"
Expand All @@ -43,17 +46,19 @@ var (

type m3nschAgent struct {
sync.RWMutex
token string // workload token
workload m3nsch.Workload // workload to operate upon
registry datums.Registry // workload fake metric registry
session client.Session // m3db session to operate upon
agentStatus m3nsch.Status // agent status
opts m3nsch.AgentOptions // agent options
logger xlog.Logger // logger
metrics agentMetrics // agent performance metrics
workerChans workerChannels // worker-idx -> channel for worker notification
workerWg sync.WaitGroup // used to track when workers are finished
params workerParams // worker params
token string // workload token
workload m3nsch.Workload // workload to operate upon
registry datums.Registry // workload fake metric registry
session client.Session // m3db session to operate upon
agentStatus m3nsch.Status // agent status
opts m3nsch.AgentOptions // agent options
logger xlog.Logger // logger
metrics agentMetrics // agent performance metrics
workerChans workerChannels // worker-idx -> channel for worker notification
workerWg sync.WaitGroup // used to track when workers are finished
params workerParams // worker params
lastStartTime int64 // last time a workload was started as unix epoch
nowFn clock.NowFn
}

type workerParams struct {
Expand All @@ -72,6 +77,7 @@ func New(
registry: registry,
opts: opts,
logger: opts.InstrumentOptions().Logger(),
nowFn: time.Now,
params: workerParams{
fn: workerWriteFn,
},
Expand Down Expand Up @@ -215,6 +221,7 @@ func (ms *m3nschAgent) Start() error {
concurrency := ms.opts.Concurrency()
ms.workerChans = newWorkerChannels(concurrency)
ms.agentStatus = m3nsch.StatusRunning
atomic.StoreInt64(&ms.lastStartTime, ms.nowFn().Unix())
ms.workerWg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go ms.runWorker(i, ms.workerChans[i])
Expand Down Expand Up @@ -299,14 +306,28 @@ func (ms *m3nschAgent) runWorker(workerIdx int, workerCh chan workerNotification
case <-tickLoop.C:
fakeNow = fakeNow.Add(tickPeriod)
metric := ms.nextWorkerMetric(workerIdx)
start := time.Now()
start := ms.nowFn()

// If configured to generate uniques over time, modify the metric to add
// cardinality.
if u := ms.workload.UniqueAmplifier; u > 0 {
lastStart := time.Unix(atomic.LoadInt64(&ms.lastStartTime), 0)
suffix := "/" + metricUniqueSuffix(lastStart, ms.nowFn(), u)
metric.name += suffix
}

err := ms.params.fn(workerIdx, ms.session, namespace, metric, fakeNow, timeUnit)
elapsed := time.Since(start)
methodMetrics.ReportSuccessOrError(err, elapsed)
}
}
}

func metricUniqueSuffix(startTime, now time.Time, uniqueAmplifier float64) string {
n := now.Sub(startTime).Seconds() * uniqueAmplifier
return strconv.Itoa(int(n))
}

type generatedMetric struct {
name string
timeseries datums.SyntheticTimeSeries
Expand Down
20 changes: 20 additions & 0 deletions src/m3nsch/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3x/instrument"
xtime "github.com/m3db/m3x/time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -318,3 +319,22 @@ func TestTransitions(t *testing.T) {
err = agent.Stop()
require.NoError(t, err)
}

func TestMetricUniqueSuffix(t *testing.T) {
start := time.Unix(0, 0)
for u, exp := range map[float64]int{
0: 1,
0.2: 2,
0.5: 5,
1.0: 10,
} {
uniques := make(map[string]struct{})
for i := 0; i < 10; i++ {
now := start.Add(time.Duration(i) * time.Second)
n := metricUniqueSuffix(start, now, u)
uniques[n] = struct{}{}
}
l := len(uniques)
assert.Equal(t, exp, l, "expected to see %d unique metrics", l)
}
}
2 changes: 2 additions & 0 deletions src/m3nsch/examples/sample_workload.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//+build never

// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down
11 changes: 6 additions & 5 deletions src/m3nsch/generated/convert/to_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ func ToM3nschWorkload(workload *proto.Workload) (m3nsch.Workload, error) {
}

return m3nsch.Workload{
BaseTime: toTimeFromProtoTimestamp(workload.BaseTime),
MetricPrefix: workload.MetricPrefix,
Namespace: workload.Namespace,
Cardinality: int(workload.Cardinality),
IngressQPS: int(workload.IngressQPS),
BaseTime: toTimeFromProtoTimestamp(workload.BaseTime),
MetricPrefix: workload.MetricPrefix,
Namespace: workload.Namespace,
Cardinality: int(workload.Cardinality),
IngressQPS: int(workload.IngressQPS),
UniqueAmplifier: workload.UniqueAmplifier,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions src/m3nsch/generated/convert/to_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ func ToProtoWorkload(mw m3nsch.Workload) proto.Workload {
w.IngressQPS = int32(mw.IngressQPS)
w.MetricPrefix = mw.MetricPrefix
w.Namespace = mw.Namespace
w.UniqueAmplifier = mw.UniqueAmplifier
return w
}
119 changes: 75 additions & 44 deletions src/m3nsch/generated/proto/m3nsch/m3nsch.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/m3nsch/generated/proto/m3nsch/m3nsch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ message Workload {
string namespace = 3;
int32 cardinality = 4;
int32 ingressQPS = 5;
double uniqueAmplifier = 6;
}
5 changes: 5 additions & 0 deletions src/m3nsch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Workload struct {
// MetricStartIdx is an offset to control metric numbering. Can be safely ignored
// by external callers.
MetricStartIdx int

// UniqueAmplifier is the percentage of unique metrics generated as a float
// between 0.0 and 1.0 that will be unique. This allows for generating metrics
// with steady cardinality rate over time.
UniqueAmplifier float64
}

// Coordinator refers to the process responsible for synchronizing load generation.
Expand Down

0 comments on commit 40ec4e1

Please sign in to comment.