Skip to content

Commit

Permalink
Refactor ramping VUs executor
Browse files Browse the repository at this point in the history
RampingVUs.Run method was complex and this refactors it by adding
rampingVUsRunState to share run state between functions used by
the Run method.

+ Makes the Run method easier to read and understand
+ Makes it explicit which Goroutines are being fired in the Run
+ Separates responsibilities to other parts like:
  + rampingVUsRunState and its methods.
  + Moves trackProgress Goroutine from the makeProgressFn
    to the Run method
  + Makes initializeVUs to only handle initializing GRs
  + Uses two strategy functions to make them reusable
    and clearer. handleVUs and handleRemainingVUs use them.
+ Makes the handleVUs algorithm easier to understand and
  manage.
  • Loading branch information
inancgumus committed Sep 29, 2021
1 parent 36d699c commit 3282080
Showing 1 changed file with 160 additions and 120 deletions.
280 changes: 160 additions & 120 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,154 +493,194 @@ var _ lib.Executor = &RampingVUs{}

// Run constantly loops through as many iterations as possible on a variable
// number of VUs for the specified stages.
//
// TODO: split up? since this does a ton of things, unfortunately I can't think
// of a less complex way to implement it (besides the old "increment by 100ms
// and see what happens)... :/ so maybe see how it can be split?
// nolint:funlen,gocognit,cyclop
func (vlv RampingVUs) Run(
parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics,
) (err error) {
rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration)
func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error {
stepsRaw := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regDur, finalRaw := lib.GetEndOffset(stepsRaw)
if !finalRaw {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur)
}
stepsGraceful := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDur, finalGraceful := lib.GetEndOffset(stepsGraceful)
if !finalGraceful {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur)
}
startMaxVUs := lib.GetMaxPlannedVUs(stepsGraceful)
startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur)
defer cancel()

gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration)
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(),
"startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple),
"maxVUs": startMaxVUs,
"duration": regDur,
"numStages": len(vlv.config.Stages),
}).Debug("Starting executor run...")

runState := &rampingVUsRunState{
executor: vlv,
wg: new(sync.WaitGroup),
vuHandles: make([]*vuHandle, startMaxVUs),
startMaxVUs: startMaxVUs,
activeVUsCount: new(int64),
started: startTime,
stepsRaw: stepsRaw,
stepsGraceful: stepsGraceful,
runIteration: getIterationRunner(vlv.executionState, vlv.logger),
}
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop)
defer cancel()
progressFn := runState.makeProgressFn(regDur)
maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: runState.started,
ProgressFn: progressFn,
})
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn)

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()
defer runState.wg.Wait()
runState.initializeVUs(maxDurCtx, cancel)
for i := uint64(0); i < runState.startMaxVUs; i++ {
go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration)
}
runState.handleVUs(ctx)
go runState.handleRemainingVUs(ctx)

// Make sure the log and the progress bar have accurate information
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs,
"duration": regularDuration, "numStages": len(vlv.config.Stages),
},
).Debug("Starting executor run...")

activeVUsCount := new(int64)
vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs))
regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currentlyActiveVUs := atomic.LoadInt64(activeVUsCount)
vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
if spent > regularDuration {
return 1, []string{vus, regularDuration.String()}
return nil
}

// rampingVUsRunState is created and initialized by the Run() method
// of the ramping VUs executor. It is used to track and modify various
// details of the execution.
type rampingVUsRunState struct {
executor RampingVUs
vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs
startMaxVUs uint64 // the scaled number of initially configured MaxVUs
activeVUsCount *int64 // the current number of active VUs, used only for the progress display
started time.Time
stepsRaw, stepsGraceful []lib.ExecutionStep
wg *sync.WaitGroup

runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration
}

func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) {
vusFmt := pb.GetFixedLengthIntFormat(int64(rs.startMaxVUs))
regDuration := pb.GetFixedLengthDuration(total, total)

return func() (float64, []string) {
spent := time.Since(rs.started)
cur := atomic.LoadInt64(rs.activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.startMaxVUs)
if spent > total {
return 1, []string{progVUs, total.String()}
}
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr
return float64(spent) / float64(regularDuration), []string{progVUs, progDur}
progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration
return float64(spent) / float64(total), []string{progVUs, progDur}
}
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn)
}

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
runIteration := getIterationRunner(vlv.executionState, vlv.logger)
func (rs rampingVUsRunState) initializeVUs(ctx context.Context, cancel func()) {
getVU := func() (lib.InitializedVU, error) {
initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false)
pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false)
if err != nil {
vlv.logger.WithError(err).Error("Cannot get a VU from the buffer")
rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
} else {
activeVUs.Add(1)
atomic.AddInt64(activeVUsCount, 1)
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
return pvu, err
}
return initVU, err
rs.wg.Add(1)
atomic.AddInt64(rs.activeVUsCount, 1)
rs.executor.executionState.ModCurrentlyActiveVUsCount(+1)
return pvu, err
}
returnVU := func(initVU lib.InitializedVU) {
vlv.executionState.ReturnVU(initVU, false)
atomic.AddInt64(activeVUsCount, -1)
activeVUs.Done()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
rs.executor.executionState.ReturnVU(initVU, false)
atomic.AddInt64(rs.activeVUsCount, -1)
rs.wg.Done()
rs.executor.executionState.ModCurrentlyActiveVUsCount(-1)
}

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: startTime,
ProgressFn: progressFn,
})

vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters,
&vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i))
go vuHandle.runLoopsIfPossible(runIteration)
vuHandles[i] = vuHandle
for i := uint64(0); i < rs.startMaxVUs; i++ {
rs.vuHandles[i] = newStoppedVUHandle(
ctx, getVU, returnVU, rs.executor.nextIterationCounters,
&rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i))
}
}

// 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs
var currentScheduledVUs, currentMaxAllowedVUs uint64

handleNewScheduledVUs := func(newScheduledVUs uint64) {
if newScheduledVUs > currentScheduledVUs {
for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ {
_ = vuHandles[vuNum].start() // TODO handle error
}
func (rs rampingVUsRunState) handleVUs(ctx context.Context) {
// iterate over stepsRaw and stepsGraceful in order by TimeOffset
// giving stepsRaw precedence.
// we stop iterating once stepsRaw are over as we need to run the remaining
// stepsGraceful concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop (= maxDur-regDur) timeouts
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for len(rs.stepsRaw) > 0 {
var (
r, g = rs.stepsRaw[0], rs.stepsGraceful[0]
gfirst = r.TimeOffset > g.TimeOffset
offset = g.TimeOffset
)
if !gfirst {
offset = r.TimeOffset
}
if wait(offset) {
return
}
if gfirst {
handleNewMaxAllowedVUs(g)
rs.stepsGraceful = rs.stepsGraceful[1:]
} else {
for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ {
vuHandles[vuNum].gracefulStop()
}
handleNewScheduledVUs(r)
rs.stepsRaw = rs.stepsRaw[1:]
}
}
}

// TODO: removing this has no effect on tests?
func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) {
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for _, s := range rs.stepsGraceful {
if wait(s.TimeOffset) {
return
}
currentScheduledVUs = newScheduledVUs
handleNewMaxAllowedVUs(s)
}
}

handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) {
if newMaxAllowedVUs < currentMaxAllowedVUs {
for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ {
vuHandles[vuNum].hardStop()
func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) {
var max uint64
return func(graceful lib.ExecutionStep) {
cur := graceful.PlannedVUs
if cur < max {
for n := cur; n < max; n++ {
rs.vuHandles[n].hardStop()
}
}
currentMaxAllowedVUs = newMaxAllowedVUs
max = cur
}
}

wait := waiter(parentCtx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
// gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop timeouts
i, j := 0, 0
for i != len(rawExecutionSteps) {
if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset {
if wait(gracefulExecutionSteps[j].TimeOffset) {
return
func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) {
var sched uint64
return func(raw lib.ExecutionStep) {
cur := raw.PlannedVUs
if cur > sched {
for n := sched; n < cur; n++ {
_ = rs.vuHandles[n].start() // TODO handle error
}
handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs)
j++
} else {
if wait(rawExecutionSteps[i].TimeOffset) {
return
for n := cur; n < sched; n++ {
rs.vuHandles[n].gracefulStop()
}
handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs)
i++
}
sched = cur
}

go func() { // iterate over the remaining gracefulExecutionSteps
for _, step := range gracefulExecutionSteps[j:] {
if wait(step.TimeOffset) {
return
}
handleNewMaxAllowedVUs(step.PlannedVUs)
}
}()

return nil
}

// waiter returns a function that will sleep/wait for the required time since the startTime and then
Expand All @@ -651,9 +691,9 @@ func (vlv RampingVUs) Run(
func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool {
timer := time.NewTimer(time.Hour * 24)
return func(offset time.Duration) bool {
offsetDiff := offset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(offsetDiff)
diff := offset - time.Since(startTime)
if diff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(diff)
select {
case <-ctx.Done():
return true // exit if context is cancelled
Expand Down

0 comments on commit 3282080

Please sign in to comment.