diff --git a/core/local/local.go b/core/local/local.go index 7fecc4651eb..b0b8b994e1c 100644 --- a/core/local/local.go +++ b/core/local/local.go @@ -145,24 +145,20 @@ func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep { return e.executionPlan } -// initVU is just a helper method that's used to both initialize the planned VUs +// initVU is a helper method that's used to both initialize the planned VUs // in the Init() method, and also passed to executors so they can initialize // any unplanned VUs themselves. -// TODO: actually use the context... func (e *ExecutionScheduler) initVU( - _ context.Context, logger *logrus.Entry, engineOut chan<- stats.SampleContainer, -) (lib.VU, error) { - vu, err := e.runner.NewVU(engineOut) - if err != nil { - return nil, fmt.Errorf("error while initializing a VU: '%s'", err) - } - + samplesOut chan<- stats.SampleContainer, logger *logrus.Entry, +) (lib.InitializedVU, error) { // Get the VU ID here, so that the VUs are (mostly) ordered by their // number in the channel buffer vuID := e.state.GetUniqueVUIdentifier() - if err := vu.Reconfigure(int64(vuID)); err != nil { - return nil, fmt.Errorf("error while reconfiguring VU #%d: '%s'", vuID, err) + vu, err := e.runner.NewVU(int64(vuID), samplesOut) + if err != nil { + return nil, fmt.Errorf("error while initializing a VU: '%s'", err) } + logger.Debugf("Initialized VU #%d", vuID) return vu, nil } @@ -188,7 +184,8 @@ func (e *ExecutionScheduler) getRunStats() string { } func (e *ExecutionScheduler) initVUsConcurrently( - ctx context.Context, engineOut chan<- stats.SampleContainer, count uint64, concurrency int, logger *logrus.Entry, + ctx context.Context, samplesOut chan<- stats.SampleContainer, count uint64, + concurrency int, logger *logrus.Entry, ) chan error { doneInits := make(chan error, count) // poor man's early-return waitgroup limiter := make(chan struct{}) @@ -196,7 +193,7 @@ func (e *ExecutionScheduler) initVUsConcurrently( for i := 0; i < concurrency; i++ { go func() { for range limiter { - newVU, err := e.initVU(ctx, logger, engineOut) + newVU, err := e.initVU(samplesOut, logger) if err == nil { e.state.AddInitializedVU(newVU) } @@ -221,7 +218,7 @@ func (e *ExecutionScheduler) initVUsConcurrently( // Init concurrently initializes all of the planned VUs and then sequentially // initializes all of the configured executors. -func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.SampleContainer) error { +func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- stats.SampleContainer) error { logger := e.logger.WithField("phase", "local-execution-scheduler-init") vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan) @@ -234,7 +231,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.Sa defer cancel() e.state.SetExecutionStatus(lib.ExecutionStatusInitVUs) - doneInits := e.initVUsConcurrently(subctx, engineOut, vusToInitialize, runtime.NumCPU(), logger) + doneInits := e.initVUsConcurrently(subctx, samplesOut, vusToInitialize, runtime.NumCPU(), logger) initializedVUs := new(uint64) vusFmt := pb.GetFixedLengthIntFormat(int64(vusToInitialize)) @@ -261,8 +258,8 @@ func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.Sa } } - e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.VU, error) { - return e.initVU(ctx, logger, engineOut) + e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) { + return e.initVU(samplesOut, logger) }) e.state.SetExecutionStatus(lib.ExecutionStatusInitExecutors) diff --git a/js/runner.go b/js/runner.go index aba2ca7d0df..88df420d89c 100644 --- a/js/runner.go +++ b/js/runner.go @@ -28,7 +28,6 @@ import ( "net/http" "net/http/cookiejar" "strconv" - "sync" "time" "github.com/dop251/goja" @@ -114,15 +113,17 @@ func (r *Runner) MakeArchive() *lib.Archive { return r.Bundle.makeArchive() } -func (r *Runner) NewVU(samplesOut chan<- stats.SampleContainer) (lib.VU, error) { - vu, err := r.newVU(samplesOut) +// NewVU returns a new initialized VU. +func (r *Runner) NewVU(id int64, samplesOut chan<- stats.SampleContainer) (lib.InitializedVU, error) { + vu, err := r.newVU(id, samplesOut) if err != nil { return nil, err } - return lib.VU(vu), nil + return lib.InitializedVU(vu), nil } -func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) { +// nolint:funlen +func (r *Runner) newVU(id int64, samplesOut chan<- stats.SampleContainer) (*VU, error) { // Instantiate a new bundle, make a VU out of it. bi, err := r.Bundle.Instantiate() if err != nil { @@ -185,6 +186,7 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) { } vu := &VU{ + ID: id, BundleInstance: *bi, Runner: r, Transport: transport, @@ -194,8 +196,8 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) { Console: r.console, BPool: bpool.NewBufferPool(100), Samples: samplesOut, - m: &sync.Mutex{}, } + vu.Runtime.Set("__VU", vu.ID) vu.Runtime.Set("console", common.Bind(vu.Runtime, vu.Console, vu.Context)) common.BindToGlobal(vu.Runtime, map[string]interface{}{ "open": func() { @@ -206,11 +208,6 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) { }, }) - // Give the VU an initial sense of identity. - if err := vu.Reconfigure(0); err != nil { - return nil, err - } - return vu, nil } @@ -301,7 +298,7 @@ func (r *Runner) SetOptions(opts lib.Options) error { // Runs an exported function in its own temporary VU, optionally with an argument. Execution is // interrupted if the context expires. No error is returned if the part does not exist. func (r *Runner) runPart(ctx context.Context, out chan<- stats.SampleContainer, name string, arg interface{}) (goja.Value, error) { - vu, err := r.newVU(out) + vu, err := r.newVU(0, out) if err != nil { return goja.Undefined(), err } @@ -356,13 +353,14 @@ func (r *Runner) timeoutErrorDuration(stage string) time.Duration { type VU struct { BundleInstance - Runner *Runner - Transport *http.Transport - Dialer *netext.Dialer - CookieJar *cookiejar.Jar - TLSConfig *tls.Config - ID int64 - Iteration int64 + Runner *Runner + RunContext *context.Context + Transport *http.Transport + Dialer *netext.Dialer + CookieJar *cookiejar.Jar + TLSConfig *tls.Config + ID int64 + Iteration int64 Console *console BPool *bpool.BufferPool @@ -370,51 +368,31 @@ type VU struct { Samples chan<- stats.SampleContainer setupData goja.Value - - // A VU will track the last context it was called with for cancellation. - // Note that interruptTrackedCtx is the context that is currently being tracked, while - // interruptCancel cancels an unrelated context that terminates the tracking goroutine - // without triggering an interrupt (for if the context changes). - // There are cleaner ways of handling the interruption problem, but this is a hot path that - // needs to be called thousands of times per second, which rules out anything that spawns a - // goroutine per call. - interruptTrackedCtx context.Context - interruptCancel context.CancelFunc - - m *sync.Mutex } -// Verify that VU implements lib.VU -var _ lib.VU = &VU{} +// Verify that interfaces are implemented +var _ lib.ActiveVU = &VU{} +var _ lib.InitializedVU = &VU{} -func (u *VU) Reconfigure(id int64) error { - u.ID = id - u.Iteration = 0 - u.Runtime.Set("__VU", u.ID) - return nil -} +// Activate the VU so it will be able to run code +func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU { + u.Runtime.ClearInterrupt() + u.RunContext = ¶ms.RunContext + // u.Env = params.Env -func (u *VU) RunOnce(ctx context.Context) error { - u.m.Lock() - defer u.m.Unlock() - // Track the context and interrupt JS execution if it's cancelled. - if u.interruptTrackedCtx != ctx { - interCtx, interCancel := context.WithCancel(context.Background()) - if u.interruptCancel != nil { - u.interruptCancel() + go func() { + <-params.RunContext.Done() + u.Runtime.Interrupt(errInterrupt) + if params.DeactivateCallback != nil { + params.DeactivateCallback() } - u.interruptCancel = interCancel - u.interruptTrackedCtx = ctx - defer interCancel() - go func() { - select { - case <-interCtx.Done(): - case <-ctx.Done(): - u.Runtime.Interrupt(errInterrupt) - } - }() - } + }() + + return lib.ActiveVU(u) +} +// RunOnce runs the default function once. +func (u *VU) RunOnce() error { // Unmarshall the setupData only the first time for each VU so that VUs are isolated but we // still don't use too much CPU in the middle test if u.setupData == nil { @@ -430,7 +408,7 @@ func (u *VU) RunOnce(ctx context.Context) error { } // Call the default function. - _, isFullIteration, totalTime, err := u.runFn(ctx, u.Runner.defaultGroup, true, u.Default, u.setupData) + _, isFullIteration, totalTime, err := u.runFn(*u.RunContext, u.Runner.defaultGroup, true, u.Default, u.setupData) // If MinIterationDuration is specified and the iteration wasn't cancelled // and was less than it, sleep for the remainder diff --git a/lib/execution.go b/lib/execution.go index df894a42671..3975db654ed 100644 --- a/lib/execution.go +++ b/lib/execution.go @@ -58,11 +58,11 @@ type ExecutionScheduler interface { GetExecutors() []Executor // Init initializes all executors, including all of their needed VUs. - Init(ctx context.Context, engineOut chan<- stats.SampleContainer) error + Init(ctx context.Context, samplesOut chan<- stats.SampleContainer) error // Run the ExecutionScheduler, funneling the generated metric samples // through the supplied out channel. - Run(ctx context.Context, engineOut chan<- stats.SampleContainer) error + Run(ctx context.Context, samplesOut chan<- stats.SampleContainer) error // Pause a test, or start/resume it. To check if a test is paused, use // GetState().IsPaused(). @@ -172,7 +172,7 @@ type ExecutionState struct { // directly with the channel. These methods will emit a warning or can even // return an error if retrieving a VU takes more than // MaxTimeToWaitForPlannedVU. - vus chan VU + vus chan InitializedVU // The current VU ID, used for the __VU execution context variable. Use the // GetUniqueVUIdentifier() to get unique values for each VU, starting from 1 @@ -277,7 +277,7 @@ func NewExecutionState(options Options, maxPlannedVUs, maxPossibleVUs uint64) *E return &ExecutionState{ Options: options, - vus: make(chan VU, maxPossibleVUs), + vus: make(chan InitializedVU, maxPossibleVUs), executionStatus: new(uint32), currentVUIdentifier: new(uint64), @@ -525,7 +525,7 @@ func (es *ExecutionState) ResumeNotify() <-chan struct{} { // executors might have to retrieve their reserved VUs without using them // immediately - for example, the externally-controlled executor when the // configured maxVUs number is greater than the configured starting VUs. -func (es *ExecutionState) GetPlannedVU(logger *logrus.Entry, modifyActiveVUCount bool) (VU, error) { +func (es *ExecutionState) GetPlannedVU(logger *logrus.Entry, modifyActiveVUCount bool) (InitializedVU, error) { for i := 1; i <= MaxRetriesGetPlannedVU; i++ { select { case vu := <-es.vus: @@ -562,7 +562,7 @@ func (es *ExecutionState) SetInitVUFunc(initVUFunc InitVUFunc) { // Executors are trusted to correctly declare their needs (via their // GetExecutionRequirements() methods) and then to never ask for more VUs than // they have specified in those requirements. -func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Entry) (VU, error) { +func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Entry) (InitializedVU, error) { remVUs := atomic.AddInt64(es.uninitializedUnplannedVUs, -1) if remVUs < 0 { logger.Debug("Reusing a previously initialized unplanned VU") @@ -571,16 +571,12 @@ func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Ent } logger.Debug("Initializing an unplanned VU, this may affect test results") - vu, err := es.InitializeNewVU(ctx, logger) - if err == nil { - es.ModCurrentlyActiveVUsCount(+1) - } - return vu, err + return es.InitializeNewVU(ctx, logger) } // InitializeNewVU creates and returns a brand new VU, updating the relevant // tracking counters. -func (es *ExecutionState) InitializeNewVU(ctx context.Context, logger *logrus.Entry) (VU, error) { +func (es *ExecutionState) InitializeNewVU(ctx context.Context, logger *logrus.Entry) (InitializedVU, error) { if es.initVUFunc == nil { return nil, fmt.Errorf("initVUFunc wasn't set in the execution state") } @@ -594,14 +590,14 @@ func (es *ExecutionState) InitializeNewVU(ctx context.Context, logger *logrus.En // AddInitializedVU is a helper function that adds VUs into the buffer and // increases the initialized VUs counter. -func (es *ExecutionState) AddInitializedVU(vu VU) { +func (es *ExecutionState) AddInitializedVU(vu InitializedVU) { es.vus <- vu es.ModInitializedVUsCount(+1) } // ReturnVU is a helper function that puts VUs back into the buffer and // decreases the active VUs counter. -func (es *ExecutionState) ReturnVU(vu VU, wasActive bool) { +func (es *ExecutionState) ReturnVU(vu InitializedVU, wasActive bool) { es.vus <- vu if wasActive { es.ModCurrentlyActiveVUsCount(-1) diff --git a/lib/executor/helpers.go b/lib/executor/helpers.go index 1894047f5ab..06e8115c4c6 100644 --- a/lib/executor/helpers.go +++ b/lib/executor/helpers.go @@ -32,7 +32,6 @@ import ( "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/lib/types" - "github.com/loadimpact/k6/stats" ) const minDuration = 1 * time.Second @@ -43,10 +42,10 @@ const minDuration = 1 * time.Second // // TODO: emit the end-of-test iteration metrics here (https://github.com/loadimpact/k6/issues/1250) func getIterationRunner( - executionState *lib.ExecutionState, logger *logrus.Entry, _ chan<- stats.SampleContainer, -) func(context.Context, lib.VU) { - return func(ctx context.Context, vu lib.VU) { - err := vu.RunOnce(ctx) + executionState *lib.ExecutionState, logger *logrus.Entry, +) func(context.Context, lib.ActiveVU) { + return func(ctx context.Context, vu lib.ActiveVU) { + err := vu.RunOnce() //TODO: track (non-ramp-down) errors from script iterations as a metric, // and have a default threshold that will abort the script when the error diff --git a/lib/executor/vu_handle.go b/lib/executor/vu_handle.go index c97e221096a..10d2e3f5971 100644 --- a/lib/executor/vu_handle.go +++ b/lib/executor/vu_handle.go @@ -37,8 +37,8 @@ import ( type vuHandle struct { mutex *sync.RWMutex parentCtx context.Context - getVU func() (lib.VU, error) - returnVU func(lib.VU) + getVU func() (lib.InitializedVU, error) + returnVU func(lib.InitializedVU) canStartIter chan struct{} @@ -48,7 +48,7 @@ type vuHandle struct { } func newStoppedVUHandle( - parentCtx context.Context, getVU func() (lib.VU, error), returnVU func(lib.VU), logger *logrus.Entry, + parentCtx context.Context, getVU func() (lib.InitializedVU, error), returnVU func(lib.InitializedVU), logger *logrus.Entry, ) *vuHandle { lock := &sync.RWMutex{} ctx, cancel := context.WithCancel(parentCtx) @@ -101,15 +101,11 @@ func (vh *vuHandle) hardStop() { //TODO: simplify this somehow - I feel like there should be a better way to //implement this logic... maybe with sync.Cond? -func (vh *vuHandle) runLoopsIfPossible(runIter func(context.Context, lib.VU)) { +func (vh *vuHandle) runLoopsIfPossible(runIter func(context.Context, lib.ActiveVU)) { executorDone := vh.parentCtx.Done() - var vu lib.VU - defer func() { - if vu != nil { - vh.returnVU(vu) - } - }() + var vu lib.ActiveVU + var deactivateVU func() mainLoop: for { @@ -127,12 +123,9 @@ mainLoop: return default: // We're not running, but the executor isn't done yet, so we wait - // for either one of those conditions. But before that, we'll return - // our VU to the pool, if we have it. - if vu != nil { - vh.returnVU(vu) - vu = nil - } + // for either one of those conditions. But before that, clear + // the VU reference to ensure we get a fresh one below. + vu = nil select { case <-canStartIter: // continue on, we were unblocked... @@ -154,13 +147,19 @@ mainLoop: default: } - // Ensure we have a VU + // Ensure we have an active VU if vu == nil { - freshVU, err := vh.getVU() + initVU, err := vh.getVU() if err != nil { return } - vu = freshVU + deactivateVU = func() { + vh.returnVU(initVU) + } + vu = initVU.Activate(&lib.VUActivationParams{ + RunContext: ctx, + DeactivateCallback: deactivateVU, + }) } runIter(ctx, vu) diff --git a/lib/executors.go b/lib/executors.go index ea914688131..52d665d9ba7 100644 --- a/lib/executors.go +++ b/lib/executors.go @@ -29,10 +29,11 @@ import ( "sync" "time" - "github.com/loadimpact/k6/stats" - "github.com/loadimpact/k6/ui/pb" "github.com/sirupsen/logrus" null "gopkg.in/guregu/null.v3" + + "github.com/loadimpact/k6/stats" + "github.com/loadimpact/k6/ui/pb" ) //TODO: remove globals and use some type of explicit dependency injection? @@ -110,7 +111,7 @@ type ExecutorConfig interface { // InitVUFunc is just a shorthand so we don't have to type the function // signature every time. -type InitVUFunc func(context.Context, *logrus.Entry) (VU, error) +type InitVUFunc func(context.Context, *logrus.Entry) (InitializedVU, error) // Executor is the interface all executors should implement type Executor interface { diff --git a/lib/runner.go b/lib/runner.go index 41b68285198..ff5e389e281 100644 --- a/lib/runner.go +++ b/lib/runner.go @@ -26,6 +26,32 @@ import ( "github.com/loadimpact/k6/stats" ) +// ActiveVU represents an actively running virtual user. +type ActiveVU interface { + // Runs the VU once. The only way to interrupt the execution is to cancel + // the context given to InitializedVU.Activate() + RunOnce() error +} + +// InitializedVU represents a virtual user ready for work. It needs to be +// activated (i.e. given a context) before it can actually be used. Activation +// also requires a callback function, which will be called when the supplied +// context is done. That way, VUs can be returned to a pool and reused. +type InitializedVU interface { + // Fully activate the VU so it will be able to run code + Activate(*VUActivationParams) ActiveVU +} + +// VUActivationParams are supplied by each executor when it retrieves a VU from +// the buffer pool and activates it for use. +type VUActivationParams struct { + RunContext context.Context + DeactivateCallback func() + // Env map[string]string + // Tags map[string]string + // Exec null.String +} + // A Runner is a factory for VUs. It should precompute as much as possible upon // creation (parse ASTs, load files into memory, etc.), so that spawning VUs // becomes as fast as possible. The Runner doesn't actually *do* anything in @@ -42,8 +68,7 @@ type Runner interface { // Spawns a new VU. It's fine to make this function rather heavy, if it means a performance // improvement at runtime. Remember, this is called once per VU and normally only at the start // of a test - RunOnce() may be called hundreds of thousands of times, and must be fast. - //TODO: pass context.Context, so initialization can be killed properly... - NewVU(out chan<- stats.SampleContainer) (VU, error) + NewVU(id int64, out chan<- stats.SampleContainer) (InitializedVU, error) // Runs pre-test setup, if applicable. Setup(ctx context.Context, out chan<- stats.SampleContainer) error @@ -66,15 +91,3 @@ type Runner interface { GetOptions() Options SetOptions(opts Options) error } - -// A VU is a Virtual User, that can be scheduled by an Executor. -type VU interface { - // Runs the VU once. The VU is responsible for handling the Halting Problem, eg. making sure - // that execution actually stops when the context is cancelled. - RunOnce(ctx context.Context) error - - // Assign the VU a new ID. Called by the Executor upon creation, but may be called multiple - // times if the VU is recycled because the test was scaled down and then back up. - //TODO: support reconfiguring of env vars, tags and exec - Reconfigure(id int64) error -}