From 258d22306121eaa21430b8dc8a56ea5fe9146800 Mon Sep 17 00:00:00 2001 From: vankichi Date: Fri, 8 Mar 2024 10:08:24 +0900 Subject: [PATCH] :chart_with_upwards_trend: add bench job status Signed-off-by: vankichi --- internal/config/benchmark.go | 2 +- .../metrics/tools/benchmark/benchmark.go | 194 ++++++++++++++++++ .../10-vald-benchmark-operator.yaml | 2 +- .../benchmark/operator/service/operator.go | 40 ++++ .../benchmark/operator/usecase/benchmarkd.go | 2 + 5 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 internal/observability/metrics/tools/benchmark/benchmark.go diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index cb7039dfb06..fd45f4f2933 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -234,7 +234,7 @@ func (b *BenchmarkScenario) Bind() *BenchmarkScenario { // BenchmarkJobImageInfo represents the docker image information for benchmark job. type BenchmarkJobImageInfo struct { - Image string `info:"image" json:"image,omitempty" yaml:"image"` + Image string `info:"image" json:"image,omitempty" yaml:"image"` PullPolicy string `info:"pull_policy" json:"pull_policy,omitempty" yaml:"pull_policy"` } diff --git a/internal/observability/metrics/tools/benchmark/benchmark.go b/internal/observability/metrics/tools/benchmark/benchmark.go new file mode 100644 index 00000000000..ff64e8ddc56 --- /dev/null +++ b/internal/observability/metrics/tools/benchmark/benchmark.go @@ -0,0 +1,194 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package benchmark + +import ( + "context" + + v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" + "github.com/vdaas/vald/internal/observability/metrics" + "github.com/vdaas/vald/pkg/tools/benchmark/operator/service" + api "go.opentelemetry.io/otel/metric" + view "go.opentelemetry.io/otel/sdk/metric" +) + +const ( + appliedScenarioCount = "benchmark_operator_applied_scenario" + appliedScenarioCountDescription = "Benchmark Operator applied scenario count" + + runningScenarioCount = "benchmark_operator_running_scenario" + runningScenarioCountDescription = "Benchmark Operator running scenario count" + + completeScenarioCount = "benchmark_operator_complete_scenario" + completeScenarioCountDescription = "Benchmark Operator complete scenario count" + + appliedBenchmarkJobCount = "benchmark_operator_applied_benchmark_job" + appliedBenchmarkJobCountDescription = "Benchmark Operator applied benchmark job count" + + runningBenchmarkJobCount = "benchmark_operator_running_benchmark_job" + runningBenchmarkJobCountDescription = "Benchmark Operator running benchmark job count" + + completeBenchmarkJobCount = "benchmark_operator_complete_benchmark_job" + completeBenchmarkJobCountDescription = "Benchmark Operator complete benchmark job count" +) + +const ( + applied = "applied" + running = "running" + complete = "complete" +) + +type operatorMetrics struct { + op service.Operator +} + +func New(om service.Operator) metrics.Metric { + return &operatorMetrics{ + op: om, + } +} + +// TODO: implement here +func (om *operatorMetrics) View() ([]metrics.View, error) { + return []metrics.View{ + view.NewView( + view.Instrument{ + Name: appliedScenarioCount, + Description: appliedScenarioCountDescription, + }, + view.Stream{ + Aggregation: view.AggregationLastValue{}, + }, + ), + view.NewView( + view.Instrument{ + Name: runningScenarioCount, + Description: runningScenarioCountDescription, + }, + view.Stream{ + Aggregation: view.AggregationLastValue{}, + }, + ), + view.NewView( + view.Instrument{ + Name: completeScenarioCount, + Description: completeScenarioCountDescription, + }, + view.Stream{ + Aggregation: view.AggregationLastValue{}, + }, + ), + }, nil +} + +// TODO: implement here +func (om *operatorMetrics) Register(m metrics.Meter) error { + appliedScCount, err := m.Int64ObservableCounter( + appliedScenarioCount, + metrics.WithDescription(appliedScenarioCountDescription), + metrics.WithUnit(metrics.Dimensionless), + ) + if err != nil { + return err + } + runningScCount, err := m.Int64ObservableCounter( + runningScenarioCount, + metrics.WithDescription(runningScenarioCountDescription), + metrics.WithUnit(metrics.Dimensionless), + ) + if err != nil { + return err + } + completeScCount, err := m.Int64ObservableCounter( + completeScenarioCount, + metrics.WithDescription(completeScenarioCountDescription), + metrics.WithUnit(metrics.Dimensionless), + ) + if err != nil { + return err + } + + appliedBjCount, err := m.Int64ObservableCounter( + appliedBenchmarkJobCount, + metrics.WithDescription(appliedScenarioCountDescription), + metrics.WithUnit(metrics.Dimensionless), + ) + if err != nil { + return err + } + runningBjCount, err := m.Int64ObservableCounter( + runningBenchmarkJobCount, + metrics.WithDescription(runningScenarioCountDescription), + metrics.WithUnit(metrics.Dimensionless), + ) + if err != nil { + return err + } + completeBjCount, err := m.Int64ObservableCounter( + completeBenchmarkJobCount, + metrics.WithDescription(completeScenarioCountDescription), + metrics.WithUnit(metrics.Dimensionless), + ) + if err != nil { + return err + } + + _, err = m.RegisterCallback( + func(_ context.Context, o api.Observer) error { + // scenario status + sst := map[string]int64{ + applied: 0, + running: 0, + complete: 0, + } + for k, v := range om.op.LenBenchSC() { + sst[applied] += v + if k == v1.BenchmarkScenarioCompleted { + sst[complete] += v + } else { + sst[running] += v + } + } + o.ObserveInt64(appliedScCount, sst[applied]) + o.ObserveInt64(runningScCount, sst[running]) + o.ObserveInt64(completeScCount, sst[complete]) + + // benchmark job status + bst := map[string]int64{ + applied: 0, + running: 0, + complete: 0, + } + for k, v := range om.op.LenBenchBJ() { + sst[applied] += v + if k == v1.BenchmarkJobCompleted { + sst[complete] += v + } else { + sst[running] += v + } + } + o.ObserveInt64(appliedBjCount, bst[applied]) + o.ObserveInt64(runningBjCount, bst[running]) + o.ObserveInt64(completeBjCount, bst[complete]) + return nil + }, + appliedScCount, + runningScCount, + completeScCount, + appliedBjCount, + runningBjCount, + completeBjCount, + ) + return nil +} diff --git a/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml b/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml index 8a2f64bbde9..5747988b61c 100644 --- a/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml +++ b/k8s/metrics/grafana/dashboards/10-vald-benchmark-operator.yaml @@ -1574,7 +1574,7 @@ data: }, "timezone": "", "title": "Vald Benchmark Operator", - "uid": "JkemcMB", + "uid": "fdewjfx1jkxz4b", "version": 1, "weekStart": "" } diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 9ddf86a82a3..5100ec52f53 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -38,6 +38,8 @@ import ( type Operator interface { PreStart(context.Context) error Start(context.Context) (<-chan error, error) + LenBenchSC() map[v1.ValdBenchmarkScenarioStatus]int64 + LenBenchBJ() map[v1.BenchmarkJobStatus]int64 } type scenario struct { @@ -640,6 +642,44 @@ func (o *operator) checkAtomics() error { return nil } +func (o *operator) LenBenchSC() map[v1.ValdBenchmarkScenarioStatus]int64 { + m := map[v1.ValdBenchmarkScenarioStatus]int64{ + v1.BenchmarkScenarioAvailable: 0, + v1.BenchmarkScenarioHealthy: 0, + v1.BenchmarkScenarioNotReady: 0, + v1.BenchmarkScenarioCompleted: 0, + } + if sc := o.getAtomicScenario(); sc != nil { + for _, s := range sc { + if _, ok := m[s.Crd.Status]; ok { + m[s.Crd.Status] += 1 + } else { + m[s.Crd.Status] = 1 + } + } + } + return m +} + +func (o *operator) LenBenchBJ() map[v1.BenchmarkJobStatus]int64 { + m := map[v1.BenchmarkJobStatus]int64{ + v1.BenchmarkJobAvailable: 0, + v1.BenchmarkJobHealthy: 0, + v1.BenchmarkJobNotReady: 0, + v1.BenchmarkJobCompleted: 0, + } + if bjs := o.getAtomicBenchJob(); bjs != nil { + for _, bj := range bjs { + if _, ok := m[bj.Status]; ok { + m[bj.Status] += 1 + } else { + m[bj.Status] = 1 + } + } + } + return m +} + func (*operator) PreStart(context.Context) error { log.Infof("[benchmark scenario operator] start vald benchmark scenario operator") return nil diff --git a/pkg/tools/benchmark/operator/usecase/benchmarkd.go b/pkg/tools/benchmark/operator/usecase/benchmarkd.go index 134226967ac..5f4343f615d 100644 --- a/pkg/tools/benchmark/operator/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/operator/usecase/benchmarkd.go @@ -29,6 +29,7 @@ import ( "github.com/vdaas/vald/internal/observability" backoffmetrics "github.com/vdaas/vald/internal/observability/metrics/backoff" infometrics "github.com/vdaas/vald/internal/observability/metrics/info" + benchmarkmetrics "github.com/vdaas/vald/internal/observability/metrics/tools/benchmark" "github.com/vdaas/vald/internal/runner" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/servers/server" @@ -96,6 +97,7 @@ func New(cfg *config.Config) (r runner.Runner, err error) { if cfg.Observability.Enabled { obs, err = observability.NewWithConfig( cfg.Observability, + benchmarkmetrics.New(operator), infometrics.New("benchmark_operator_info", "Benchmark Operator info", *cfg.JobImage), backoffmetrics.New(), )