From 8334f2abd5ce00bc219cb980dd789e6d4934c085 Mon Sep 17 00:00:00 2001 From: Raul Sevilla Date: Tue, 15 Dec 2020 13:05:13 +0100 Subject: [PATCH] Latency threholds feature Signed-off-by: Raul Sevilla --- README.md | 4 +-- cmd/kube-burner.go | 5 ++- docs/index.md | 4 +-- docs/measurements.md | 44 +++++++++++++++++++++-- pkg/burner/create.go | 1 - pkg/burner/delete.go | 13 +++---- pkg/config/config.go | 11 ++++++ pkg/config/types.go | 21 +++++++++-- pkg/config/validations.go | 47 +++++++++++++++++++++++++ pkg/measurements/factory.go | 12 +++++-- pkg/measurements/pod_latency.go | 62 +++++++++++++++++++++++---------- pkg/measurements/pprof.go | 4 +-- 12 files changed, 186 insertions(+), 42 deletions(-) create mode 100644 pkg/config/validations.go diff --git a/README.md b/README.md index 835a80b2f..36998a5df 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ # What is Kube-burner? -Kube-burner is a tool aimed at stressing kubernetes clusters. The main functionallity it provides can be summareized these three steps: +Kube-burner is a tool aimed at stressing kubernetes clusters. The main functionallity it provides can be summarized in these three steps: - Create/delete the objects declared in the jobs. - Collect desired on-cluster prometheus metrics. @@ -20,7 +20,7 @@ Documentation is available at https://kube-burner.readthedocs.io/ ## Downloading Kube-burner -In case you want to start tinkering with `kube-burner` now: +In case you want to start tinkering with Kube-burner now: - You can find the binaries in the [releases section of the repository](https://github.com/cloud-bulldozer/kube-burner/releases). - There's also a container image available at [quay](https://quay.io/repository/cloud-bulldozer/kube-burner?tab=tags). diff --git a/cmd/kube-burner.go b/cmd/kube-burner.go index d0bf33ffc..007c6526d 100644 --- a/cmd/kube-burner.go +++ b/cmd/kube-burner.go @@ -274,6 +274,7 @@ func steps(uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager) measurements.NewMeasurementFactory(burner.RestConfig, job.Config, uuid, indexer) measurements.Register() measurements.Start() + job.Start = time.Now().UTC() job.RunCreateJob() if job.Config.VerifyObjects { verification = job.Verify() @@ -283,15 +284,17 @@ func steps(uuid string, p *prometheus.Prometheus, alertM *alerting.AlertManager) } } // We stop and index measurements per job - measurements.Stop() + rc = measurements.Stop() // Verification failed if job.Config.VerifyObjects && !verification { log.Error("Object verification failed") rc = 1 } case config.DeletionJob: + job.Start = time.Now().UTC() job.RunDeleteJob() } + job.End = time.Now().UTC() elapsedTime := job.End.Sub(job.Start).Seconds() log.Infof("Job %s took %.2f seconds", job.Config.Name, elapsedTime) if config.ConfigSpec.GlobalConfig.IndexerConfig.Enabled { diff --git a/docs/index.md b/docs/index.md index 9ba7ff98f..06ca4653e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,7 @@ # What's this? -Kube-burner is a tool aimed at stressing kubernetes clusters. The main functionallity it provides can be summareized these three steps: +Kube-burner is a tool aimed at stressing kubernetes clusters. The main functionallity it provides can be summarized in these three steps: - Create/delete the objects declared in the jobs. - Collect desired on-cluster prometheus metrics. @@ -18,7 +18,7 @@ Kube-burner is a tool aimed at stressing kubernetes clusters. The main functiona # Downloading Kube-burner -In case you want to start tinkering with `kube-burner` now: +In case you want to start tinkering with Kube-burner now: - You can find the binaries in the [releases section of the repository](https://github.com/cloud-bulldozer/kube-burner/releases). - There's also a container image available at [quay](https://quay.io/repository/cloud-bulldozer/kube-burner?tab=tags). diff --git a/docs/measurements.md b/docs/measurements.md index 9a392634a..0b38e966c 100644 --- a/docs/measurements.md +++ b/docs/measurements.md @@ -33,11 +33,13 @@ Pod latency sample: } ``` +--- + Pod latency quantile sample: ```json { - "quantileName": "podReady", + "quantileName": "Ready", "uuid": "23c0b5fd-c17e-4326-a389-b3aebc774c82", "P99": 3774, "P95": 3510, @@ -49,7 +51,7 @@ Pod latency quantile sample: "jobName": "kubelet-density" }, { - "quantileName": "scheduling", + "quantileName": "PodScheduled", "uuid": "23c0b5fd-c17e-4326-a389-b3aebc774c82", "P99": 64, "P95": 8, @@ -62,10 +64,48 @@ Pod latency quantile sample: } ``` +Where quantileName matches with pod conditions and can be: +- PodScheduled: Pod has been scheduled in to a node. +- ContainersReady: Indicates whether all containers in the pod are ready. +- Initialized: All init containers in the pod have started successfully +- Ready: The pod is able to service reqeusts and should be added to the load balancing pools of all matching services. + +And the metrics are: +- P99: 99th percentile of the pod condition. +- P95: 95th percentile of the pod condition. +- P50: 50th percentile of the pod condition. +- Max: Maximum value of the condition. +- Avg: Average value of the condition. + More information about the pod lifecycle can be found in the [kubernetes docs](https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/). **Note**: The __esIndex__ option can be used to configure the ES index where metrics will be indexed. +### Pod latency thresholds + +It's possible to stablish pod latency thresholds in the different pod conditions and metrics through the option `thresholds` from the podLatency measurement: + +For example, the example below establish a threshold of 2000ms in the P99 metric of the `Ready` condition. + +```yaml + measurements: + - name: podLatency + esIndex: kube-burner-podlatency + thresholds: + - conditionType: Ready + metric: P99 + thrshold: 2000ms +``` + +Latency thresholds are evaluated at the end of each job, showing an informative message like the following: + +``` +INFO[2020-12-15 12:37:08] Evaluating latency thresholds +WARN[2020-12-15 12:37:08] P99 Ready latency (2929ms) higher than configured threshold: 2000ms +``` + +**In case of not meeting any of the configured thresholds, like the example above, Kube-burner return code will be 1** + ## Pprof collection This measurement takes care of collecting golang profiling information from pods. To do so, kube-burner connects to pods with the given labels running in certain namespaces. This measurement uses an implementation similar to `kubectl exec`, and as soon as it connects to one pod it executes the command `curl ` to get the pprof data. Pprof files are collected in a regular basis given by the parameter `pprofInterval` and these files are stored in the directory configured by the parameter `pprofDirectory` which by default is `pprof`. diff --git a/pkg/burner/create.go b/pkg/burner/create.go index be9a7cadc..9f290e5ab 100644 --- a/pkg/burner/create.go +++ b/pkg/burner/create.go @@ -154,7 +154,6 @@ func (ex *Executor) RunCreateJob() { } } } - ex.End = time.Now().UTC() } func (ex *Executor) replicaHandler(objectIndex int, obj object, ns string, iteration int, wg *sync.WaitGroup) { diff --git a/pkg/burner/delete.go b/pkg/burner/delete.go index 5bc32e642..d49ef6b4e 100644 --- a/pkg/burner/delete.go +++ b/pkg/burner/delete.go @@ -56,11 +56,9 @@ func setupDeleteJob(jobConfig config.Job) Executor { // RunDeleteJob executes a deletion job func (ex *Executor) RunDeleteJob() { var wg sync.WaitGroup - var err error var resp *unstructured.UnstructuredList log.Infof("Triggering job: %s", ex.Config.Name) - ex.Start = time.Now().UTC() - RestConfig, err = config.GetRestConfig(ex.Config.QPS, ex.Config.Burst) + RestConfig, err := config.GetRestConfig(ex.Config.QPS, ex.Config.Burst) if err != nil { log.Fatalf("Error creating restConfig for kube-burner: %s", err) } @@ -92,16 +90,16 @@ func (ex *Executor) RunDeleteJob() { ex.limiter.Wait(context.TODO()) err := dynamicClient.Resource(obj.gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{}) if err != nil { - log.Errorf("Error found removing %s %s from ns %s: %s", item.GetKind(), item.GetName(), item.GetNamespace(), err) + log.Errorf("Error found removing %s %s: %s", item.GetKind(), item.GetName(), err) } else { - log.Infof("Removing %s %s from ns %s", item.GetKind(), item.GetName(), item.GetNamespace()) + log.Infof("Removing %s %s", item.GetKind(), item.GetName()) } }(item) } if ex.Config.WaitForDeletion { wg.Wait() - wait.PollImmediateInfinite(1*time.Second, func() (bool, error) { - resp, err := dynamicClient.Resource(obj.gvr).List(context.TODO(), listOptions) + wait.PollImmediateInfinite(2*time.Second, func() (bool, error) { + resp, err = dynamicClient.Resource(obj.gvr).List(context.TODO(), listOptions) if err != nil { return false, err } @@ -113,5 +111,4 @@ func (ex *Executor) RunDeleteJob() { }) } } - ex.End = time.Now().UTC() } diff --git a/pkg/config/config.go b/pkg/config/config.go index 6fd1acb4f..fecc73dc3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -87,6 +87,17 @@ func Parse(c string, jobsRequired bool) error { if err := validateDNS1123(); err != nil { return err } + for _, m := range ConfigSpec.GlobalConfig.Measurements { + switch m.Name { + case string(podLatency): + err = validatePodLatencyCfg(m) + case string(pprof): + err = validatePprofCfg(m) + } + if err != nil { + log.Fatalf("Config validataion error: %s", err) + } + } } return nil } diff --git a/pkg/config/types.go b/pkg/config/types.go index f4f86a4a6..761cbdb6a 100644 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -14,16 +14,25 @@ package config -import "time" +import ( + "time" + + v1 "k8s.io/api/core/v1" +) // JobType type of job type JobType string +// Measurement type +type MeasurementType string + const ( // CreationJob used to create objects CreationJob JobType = "create" // DeletionJob used to delete objects - DeletionJob JobType = "delete" + DeletionJob JobType = "delete" + podLatency MeasurementType = "podLatency" + pprof MeasurementType = "pprof" ) // Spec configuration root @@ -49,6 +58,12 @@ type IndexerConfig struct { Enabled bool `yaml:"enabled"` } +type LatencyThrehold struct { + ConditionType v1.PodConditionType `yaml:"conditionType"` + Metric string `yaml:"metric"` + Threshold time.Duration `yaml:"threshold"` +} + // PProftarget pprof targets to collect type PProftarget struct { // Name pprof target name @@ -70,6 +85,8 @@ type Measurement struct { // ESIndex contains the ElasticSearch index used to // index the metrics ESIndex string `yaml:"esIndex"` + // LatencyThreholds config + LatencyThresholds []LatencyThrehold `yaml:"thresholds"` // PPRofTargets targets config PProfTargets []PProftarget `yaml:"pprofTargets"` // PPRofInterval pprof collect interval diff --git a/pkg/config/validations.go b/pkg/config/validations.go new file mode 100644 index 000000000..fd3d96b66 --- /dev/null +++ b/pkg/config/validations.go @@ -0,0 +1,47 @@ +// Copyright 2020 The Kube-burner Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "strings" + + v1 "k8s.io/api/core/v1" +) + +func validatePodLatencyCfg(cfg Measurement) error { + var metricFound bool + var latencyMetrics []string = []string{"P99", "P95", "P50", "Avg", "Max"} + for _, th := range cfg.LatencyThresholds { + if th.ConditionType == v1.ContainersReady || th.ConditionType == v1.PodInitialized || th.ConditionType == v1.PodReady || th.ConditionType == v1.PodScheduled { + for _, lm := range latencyMetrics { + if th.Metric == lm { + metricFound = true + break + } + } + if !metricFound { + return fmt.Errorf("Unsupported metric %s in podLatency measurement, supported are: %s", th.Metric, strings.Join(latencyMetrics, ", ")) + } + } else { + return fmt.Errorf("Unsupported pod condition type in podLatency measurement: %s", th.ConditionType) + } + } + return nil +} + +func validatePprofCfg(cfg Measurement) error { + return nil +} diff --git a/pkg/measurements/factory.go b/pkg/measurements/factory.go index 36adf5c74..8aee3a4d8 100644 --- a/pkg/measurements/factory.go +++ b/pkg/measurements/factory.go @@ -34,7 +34,7 @@ type measurementFactory struct { type measurement interface { start() - stop() error + stop() (int, error) setConfig(config.Measurement) } @@ -85,11 +85,17 @@ func Start() { } // Stop stops registered measurements -func Stop() { +func Stop() int { + var err error + var r, rc int for name, measurement := range factory.createFuncs { log.Infof("Stopping measurement: %s", name) - if err := measurement.stop(); err != nil { + if r, err = measurement.stop(); err != nil { log.Errorf("Error stopping measurement %s: %s", name, err) } + if r != 0 { + rc = r + } } + return rc } diff --git a/pkg/measurements/pod_latency.go b/pkg/measurements/pod_latency.go index af3791329..da8a7ecce 100644 --- a/pkg/measurements/pod_latency.go +++ b/pkg/measurements/pod_latency.go @@ -20,6 +20,7 @@ import ( "math" "os" "path" + "reflect" "sort" "strings" "time" @@ -49,16 +50,16 @@ type podMetric struct { } type podLatencyQuantiles struct { - QuantileName string `json:"quantileName"` - UUID string `json:"uuid"` - P99 int `json:"P99"` - P95 int `json:"P95"` - P50 int `json:"P50"` - Max int `json:"max"` - Avg float64 `json:"avg"` - Timestamp time.Time `json:"timestamp"` - MetricName string `json:"metricName"` - JobName string `json:"jobName"` + QuantileName v1.PodConditionType `json:"quantileName"` + UUID string `json:"uuid"` + P99 int `json:"P99"` + P95 int `json:"P95"` + P50 int `json:"P50"` + Max int `json:"max"` + Avg int `json:"avg"` + Timestamp time.Time `json:"timestamp"` + MetricName string `json:"metricName"` + JobName string `json:"jobName"` } var podQuantiles []interface{} @@ -190,9 +191,10 @@ func (p *podLatency) startAndSync() error { } // Stop stops podLatency measurement -func (p *podLatency) stop() error { +func (p *podLatency) stop() (int, error) { normalizeMetrics() calcQuantiles() + rc := p.checkThreshold() defer close(p.stopChannel) timeoutCh := make(chan struct{}) timeoutTimer := time.AfterFunc(informerTimeout, func() { @@ -200,7 +202,7 @@ func (p *podLatency) stop() error { }) defer timeoutTimer.Stop() if !cache.WaitForCacheSync(timeoutCh, p.informer.HasSynced) { - return fmt.Errorf("Pod-latency: Timed out waiting for caches to sync") + return rc, fmt.Errorf("Pod-latency: Timed out waiting for caches to sync") } if factory.globalConfig.WriteToFile { if err := p.writeToFile(); err != nil { @@ -210,7 +212,7 @@ func (p *podLatency) stop() error { if factory.globalConfig.IndexerConfig.Enabled { p.index() } - return nil + return rc, nil } // index sends metrics to the configured indexer @@ -235,12 +237,12 @@ func normalizeMetrics() { func calcQuantiles() { quantiles := []float64{0.5, 0.95, 0.99} - quantileMap := map[string][]int{} + quantileMap := map[v1.PodConditionType][]int{} for _, normLatency := range normLatencies { - quantileMap["scheduling"] = append(quantileMap["scheduling"], normLatency.(podMetric).SchedulingLatency) - quantileMap["containersReady"] = append(quantileMap["containersReady"], normLatency.(podMetric).ContainersReadyLatency) - quantileMap["initialized"] = append(quantileMap["initialized"], normLatency.(podMetric).InitializedLatency) - quantileMap["podReady"] = append(quantileMap["podReady"], normLatency.(podMetric).PodReadyLatency) + quantileMap[v1.PodScheduled] = append(quantileMap[v1.PodScheduled], normLatency.(podMetric).SchedulingLatency) + quantileMap[v1.ContainersReady] = append(quantileMap[v1.ContainersReady], normLatency.(podMetric).ContainersReadyLatency) + quantileMap[v1.PodInitialized] = append(quantileMap[v1.PodInitialized], normLatency.(podMetric).InitializedLatency) + quantileMap[v1.PodReady] = append(quantileMap[v1.PodReady], normLatency.(podMetric).PodReadyLatency) } for quantileName, v := range quantileMap { podQ := podLatencyQuantiles{ @@ -263,7 +265,7 @@ func calcQuantiles() { for _, n := range v { sum += n } - podQ.Avg = math.Round(100*float64(sum)/float64(length)) / 100 + podQ.Avg = int(math.Round(float64(sum) / float64(length))) podQuantiles = append(podQuantiles, podQ) } } @@ -278,3 +280,25 @@ func (plq *podLatencyQuantiles) setQuantile(quantile float64, qValue int) { plq.P99 = qValue } } + +func (p *podLatency) checkThreshold() int { + var rc int + log.Info("Evaluating latency thresholds") + for _, phase := range p.config.LatencyThresholds { + for _, pq := range podQuantiles { + if phase.ConditionType == pq.(podLatencyQuantiles).QuantileName { + // Required to acccess the attribute by name + r := reflect.ValueOf(pq.(podLatencyQuantiles)) + v := r.FieldByName(phase.Metric).Int() + if v > phase.Threshold.Milliseconds() { + log.Warnf("%s %s latency (%dms) higher than configured threshold: %v", phase.Metric, phase.ConditionType, v, phase.Threshold) + rc = 1 + } else { + log.Infof("%s %s latency (%dms) meets the configured threshold: %v", phase.Metric, phase.ConditionType, v, phase.Threshold) + } + continue + } + } + } + return rc +} diff --git a/pkg/measurements/pprof.go b/pkg/measurements/pprof.go index 811ff13f2..2c9b68d17 100644 --- a/pkg/measurements/pprof.go +++ b/pkg/measurements/pprof.go @@ -135,7 +135,7 @@ func (p *pprof) getPProf() { wg.Wait() } -func (p *pprof) stop() error { +func (p *pprof) stop() (int, error) { p.stopChannel <- true - return nil + return 0, nil }