Skip to content

Commit

Permalink
Make code more readable based on PR reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Mar 29, 2022
1 parent 9d58579 commit 25144a6
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 32 deletions.
10 changes: 1 addition & 9 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {

// We do this here so we can get any output URLs below.
initBar.Modify(pb.WithConstProgress(0, "Starting outputs"))
// TODO: re-enable the code below
/*
outputManager := output.NewManager(outputs, logger, func(err error) {
if err != nil {
logger.WithError(err).Error("Received error to stop from output")
}
runCancel()
})
*/
// TODO: directly create the MutputManager here, not in the Engine
err = engine.OutputManager.StartOutputs()
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (e *Engine) startBackgroundProcesses(
for {
select {
case <-ticker.C:
thresholdsTainted, shouldAbort := e.MetricsEngine.ProcessThresholds()
thresholdsTainted, shouldAbort := e.MetricsEngine.EvaluateThresholds()
e.thresholdsTaintedLock.Lock()
e.thresholdsTainted = thresholdsTainted
e.thresholdsTaintedLock.Unlock()
Expand Down Expand Up @@ -261,7 +261,7 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu

if !e.runtimeOptions.NoThresholds.Bool {
// Process the thresholds one final time
thresholdsTainted, _ := e.MetricsEngine.ProcessThresholds()
thresholdsTainted, _ := e.MetricsEngine.EvaluateThresholds()
e.thresholdsTaintedLock.Lock()
e.thresholdsTainted = thresholdsTainted
e.thresholdsTaintedLock.Unlock()
Expand Down Expand Up @@ -300,7 +300,7 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu
if !e.runtimeOptions.NoThresholds.Bool {
// Ensure the ingester flushes any buffered metrics
_ = e.ingester.Stop()
thresholdsTainted, _ := e.MetricsEngine.ProcessThresholds()
thresholdsTainted, _ := e.MetricsEngine.EvaluateThresholds()
e.thresholdsTaintedLock.Lock()
e.thresholdsTainted = thresholdsTainted
e.thresholdsTaintedLock.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ExecutionScheduler struct {

// TODO: remove these when we don't have separate Init() and Run() methods
// and can use a context + a WaitGroup (or something like that)
stopVusEmission, vusEmissionStopped chan struct{}
stopVUsEmission, vusEmissionStopped chan struct{}
}

// Check to see if we implement the lib.ExecutionScheduler interface
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewExecutionScheduler(
maxPossibleVUs: maxPossibleVUs,
state: executionState,

stopVusEmission: make(chan struct{}),
stopVUsEmission: make(chan struct{}),
vusEmissionStopped: make(chan struct{}),
}, nil
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (e *ExecutionScheduler) emitVUsAndVUsMax(ctx context.Context, out chan<- st
emitMetrics()
case <-ctx.Done():
return
case <-e.stopVusEmission:
case <-e.stopVUsEmission:
return
}
}
Expand Down Expand Up @@ -399,7 +399,7 @@ func (e *ExecutionScheduler) runExecutor(
//nolint:funlen
func (e *ExecutionScheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- stats.SampleContainer) error {
defer func() {
close(e.stopVusEmission)
close(e.stopVUsEmission)
<-e.vusEmissionStopped
}()

Expand Down
34 changes: 18 additions & 16 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (me *MetricsEngine) GetIngester() output.Output {
}
}

func (me *MetricsEngine) getOrInitPotentialSubmetric(name string) (*stats.Metric, error) {
func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*stats.Metric, error) {
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

Expand All @@ -83,10 +83,11 @@ func (me *MetricsEngine) getOrInitPotentialSubmetric(name string) (*stats.Metric
return metric, nil
}

if nameParts[1][len(nameParts[1])-1] != '}' {
submetricDefinition := nameParts[1]
if submetricDefinition[len(submetricDefinition)-1] != '}' {
return nil, fmt.Errorf("missing ending bracket, sub-metric format needs to be 'metric{key:value}'")
}
sm, err := metric.AddSubmetric(nameParts[1][:len(nameParts[1])-1])
sm, err := metric.AddSubmetric(submetricDefinition[:len(submetricDefinition)-1])
if err != nil {
return nil, err
}
Expand All @@ -102,7 +103,7 @@ func (me *MetricsEngine) markObserved(metric *stats.Metric) {

func (me *MetricsEngine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range me.options.Thresholds {
metric, err := me.getOrInitPotentialSubmetric(metricName)
metric, err := me.getThresholdMetricOrSubmetric(metricName)

if me.runtimeOptions.NoThresholds.Bool {
if err != nil {
Expand Down Expand Up @@ -130,7 +131,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if me.options.SystemTags.Has(stats.TagExpectedResponse) {
_, err := me.getOrInitPotentialSubmetric("http_req_duration{expected_response:true}")
_, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
}
Expand All @@ -139,10 +140,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
return nil
}

// ProcessThresholds processes all of the thresholds.
// EvaluateThresholds processes all of the thresholds.
//
// TODO: refactor, make private, optimize
func (me *MetricsEngine) ProcessThresholds() (thresholdsTainted, shouldAbort bool) {
func (me *MetricsEngine) EvaluateThresholds() (thresholdsTainted, shouldAbort bool) {
me.MetricsLock.Lock()
defer me.MetricsLock.Unlock()

Expand All @@ -154,19 +155,20 @@ func (me *MetricsEngine) ProcessThresholds() (thresholdsTainted, shouldAbort boo
}
m.Tainted = null.BoolFrom(false)

me.logger.WithField("m", m.Name).Debug("running thresholds")
me.logger.WithField("metric_name", m.Name).Debug("running thresholds")
succ, err := m.Thresholds.Run(m.Sink, t)
if err != nil {
me.logger.WithField("m", m.Name).WithError(err).Error("Threshold error")
me.logger.WithField("metric_name", m.Name).WithError(err).Error("Threshold error")
continue
}
if !succ {
me.logger.WithField("m", m.Name).Debug("Thresholds failed")
m.Tainted = null.BoolFrom(true)
thresholdsTainted = true
if m.Thresholds.Abort {
shouldAbort = true
}
if succ {
continue // threshold passed
}
me.logger.WithField("metric_name", m.Name).Debug("Thresholds failed")
m.Tainted = null.BoolFrom(true)
thresholdsTainted = true
if m.Thresholds.Abort {
shouldAbort = true
}
}

Expand Down

0 comments on commit 25144a6

Please sign in to comment.