Skip to content

Commit

Permalink
Refactor VU context handling, introduce VU activation
Browse files Browse the repository at this point in the history
This cleans up how context was being handled for purposes of
interruption, and introduces a VU activation method that also handles
de-activation (i.e. returning the VU to the pool) via a callback passed
during execution.

See #1283
  • Loading branch information
Ivan Mirić committed Apr 3, 2020
1 parent 0837bcd commit fbf60b8
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 132 deletions.
31 changes: 14 additions & 17 deletions core/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,20 @@ func (e *ExecutionScheduler) GetExecutionPlan() []lib.ExecutionStep {
return e.executionPlan
}

// initVU is just a helper method that's used to both initialize the planned VUs
// initVU is a helper method that's used to both initialize the planned VUs
// in the Init() method, and also passed to executors so they can initialize
// any unplanned VUs themselves.
// TODO: actually use the context...
func (e *ExecutionScheduler) initVU(
_ context.Context, logger *logrus.Entry, engineOut chan<- stats.SampleContainer,
) (lib.VU, error) {
vu, err := e.runner.NewVU(engineOut)
if err != nil {
return nil, fmt.Errorf("error while initializing a VU: '%s'", err)
}

samplesOut chan<- stats.SampleContainer, logger *logrus.Entry,
) (lib.InitializedVU, error) {
// Get the VU ID here, so that the VUs are (mostly) ordered by their
// number in the channel buffer
vuID := e.state.GetUniqueVUIdentifier()
if err := vu.Reconfigure(int64(vuID)); err != nil {
return nil, fmt.Errorf("error while reconfiguring VU #%d: '%s'", vuID, err)
vu, err := e.runner.NewVU(int64(vuID), samplesOut)
if err != nil {
return nil, fmt.Errorf("error while initializing a VU: '%s'", err)
}

logger.Debugf("Initialized VU #%d", vuID)
return vu, nil
}
Expand All @@ -191,15 +187,16 @@ func (e *ExecutionScheduler) getRunStats() string {
}

func (e *ExecutionScheduler) initVUsConcurrently(
ctx context.Context, engineOut chan<- stats.SampleContainer, count uint64, concurrency int, logger *logrus.Entry,
ctx context.Context, samplesOut chan<- stats.SampleContainer, count uint64,
concurrency int, logger *logrus.Entry,
) chan error {
doneInits := make(chan error, count) // poor man's early-return waitgroup
limiter := make(chan struct{})

for i := 0; i < concurrency; i++ {
go func() {
for range limiter {
newVU, err := e.initVU(ctx, logger, engineOut)
newVU, err := e.initVU(samplesOut, logger)
if err == nil {
e.state.AddInitializedVU(newVU)
}
Expand All @@ -224,7 +221,7 @@ func (e *ExecutionScheduler) initVUsConcurrently(

// Init concurrently initializes all of the planned VUs and then sequentially
// initializes all of the configured executors.
func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.SampleContainer) error {
func (e *ExecutionScheduler) Init(ctx context.Context, samplesOut chan<- stats.SampleContainer) error {
logger := e.logger.WithField("phase", "local-execution-scheduler-init")

vusToInitialize := lib.GetMaxPlannedVUs(e.executionPlan)
Expand All @@ -237,7 +234,7 @@ func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.Sa
defer cancel()

e.state.SetExecutionStatus(lib.ExecutionStatusInitVUs)
doneInits := e.initVUsConcurrently(subctx, engineOut, vusToInitialize, runtime.NumCPU(), logger)
doneInits := e.initVUsConcurrently(subctx, samplesOut, vusToInitialize, runtime.NumCPU(), logger)

initializedVUs := new(uint64)
vusFmt := pb.GetFixedLengthIntFormat(int64(vusToInitialize))
Expand All @@ -264,8 +261,8 @@ func (e *ExecutionScheduler) Init(ctx context.Context, engineOut chan<- stats.Sa
}
}

e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.VU, error) {
return e.initVU(ctx, logger, engineOut)
e.state.SetInitVUFunc(func(ctx context.Context, logger *logrus.Entry) (lib.InitializedVU, error) {
return e.initVU(samplesOut, logger)
})

e.state.SetExecutionStatus(lib.ExecutionStatusInitExecutors)
Expand Down
96 changes: 37 additions & 59 deletions js/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"net/http"
"net/http/cookiejar"
"strconv"
"sync"
"time"

"github.com/dop251/goja"
Expand Down Expand Up @@ -114,15 +113,17 @@ func (r *Runner) MakeArchive() *lib.Archive {
return r.Bundle.makeArchive()
}

func (r *Runner) NewVU(samplesOut chan<- stats.SampleContainer) (lib.VU, error) {
vu, err := r.newVU(samplesOut)
// NewVU returns a new initialized VU.
func (r *Runner) NewVU(id int64, samplesOut chan<- stats.SampleContainer) (lib.InitializedVU, error) {
vu, err := r.newVU(id, samplesOut)
if err != nil {
return nil, err
}
return lib.VU(vu), nil
return lib.InitializedVU(vu), nil
}

func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
// nolint:funlen
func (r *Runner) newVU(id int64, samplesOut chan<- stats.SampleContainer) (*VU, error) {
// Instantiate a new bundle, make a VU out of it.
bi, err := r.Bundle.Instantiate()
if err != nil {
Expand Down Expand Up @@ -185,6 +186,7 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
}

vu := &VU{
ID: id,
BundleInstance: *bi,
Runner: r,
Transport: transport,
Expand All @@ -194,8 +196,8 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
Console: r.console,
BPool: bpool.NewBufferPool(100),
Samples: samplesOut,
m: &sync.Mutex{},
}
vu.Runtime.Set("__VU", vu.ID)
vu.Runtime.Set("console", common.Bind(vu.Runtime, vu.Console, vu.Context))
common.BindToGlobal(vu.Runtime, map[string]interface{}{
"open": func() {
Expand All @@ -206,11 +208,6 @@ func (r *Runner) newVU(samplesOut chan<- stats.SampleContainer) (*VU, error) {
},
})

// Give the VU an initial sense of identity.
if err := vu.Reconfigure(0); err != nil {
return nil, err
}

return vu, nil
}

Expand Down Expand Up @@ -301,7 +298,7 @@ func (r *Runner) SetOptions(opts lib.Options) error {
// Runs an exported function in its own temporary VU, optionally with an argument. Execution is
// interrupted if the context expires. No error is returned if the part does not exist.
func (r *Runner) runPart(ctx context.Context, out chan<- stats.SampleContainer, name string, arg interface{}) (goja.Value, error) {
vu, err := r.newVU(out)
vu, err := r.newVU(0, out)
if err != nil {
return goja.Undefined(), err
}
Expand Down Expand Up @@ -356,65 +353,46 @@ func (r *Runner) timeoutErrorDuration(stage string) time.Duration {
type VU struct {
BundleInstance

Runner *Runner
Transport *http.Transport
Dialer *netext.Dialer
CookieJar *cookiejar.Jar
TLSConfig *tls.Config
ID int64
Iteration int64
Runner *Runner
RunContext *context.Context
Transport *http.Transport
Dialer *netext.Dialer
CookieJar *cookiejar.Jar
TLSConfig *tls.Config
ID int64
Iteration int64

Console *console
BPool *bpool.BufferPool

Samples chan<- stats.SampleContainer

setupData goja.Value

// A VU will track the last context it was called with for cancellation.
// Note that interruptTrackedCtx is the context that is currently being tracked, while
// interruptCancel cancels an unrelated context that terminates the tracking goroutine
// without triggering an interrupt (for if the context changes).
// There are cleaner ways of handling the interruption problem, but this is a hot path that
// needs to be called thousands of times per second, which rules out anything that spawns a
// goroutine per call.
interruptTrackedCtx context.Context
interruptCancel context.CancelFunc

m *sync.Mutex
}

// Verify that VU implements lib.VU
var _ lib.VU = &VU{}
// Verify that interfaces are implemented
var _ lib.ActiveVU = &VU{}
var _ lib.InitializedVU = &VU{}

func (u *VU) Reconfigure(id int64) error {
u.ID = id
u.Iteration = 0
u.Runtime.Set("__VU", u.ID)
return nil
}
// Activate the VU so it will be able to run code
func (u *VU) Activate(params *lib.VUActivationParams) lib.ActiveVU {
u.Runtime.ClearInterrupt()
u.RunContext = &params.RunContext
// u.Env = params.Env

func (u *VU) RunOnce(ctx context.Context) error {
u.m.Lock()
defer u.m.Unlock()
// Track the context and interrupt JS execution if it's cancelled.
if u.interruptTrackedCtx != ctx {
interCtx, interCancel := context.WithCancel(context.Background())
if u.interruptCancel != nil {
u.interruptCancel()
go func() {
<-params.RunContext.Done()
u.Runtime.Interrupt(errInterrupt)
if params.DeactivateCallback != nil {
params.DeactivateCallback()
}
u.interruptCancel = interCancel
u.interruptTrackedCtx = ctx
defer interCancel()
go func() {
select {
case <-interCtx.Done():
case <-ctx.Done():
u.Runtime.Interrupt(errInterrupt)
}
}()
}
}()

return lib.ActiveVU(u)
}

// RunOnce runs the default function once.
func (u *VU) RunOnce() error {
// Unmarshall the setupData only the first time for each VU so that VUs are isolated but we
// still don't use too much CPU in the middle test
if u.setupData == nil {
Expand All @@ -430,7 +408,7 @@ func (u *VU) RunOnce(ctx context.Context) error {
}

// Call the default function.
_, isFullIteration, totalTime, err := u.runFn(ctx, u.Runner.defaultGroup, true, u.Default, u.setupData)
_, isFullIteration, totalTime, err := u.runFn(*u.RunContext, u.Runner.defaultGroup, true, u.Default, u.setupData)

// If MinIterationDuration is specified and the iteration wasn't cancelled
// and was less than it, sleep for the remainder
Expand Down
24 changes: 10 additions & 14 deletions lib/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ type ExecutionScheduler interface {
GetExecutors() []Executor

// Init initializes all executors, including all of their needed VUs.
Init(ctx context.Context, engineOut chan<- stats.SampleContainer) error
Init(ctx context.Context, samplesOut chan<- stats.SampleContainer) error

// Run the ExecutionScheduler, funneling the generated metric samples
// through the supplied out channel.
Run(ctx context.Context, engineOut chan<- stats.SampleContainer) error
Run(ctx context.Context, samplesOut chan<- stats.SampleContainer) error

// Pause a test, or start/resume it. To check if a test is paused, use
// GetState().IsPaused().
Expand Down Expand Up @@ -175,7 +175,7 @@ type ExecutionState struct {
// directly with the channel. These methods will emit a warning or can even
// return an error if retrieving a VU takes more than
// MaxTimeToWaitForPlannedVU.
vus chan VU
vus chan InitializedVU

// The current VU ID, used for the __VU execution context variable. Use the
// GetUniqueVUIdentifier() to get unique values for each VU, starting from 1
Expand Down Expand Up @@ -280,7 +280,7 @@ func NewExecutionState(options Options, et *ExecutionTuple, maxPlannedVUs, maxPo

return &ExecutionState{
Options: options,
vus: make(chan VU, maxPossibleVUs),
vus: make(chan InitializedVU, maxPossibleVUs),

executionStatus: new(uint32),
currentVUIdentifier: new(uint64),
Expand Down Expand Up @@ -529,7 +529,7 @@ func (es *ExecutionState) ResumeNotify() <-chan struct{} {
// executors might have to retrieve their reserved VUs without using them
// immediately - for example, the externally-controlled executor when the
// configured maxVUs number is greater than the configured starting VUs.
func (es *ExecutionState) GetPlannedVU(logger *logrus.Entry, modifyActiveVUCount bool) (VU, error) {
func (es *ExecutionState) GetPlannedVU(logger *logrus.Entry, modifyActiveVUCount bool) (InitializedVU, error) {
for i := 1; i <= MaxRetriesGetPlannedVU; i++ {
select {
case vu := <-es.vus:
Expand Down Expand Up @@ -566,7 +566,7 @@ func (es *ExecutionState) SetInitVUFunc(initVUFunc InitVUFunc) {
// Executors are trusted to correctly declare their needs (via their
// GetExecutionRequirements() methods) and then to never ask for more VUs than
// they have specified in those requirements.
func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Entry) (VU, error) {
func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Entry) (InitializedVU, error) {
remVUs := atomic.AddInt64(es.uninitializedUnplannedVUs, -1)
if remVUs < 0 {
logger.Debug("Reusing a previously initialized unplanned VU")
Expand All @@ -575,16 +575,12 @@ func (es *ExecutionState) GetUnplannedVU(ctx context.Context, logger *logrus.Ent
}

logger.Debug("Initializing an unplanned VU, this may affect test results")
vu, err := es.InitializeNewVU(ctx, logger)
if err == nil {
es.ModCurrentlyActiveVUsCount(+1)
}
return vu, err
return es.InitializeNewVU(ctx, logger)
}

// InitializeNewVU creates and returns a brand new VU, updating the relevant
// tracking counters.
func (es *ExecutionState) InitializeNewVU(ctx context.Context, logger *logrus.Entry) (VU, error) {
func (es *ExecutionState) InitializeNewVU(ctx context.Context, logger *logrus.Entry) (InitializedVU, error) {
if es.initVUFunc == nil {
return nil, fmt.Errorf("initVUFunc wasn't set in the execution state")
}
Expand All @@ -598,14 +594,14 @@ func (es *ExecutionState) InitializeNewVU(ctx context.Context, logger *logrus.En

// AddInitializedVU is a helper function that adds VUs into the buffer and
// increases the initialized VUs counter.
func (es *ExecutionState) AddInitializedVU(vu VU) {
func (es *ExecutionState) AddInitializedVU(vu InitializedVU) {
es.vus <- vu
es.ModInitializedVUsCount(+1)
}

// ReturnVU is a helper function that puts VUs back into the buffer and
// decreases the active VUs counter.
func (es *ExecutionState) ReturnVU(vu VU, wasActive bool) {
func (es *ExecutionState) ReturnVU(vu InitializedVU, wasActive bool) {
es.vus <- vu
if wasActive {
es.ModCurrentlyActiveVUsCount(-1)
Expand Down
10 changes: 4 additions & 6 deletions lib/executor/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
)

func sumStagesDuration(stages []Stage) (result time.Duration) {
Expand Down Expand Up @@ -81,10 +80,10 @@ func validateStages(stages []Stage) []error {
//
// TODO: emit the end-of-test iteration metrics here (https://github.com/loadimpact/k6/issues/1250)
func getIterationRunner(
executionState *lib.ExecutionState, logger *logrus.Entry, _ chan<- stats.SampleContainer,
) func(context.Context, lib.VU) {
return func(ctx context.Context, vu lib.VU) {
err := vu.RunOnce(ctx)
executionState *lib.ExecutionState, logger *logrus.Entry,
) func(context.Context, lib.ActiveVU) {
return func(ctx context.Context, vu lib.ActiveVU) {
err := vu.RunOnce()

//TODO: track (non-ramp-down) errors from script iterations as a metric,
// and have a default threshold that will abort the script when the error
Expand All @@ -101,7 +100,6 @@ func getIterationRunner(
} else {
logger.Error(err.Error())
}
//TODO: investigate context cancelled errors
}

//TODO: move emission of end-of-iteration metrics here?
Expand Down
Loading

0 comments on commit fbf60b8

Please sign in to comment.