Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ramping VUs executor #2155

Merged
merged 10 commits into from
Nov 29, 2021
293 changes: 176 additions & 117 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (vlvc RampingVUsConfig) GetExecutionRequirements(et *lib.ExecutionTuple) []

// NewExecutor creates a new RampingVUs executor
func (vlvc RampingVUsConfig) NewExecutor(es *lib.ExecutionState, logger *logrus.Entry) (lib.Executor, error) {
return RampingVUs{
return &RampingVUs{
BaseExecutor: NewBaseExecutor(vlvc, es, logger),
config: vlvc,
}, nil
Expand All @@ -486,174 +486,233 @@ func (vlvc RampingVUsConfig) HasWork(et *lib.ExecutionTuple) bool {
type RampingVUs struct {
*BaseExecutor
config RampingVUsConfig

rawSteps, gracefulSteps []lib.ExecutionStep
}

// Make sure we implement the lib.Executor interface.
var _ lib.Executor = &RampingVUs{}

// Init initializes the rampingVUs executor by precalculating the raw
// and graceful steps.
func (vlv *RampingVUs) Init(_ context.Context) error {
vlv.rawSteps = vlv.config.getRawExecutionSteps(
vlv.executionState.ExecutionTuple, true,
)
vlv.gracefulSteps = vlv.config.GetExecutionRequirements(
vlv.executionState.ExecutionTuple,
)
return nil
}

// Run constantly loops through as many iterations as possible on a variable
// number of VUs for the specified stages.
//
// TODO: split up? since this does a ton of things, unfortunately I can't think
// of a less complex way to implement it (besides the old "increment by 100ms
// and see what happens)... :/ so maybe see how it can be split?
// nolint:funlen,gocognit,cyclop
func (vlv RampingVUs) Run(
parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics,
) (err error) {
rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps)
func (vlv *RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error {
regularDuration, isFinal := lib.GetEndOffset(vlv.rawSteps)
if !isFinal {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration)
}

gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps)
maxDuration, isFinal := lib.GetEndOffset(vlv.gracefulSteps)
if !isFinal {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration)
}
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop)
startTime, maxDurationCtx, regularDurationCtx, cancel := getDurationContexts(
ctx, regularDuration, maxDuration-regularDuration,
)
defer cancel()

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()
maxVUs := lib.GetMaxPlannedVUs(vlv.gracefulSteps)

// Make sure the log and the progress bar have accurate information
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs,
"duration": regularDuration, "numStages": len(vlv.config.Stages),
},
).Debug("Starting executor run...")

activeVUsCount := new(int64)
vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs))
regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currentlyActiveVUs := atomic.LoadInt64(activeVUsCount)
vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
if spent > regularDuration {
return 1, []string{vus, regularDuration.String()}
}
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr
return float64(spent) / float64(regularDuration), []string{progVUs, progDur}
}
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn)

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
runIteration := getIterationRunner(vlv.executionState, vlv.logger)
getVU := func() (lib.InitializedVU, error) {
initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false)
if err != nil {
vlv.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
} else {
activeVUs.Add(1)
atomic.AddInt64(activeVUsCount, 1)
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
}
return initVU, err
}
returnVU := func(initVU lib.InitializedVU) {
vlv.executionState.ReturnVU(initVU, false)
atomic.AddInt64(activeVUsCount, -1)
activeVUs.Done()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
"type": vlv.config.GetType(),
"startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple),
"maxVUs": maxVUs,
"duration": regularDuration,
"numStages": len(vlv.config.Stages),
}).Debug("Starting executor run...")

runState := &rampingVUsRunState{
executor: vlv,
vuHandles: make([]*vuHandle, maxVUs),
maxVUs: maxVUs,
activeVUsCount: new(int64),
started: startTime,
runIteration: getIterationRunner(vlv.executionState, vlv.logger),
}

progressFn := runState.makeProgressFn(regularDuration)
maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: startTime,
StartTime: runState.started,
ProgressFn: progressFn,
})
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(ctx, maxDurationCtx, regularDurationCtx, vlv, progressFn)

vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters,
&vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i))
go vuHandle.runLoopsIfPossible(runIteration)
vuHandles[i] = vuHandle
}
defer runState.wg.Wait()
// this will populate stopped VUs and run runLoopsIfPossible on each VU
// handle in a new goroutine
runState.runLoopsIfPossible(maxDurationCtx, cancel)

// 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs
var currentScheduledVUs, currentMaxAllowedVUs uint64
var (
handleNewMaxAllowedVUs = runState.maxAllowedVUsHandlerStrategy()
handleNewScheduledVUs = runState.scheduledVUsHandlerStrategy()
)
handledGracefulSteps := runState.iterateSteps(
ctx,
handleNewMaxAllowedVUs,
handleNewScheduledVUs,
)
go runState.runRemainingGracefulSteps(
ctx,
handleNewMaxAllowedVUs,
handledGracefulSteps,
)
return nil
}

handleNewScheduledVUs := func(newScheduledVUs uint64) {
if newScheduledVUs > currentScheduledVUs {
for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ {
_ = vuHandles[vuNum].start() // TODO handle error
}
} else {
for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ {
vuHandles[vuNum].gracefulStop()
}
// rampingVUsRunState is created and initialized by the Run() method
// of the ramping VUs executor. It is used to track and modify various
// details of the execution.
type rampingVUsRunState struct {
executor *RampingVUs
vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs
maxVUs uint64 // the scaled number of initially configured MaxVUs
activeVUsCount *int64 // the current number of active VUs, used only for the progress display
started time.Time
wg sync.WaitGroup

runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration
}

func (rs *rampingVUsRunState) makeProgressFn(regular time.Duration) (progressFn func() (float64, []string)) {
vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs))
regularDuration := pb.GetFixedLengthDuration(regular, regular)

return func() (float64, []string) {
spent := time.Since(rs.started)
cur := atomic.LoadInt64(rs.activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs)
if spent > regular {
return 1, []string{progVUs, regular.String()}
}
currentScheduledVUs = newScheduledVUs
status := pb.GetFixedLengthDuration(spent, regular) + "/" + regularDuration
return float64(spent) / float64(regular), []string{progVUs, status}
}
}

handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) {
if newMaxAllowedVUs < currentMaxAllowedVUs {
for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ {
vuHandles[vuNum].hardStop()
}
func (rs *rampingVUsRunState) runLoopsIfPossible(ctx context.Context, cancel func()) {
getVU := func() (lib.InitializedVU, error) {
pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false)
if err != nil {
rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
return pvu, err
}
currentMaxAllowedVUs = newMaxAllowedVUs
rs.wg.Add(1)
atomic.AddInt64(rs.activeVUsCount, 1)
rs.executor.executionState.ModCurrentlyActiveVUsCount(+1)
return pvu, err
}
returnVU := func(initVU lib.InitializedVU) {
rs.executor.executionState.ReturnVU(initVU, false)
atomic.AddInt64(rs.activeVUsCount, -1)
rs.wg.Done()
rs.executor.executionState.ModCurrentlyActiveVUsCount(-1)
}
for i := uint64(0); i < rs.maxVUs; i++ {
rs.vuHandles[i] = newStoppedVUHandle(
ctx, getVU, returnVU, rs.executor.nextIterationCounters,
&rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i))
go rs.vuHandles[i].runLoopsIfPossible(rs.runIteration)
}
}

wait := waiter(parentCtx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
// gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop timeouts
// iterateSteps iterates over rawSteps and gracefulSteps in order according to
// their TimeOffsets, prioritizing rawSteps. It stops iterating once rawSteps
// are over. And it returns the number of handled gracefulSteps.
func (rs *rampingVUsRunState) iterateSteps(
ctx context.Context,
handleNewMaxAllowedVUs, handleNewScheduledVUs func(lib.ExecutionStep),
) (handledGracefulSteps int) {
wait := waiter(ctx, rs.started)
i, j := 0, 0
for i != len(rawExecutionSteps) {
if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset {
if wait(gracefulExecutionSteps[j].TimeOffset) {
return
for i != len(rs.executor.rawSteps) {
r, g := rs.executor.rawSteps[i], rs.executor.gracefulSteps[j]
if g.TimeOffset < r.TimeOffset {
if wait(g.TimeOffset) {
break
}
handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs)
handleNewMaxAllowedVUs(g)
j++
} else {
if wait(rawExecutionSteps[i].TimeOffset) {
return
if wait(r.TimeOffset) {
break
}
handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs)
handleNewScheduledVUs(r)
i++
}
}
return j
}

go func() { // iterate over the remaining gracefulExecutionSteps
for _, step := range gracefulExecutionSteps[j:] {
if wait(step.TimeOffset) {
return
}
handleNewMaxAllowedVUs(step.PlannedVUs)
// runRemainingGracefulSteps runs the remaining gracefulSteps concurrently
// before the gracefulStop timeout period stops VUs.
//
// This way we will have run the gracefulSteps at the same time while
// waiting for the VUs to finish.
//
// (gracefulStop = maxDuration-regularDuration)
func (rs *rampingVUsRunState) runRemainingGracefulSteps(
ctx context.Context,
handleNewMaxAllowedVUs func(lib.ExecutionStep),
handledGracefulSteps int,
) {
wait := waiter(ctx, rs.started)
for _, s := range rs.executor.gracefulSteps[handledGracefulSteps:] {
if wait(s.TimeOffset) {
return
}
}()
handleNewMaxAllowedVUs(s)
}
}

return nil
func (rs *rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64 // current number of planned graceful VUs
return func(graceful lib.ExecutionStep) {
pv := graceful.PlannedVUs
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].hardStop()
}
cur = pv
}
}

func (rs *rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64 // current number of planned raw VUs
return func(raw lib.ExecutionStep) {
pv := raw.PlannedVUs
for ; cur < pv; cur++ {
_ = rs.vuHandles[cur].start() // TODO: handle the error
}
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].gracefulStop()
}
}
}

// waiter returns a function that will sleep/wait for the required time since the startTime and then
// return. If the context was done before that it will return true otherwise it will return false
// TODO use elsewhere
// TODO set startTime here?
// TODO set start here?
// TODO move it to a struct type or something and benchmark if that makes a difference
func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool {
func waiter(ctx context.Context, start time.Time) func(offset time.Duration) bool {
timer := time.NewTimer(time.Hour * 24)
return func(offset time.Duration) bool {
offsetDiff := offset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(offsetDiff)
diff := offset - time.Since(start)
if diff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(diff)
select {
case <-ctx.Done():
return true // exit if context is cancelled
Expand Down
Loading