Skip to content

Commit

Permalink
Fix the double-counting of active VUs by the arrival-rate executors
Browse files Browse the repository at this point in the history
Closes #1954
  • Loading branch information
na-- committed Apr 8, 2021
1 parent 96b67c4 commit da5c59f
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 2 deletions.
107 changes: 107 additions & 0 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,17 @@ func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) {
return
}

func getMetricMax(mo *mockoutput.MockOutput, name string) (result float64) {
for _, sc := range mo.SampleContainers {
for _, s := range sc.GetSamples() {
if s.Metric.Name == name && s.Value > result {
result = s.Value
}
}
}
return
}

const expectedHeaderMaxLength = 500

// FIXME: This test is too brittle, consider simplifying.
Expand Down Expand Up @@ -975,3 +986,99 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) {
}
assert.Equal(t, 1.0, count)
}

func TestActiveVUsCount(t *testing.T) {
t.Parallel()

script := []byte(`
var sleep = require('k6').sleep;
exports.options = {
scenarios: {
carr1: {
executor: 'constant-arrival-rate',
rate: 10,
preAllocatedVUs: 1,
maxVUs: 10,
startTime: '0s',
duration: '3s',
gracefulStop: '0s',
},
carr2: {
executor: 'constant-arrival-rate',
rate: 10,
preAllocatedVUs: 1,
maxVUs: 10,
duration: '3s',
startTime: '3s',
gracefulStop: '0s',
},
rarr: {
executor: 'ramping-arrival-rate',
startRate: 5,
stages: [
{ target: 10, duration: '2s' },
{ target: 0, duration: '2s' },
],
preAllocatedVUs: 1,
maxVUs: 10,
startTime: '6s',
gracefulStop: '0s',
},
}
}
exports.default = function () {
sleep(5);
}
`)

logger := testutils.NewLogger(t)
logHook := testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels}
logger.AddHook(&logHook)

rtOpts := lib.RuntimeOptions{CompatibilityMode: null.StringFrom("base")}

runner, err := js.New(logger, &loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: script}, nil, rtOpts)
require.NoError(t, err)

mockOutput := mockoutput.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts, err := executor.DeriveScenariosFromShortcuts(lib.Options{
MetricSamplesBufferSize: null.NewInt(200, false),
}.Apply(runner.GetOptions()))
require.NoError(t, err)
require.Empty(t, opts.Validate())
require.NoError(t, runner.SetOptions(opts))
execScheduler, err := local.NewExecutionScheduler(runner, logger)
require.NoError(t, err)
engine, err := NewEngine(execScheduler, opts, rtOpts, []output.Output{mockOutput}, logger)
require.NoError(t, err)
run, waitFn, err := engine.Init(ctx, ctx) // no need for 2 different contexts
require.NoError(t, err)

errC := make(chan error)
go func() { errC <- run() }()

select {
case <-time.After(15 * time.Second):
t.Fatal("Test timed out")
case err := <-errC:
require.NoError(t, err)
cancel()
waitFn()
require.False(t, engine.IsTainted())
}

assert.Equal(t, 10.0, getMetricMax(mockOutput, metrics.VUs.Name))
assert.Equal(t, 10.0, getMetricMax(mockOutput, metrics.VUsMax.Name))

logEntries := logHook.Drain()
assert.Len(t, logEntries, 3)
for _, logEntry := range logEntries {
assert.Equal(t, logrus.WarnLevel, logEntry.Level)
assert.Equal(t, "Insufficient VUs, reached 10 active VUs and cannot initialize more", logEntry.Message)
}
}
5 changes: 3 additions & 2 deletions lib/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ func (es *ExecutionState) SetInitVUFunc(initVUFunc InitVUFunc) {

// GetUnplannedVU checks if any unplanned VUs remain to be initialized, and if
// they do, it initializes one and returns it. If all unplanned VUs have already
// been initialized, it returns one from the global vus buffer.
// been initialized, it returns one from the global vus buffer, but doesn't
// automatically increment the active VUs counter in either case.
//
// IMPORTANT: GetUnplannedVU() doesn't do any checking if the requesting
// executor is actually allowed to have the VU at this particular time.
Expand All @@ -570,7 +571,7 @@ func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Ent
if remVUs < 0 {
logger.Debug("Reusing a previously initialized unplanned VU")
atomic.AddInt64(es.uninitializedUnplannedVUs, 1)
return es.GetPlannedVU(logger, true)
return es.GetPlannedVU(logger, false)
}

logger.Debug("Initializing an unplanned VU, this may affect test results")
Expand Down

0 comments on commit da5c59f

Please sign in to comment.