From da5c59f8727e713a301972b4c1c692eb5f7e33cf Mon Sep 17 00:00:00 2001 From: Nedyalko Andreev Date: Thu, 8 Apr 2021 15:02:23 +0300 Subject: [PATCH] Fix the double-counting of active VUs by the arrival-rate executors Closes https://github.com/k6io/k6/issues/1954 --- core/engine_test.go | 107 ++++++++++++++++++++++++++++++++++++++++++++ lib/execution.go | 5 ++- 2 files changed, 110 insertions(+), 2 deletions(-) diff --git a/core/engine_test.go b/core/engine_test.go index 157eced789c..88bfb94d2eb 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -381,6 +381,17 @@ func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) { return } +func getMetricMax(mo *mockoutput.MockOutput, name string) (result float64) { + for _, sc := range mo.SampleContainers { + for _, s := range sc.GetSamples() { + if s.Metric.Name == name && s.Value > result { + result = s.Value + } + } + } + return +} + const expectedHeaderMaxLength = 500 // FIXME: This test is too brittle, consider simplifying. @@ -975,3 +986,99 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { } assert.Equal(t, 1.0, count) } + +func TestActiveVUsCount(t *testing.T) { + t.Parallel() + + script := []byte(` + var sleep = require('k6').sleep; + + exports.options = { + scenarios: { + carr1: { + executor: 'constant-arrival-rate', + rate: 10, + preAllocatedVUs: 1, + maxVUs: 10, + startTime: '0s', + duration: '3s', + gracefulStop: '0s', + }, + carr2: { + executor: 'constant-arrival-rate', + rate: 10, + preAllocatedVUs: 1, + maxVUs: 10, + duration: '3s', + startTime: '3s', + gracefulStop: '0s', + }, + rarr: { + executor: 'ramping-arrival-rate', + startRate: 5, + stages: [ + { target: 10, duration: '2s' }, + { target: 0, duration: '2s' }, + ], + preAllocatedVUs: 1, + maxVUs: 10, + startTime: '6s', + gracefulStop: '0s', + }, + } + } + + exports.default = function () { + sleep(5); + } + `) + + logger := testutils.NewLogger(t) + logHook := testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels} + logger.AddHook(&logHook) + + rtOpts := lib.RuntimeOptions{CompatibilityMode: null.StringFrom("base")} + + runner, err := js.New(logger, &loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: script}, nil, rtOpts) + require.NoError(t, err) + + mockOutput := mockoutput.New() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts, err := executor.DeriveScenariosFromShortcuts(lib.Options{ + MetricSamplesBufferSize: null.NewInt(200, false), + }.Apply(runner.GetOptions())) + require.NoError(t, err) + require.Empty(t, opts.Validate()) + require.NoError(t, runner.SetOptions(opts)) + execScheduler, err := local.NewExecutionScheduler(runner, logger) + require.NoError(t, err) + engine, err := NewEngine(execScheduler, opts, rtOpts, []output.Output{mockOutput}, logger) + require.NoError(t, err) + run, waitFn, err := engine.Init(ctx, ctx) // no need for 2 different contexts + require.NoError(t, err) + + errC := make(chan error) + go func() { errC <- run() }() + + select { + case <-time.After(15 * time.Second): + t.Fatal("Test timed out") + case err := <-errC: + require.NoError(t, err) + cancel() + waitFn() + require.False(t, engine.IsTainted()) + } + + assert.Equal(t, 10.0, getMetricMax(mockOutput, metrics.VUs.Name)) + assert.Equal(t, 10.0, getMetricMax(mockOutput, metrics.VUsMax.Name)) + + logEntries := logHook.Drain() + assert.Len(t, logEntries, 3) + for _, logEntry := range logEntries { + assert.Equal(t, logrus.WarnLevel, logEntry.Level) + assert.Equal(t, "Insufficient VUs, reached 10 active VUs and cannot initialize more", logEntry.Message) + } +} diff --git a/lib/execution.go b/lib/execution.go index fbd77e56b11..b77a2fd9fdb 100644 --- a/lib/execution.go +++ b/lib/execution.go @@ -558,7 +558,8 @@ func (es *ExecutionState) SetInitVUFunc(initVUFunc InitVUFunc) { // GetUnplannedVU checks if any unplanned VUs remain to be initialized, and if // they do, it initializes one and returns it. If all unplanned VUs have already -// been initialized, it returns one from the global vus buffer. +// been initialized, it returns one from the global vus buffer, but doesn't +// automatically increment the active VUs counter in either case. // // IMPORTANT: GetUnplannedVU() doesn't do any checking if the requesting // executor is actually allowed to have the VU at this particular time. @@ -570,7 +571,7 @@ func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Ent if remVUs < 0 { logger.Debug("Reusing a previously initialized unplanned VU") atomic.AddInt64(es.uninitializedUnplannedVUs, 1) - return es.GetPlannedVU(logger, true) + return es.GetPlannedVU(logger, false) } logger.Debug("Initializing an unplanned VU, this may affect test results")