From fb9437e4c89efe5b28494553926b63edf31d08e3 Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Wed, 9 Mar 2022 14:12:15 +0200 Subject: [PATCH] Move the Engine data crunching logic in a new component under metrics/ --- api/v1/metric_routes.go | 14 +- api/v1/metric_routes_test.go | 8 +- api/v1/setup_teardown_routes_test.go | 30 +++- cmd/run.go | 29 ++-- core/engine.go | 235 ++++++--------------------- core/engine_test.go | 102 ++++++------ js/runner_test.go | 2 + metrics/engine/engine.go | 174 ++++++++++++++++++++ metrics/engine/ingester.go | 92 +++++++++++ output/manager.go | 26 +++ output/types.go | 2 + 11 files changed, 452 insertions(+), 262 deletions(-) create mode 100644 metrics/engine/engine.go create mode 100644 metrics/engine/ingester.go diff --git a/api/v1/metric_routes.go b/api/v1/metric_routes.go index 6ffdd929d02..2855e4bcdeb 100644 --- a/api/v1/metric_routes.go +++ b/api/v1/metric_routes.go @@ -36,9 +36,9 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) { t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration() } - engine.MetricsLock.Lock() - metrics := newMetricsJSONAPI(engine.Metrics, t) - engine.MetricsLock.Unlock() + engine.MetricsEngine.MetricsLock.Lock() + metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t) + engine.MetricsEngine.MetricsLock.Unlock() data, err := json.Marshal(metrics) if err != nil { @@ -56,13 +56,17 @@ func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) { t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration() } - metric, ok := engine.Metrics[id] + engine.MetricsEngine.MetricsLock.Lock() + metric, ok := engine.MetricsEngine.ObservedMetrics[id] if !ok { + engine.MetricsEngine.MetricsLock.Unlock() apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound) return } + wrappedMetric := newMetricEnvelope(metric, t) + engine.MetricsEngine.MetricsLock.Unlock() - data, err := json.Marshal(newMetricEnvelope(metric, t)) + data, err := json.Marshal(wrappedMetric) if err != nil { apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError) return diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index f0c4d2340cb..80d37fe4ca6 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -52,10 +52,10 @@ func TestGetMetrics(t *testing.T) { engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry) require.NoError(t, err) - engine.Metrics = map[string]*stats.Metric{ + engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{ "my_metric": stats.New("my_metric", stats.Trend, stats.Time), } - engine.Metrics["my_metric"].Tainted = null.BoolFrom(true) + engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) rw := httptest.NewRecorder() NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil)) @@ -112,10 +112,10 @@ func TestGetMetric(t *testing.T) { engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry) require.NoError(t, err) - engine.Metrics = map[string]*stats.Metric{ + engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{ "my_metric": stats.New("my_metric", stats.Trend, stats.Time), } - engine.Metrics["my_metric"].Tainted = null.BoolFrom(true) + engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) t.Run("nonexistent", func(t *testing.T) { t.Parallel() diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index e4bb27fd1d7..8e18bd1ce50 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -24,6 +24,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -133,12 +134,14 @@ func TestSetupData(t *testing.T) { }, }, } - logger := logrus.New() - logger.SetOutput(testutils.NewTestOutput(t)) - registry := metrics.NewRegistry() - builtinMetrics := metrics.RegisterBuiltinMetrics(registry) - for _, testCase := range testCases { - testCase := testCase + + runTestCase := func(t *testing.T, tcid int) { + testCase := testCases[tcid] + logger := logrus.New() + logger.SetOutput(testutils.NewTestOutput(t)) + registry := metrics.NewRegistry() + builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + t.Run(testCase.name, func(t *testing.T) { t.Parallel() @@ -164,14 +167,17 @@ func TestSetupData(t *testing.T) { engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, registry) require.NoError(t, err) + require.NoError(t, engine.OutputManager.StartOutputs()) + defer engine.OutputManager.StopOutputs() + globalCtx, globalCancel := context.WithCancel(context.Background()) runCtx, runCancel := context.WithCancel(globalCtx) run, wait, err := engine.Init(globalCtx, runCtx) + require.NoError(t, err) + defer wait() defer globalCancel() - require.NoError(t, err) - errC := make(chan error) go func() { errC <- run() }() @@ -211,4 +217,12 @@ func TestSetupData(t *testing.T) { } }) } + + for id := range testCases { + id := id + t.Run(fmt.Sprintf("testcase_%d", id), func(t *testing.T) { + t.Parallel() + runTestCase(t, id) + }) + } } diff --git a/cmd/run.go b/cmd/run.go index e64d4f3a2cc..2e868fcd448 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -45,7 +45,6 @@ import ( "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" - "go.k6.io/k6/output" "go.k6.io/k6/ui/pb" ) @@ -120,7 +119,10 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { return err } - // TODO: remove + // TODO: create a MetricsEngine here and add its ingester to the list of + // outputs (unless both NoThresholds and NoSummary were enabled) + + // TODO: remove this completely // Create the engine. initBar.Modify(pb.WithConstProgress(0, "Init engine")) engine, err := core.NewEngine( @@ -151,17 +153,20 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { // We do this here so we can get any output URLs below. initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) - outputManager := output.NewManager(outputs, logger, func(err error) { - if err != nil { - logger.WithError(err).Error("Received error to stop from output") - } - runCancel() - }) - err = outputManager.StartOutputs() + // TODO: re-enable the code below + /* + outputManager := output.NewManager(outputs, logger, func(err error) { + if err != nil { + logger.WithError(err).Error("Received error to stop from output") + } + runCancel() + }) + */ + err = engine.OutputManager.StartOutputs() if err != nil { return err } - defer outputManager.StopOutputs() + defer engine.OutputManager.StopOutputs() printExecutionDescription( c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, @@ -234,8 +239,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { // Handle the end-of-test summary. if !test.runtimeOptions.NoSummary.Bool { + engine.MetricsEngine.MetricsLock.Lock() // TODO: refactor so this is not needed summaryResult, err := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ - Metrics: engine.Metrics, + Metrics: engine.MetricsEngine.ObservedMetrics, RootGroup: execScheduler.GetRunner().GetDefaultGroup(), TestRunDuration: executionState.GetCurrentTestRunDuration(), NoColor: c.gs.flags.noColor, @@ -244,6 +250,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { IsStdErrTTY: c.gs.stdErr.isTTY, }, }) + engine.MetricsEngine.MetricsLock.Unlock() if err == nil { err = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult) } diff --git a/core/engine.go b/core/engine.go index a6a1f1c864f..165414dde6b 100644 --- a/core/engine.go +++ b/core/engine.go @@ -23,18 +23,16 @@ package core import ( "context" "errors" - "fmt" - "strings" "sync" "time" "github.com/sirupsen/logrus" - "gopkg.in/guregu/null.v3" "go.k6.io/k6/errext" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" "go.k6.io/k6/output" "go.k6.io/k6/stats" ) @@ -54,28 +52,25 @@ type Engine struct { // expects to be able to get information from the Engine and is initialized // before the Init() call... + // TODO: completely remove the engine and use all of these separately, in a + // much more composable and testable manner ExecutionScheduler lib.ExecutionScheduler - executionState *lib.ExecutionState + MetricsEngine *engine.MetricsEngine + OutputManager *output.Manager - options lib.Options runtimeOptions lib.RuntimeOptions - outputs []output.Output + + ingester output.Output logger *logrus.Entry stopOnce sync.Once stopChan chan struct{} - Metrics map[string]*stats.Metric // TODO: refactor, this doesn't need to be a map - MetricsLock sync.Mutex - - registry *metrics.Registry - Samples chan stats.SampleContainer - - // These can be both top-level metrics or sub-metrics - metricsWithThresholds []*stats.Metric + Samples chan stats.SampleContainer // Are thresholds tainted? - thresholdsTainted bool + thresholdsTaintedLock sync.Mutex + thresholdsTainted bool } // NewEngine instantiates a new Engine, without doing any heavy initialization. @@ -89,89 +84,32 @@ func NewEngine( e := &Engine{ ExecutionScheduler: ex, - executionState: ex.GetState(), - options: opts, runtimeOptions: rtOpts, - outputs: outputs, - Metrics: make(map[string]*stats.Metric), Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64), stopChan: make(chan struct{}), logger: logger.WithField("component", "engine"), - registry: registry, - } - - if !(e.runtimeOptions.NoSummary.Bool && e.runtimeOptions.NoThresholds.Bool) { - err := e.initSubMetricsAndThresholds() - if err != nil { - return nil, err - } - } - - return e, nil -} - -func (e *Engine) getOrInitPotentialSubmetric(name string) (*stats.Metric, error) { - // TODO: replace with strings.Cut after Go 1.18 - nameParts := strings.SplitN(name, "{", 2) - - metric := e.registry.Get(nameParts[0]) - if metric == nil { - return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) - } - if len(nameParts) == 1 { // no sub-metric - return metric, nil } - if nameParts[1][len(nameParts[1])-1] != '}' { - return nil, fmt.Errorf("missing ending bracket, sub-metric format needs to be 'metric{key:value}'") - } - sm, err := metric.AddSubmetric(nameParts[1][:len(nameParts[1])-1]) + me, err := engine.NewMetricsEngine(registry, ex.GetState(), opts, rtOpts, logger) if err != nil { return nil, err } - return sm.Metric, nil -} + e.MetricsEngine = me -func (e *Engine) initSubMetricsAndThresholds() error { - for metricName, thresholds := range e.options.Thresholds { - metric, err := e.getOrInitPotentialSubmetric(metricName) - - if e.runtimeOptions.NoThresholds.Bool { - if err != nil { - e.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName) - } - continue - } - - if err != nil { - return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err) - } - - metric.Thresholds = thresholds - e.metricsWithThresholds = append(e.metricsWithThresholds, metric) - - // Mark the metric (and the parent metricq, if we're dealing with a - // submetric) as observed, so they are shown in the end-of-test summary, - // even if they don't have any metric samples during the test run - metric.Observed = true - e.Metrics[metric.Name] = metric - if metric.Sub != nil { - metric.Sub.Metric.Observed = true - e.Metrics[metric.Sub.Metric.Name] = metric.Sub.Metric - } + if !(rtOpts.NoSummary.Bool && rtOpts.NoThresholds.Bool) { + e.ingester = me.GetIngester() + outputs = append(outputs, e.ingester) } - // TODO: refactor out of here when https://github.com/grafana/k6/issues/1321 - // lands and there is a better way to enable a metric with tag - if e.options.SystemTags.Has(stats.TagExpectedResponse) { - _, err := e.getOrInitPotentialSubmetric("http_req_duration{expected_response:true}") + e.OutputManager = output.NewManager(outputs, logger, func(err error) { if err != nil { - return err // shouldn't happen, but ¯\_(ツ)_/¯ + logger.WithError(err).Error("Received error to stop from output") } - } + e.Stop() + }) - return nil + return e, nil } // Init is used to initialize the execution scheduler and all metrics processing @@ -253,27 +191,27 @@ func (e *Engine) startBackgroundProcesses( var serr errext.Exception switch { case errors.As(err, &serr): - e.setRunStatus(lib.RunStatusAbortedScriptError) + e.OutputManager.SetRunStatus(lib.RunStatusAbortedScriptError) case common.IsInterruptError(err): - e.setRunStatus(lib.RunStatusAbortedUser) + e.OutputManager.SetRunStatus(lib.RunStatusAbortedUser) default: - e.setRunStatus(lib.RunStatusAbortedSystem) + e.OutputManager.SetRunStatus(lib.RunStatusAbortedSystem) } } else { e.logger.Debug("run: execution scheduler terminated") - e.setRunStatus(lib.RunStatusFinished) + e.OutputManager.SetRunStatus(lib.RunStatusFinished) } case <-runCtx.Done(): e.logger.Debug("run: context expired; exiting...") - e.setRunStatus(lib.RunStatusAbortedUser) + e.OutputManager.SetRunStatus(lib.RunStatusAbortedUser) case <-e.stopChan: runSubCancel() e.logger.Debug("run: stopped by user; exiting...") - e.setRunStatus(lib.RunStatusAbortedUser) + e.OutputManager.SetRunStatus(lib.RunStatusAbortedUser) case <-thresholdAbortChan: e.logger.Debug("run: stopped by thresholds; exiting...") runSubCancel() - e.setRunStatus(lib.RunStatusAbortedThreshold) + e.OutputManager.SetRunStatus(lib.RunStatusAbortedThreshold) } }() @@ -289,7 +227,11 @@ func (e *Engine) startBackgroundProcesses( for { select { case <-ticker.C: - if e.processThresholds() { + thresholdsTainted, shouldAbort := e.MetricsEngine.ProcessThresholds() + e.thresholdsTaintedLock.Lock() + e.thresholdsTainted = thresholdsTainted + e.thresholdsTaintedLock.Unlock() + if shouldAbort { close(thresholdAbortChan) return } @@ -315,10 +257,14 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu for sc := range e.Samples { sampleContainers = append(sampleContainers, sc) } - e.processSamples(sampleContainers) + e.OutputManager.AddMetricSamples(sampleContainers) if !e.runtimeOptions.NoThresholds.Bool { - e.processThresholds() // Process the thresholds one final time + // Process the thresholds one final time + thresholdsTainted, _ := e.MetricsEngine.ProcessThresholds() + e.thresholdsTaintedLock.Lock() + e.thresholdsTainted = thresholdsTainted + e.thresholdsTaintedLock.Unlock() } }() @@ -328,7 +274,7 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu e.logger.Debug("Metrics processing started...") processSamples := func() { if len(sampleContainers) > 0 { - e.processSamples(sampleContainers) + e.OutputManager.AddMetricSamples(sampleContainers) // Make the new container with the same size as the previous // one, assuming that we produce roughly the same amount of // metrics data between ticks... @@ -352,7 +298,12 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu e.logger.Debug("Processing metrics and thresholds after the test run has ended...") processSamples() if !e.runtimeOptions.NoThresholds.Bool { - e.processThresholds() + // Ensure the ingester flushes any buffered metrics + _ = e.ingester.Stop() + thresholdsTainted, _ := e.MetricsEngine.ProcessThresholds() + e.thresholdsTaintedLock.Lock() + e.thresholdsTainted = thresholdsTainted + e.thresholdsTaintedLock.Unlock() } processMetricsAfterRun <- struct{}{} @@ -364,15 +315,9 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu } } -func (e *Engine) setRunStatus(status lib.RunStatus) { - for _, out := range e.outputs { - if statUpdOut, ok := out.(output.WithRunStatusUpdates); ok { - statUpdOut.SetRunStatus(status) - } - } -} - func (e *Engine) IsTainted() bool { + e.thresholdsTaintedLock.Lock() + defer e.thresholdsTaintedLock.Unlock() return e.thresholdsTainted } @@ -392,89 +337,3 @@ func (e *Engine) IsStopped() bool { return false } } - -func (e *Engine) processThresholds() (shouldAbort bool) { - e.MetricsLock.Lock() - defer e.MetricsLock.Unlock() - - t := e.executionState.GetCurrentTestRunDuration() - - e.thresholdsTainted = false - for _, m := range e.metricsWithThresholds { - if len(m.Thresholds.Thresholds) == 0 { - continue - } - m.Tainted = null.BoolFrom(false) - - e.logger.WithField("m", m.Name).Debug("running thresholds") - succ, err := m.Thresholds.Run(m.Sink, t) - if err != nil { - e.logger.WithField("m", m.Name).WithError(err).Error("Threshold error") - continue - } - if !succ { - e.logger.WithField("m", m.Name).Debug("Thresholds failed") - m.Tainted = null.BoolFrom(true) - e.thresholdsTainted = true - if m.Thresholds.Abort { - shouldAbort = true - } - } - } - - return shouldAbort -} - -func (e *Engine) processMetricsInSamples(sampleContainers []stats.SampleContainer) { - for _, sampleContainer := range sampleContainers { - samples := sampleContainer.GetSamples() - - if len(samples) == 0 { - continue - } - - for _, sample := range samples { - m := sample.Metric // this should have come from the Registry, no need to look it up - if !m.Observed { - // But we need to add it here, so we can show data in the - // end-of-test summary for this metric - e.Metrics[m.Name] = m - m.Observed = true - } - m.Sink.Add(sample) // add its value to its own sink - - // and also add it to any submetrics that match - for _, sm := range m.Submetrics { - if !sample.Tags.Contains(sm.Tags) { - continue - } - if !sm.Metric.Observed { - // But we need to add it here, so we can show data in the - // end-of-test summary for this metric - e.Metrics[sm.Metric.Name] = sm.Metric - sm.Metric.Observed = true - } - sm.Metric.Sink.Add(sample) - } - } - } -} - -func (e *Engine) processSamples(sampleContainers []stats.SampleContainer) { - if len(sampleContainers) == 0 { - return - } - - // TODO: optimize this... - e.MetricsLock.Lock() - defer e.MetricsLock.Unlock() - - // TODO: run this and the below code in goroutines? - if !(e.runtimeOptions.NoSummary.Bool && e.runtimeOptions.NoThresholds.Bool) { - e.processMetricsInSamples(sampleContainers) - } - - for _, out := range e.outputs { - out.AddMetricSamples(sampleContainers) - } -} diff --git a/core/engine_test.go b/core/engine_test.go index 8b445dd317f..7f45d4bf19d 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -84,6 +84,7 @@ func newTestEngineWithRegistry( //nolint:golint engine, err = NewEngine(execScheduler, opts, lib.RuntimeOptions{}, outputs, logger, registry) require.NoError(t, err) + require.NoError(t, engine.OutputManager.StartOutputs()) run, waitFn, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) @@ -94,6 +95,7 @@ func newTestEngineWithRegistry( //nolint:golint } globalCancel() waitFn() + engine.OutputManager.StopOutputs() } } @@ -249,7 +251,7 @@ func TestEngineOutput(t *testing.T) { cSamples = append(cSamples, sample) } } - metric := e.Metrics["test_metric"] + metric := e.MetricsEngine.ObservedMetrics["test_metric"] if assert.NotNil(t, metric) { sink := metric.Sink.(*stats.TrendSink) if assert.NotNil(t, sink) { @@ -271,13 +273,15 @@ func TestEngine_processSamples(t *testing.T) { require.NoError(t, err) e, _, wait := newTestEngineWithRegistry(t, nil, nil, nil, lib.Options{}, registry) - defer wait() - e.processSamples( + e.OutputManager.AddMetricSamples( []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}}, ) - assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink) + e.Stop() + wait() + + assert.IsType(t, &stats.GaugeSink{}, e.MetricsEngine.ObservedMetrics["my_metric"].Sink) }) t.Run("submetric", func(t *testing.T) { t.Parallel() @@ -295,19 +299,20 @@ func TestEngine_processSamples(t *testing.T) { "my_metric{a:1}": ths, }, }, registry) - defer wait() - assert.Len(t, e.metricsWithThresholds, 1) - sms := e.metricsWithThresholds[0] - assert.Equal(t, "my_metric{a:1}", sms.Name) - assert.EqualValues(t, map[string]string{"a": "1"}, sms.Sub.Tags.CloneTags()) - - e.processSamples( + e.OutputManager.AddMetricSamples( []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1", "b": "2"})}}, ) - assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink) - assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric{a:1}"].Sink) + e.Stop() + wait() + + assert.Len(t, e.MetricsEngine.ObservedMetrics, 2) + sms := e.MetricsEngine.ObservedMetrics["my_metric{a:1}"] + assert.EqualValues(t, map[string]string{"a": "1"}, sms.Sub.Tags.CloneTags()) + + assert.IsType(t, &stats.GaugeSink{}, e.MetricsEngine.ObservedMetrics["my_metric"].Sink) + assert.IsType(t, &stats.GaugeSink{}, e.MetricsEngine.ObservedMetrics["my_metric{a:1}"].Sink) }) } @@ -329,12 +334,13 @@ func TestEngineThresholdsWillAbort(t *testing.T) { thresholds := map[string]stats.Thresholds{metric.Name: ths} e, _, wait := newTestEngineWithRegistry(t, nil, nil, nil, lib.Options{Thresholds: thresholds}, registry) - defer wait() - e.processSamples( + e.OutputManager.AddMetricSamples( []stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}}, ) - assert.True(t, e.processThresholds()) + e.Stop() + wait() + assert.True(t, e.thresholdsTainted) } func TestEngineAbortedByThresholds(t *testing.T) { @@ -384,32 +390,30 @@ func TestEngine_processThresholds(t *testing.T) { t.Parallel() testdata := map[string]struct { - pass bool - ths map[string][]string - abort bool + pass bool + ths map[string][]string }{ - "passing": {true, map[string][]string{"my_metric": {"value<2"}}, false}, - "failing": {false, map[string][]string{"my_metric": {"value>1.25"}}, false}, - "aborting": {false, map[string][]string{"my_metric": {"value>1.25"}}, true}, - - "submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}, false}, - "submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}, false}, - "submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}, false}, - "submetric,nomatch,failing": {false, map[string][]string{"my_metric{a:2}": {"value>1.25"}}, false}, - - "unused,passing": {true, map[string][]string{"unused_counter": {"count==0"}}, false}, - "unused,failing": {false, map[string][]string{"unused_counter": {"count>1"}}, false}, - "unused,subm,passing": {true, map[string][]string{"unused_counter{a:2}": {"count<1"}}, false}, - "unused,subm,failing": {false, map[string][]string{"unused_counter{a:2}": {"count>1"}}, false}, - - "used,passing": {true, map[string][]string{"used_counter": {"count==2"}}, false}, - "used,failing": {false, map[string][]string{"used_counter": {"count<1"}}, false}, - "used,subm,passing": {true, map[string][]string{"used_counter{b:1}": {"count==2"}}, false}, - "used,not-subm,passing": {true, map[string][]string{"used_counter{b:2}": {"count==0"}}, false}, - "used,invalid-subm,passing1": {true, map[string][]string{"used_counter{c:''}": {"count==0"}}, false}, - "used,invalid-subm,failing1": {false, map[string][]string{"used_counter{c:''}": {"count>0"}}, false}, - "used,invalid-subm,passing2": {true, map[string][]string{"used_counter{c:}": {"count==0"}}, false}, - "used,invalid-subm,failing2": {false, map[string][]string{"used_counter{c:}": {"count>0"}}, false}, + "passing": {true, map[string][]string{"my_metric": {"value<2"}}}, + "failing": {false, map[string][]string{"my_metric": {"value>1.25"}}}, + + "submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}}, + "submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}}, + "submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}}, + "submetric,nomatch,failing": {false, map[string][]string{"my_metric{a:2}": {"value>1.25"}}}, + + "unused,passing": {true, map[string][]string{"unused_counter": {"count==0"}}}, + "unused,failing": {false, map[string][]string{"unused_counter": {"count>1"}}}, + "unused,subm,passing": {true, map[string][]string{"unused_counter{a:2}": {"count<1"}}}, + "unused,subm,failing": {false, map[string][]string{"unused_counter{a:2}": {"count>1"}}}, + + "used,passing": {true, map[string][]string{"used_counter": {"count==2"}}}, + "used,failing": {false, map[string][]string{"used_counter": {"count<1"}}}, + "used,subm,passing": {true, map[string][]string{"used_counter{b:1}": {"count==2"}}}, + "used,not-subm,passing": {true, map[string][]string{"used_counter{b:2}": {"count==0"}}}, + "used,invalid-subm,passing1": {true, map[string][]string{"used_counter{c:''}": {"count==0"}}}, + "used,invalid-subm,failing1": {false, map[string][]string{"used_counter{c:''}": {"count>0"}}}, + "used,invalid-subm,passing2": {true, map[string][]string{"used_counter{c:}": {"count==0"}}}, + "used,invalid-subm,failing2": {false, map[string][]string{"used_counter{c:}": {"count>0"}}}, } for name, data := range testdata { @@ -430,21 +434,25 @@ func TestEngine_processThresholds(t *testing.T) { ths := stats.NewThresholds(srcs) gotParseErr := ths.Parse() require.NoError(t, gotParseErr) - ths.Thresholds[0].AbortOnFail = data.abort thresholds[m] = ths } - e, _, wait := newTestEngineWithRegistry(t, nil, nil, nil, lib.Options{Thresholds: thresholds}, registry) - defer wait() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + e, run, wait := newTestEngineWithRegistry( + t, ctx, &minirunner.MiniRunner{}, nil, lib.Options{Thresholds: thresholds}, registry, + ) - e.processSamples( + e.OutputManager.AddMetricSamples( []stats.SampleContainer{ stats.Sample{Metric: gaugeMetric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}, stats.Sample{Metric: counterMetric, Value: 2, Tags: stats.IntoSampleTags(&map[string]string{"b": "1"})}, }, ) - assert.Equal(t, data.abort, e.processThresholds()) + require.NoError(t, run()) + wait() + assert.Equal(t, data.pass, !e.IsTainted()) }) } @@ -1289,6 +1297,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) engine, err := NewEngine(execScheduler, opts, rtOpts, []output.Output{mockOutput}, logger, registry) require.NoError(t, err) + require.NoError(t, engine.OutputManager.StartOutputs()) run, waitFn, err := engine.Init(ctx, ctx) // no need for 2 different contexts require.NoError(t, err) @@ -1302,6 +1311,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) cancel() waitFn() + engine.OutputManager.StopOutputs() require.False(t, engine.IsTainted()) } diff --git a/js/runner_test.go b/js/runner_test.go index 06938b55b6d..3a52d57d704 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -314,6 +314,8 @@ func TestSetupDataIsolation(t *testing.T) { execScheduler, options, lib.RuntimeOptions{}, []output.Output{mockOutput}, testutils.NewLogger(t), registry, ) require.NoError(t, err) + require.NoError(t, engine.OutputManager.StartOutputs()) + defer engine.OutputManager.StopOutputs() ctx, cancel := context.WithCancel(context.Background()) run, wait, err := engine.Init(ctx, ctx) diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go new file mode 100644 index 00000000000..cfad98094dc --- /dev/null +++ b/metrics/engine/engine.go @@ -0,0 +1,174 @@ +// Package engine contains the internal metrics engine responsible for +// aggregating metrics during the test and evaluating thresholds against them. +package engine + +import ( + "fmt" + "strings" + "sync" + + "github.com/sirupsen/logrus" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" + "go.k6.io/k6/output" + "go.k6.io/k6/stats" + "gopkg.in/guregu/null.v3" +) + +// MetricsEngine is the internal metrics engine that k6 uses to keep track of +// aggregated metric sample values. They are used to generate the end-of-test +// summary and to evaluate the test thresholds. +type MetricsEngine struct { + registry *metrics.Registry + executionState *lib.ExecutionState + options lib.Options + runtimeOptions lib.RuntimeOptions + logger logrus.FieldLogger + + // These can be both top-level metrics or sub-metrics + metricsWithThresholds []*stats.Metric + + // TODO: completely refactor: + // - make these private, + // - do not use an unnecessary map for the observed metrics + // - have one lock per metric instead of a a global one, when + // the metrics are decoupled from their types + MetricsLock sync.Mutex + ObservedMetrics map[string]*stats.Metric +} + +// NewMetricsEngine creates a new metrics Engine with the given parameters. +func NewMetricsEngine( + registry *metrics.Registry, executionState *lib.ExecutionState, + opts lib.Options, rtOpts lib.RuntimeOptions, logger logrus.FieldLogger, +) (*MetricsEngine, error) { + me := &MetricsEngine{ + registry: registry, + executionState: executionState, + options: opts, + runtimeOptions: rtOpts, + logger: logger.WithField("component", "metrics-engine"), + + ObservedMetrics: make(map[string]*stats.Metric), + } + + if !(me.runtimeOptions.NoSummary.Bool && me.runtimeOptions.NoThresholds.Bool) { + err := me.initSubMetricsAndThresholds() + if err != nil { + return nil, err + } + } + + return me, nil +} + +// GetIngester returns a pseudo-Output that uses the given metric samples to +// update the engine's inner state. +func (me *MetricsEngine) GetIngester() output.Output { + return &outputIngester{ + logger: me.logger.WithField("component", "metrics-engine-ingester"), + metricsEngine: me, + } +} + +func (me *MetricsEngine) getOrInitPotentialSubmetric(name string) (*stats.Metric, error) { + // TODO: replace with strings.Cut after Go 1.18 + nameParts := strings.SplitN(name, "{", 2) + + metric := me.registry.Get(nameParts[0]) + if metric == nil { + return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) + } + if len(nameParts) == 1 { // no sub-metric + return metric, nil + } + + if nameParts[1][len(nameParts[1])-1] != '}' { + return nil, fmt.Errorf("missing ending bracket, sub-metric format needs to be 'metric{key:value}'") + } + sm, err := metric.AddSubmetric(nameParts[1][:len(nameParts[1])-1]) + if err != nil { + return nil, err + } + return sm.Metric, nil +} + +func (me *MetricsEngine) markObserved(metric *stats.Metric) { + if !metric.Observed { + metric.Observed = true + me.ObservedMetrics[metric.Name] = metric + } +} + +func (me *MetricsEngine) initSubMetricsAndThresholds() error { + for metricName, thresholds := range me.options.Thresholds { + metric, err := me.getOrInitPotentialSubmetric(metricName) + + if me.runtimeOptions.NoThresholds.Bool { + if err != nil { + me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName) + } + continue + } + + if err != nil { + return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err) + } + + metric.Thresholds = thresholds + me.metricsWithThresholds = append(me.metricsWithThresholds, metric) + + // Mark the metric (and the parent metric, if we're dealing with a + // submetric) as observed, so they are shown in the end-of-test summary, + // even if they don't have any metric samples during the test run + me.markObserved(metric) + if metric.Sub != nil { + me.markObserved(metric.Sub.Metric) + } + } + + // TODO: refactor out of here when https://github.com/grafana/k6/issues/1321 + // lands and there is a better way to enable a metric with tag + if me.options.SystemTags.Has(stats.TagExpectedResponse) { + _, err := me.getOrInitPotentialSubmetric("http_req_duration{expected_response:true}") + if err != nil { + return err // shouldn't happen, but ¯\_(ツ)_/¯ + } + } + + return nil +} + +// ProcessThresholds processes all of the thresholds. +// +// TODO: refactor, make private, optimize +func (me *MetricsEngine) ProcessThresholds() (thresholdsTainted, shouldAbort bool) { + me.MetricsLock.Lock() + defer me.MetricsLock.Unlock() + + t := me.executionState.GetCurrentTestRunDuration() + + for _, m := range me.metricsWithThresholds { + if len(m.Thresholds.Thresholds) == 0 { + continue + } + m.Tainted = null.BoolFrom(false) + + me.logger.WithField("m", m.Name).Debug("running thresholds") + succ, err := m.Thresholds.Run(m.Sink, t) + if err != nil { + me.logger.WithField("m", m.Name).WithError(err).Error("Threshold error") + continue + } + if !succ { + me.logger.WithField("m", m.Name).Debug("Thresholds failed") + m.Tainted = null.BoolFrom(true) + thresholdsTainted = true + if m.Thresholds.Abort { + shouldAbort = true + } + } + } + + return thresholdsTainted, shouldAbort +} diff --git a/metrics/engine/ingester.go b/metrics/engine/ingester.go new file mode 100644 index 00000000000..87bdceeadf2 --- /dev/null +++ b/metrics/engine/ingester.go @@ -0,0 +1,92 @@ +package engine + +import ( + "time" + + "github.com/sirupsen/logrus" + "go.k6.io/k6/output" +) + +const collectRate = 50 * time.Millisecond + +var _ output.Output = &outputIngester{} + +// outputIngester implements the output.Output interface and can be used to +// "feed" the MetricsEngine data from a `k6 run` test run. +type outputIngester struct { + output.SampleBuffer + logger logrus.FieldLogger + + metricsEngine *MetricsEngine + periodicFlusher *output.PeriodicFlusher +} + +// Description returns a human-readable description of the output. +func (oi *outputIngester) Description() string { + return "engine" +} + +// Start the engine by initializing a new output.PeriodicFlusher +func (oi *outputIngester) Start() error { + oi.logger.Debug("Starting...") + + pf, err := output.NewPeriodicFlusher(collectRate, oi.flushMetrics) + if err != nil { + return err + } + oi.logger.Debug("Started!") + oi.periodicFlusher = pf + + return nil +} + +// Stop flushes any remaining metrics and stops the goroutine. +func (oi *outputIngester) Stop() error { + oi.logger.Debug("Stopping...") + defer oi.logger.Debug("Stopped!") + oi.periodicFlusher.Stop() + return nil +} + +// flushMetrics Writes samples to the MetricsEngine +func (oi *outputIngester) flushMetrics() { + sampleContainers := oi.GetBufferedSamples() + if len(sampleContainers) == 0 { + return + } + + oi.metricsEngine.MetricsLock.Lock() + defer oi.metricsEngine.MetricsLock.Unlock() + + // TODO: split metric samples in buckets with a *stats.Metric key; this will + // allow us to have a per-bucket lock, instead of one global one, and it + // will allow us to split apart the metric Name and Type from its Sink and + // Observed fields... + // + // And, to further optimize things, if every metric (and sub-metric) had a + // sequential integer ID, we would be able to use a slice for these buckets + // and eliminate the map loopkups altogether! + + for _, sampleContainer := range sampleContainers { + samples := sampleContainer.GetSamples() + + if len(samples) == 0 { + continue + } + + for _, sample := range samples { + m := sample.Metric // this should have come from the Registry, no need to look it up + oi.metricsEngine.markObserved(m) // mark it as observed so it shows in the end-of-test summary + m.Sink.Add(sample) // finally, add its value to its own sink + + // and also to the same for any submetrics that match the metric sample + for _, sm := range m.Submetrics { + if !sample.Tags.Contains(sm.Tags) { + continue + } + oi.metricsEngine.markObserved(sm.Metric) + sm.Metric.Sink.Add(sample) + } + } + } +} diff --git a/output/manager.go b/output/manager.go index 18aa6cc3f15..fdb88743e19 100644 --- a/output/manager.go +++ b/output/manager.go @@ -2,6 +2,8 @@ package output import ( "github.com/sirupsen/logrus" + "go.k6.io/k6/lib" + "go.k6.io/k6/stats" ) // Manager can be used to manage multiple outputs at the same time. @@ -53,3 +55,27 @@ func (om *Manager) stopOutputs(upToID int) { } } } + +// SetRunStatus checks which outputs implement the WithRunStatusUpdates +// interface and sets the provided RunStatus to them. +func (om *Manager) SetRunStatus(status lib.RunStatus) { + for _, out := range om.outputs { + if statUpdOut, ok := out.(WithRunStatusUpdates); ok { + statUpdOut.SetRunStatus(status) + } + } +} + +// AddMetricSamples is a temporary method to make the Manager usable in the +// current Engine. It needs to be replaced with the full metric pump. +// +// TODO: refactor +func (om *Manager) AddMetricSamples(sampleContainers []stats.SampleContainer) { + if len(sampleContainers) == 0 { + return + } + + for _, out := range om.outputs { + out.AddMetricSamples(sampleContainers) + } +} diff --git a/output/types.go b/output/types.go index eb623102823..42227e9e8f1 100644 --- a/output/types.go +++ b/output/types.go @@ -53,6 +53,8 @@ type Params struct { ExecutionPlan []lib.ExecutionStep } +// TODO: make v2 with buffered channels? + // An Output abstracts the process of funneling samples to an external storage // backend, such as a file or something like an InfluxDB instance. //