Skip to content

Commit

Permalink
adding waitWhenFinished in global scope (kube-burner#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishnuchalla authored Aug 10, 2023
1 parent 14649d1 commit e83d227
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
global:
gc: {{.GC}}
waitWhenFinished: true
indexerConfig:
esServers: ["{{.ES_SERVER}}"]
insecureSkipVerify: true
Expand Down
8 changes: 6 additions & 2 deletions docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ In this section is described global job configuration, it holds the following pa
| `bearerToken` | Bearer token to access the Prometheus endpoint | String | "" |
| `metricsProfile` | Path to the metrics profile configuration file | String | "" |
| `metricsEndpoint` | Path to the metrics endpoint configuration file containing a list of target endpoints, flag has precedence | String | "" |
| `gc` | Garbage collect created namespaces | Boolean | false |
| `gcTimeout` | Garbage collection timeout | Duration | 1h |
| `GC` | Garbage collect created namespaces | Boolean | false |
| `GCTimeout` | Garbage collection timeout | Duration | 1h |
| `waitWhenFinished` | Wait for all pods to be running when all jobs are completed | Boolean | false |

!!! note
The precedence order to wait on resources is Global.waitWhenFinished > Jod.waitWhenFinished > Job.podWait

kube-burner connects k8s clusters using the following methods in this order:

Expand Down
10 changes: 6 additions & 4 deletions pkg/burner/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func setupCreateJob(jobConfig config.Job) Executor {
}

// RunCreateJob executes a creation job
func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int) {
func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int, waitListNamespaces *[]string) {
nsLabels := map[string]string{
"kube-burner-job": ex.Name,
"kube-burner-uuid": ex.uuid,
Expand All @@ -108,6 +108,7 @@ func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int) {
if err = createNamespace(ns, nsLabels); err != nil {
log.Fatal(err.Error())
}
*waitListNamespaces = append(*waitListNamespaces, ns)
}
// We have to sum 1 since the iterations start from 1
iterationProgress := (iterationEnd - iterationStart) / 10
Expand All @@ -128,12 +129,13 @@ func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int) {
continue
}
namespacesCreated[ns] = true
*waitListNamespaces = append(*waitListNamespaces, ns)
}
}
for objectIndex, obj := range ex.objects {
ex.replicaHandler(objectIndex, obj, ns, i, &wg)
}
if ex.PodWait {
if !ex.WaitWhenFinished && ex.PodWait {
if !ex.NamespacedIterations || !namespacesWaited[ns] {
log.Infof("Waiting up to %s for actions to be completed in namespace %s", ex.MaxWaitTimeout, ns)
wg.Wait()
Expand All @@ -148,7 +150,7 @@ func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int) {
}
// Wait for all replicas to be created
wg.Wait()
if ex.WaitWhenFinished && !ex.PodWait {
if ex.WaitWhenFinished {
log.Infof("Waiting up to %s for actions to be completed", ex.MaxWaitTimeout)
// This semaphore is used to limit the maximum number of concurrent goroutines
sem := make(chan int, int(ClientSet.RESTClient().GetRateLimiter().QPS())*2)
Expand Down Expand Up @@ -314,7 +316,7 @@ func (ex *Executor) RunCreateJobWithChurn() {
CleanupNamespaces(ctx, metav1.ListOptions{LabelSelector: "churndelete=delete"}, true)
log.Info("Re-creating deleted objects")
// Re-create objects that were deleted
ex.RunCreateJob(randStart, numToChurn+randStart)
ex.RunCreateJob(randStart, numToChurn+randStart, &[]string{})
log.Infof("Sleeping for %v", ex.ChurnDelay)
time.Sleep(ex.ChurnDelay)
}
Expand Down
69 changes: 58 additions & 11 deletions pkg/burner/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package burner
import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/cloud-bulldozer/go-commons/indexers"
Expand Down Expand Up @@ -77,17 +79,21 @@ func Run(configSpec config.Spec, prometheusClients []*prometheus.Prometheus, ale
var err error
var rc int
var prometheusJobList []prometheus.Job
var measurementStopFunctions []func() error
var jobList []Executor
res := make(chan int, 1)
uuid := configSpec.GlobalConfig.UUID
globalConfig := configSpec.GlobalConfig
globalWaitMap := make(map[string][]string)
executorMap := make(map[string]Executor)
log.Infof("🔥 Starting kube-burner (%s@%s) with UUID %s", version.Version, version.GitCommit, uuid)
go func() {
var innerRC int
measurements.NewMeasurementFactory(configSpec, indexer, metadata)
jobList = newExecutorList(configSpec, uuid, timeout)
// Iterate job list
for jobPosition, job := range jobList {
var waitListNamespaces []string
if job.QPS == 0 || job.Burst == 0 {
log.Infof("QPS or Burst rates not set, using default client-go values: %v %v", rest.DefaultQPS, rest.DefaultBurst)
} else {
Expand Down Expand Up @@ -123,7 +129,7 @@ func Run(configSpec config.Spec, prometheusClients []*prometheus.Prometheus, ale
log.Infof("Churn percent: %v", job.ChurnPercent)
log.Infof("Churn delay: %v", job.ChurnDelay)
}
job.RunCreateJob(0, job.JobIterations)
job.RunCreateJob(0, job.JobIterations, &waitListNamespaces)
// If object verification is enabled
if job.VerifyObjects && !job.Verify() {
errMsg := "Object verification failed"
Expand All @@ -137,6 +143,8 @@ func Run(configSpec config.Spec, prometheusClients []*prometheus.Prometheus, ale
if job.Churn {
job.RunCreateJobWithChurn()
}
globalWaitMap[strconv.Itoa(jobPosition)+job.Name] = waitListNamespaces
executorMap[strconv.Itoa(jobPosition)+job.Name] = job
case config.DeletionJob:
job.RunDeleteJob()
case config.PatchJob:
Expand All @@ -148,21 +156,39 @@ func Run(configSpec config.Spec, prometheusClients []*prometheus.Prometheus, ale
}

jobList[jobPosition].End = time.Now().UTC()
prometheusJob := prometheus.Job{
Start: jobList[jobPosition].Start,
End: jobList[jobPosition].End,
JobConfig: job.Job,
}
elapsedTime := prometheusJob.End.Sub(prometheusJob.Start).Round(time.Second)
// Don't append to Prometheus jobList when prometheus it's not initialized
if len(prometheusClients) > 0 {
prometheusJob := prometheus.Job{
Start: jobList[jobPosition].Start,
End: jobList[jobPosition].End,
JobConfig: job.Job,
}
prometheusJobList = append(prometheusJobList, prometheusJob)
}
log.Infof("Job %s took %v", job.Name, elapsedTime)
// We stop and index measurements per job
if err = measurements.Stop(); err != nil {
log.Errorf("Failed measurements: %v", err.Error())
innerRC = 1
if !globalConfig.WaitWhenFinished {
elapsedTime := jobList[jobPosition].End.Sub(jobList[jobPosition].Start).Round(time.Second)
log.Infof("Job %s took %v", job.Name, elapsedTime)
if err = measurements.Stop(); err != nil {
log.Errorf("Failed measurements: %v", err.Error())
innerRC = 1
}
} else {
measurementStopFunctions = append(measurementStopFunctions, measurements.Stop)
}
}
if globalConfig.WaitWhenFinished {
runWaitList(globalWaitMap, executorMap)
endTime := time.Now().UTC()
for jobIndex, stopFunc := range measurementStopFunctions {
jobList[jobIndex].End = endTime
if len(prometheusJobList) > jobIndex {
prometheusJobList[jobIndex].End = endTime
}
if err := stopFunc(); err != nil {
log.Errorf("Failed measurements: %v", err.Error())
innerRC = 1
}
}
}
if globalConfig.IndexerConfig.Type != "" {
Expand Down Expand Up @@ -245,3 +271,24 @@ func newExecutorList(configSpec config.Spec, uuid string, timeout time.Duration)
}
return executorList
}

// Runs on wait list at the end of benchmark
func runWaitList(globalWaitMap map[string][]string, executorMap map[string]Executor) {
var wg sync.WaitGroup
for executorUUID, namespaces := range globalWaitMap {
executor := executorMap[executorUUID]
log.Infof("Waiting up to %s for actions to be completed", executor.MaxWaitTimeout)
// This semaphore is used to limit the maximum number of concurrent goroutines
sem := make(chan int, int(ClientSet.RESTClient().GetRateLimiter().QPS())*2)
for _, ns := range namespaces {
sem <- 1
wg.Add(1)
go func(ns string) {
executor.waitForObjects(ns)
<-sem
wg.Done()
}(ns)
}
wg.Wait()
}
}
7 changes: 7 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var configSpec = Spec{
MetricsDirectory: "collected-metrics",
TarballName: "kube-burner-metrics.tgz",
},
WaitWhenFinished: false,
},
}

Expand Down Expand Up @@ -86,9 +87,15 @@ func (j *Job) UnmarshalYAML(unmarshal func(interface{}) error) error {
ChurnDuration: 1 * time.Hour,
ChurnDelay: 5 * time.Minute,
}

if err := unmarshal(&raw); err != nil {
return err
}
// Applying overrides here
if configSpec.GlobalConfig.WaitWhenFinished {
raw.PodWait = false
raw.WaitWhenFinished = false
}
// Convert raw to Job
*j = Job(raw)
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type GlobalConfig struct {
AlertProfile string `yaml:"alertProfile"`
// GC garbage collect created namespaces
GC bool `yaml:"gc" json:"gc"`
// WaitWhenFinished Wait for pods to be running when all the jobs are completed
WaitWhenFinished bool `yaml:"waitWhenFinished" json:"waitWhenFinished,omitempty"`
// GCTimeout garbage collection timeout
GCTimeout time.Duration `yaml:"gcTimeout"`
}
Expand Down

0 comments on commit e83d227

Please sign in to comment.