Skip to content

Commit

Permalink
📈 add bench job status
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Mar 13, 2024
1 parent f0ebf37 commit cb0c47f
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 2 deletions.
2 changes: 1 addition & 1 deletion internal/config/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (b *BenchmarkScenario) Bind() *BenchmarkScenario {

// BenchmarkJobImageInfo represents the docker image information for benchmark job.
type BenchmarkJobImageInfo struct {
Image string `info:"image" json:"image,omitempty" yaml:"image"`
Image string `info:"image" json:"image,omitempty" yaml:"image"`
PullPolicy string `info:"pull_policy" json:"pull_policy,omitempty" yaml:"pull_policy"`
}

Expand Down
194 changes: 194 additions & 0 deletions internal/observability/metrics/tools/benchmark/benchmark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright (C) 2019-2024 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package benchmark

import (
"context"

v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1"
"github.com/vdaas/vald/internal/observability/metrics"
"github.com/vdaas/vald/pkg/tools/benchmark/operator/service"
api "go.opentelemetry.io/otel/metric"
view "go.opentelemetry.io/otel/sdk/metric"
)

const (
appliedScenarioCount = "benchmark_operator_applied_scenario"
appliedScenarioCountDescription = "Benchmark Operator applied scenario count"

runningScenarioCount = "benchmark_operator_running_scenario"
runningScenarioCountDescription = "Benchmark Operator running scenario count"

completeScenarioCount = "benchmark_operator_complete_scenario"
completeScenarioCountDescription = "Benchmark Operator complete scenario count"

appliedBenchmarkJobCount = "benchmark_operator_applied_benchmark_job"
appliedBenchmarkJobCountDescription = "Benchmark Operator applied benchmark job count"

runningBenchmarkJobCount = "benchmark_operator_running_benchmark_job"
runningBenchmarkJobCountDescription = "Benchmark Operator running benchmark job count"

completeBenchmarkJobCount = "benchmark_operator_complete_benchmark_job"
completeBenchmarkJobCountDescription = "Benchmark Operator complete benchmark job count"
)

const (
applied = "applied"
running = "running"
complete = "complete"
)

type operatorMetrics struct {
op service.Operator
}

func New(om service.Operator) metrics.Metric {
return &operatorMetrics{
op: om,
}
}

// TODO: implement here
func (om *operatorMetrics) View() ([]metrics.View, error) {
return []metrics.View{
view.NewView(
view.Instrument{
Name: appliedScenarioCount,
Description: appliedScenarioCountDescription,
},
view.Stream{
Aggregation: view.AggregationLastValue{},
},
),
view.NewView(
view.Instrument{
Name: runningScenarioCount,
Description: runningScenarioCountDescription,
},
view.Stream{
Aggregation: view.AggregationLastValue{},
},
),
view.NewView(
view.Instrument{
Name: completeScenarioCount,
Description: completeScenarioCountDescription,
},
view.Stream{
Aggregation: view.AggregationLastValue{},
},
),
}, nil
}

// TODO: implement here
func (om *operatorMetrics) Register(m metrics.Meter) error {
appliedScCount, err := m.Int64ObservableCounter(
appliedScenarioCount,
metrics.WithDescription(appliedScenarioCountDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}
runningScCount, err := m.Int64ObservableCounter(
runningScenarioCount,
metrics.WithDescription(runningScenarioCountDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}
completeScCount, err := m.Int64ObservableCounter(
completeScenarioCount,
metrics.WithDescription(completeScenarioCountDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}

appliedBjCount, err := m.Int64ObservableCounter(
appliedBenchmarkJobCount,
metrics.WithDescription(appliedScenarioCountDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}
runningBjCount, err := m.Int64ObservableCounter(
runningBenchmarkJobCount,
metrics.WithDescription(runningScenarioCountDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}
completeBjCount, err := m.Int64ObservableCounter(
completeBenchmarkJobCount,
metrics.WithDescription(completeScenarioCountDescription),
metrics.WithUnit(metrics.Dimensionless),
)
if err != nil {
return err
}

_, err = m.RegisterCallback(
func(_ context.Context, o api.Observer) error {
// scenario status
sst := map[string]int64{
applied: 0,
running: 0,
complete: 0,
}
for k, v := range om.op.LenBenchSC() {
sst[applied] += v
if k == v1.BenchmarkScenarioCompleted {
sst[complete] += v
} else {
sst[running] += v
}
}
o.ObserveInt64(appliedScCount, sst[applied])
o.ObserveInt64(runningScCount, sst[running])
o.ObserveInt64(completeScCount, sst[complete])

// benchmark job status
bst := map[string]int64{
applied: 0,
running: 0,
complete: 0,
}
for k, v := range om.op.LenBenchBJ() {
sst[applied] += v
if k == v1.BenchmarkJobCompleted {
sst[complete] += v
} else {
sst[running] += v
}
}
o.ObserveInt64(appliedBjCount, bst[applied])
o.ObserveInt64(runningBjCount, bst[running])
o.ObserveInt64(completeBjCount, bst[complete])
return nil
},
appliedScCount,
runningScCount,
completeScCount,
appliedBjCount,
runningBjCount,
completeBjCount,
)
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1574,7 +1574,7 @@ data:
},
"timezone": "",
"title": "Vald Benchmark Operator",
"uid": "JkemcMB",
"uid": "fdewjfx1jkxz4b",
"version": 1,
"weekStart": ""
}
40 changes: 40 additions & 0 deletions pkg/tools/benchmark/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
type Operator interface {
PreStart(context.Context) error
Start(context.Context) (<-chan error, error)
LenBenchSC() map[v1.ValdBenchmarkScenarioStatus]int64
LenBenchBJ() map[v1.BenchmarkJobStatus]int64
}

type scenario struct {
Expand Down Expand Up @@ -640,6 +642,44 @@ func (o *operator) checkAtomics() error {
return nil
}

func (o *operator) LenBenchSC() map[v1.ValdBenchmarkScenarioStatus]int64 {
m := map[v1.ValdBenchmarkScenarioStatus]int64{
v1.BenchmarkScenarioAvailable: 0,
v1.BenchmarkScenarioHealthy: 0,
v1.BenchmarkScenarioNotReady: 0,
v1.BenchmarkScenarioCompleted: 0,
}
if sc := o.getAtomicScenario(); sc != nil {
for _, s := range sc {
if _, ok := m[s.Crd.Status]; ok {
m[s.Crd.Status] += 1
} else {
m[s.Crd.Status] = 1
}
}
}
return m
}

func (o *operator) LenBenchBJ() map[v1.BenchmarkJobStatus]int64 {
m := map[v1.BenchmarkJobStatus]int64{
v1.BenchmarkJobAvailable: 0,
v1.BenchmarkJobHealthy: 0,
v1.BenchmarkJobNotReady: 0,
v1.BenchmarkJobCompleted: 0,
}
if bjs := o.getAtomicBenchJob(); bjs != nil {
for _, bj := range bjs {
if _, ok := m[bj.Status]; ok {
m[bj.Status] += 1
} else {
m[bj.Status] = 1
}
}
}
return m
}

func (*operator) PreStart(context.Context) error {
log.Infof("[benchmark scenario operator] start vald benchmark scenario operator")
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/tools/benchmark/operator/usecase/benchmarkd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/vdaas/vald/internal/observability"
backoffmetrics "github.com/vdaas/vald/internal/observability/metrics/backoff"
infometrics "github.com/vdaas/vald/internal/observability/metrics/info"
benchmarkmetrics "github.com/vdaas/vald/internal/observability/metrics/tools/benchmark"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/servers/server"
Expand Down Expand Up @@ -96,6 +97,7 @@ func New(cfg *config.Config) (r runner.Runner, err error) {
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
benchmarkmetrics.New(operator),
infometrics.New("benchmark_operator_info", "Benchmark Operator info", *cfg.JobImage),
backoffmetrics.New(),
)
Expand Down

0 comments on commit cb0c47f

Please sign in to comment.