From 191cdaa730bdebaca75235c51912d9e62779f451 Mon Sep 17 00:00:00 2001 From: Raul Sevilla Date: Wed, 7 Dec 2022 14:05:00 +0100 Subject: [PATCH] Benchmark timeout flag Add a --timeout flag to kube-burner so we can exit the benchmark after a preconfigured timeout. Signed-off-by: Raul Sevilla --- cmd/kube-burner/kube-burner.go | 4 +- cmd/kube-burner/ocp.go | 4 +- docs/cli.md | 1 + pkg/burner/job.go | 206 +++++++++++++++++---------------- pkg/burner/waiters.go | 4 +- pkg/indexers/factory.go | 4 +- pkg/workloads/helpers.go | 6 +- test/run-ocp.sh | 11 +- 8 files changed, 133 insertions(+), 107 deletions(-) diff --git a/cmd/kube-burner/kube-burner.go b/cmd/kube-burner/kube-burner.go index 65b4c5a5b..f84df7e80 100644 --- a/cmd/kube-burner/kube-burner.go +++ b/cmd/kube-burner/kube-burner.go @@ -83,6 +83,7 @@ func initCmd() *cobra.Command { var prometheusStep time.Duration var prometheusClient *prometheus.Prometheus var alertM *alerting.AlertManager + var timeout time.Duration cmd := &cobra.Command{ Use: "init", Short: "Launch benchmark", @@ -121,7 +122,7 @@ func initCmd() *cobra.Command { } } } - rc, err := burner.Run(configSpec, uuid, prometheusClient, alertM) + rc, err := burner.Run(configSpec, uuid, prometheusClient, alertM, timeout) if err != nil { log.Fatalf(err.Error()) } @@ -137,6 +138,7 @@ func initCmd() *cobra.Command { cmd.Flags().StringVarP(&alertProfile, "alert-profile", "a", "", "Alert profile file or URL") cmd.Flags().BoolVar(&skipTLSVerify, "skip-tls-verify", true, "Verify prometheus TLS certificate") cmd.Flags().DurationVarP(&prometheusStep, "step", "s", 30*time.Second, "Prometheus step size") + cmd.Flags().DurationVarP(&timeout, "timeout", "", 2*time.Hour, "Benchmark timeout") cmd.Flags().StringVarP(&configFile, "config", "c", "", "Config file path or URL") cmd.Flags().StringVarP(&configMap, "configmap", "", "", "Configmap holding all the configuration: config.yml, metrics.yml and alerts.yml. metrics and alerts are optional") cmd.Flags().StringVarP(&namespace, "namespace", "", "default", "Namespace where the configmap is") diff --git a/cmd/kube-burner/ocp.go b/cmd/kube-burner/ocp.go index f99381855..210603140 100644 --- a/cmd/kube-burner/ocp.go +++ b/cmd/kube-burner/ocp.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "strings" + "time" _ "embed" @@ -42,6 +43,7 @@ func openShiftCmd() *cobra.Command { esIndex := ocpCmd.PersistentFlags().String("es-index", "", "Elastic Search index") alerting := ocpCmd.PersistentFlags().Bool("alerting", true, "Enable alerting") uuid := ocpCmd.PersistentFlags().String("uuid", uid.NewV4().String(), "Benchmark UUID") + timeout := ocpCmd.PersistentFlags().Duration("timeout", 2*time.Hour, "Benchmark timeout") qps := ocpCmd.PersistentFlags().Int("qps", 20, "QPS") burst := ocpCmd.PersistentFlags().Int("burst", 20, "Burst") gc := ocpCmd.PersistentFlags().Bool("gc", true, "Garbage collect created namespaces") @@ -58,7 +60,7 @@ func openShiftCmd() *cobra.Command { "GC": fmt.Sprintf("%v", *gc), } discoveryAgent := discovery.NewDiscoveryAgent() - wh = workloads.NewWorkloadHelper(envVars, *alerting, OCPConfig, discoveryAgent) + wh = workloads.NewWorkloadHelper(envVars, *alerting, OCPConfig, discoveryAgent, *timeout) wh.Metadata.UUID = *uuid if *esServer != "" { err := wh.GatherMetadata() diff --git a/docs/cli.md b/docs/cli.md index 0ac60fe6b..9780db222 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -44,6 +44,7 @@ This option is meant to run Kube-burner benchmark, and it supports the these fla - password: Prometheus password for basic authentication. - skip-tls-verify: Skip TLS verification for prometheus. Default `true` - step: Prometheus step size. Default `30s` +- timeout: Kube-burner benchmark global timeout. When timing out, return code is 2. Default `2h` **Note**: Both basic authentication and Bearer authentication need credentials able to query the given Prometheus API. diff --git a/pkg/burner/job.go b/pkg/burner/job.go index 7e98315aa..1b49466bf 100644 --- a/pkg/burner/job.go +++ b/pkg/burner/job.go @@ -71,120 +71,132 @@ var ClientSet *kubernetes.Clientset var dynamicClient dynamic.Interface var restConfig *rest.Config -func Run(configSpec config.Spec, uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager) (int, error) { - var rc int +//nolint:gocyclo +func Run(configSpec config.Spec, uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager, timeout time.Duration) (int, error) { var err error + var rc int var measurementsWg sync.WaitGroup var indexer *indexers.Indexer + res := make(chan int, 1) log.Infof("🔥 Starting kube-burner (%s@%s) with UUID %s", version.Version, version.GitCommit, uuid) - if configSpec.GlobalConfig.IndexerConfig.Enabled { - indexer, err = indexers.NewIndexer(configSpec) - if err != nil { - return 1, err - } - } - measurements.NewMeasurementFactory(configSpec, uuid, indexer) - jobList := newExecutorList(configSpec, uuid) - // Iterate job list - for jobPosition, job := range jobList { - if job.Config.QPS == 0 || job.Config.Burst == 0 { - log.Infof("QPS or Burst rates not set, using default client-go values: %v %v", rest.DefaultQPS, rest.DefaultBurst) - } else { - log.Infof("QPS: %v", job.Config.QPS) - log.Infof("Burst: %v", job.Config.Burst) - } - ClientSet, restConfig, err = config.GetClientSet(job.Config.QPS, job.Config.Burst) - if err != nil { - log.Fatalf("Error creating clientSet: %s", err) - } - dynamicClient = dynamic.NewForConfigOrDie(restConfig) - if job.Config.PreLoadImages { - preLoadImages(job) - } - prometheusJob := prometheus.Job{ - Start: time.Now().UTC(), + go func(chan int) { + var innerRC int + if configSpec.GlobalConfig.IndexerConfig.Enabled { + indexer, err = indexers.NewIndexer(configSpec) + if err != nil { + log.Fatal(err) + } } - jobList[jobPosition].Start = time.Now().UTC() - log.Infof("Triggering job: %s", job.Config.Name) - measurements.SetJobConfig(&job.Config) - switch job.Config.JobType { - case config.CreationJob: - job.Cleanup() - measurements.Start(&measurementsWg) - measurementsWg.Wait() - job.RunCreateJob(1, job.Config.JobIterations) - // If object verification is enabled - if job.Config.VerifyObjects && !job.Verify() { - errMsg := "Object verification failed" - // If errorOnVerify is enabled. Set RC to 1 - if job.Config.ErrorOnVerify { - errMsg += ". Setting return code to 1" - rc = 1 - } - log.Error(errMsg) + measurements.NewMeasurementFactory(configSpec, uuid, indexer) + jobList := newExecutorList(configSpec, uuid) + // Iterate job list + for jobPosition, job := range jobList { + if job.Config.QPS == 0 || job.Config.Burst == 0 { + log.Infof("QPS or Burst rates not set, using default client-go values: %v %v", rest.DefaultQPS, rest.DefaultBurst) + } else { + log.Infof("QPS: %v", job.Config.QPS) + log.Infof("Burst: %v", job.Config.Burst) } - if job.Config.Churn { - job.RunCreateJobWithChurn() + ClientSet, restConfig, err = config.GetClientSet(job.Config.QPS, job.Config.Burst) + if err != nil { + log.Fatalf("Error creating clientSet: %s", err) } - // We stop and index measurements per job - if measurements.Stop() == 1 { - rc = 1 + dynamicClient = dynamic.NewForConfigOrDie(restConfig) + if job.Config.PreLoadImages { + preLoadImages(job) } - case config.DeletionJob: - job.RunDeleteJob() - case config.PatchJob: - job.RunPatchJob() - } - if job.Config.JobPause > 0 { - log.Infof("Pausing for %v before finishing job", job.Config.JobPause) - time.Sleep(job.Config.JobPause) - } - prometheusJob.End = time.Now().UTC() - elapsedTime := prometheusJob.End.Sub(prometheusJob.Start).Seconds() - // Don't append to Prometheus jobList when prometheus it's not initialized - if p != nil { - p.JobList = append(p.JobList, prometheusJob) - } - log.Infof("Job %s took %.2f seconds", job.Config.Name, elapsedTime) - } - if configSpec.GlobalConfig.IndexerConfig.Enabled { - for _, job := range jobList { - elapsedTime := job.End.Sub(job.Start).Seconds() - err := indexMetadataInfo(configSpec, indexer, uuid, elapsedTime, job.Config, job.Start) - if err != nil { - log.Errorf(err.Error()) + prometheusJob := prometheus.Job{ + Start: time.Now().UTC(), + } + jobList[jobPosition].Start = time.Now().UTC() + log.Infof("Triggering job: %s", job.Config.Name) + measurements.SetJobConfig(&job.Config) + switch job.Config.JobType { + case config.CreationJob: + job.Cleanup() + measurements.Start(&measurementsWg) + measurementsWg.Wait() + job.RunCreateJob(1, job.Config.JobIterations) + // If object verification is enabled + if job.Config.VerifyObjects && !job.Verify() { + errMsg := "Object verification failed" + // If errorOnVerify is enabled. Set RC to 1 + if job.Config.ErrorOnVerify { + errMsg += ". Setting return code to 1" + innerRC = 1 + } + log.Error(errMsg) + } + if job.Config.Churn { + job.RunCreateJobWithChurn() + } + // We stop and index measurements per job + if measurements.Stop() == 1 { + innerRC = 1 + } + case config.DeletionJob: + job.RunDeleteJob() + case config.PatchJob: + job.RunPatchJob() + } + if job.Config.JobPause > 0 { + log.Infof("Pausing for %v before finishing job", job.Config.JobPause) + time.Sleep(job.Config.JobPause) } + prometheusJob.End = time.Now().UTC() + elapsedTime := prometheusJob.End.Sub(prometheusJob.Start).Seconds() + // Don't append to Prometheus jobList when prometheus it's not initialized + if p != nil { + p.JobList = append(p.JobList, prometheusJob) + } + log.Infof("Job %s took %.2f seconds", job.Config.Name, elapsedTime) } - } - if p != nil { - log.Infof("Waiting %v extra before scraping prometheus", p.Step) - time.Sleep(p.Step) - // Update end time of last job - jobList[len(jobList)-1].End = time.Now().UTC() - // If alertManager is configured - if alertM != nil { - log.Infof("Evaluating alerts") - if alertM.Evaluate(jobList[0].Start, jobList[len(jobList)-1].End) == 1 { - rc = 1 + if configSpec.GlobalConfig.IndexerConfig.Enabled { + for _, job := range jobList { + elapsedTime := job.End.Sub(job.Start).Seconds() + err := indexMetadataInfo(configSpec, indexer, uuid, elapsedTime, job.Config, job.Start) + if err != nil { + log.Errorf(err.Error()) + } } } - // If prometheus is enabled query metrics from the start of the first job to the end of the last one - if len(p.MetricProfile) > 0 { - if err := p.ScrapeJobsMetrics(indexer); err != nil { - log.Error(err.Error()) + if p != nil { + log.Infof("Waiting %v extra before scraping prometheus", p.Step) + time.Sleep(p.Step) + // Update end time of last job + jobList[len(jobList)-1].End = time.Now().UTC() + // If alertManager is configured + if alertM != nil { + log.Infof("Evaluating alerts") + if alertM.Evaluate(jobList[0].Start, jobList[len(jobList)-1].End) == 1 { + innerRC = 1 + } } - if configSpec.GlobalConfig.WriteToFile && configSpec.GlobalConfig.CreateTarball { - err = prometheus.CreateTarball(configSpec.GlobalConfig.MetricsDirectory) - if err != nil { + // If prometheus is enabled query metrics from the start of the first job to the end of the last one + if len(p.MetricProfile) > 0 { + if err := p.ScrapeJobsMetrics(indexer); err != nil { log.Error(err.Error()) } + if configSpec.GlobalConfig.WriteToFile && configSpec.GlobalConfig.CreateTarball { + err = prometheus.CreateTarball(configSpec.GlobalConfig.MetricsDirectory) + if err != nil { + log.Error(err.Error()) + } + } } } - } - log.Infof("Finished execution with UUID: %s", uuid) - if configSpec.GlobalConfig.GC { - log.Info("Garbage collecting created namespaces") - CleanupNamespaces(v1.ListOptions{LabelSelector: fmt.Sprintf("kube-burner-uuid=%v", uuid)}) + log.Infof("Finished execution with UUID: %s", uuid) + if configSpec.GlobalConfig.GC { + log.Info("Garbage collecting created namespaces") + CleanupNamespaces(v1.ListOptions{LabelSelector: fmt.Sprintf("kube-burner-uuid=%v", uuid)}) + } + res <- innerRC + }(res) + select { + case rc = <-res: + case <-time.After(timeout): + log.Errorf("%v timeout reached", timeout) + rc = 2 } log.Info("👋 Exiting kube-burner") return rc, nil diff --git a/pkg/burner/waiters.go b/pkg/burner/waiters.go index 3fdb7d7f0..2c869d092 100644 --- a/pkg/burner/waiters.go +++ b/pkg/burner/waiters.go @@ -57,9 +57,7 @@ func (ex *Executor) waitForObjects(ns string) { go waitForDS(ns, ex.Config.MaxWaitTimeout, &wg) case "Pod": go waitForPod(ns, ex.Config.MaxWaitTimeout, &wg) - case "Build": - go waitForBuild(ns, ex.Config.MaxWaitTimeout, obj.replicas, &wg) - case "BuildConfig": + case "Build", "BuildConfig": go waitForBuild(ns, ex.Config.MaxWaitTimeout, obj.replicas, &wg) case "VirtualMachine": go waitForVM(ns, ex.Config.MaxWaitTimeout, &wg) diff --git a/pkg/indexers/factory.go b/pkg/indexers/factory.go index b7da200f8..71f6111c2 100644 --- a/pkg/indexers/factory.go +++ b/pkg/indexers/factory.go @@ -15,6 +15,8 @@ package indexers import ( + "fmt" + "github.com/cloud-bulldozer/kube-burner/log" "github.com/cloud-bulldozer/kube-burner/pkg/config" ) @@ -39,7 +41,7 @@ func NewIndexer(configSpec config.Spec) (*Indexer, error) { return &indexer, err } } else { - log.Fatalf("Indexer not found: %s", cfg.Type) + return &indexer, fmt.Errorf("Indexer not found: %s", cfg.Type) } return &indexer, nil } diff --git a/pkg/workloads/helpers.go b/pkg/workloads/helpers.go index ed1c116a0..dc0f7f4c6 100644 --- a/pkg/workloads/helpers.go +++ b/pkg/workloads/helpers.go @@ -45,6 +45,7 @@ type WorkloadHelper struct { envVars map[string]string prometheusURL string prometheusToken string + timeout time.Duration Metadata clusterMetadata alerting bool ocpConfig embed.FS @@ -72,12 +73,13 @@ type clusterMetadata struct { } // NewWorkloadHelper initializes workloadHelper -func NewWorkloadHelper(envVars map[string]string, alerting bool, ocpConfig embed.FS, da discovery.Agent) WorkloadHelper { +func NewWorkloadHelper(envVars map[string]string, alerting bool, ocpConfig embed.FS, da discovery.Agent, timeout time.Duration) WorkloadHelper { return WorkloadHelper{ envVars: envVars, alerting: alerting, ocpConfig: ocpConfig, discoveryAgent: da, + timeout: timeout, } } @@ -182,7 +184,7 @@ func (wh *WorkloadHelper) run(workload string) { log.Fatal(err) } } - rc, err = burner.Run(configSpec, wh.Metadata.UUID, p, alertM) + rc, err = burner.Run(configSpec, wh.Metadata.UUID, p, alertM, wh.timeout) if err != nil { log.Fatal(err) } diff --git a/test/run-ocp.sh b/test/run-ocp.sh index 9139bf7ef..a2fa57cce 100755 --- a/test/run-ocp.sh +++ b/test/run-ocp.sh @@ -1,8 +1,11 @@ #!/bin/bash -trap 'cleanup' ERR +trap 'die' ERR -cleanup() { +die() { + if [ -n $1 ]; then + echo $1 + fi oc delete ns -l kube-burner-uuid=${UUID} --ignore-not-found exit 1 } @@ -22,3 +25,7 @@ kube-burner ocp cluster-density --iterations=3 --churn-duration=5m ${COMMON_FLAG echo "Running node-density-cni wrapper" kube-burner ocp node-density-cni --pods-per-node=75 --gc=false --uuid=${UUID} oc delete ns -l kube-burner-uuid=${UUID} +rc=$(kube-burner ocp cluster-density --iterations=1 --churn-duration=5m ${COMMON_FLAGS} --timeout=1s) +if [ ${rc} != 2 ]; then + die "Kube-burner timed out but its exit code was ${rc} != 2" +fi