Skip to content

Commit

Permalink
Merge 9d1900b into 586f0e5
Browse files Browse the repository at this point in the history
  • Loading branch information
inancgumus authored Oct 4, 2021
2 parents 586f0e5 + 9d1900b commit db88710
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 121 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/xk6.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
cd .github/workflows/xk6-tests
# Temporary hack to ignore the v0.33.0 tag change
export GONOSUMDB="go.k6.io/k6"
go get go.k6.io/xk6/cmd/xk6@master
go install go.k6.io/xk6/cmd/xk6@latest
if [ "${{ github.event_name }}" == "pull_request" -a \
"${{ github.event.pull_request.head.repo.full_name }}" != "${{ github.repository }}" ]; then
export XK6_K6_REPO="github.com/${{ github.event.pull_request.head.repo.full_name }}"
Expand Down
271 changes: 151 additions & 120 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,154 +493,185 @@ var _ lib.Executor = &RampingVUs{}

// Run constantly loops through as many iterations as possible on a variable
// number of VUs for the specified stages.
//
// TODO: split up? since this does a ton of things, unfortunately I can't think
// of a less complex way to implement it (besides the old "increment by 100ms
// and see what happens)... :/ so maybe see how it can be split?
// nolint:funlen,gocognit,cyclop
func (vlv RampingVUs) Run(
parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics,
) (err error) {
rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration)
func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error {
rawSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regDur, finalRaw := lib.GetEndOffset(rawSteps)
if !finalRaw {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur)
}

gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration)
gracefulSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDur, finalGraceful := lib.GetEndOffset(gracefulSteps)
if !finalGraceful {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur)
}
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop)
startMaxVUs := lib.GetMaxPlannedVUs(gracefulSteps)
startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur)
defer cancel()

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()

// Make sure the log and the progress bar have accurate information
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs,
"duration": regularDuration, "numStages": len(vlv.config.Stages),
},
).Debug("Starting executor run...")

activeVUsCount := new(int64)
vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs))
regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currentlyActiveVUs := atomic.LoadInt64(activeVUsCount)
vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
if spent > regularDuration {
return 1, []string{vus, regularDuration.String()}
}
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr
return float64(spent) / float64(regularDuration), []string{progVUs, progDur}
"type": vlv.config.GetType(),
"startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple),
"maxVUs": startMaxVUs,
"duration": regDur,
"numStages": len(vlv.config.Stages),
}).Debug("Starting executor run...")

runState := &rampingVUsRunState{
executor: vlv,
wg: new(sync.WaitGroup),
vuHandles: make([]*vuHandle, startMaxVUs),
maxVUs: startMaxVUs,
activeVUsCount: new(int64),
started: startTime,
rawSteps: rawSteps,
gracefulSteps: gracefulSteps,
runIteration: getIterationRunner(vlv.executionState, vlv.logger),
}
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn)

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
runIteration := getIterationRunner(vlv.executionState, vlv.logger)
getVU := func() (lib.InitializedVU, error) {
initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false)
if err != nil {
vlv.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
} else {
activeVUs.Add(1)
atomic.AddInt64(activeVUsCount, 1)
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
}
return initVU, err
}
returnVU := func(initVU lib.InitializedVU) {
vlv.executionState.ReturnVU(initVU, false)
atomic.AddInt64(activeVUsCount, -1)
activeVUs.Done()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
}

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
progressFn := runState.makeProgressFn(regDur)
maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: startTime,
StartTime: runState.started,
ProgressFn: progressFn,
})
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn)

vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters,
&vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i))
go vuHandle.runLoopsIfPossible(runIteration)
vuHandles[i] = vuHandle
defer runState.wg.Wait()
runState.populateVUs(maxDurCtx, cancel)
for i := uint64(0); i < runState.maxVUs; i++ {
go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration)
}
runState.handleVUs(ctx)
go runState.handleRemainingVUs(ctx)

// 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs
var currentScheduledVUs, currentMaxAllowedVUs uint64
return nil
}

handleNewScheduledVUs := func(newScheduledVUs uint64) {
if newScheduledVUs > currentScheduledVUs {
for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ {
_ = vuHandles[vuNum].start() // TODO handle error
}
} else {
for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ {
vuHandles[vuNum].gracefulStop()
}
// rampingVUsRunState is created and initialized by the Run() method
// of the ramping VUs executor. It is used to track and modify various
// details of the execution.
type rampingVUsRunState struct {
executor RampingVUs
vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs
maxVUs uint64 // the scaled number of initially configured MaxVUs
activeVUsCount *int64 // the current number of active VUs, used only for the progress display
started time.Time
rawSteps, gracefulSteps []lib.ExecutionStep
wg *sync.WaitGroup

runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration
}

func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) {
vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs))
regDuration := pb.GetFixedLengthDuration(total, total)

return func() (float64, []string) {
spent := time.Since(rs.started)
cur := atomic.LoadInt64(rs.activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs)
if spent > total {
return 1, []string{progVUs, total.String()}
}
currentScheduledVUs = newScheduledVUs
progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration
return float64(spent) / float64(total), []string{progVUs, progDur}
}
}

handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) {
if newMaxAllowedVUs < currentMaxAllowedVUs {
for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ {
vuHandles[vuNum].hardStop()
}
func (rs rampingVUsRunState) populateVUs(ctx context.Context, cancel func()) {
getVU := func() (lib.InitializedVU, error) {
pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false)
if err != nil {
rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
return pvu, err
}
currentMaxAllowedVUs = newMaxAllowedVUs
rs.wg.Add(1)
atomic.AddInt64(rs.activeVUsCount, 1)
rs.executor.executionState.ModCurrentlyActiveVUsCount(+1)
return pvu, err
}
returnVU := func(initVU lib.InitializedVU) {
rs.executor.executionState.ReturnVU(initVU, false)
atomic.AddInt64(rs.activeVUsCount, -1)
rs.wg.Done()
rs.executor.executionState.ModCurrentlyActiveVUsCount(-1)
}
for i := uint64(0); i < rs.maxVUs; i++ {
rs.vuHandles[i] = newStoppedVUHandle(
ctx, getVU, returnVU, rs.executor.nextIterationCounters,
&rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i))
}
}

wait := waiter(parentCtx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
// gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop timeouts
i, j := 0, 0
for i != len(rawExecutionSteps) {
if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset {
if wait(gracefulExecutionSteps[j].TimeOffset) {
func (rs rampingVUsRunState) handleVUs(ctx context.Context) {
// iterate over rawSteps and gracefulSteps in order by TimeOffset
// giving rawSteps precedence.
// we stop iterating once rawSteps are over as we need to run the remaining
// gracefulSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop (= maxDur-regDur) timeouts
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for i, j := 0, 0; i != len(rs.rawSteps); {
r, g := rs.rawSteps[i], rs.gracefulSteps[j]
if g.TimeOffset < r.TimeOffset {
j++
if wait(g.TimeOffset) {
return
}
handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs)
j++
handleNewMaxAllowedVUs(g)
} else {
if wait(rawExecutionSteps[i].TimeOffset) {
i++
if wait(r.TimeOffset) {
return
}
handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs)
i++
handleNewScheduledVUs(r)
}
}
}

go func() { // iterate over the remaining gracefulExecutionSteps
for _, step := range gracefulExecutionSteps[j:] {
if wait(step.TimeOffset) {
return
}
handleNewMaxAllowedVUs(step.PlannedVUs)
// TODO: removing this has no effect on tests?
func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) {
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for _, s := range rs.gracefulSteps {
if wait(s.TimeOffset) {
return
}
}()
handleNewMaxAllowedVUs(s)
}
}

return nil
func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64
return func(graceful lib.ExecutionStep) {
pv := graceful.PlannedVUs
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].hardStop()
}
cur = pv
}
}

func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64
return func(raw lib.ExecutionStep) {
pv := raw.PlannedVUs
for ; cur < pv; cur++ {
_ = rs.vuHandles[cur].start() // TODO: handle the error
}
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].gracefulStop()
}
cur = pv
}
}

// waiter returns a function that will sleep/wait for the required time since the startTime and then
Expand All @@ -651,9 +682,9 @@ func (vlv RampingVUs) Run(
func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool {
timer := time.NewTimer(time.Hour * 24)
return func(offset time.Duration) bool {
offsetDiff := offset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(offsetDiff)
diff := offset - time.Since(startTime)
if diff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(diff)
select {
case <-ctx.Done():
return true // exit if context is cancelled
Expand Down

0 comments on commit db88710

Please sign in to comment.