Skip to content

Commit

Permalink
Refactor ramping VUs executor
Browse files Browse the repository at this point in the history
RampingVUs.Run method was complex and this refactors it by adding
rampingVUsRunState to share run state between functions used by
the Run method.

+ Makes the Run method easier to read and understand
+ Makes it explicit which Goroutines are being fired in the Run
+ Separates responsibilities to other parts like:
  + rampingVUsRunState and its methods.
  + Moves trackProgress Goroutine from the makeProgressFn
    to the Run method
  + Makes initializeVUs to only handle initializing GRs
  + Uses two strategy functions to make them reusable
    and clearer. handleVUs and handleRemainingVUs use them.
+ Makes the handleVUs algorithm easier to understand and
  manage.

`go get` was failing on Go tip.
  • Loading branch information
inancgumus committed Sep 30, 2021
1 parent 75d30e6 commit a61714c
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/xk6.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
cd .github/workflows/xk6-tests
# Temporary hack to ignore the v0.33.0 tag change
export GONOSUMDB="go.k6.io/k6"
go get go.k6.io/xk6/cmd/xk6@master
go install go.k6.io/xk6/cmd/xk6@latest
if [ "${{ github.event_name }}" == "pull_request" -a \
"${{ github.event.pull_request.head.repo.full_name }}" != "${{ github.repository }}" ]; then
export XK6_K6_REPO="github.com/${{ github.event.pull_request.head.repo.full_name }}"
Expand Down
271 changes: 151 additions & 120 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,154 +493,185 @@ var _ lib.Executor = &RampingVUs{}

// Run constantly loops through as many iterations as possible on a variable
// number of VUs for the specified stages.
//
// TODO: split up? since this does a ton of things, unfortunately I can't think
// of a less complex way to implement it (besides the old "increment by 100ms
// and see what happens)... :/ so maybe see how it can be split?
// nolint:funlen,gocognit,cyclop
func (vlv RampingVUs) Run(
parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics,
) (err error) {
rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration)
func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error {
stepsRaw := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regDur, finalRaw := lib.GetEndOffset(stepsRaw)
if !finalRaw {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur)
}

gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration)
stepsGraceful := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDur, finalGraceful := lib.GetEndOffset(stepsGraceful)
if !finalGraceful {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur)
}
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop)
startMaxVUs := lib.GetMaxPlannedVUs(stepsGraceful)
startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur)
defer cancel()

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()

// Make sure the log and the progress bar have accurate information
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs,
"duration": regularDuration, "numStages": len(vlv.config.Stages),
},
).Debug("Starting executor run...")

activeVUsCount := new(int64)
vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs))
regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currentlyActiveVUs := atomic.LoadInt64(activeVUsCount)
vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
if spent > regularDuration {
return 1, []string{vus, regularDuration.String()}
}
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr
return float64(spent) / float64(regularDuration), []string{progVUs, progDur}
"type": vlv.config.GetType(),
"startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple),
"maxVUs": startMaxVUs,
"duration": regDur,
"numStages": len(vlv.config.Stages),
}).Debug("Starting executor run...")

runState := &rampingVUsRunState{
executor: vlv,
wg: new(sync.WaitGroup),
vuHandles: make([]*vuHandle, startMaxVUs),
maxVUs: startMaxVUs,
activeVUsCount: new(int64),
started: startTime,
stepsRaw: stepsRaw,
stepsGraceful: stepsGraceful,
runIteration: getIterationRunner(vlv.executionState, vlv.logger),
}
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn)

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
runIteration := getIterationRunner(vlv.executionState, vlv.logger)
getVU := func() (lib.InitializedVU, error) {
initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false)
if err != nil {
vlv.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
} else {
activeVUs.Add(1)
atomic.AddInt64(activeVUsCount, 1)
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
}
return initVU, err
}
returnVU := func(initVU lib.InitializedVU) {
vlv.executionState.ReturnVU(initVU, false)
atomic.AddInt64(activeVUsCount, -1)
activeVUs.Done()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
}

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
progressFn := runState.makeProgressFn(regDur)
maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: startTime,
StartTime: runState.started,
ProgressFn: progressFn,
})
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn)

vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters,
&vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i))
go vuHandle.runLoopsIfPossible(runIteration)
vuHandles[i] = vuHandle
defer runState.wg.Wait()
runState.initializeVUs(maxDurCtx, cancel)
for i := uint64(0); i < runState.maxVUs; i++ {
go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration)
}
runState.handleVUs(ctx)
go runState.handleRemainingVUs(ctx)

// 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs
var currentScheduledVUs, currentMaxAllowedVUs uint64
return nil
}

handleNewScheduledVUs := func(newScheduledVUs uint64) {
if newScheduledVUs > currentScheduledVUs {
for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ {
_ = vuHandles[vuNum].start() // TODO handle error
}
} else {
for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ {
vuHandles[vuNum].gracefulStop()
}
// rampingVUsRunState is created and initialized by the Run() method
// of the ramping VUs executor. It is used to track and modify various
// details of the execution.
type rampingVUsRunState struct {
executor RampingVUs
vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs
maxVUs uint64 // the scaled number of initially configured MaxVUs
activeVUsCount *int64 // the current number of active VUs, used only for the progress display
started time.Time
stepsRaw, stepsGraceful []lib.ExecutionStep
wg *sync.WaitGroup

runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration
}

func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) {
vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs))
regDuration := pb.GetFixedLengthDuration(total, total)

return func() (float64, []string) {
spent := time.Since(rs.started)
cur := atomic.LoadInt64(rs.activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs)
if spent > total {
return 1, []string{progVUs, total.String()}
}
currentScheduledVUs = newScheduledVUs
progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration
return float64(spent) / float64(total), []string{progVUs, progDur}
}
}

handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) {
if newMaxAllowedVUs < currentMaxAllowedVUs {
for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ {
vuHandles[vuNum].hardStop()
}
func (rs rampingVUsRunState) initializeVUs(ctx context.Context, cancel func()) {
getVU := func() (lib.InitializedVU, error) {
pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false)
if err != nil {
rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
return pvu, err
}
currentMaxAllowedVUs = newMaxAllowedVUs
rs.wg.Add(1)
atomic.AddInt64(rs.activeVUsCount, 1)
rs.executor.executionState.ModCurrentlyActiveVUsCount(+1)
return pvu, err
}
returnVU := func(initVU lib.InitializedVU) {
rs.executor.executionState.ReturnVU(initVU, false)
atomic.AddInt64(rs.activeVUsCount, -1)
rs.wg.Done()
rs.executor.executionState.ModCurrentlyActiveVUsCount(-1)
}
for i := uint64(0); i < rs.maxVUs; i++ {
rs.vuHandles[i] = newStoppedVUHandle(
ctx, getVU, returnVU, rs.executor.nextIterationCounters,
&rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i))
}
}

wait := waiter(parentCtx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
// gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop timeouts
i, j := 0, 0
for i != len(rawExecutionSteps) {
if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset {
if wait(gracefulExecutionSteps[j].TimeOffset) {
func (rs rampingVUsRunState) handleVUs(ctx context.Context) {
// iterate over stepsRaw and stepsGraceful in order by TimeOffset
// giving stepsRaw precedence.
// we stop iterating once stepsRaw are over as we need to run the remaining
// stepsGraceful concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop (= maxDur-regDur) timeouts
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for i, j := 0, 0; i != len(rs.stepsRaw); {
r, g := rs.stepsRaw[i], rs.stepsGraceful[j]
if g.TimeOffset < r.TimeOffset {
j++
if wait(g.TimeOffset) {
return
}
handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs)
j++
handleNewMaxAllowedVUs(g)
} else {
if wait(rawExecutionSteps[i].TimeOffset) {
i++
if wait(r.TimeOffset) {
return
}
handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs)
i++
handleNewScheduledVUs(r)
}
}
}

go func() { // iterate over the remaining gracefulExecutionSteps
for _, step := range gracefulExecutionSteps[j:] {
if wait(step.TimeOffset) {
return
}
handleNewMaxAllowedVUs(step.PlannedVUs)
// TODO: removing this has no effect on tests?
func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) {
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for _, s := range rs.stepsGraceful {
if wait(s.TimeOffset) {
return
}
}()
handleNewMaxAllowedVUs(s)
}
}

return nil
func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64
return func(graceful lib.ExecutionStep) {
pv := graceful.PlannedVUs
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].hardStop()
}
cur = pv
}
}

func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64
return func(raw lib.ExecutionStep) {
pv := raw.PlannedVUs
for ; cur < pv; cur++ {
rs.vuHandles[cur].start()
}
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].gracefulStop()
}
cur = pv
}
}

// waiter returns a function that will sleep/wait for the required time since the startTime and then
Expand All @@ -651,9 +682,9 @@ func (vlv RampingVUs) Run(
func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool {
timer := time.NewTimer(time.Hour * 24)
return func(offset time.Duration) bool {
offsetDiff := offset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(offsetDiff)
diff := offset - time.Since(startTime)
if diff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(diff)
select {
case <-ctx.Done():
return true // exit if context is cancelled
Expand Down

0 comments on commit a61714c

Please sign in to comment.