From bfe9068ee00ed761cb60e5f609af93e4c2e7cc99 Mon Sep 17 00:00:00 2001 From: vankichi Date: Thu, 9 Mar 2023 11:07:59 +0900 Subject: [PATCH] :sparkles: impl benchmark jobs Signed-off-by: vankichi --- internal/config/benchmark.go | 10 +- internal/errors/benchmark.go | 5 + .../k8s/crd/benchmark/valdbenchmarkjob.yaml | 4 + .../crd/benchmark/valdbenchmarkscenario.yaml | 4 + internal/test/data/hdf5/hdf5.go | 22 +++++ pkg/tools/benchmark/job/service/insert.go | 92 ++++++++++++++++++ pkg/tools/benchmark/job/service/job.go | 23 ++++- pkg/tools/benchmark/job/service/option.go | 18 ++-- pkg/tools/benchmark/job/service/remove.go | 90 ++++++++++++++++++ pkg/tools/benchmark/job/service/search.go | 11 ++- pkg/tools/benchmark/job/service/update.go | 93 +++++++++++++++++++ pkg/tools/benchmark/job/service/upsert.go | 93 +++++++++++++++++++ .../benchmark/operator/service/operator.go | 71 ++++++++++---- 13 files changed, 502 insertions(+), 34 deletions(-) create mode 100644 pkg/tools/benchmark/job/service/insert.go create mode 100644 pkg/tools/benchmark/job/service/remove.go create mode 100644 pkg/tools/benchmark/job/service/update.go create mode 100644 pkg/tools/benchmark/job/service/upsert.go diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index 83fb41b2dde..b2682cf8cbf 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -80,14 +80,16 @@ type InsertConfig struct { // UpdateConfig defines the desired state of update config type UpdateConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` } // UpsertConfig defines the desired state of upsert config type UpsertConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` } // SearchConfig defines the desired state of search config diff --git a/internal/errors/benchmark.go b/internal/errors/benchmark.go index 368d8a24270..61202025fb9 100644 --- a/internal/errors/benchmark.go +++ b/internal/errors/benchmark.go @@ -29,4 +29,9 @@ var ( ErrFailedToCreateJob = func(err error, jn string) error { return Wrapf(err, "could not create job: %s ", jn) } + + // ErrMismatchAtomics represents a function to generate an error that mismatch each atomic.Pointer stored corresponding to benchmark tasks. + ErrMismatchAtomics = func(job, benchjob, benchscenario interface{}) error { + return Errorf("mismatch atomics: job=%v\tbenchjob=%v\tbenchscenario=%v", job, benchjob, benchscenario) + } ) diff --git a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml index cdefc9dcaa8..6accb8e0eab 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml @@ -168,6 +168,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -176,6 +178,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml index 135dee3c302..bc36d49dfb8 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml @@ -206,6 +206,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -214,6 +216,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/test/data/hdf5/hdf5.go b/internal/test/data/hdf5/hdf5.go index e83965ec8c1..7c9936a0c57 100644 --- a/internal/test/data/hdf5/hdf5.go +++ b/internal/test/data/hdf5/hdf5.go @@ -33,6 +33,7 @@ type Data interface { Read() error GetName() DatasetName GetPath() string + GetByGroupName(name string) [][]float32 GetTrain() [][]float32 GetTest() [][]float32 GetNeighbors() [][]int @@ -164,6 +165,27 @@ func (d *data) GetPath() string { return d.path } +// TODO: Apply generics +func (d *data) GetByGroupName(name string) [][]float32 { + switch name { + case "train": + return d.GetTrain() + case "test": + return d.GetTest() + case "neighbors": + l := d.GetNeighbors() + r := make([][]float32, 0) + for x := range l { + for y, z := range l[x] { + r[x][y] = float32(z) + } + } + return r + default: + return nil + } +} + func (d *data) GetTrain() [][]float32 { return d.train } diff --git a/pkg/tools/benchmark/job/service/insert.go b/pkg/tools/benchmark/job/service/insert.go new file mode 100644 index 00000000000..23ea5da2f0c --- /dev/null +++ b/pkg/tools/benchmark/job/service/insert.go @@ -0,0 +1,92 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) insert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking insert") + if j.insertConfig == nil { + err := errors.NewErrInvalidOption("insertConfig", j.insertConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.insertConfig.Timestamp) + cfg := &payload.Insert_Config{ + SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start insert: iter = %d", i) + bres := testing.Benchmark(func(b *testing.B) { + b.Helper() + b.ResetTimer() + b.StartTimer() + start := time.Now() + id := i + j.dataset.Indexes + res, err := j.client.Insert(ctx, &payload.Insert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + b.StopTimer() + latency := time.Since(start) + b.ReportMetric(float64(latency.Microseconds()), "latency") + log.Infof("[benchmark job] iter=%d, bN=%d, Name=%s, Uuid=%s, Ips=%v, latency=%d", i, b.N, res.Name, res.Uuid, res.Ips, latency.Microseconds()) + }) + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, bres) + } + + log.Info("[benchmark job] Finish benchmarking insert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 69c0d43f0ee..5995d6be727 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -46,15 +46,27 @@ type jobType int const ( USERDEFINED jobType = iota + INSERT SEARCH + UPDATE + UPSERT + REMOVE ) func (jt jobType) String() string { switch jt { case USERDEFINED: return "userdefined" + case INSERT: + return "insert" case SEARCH: return "search" + case UPDATE: + return "update" + case UPSERT: + return "upsert" + case REMOVE: + return "remove" } return "" } @@ -91,8 +103,16 @@ func New(opts ...Option) (Job, error) { opt := WithJobFunc(j.jobFunc) err := opt(j) return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + case INSERT: + j.jobFunc = j.insert case SEARCH: j.jobFunc = j.search + case UPDATE: + j.jobFunc = j.update + case UPSERT: + j.jobFunc = j.upsert + case REMOVE: + j.jobFunc = j.remove } } else if j.jobType != USERDEFINED { log.Warnf("[benchmark job] userdefined jobFunc is set but jobType is set %s", j.jobType.String()) @@ -208,13 +228,14 @@ func calcRecall(linearRes, searchRes []*payload.Object_Distance) (recall float64 return recall / float64(len(linearRes)) } -func genVec(data [][]float32, cfg *config.BenchmarkDataset) [][]float32 { +func (j *job) genVec(cfg *config.BenchmarkDataset) [][]float32 { start := cfg.Range.Start end := cfg.Range.End if (end - start) < cfg.Indexes { end = cfg.Indexes } num := end - start + 1 + data := j.hdf5.GetByGroupName(cfg.Group) if len(data) < num { num = len(data) end = start + num + 1 diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 998f9f87001..d4384112299 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -147,8 +147,16 @@ func WithJobTypeByString(t string) Option { switch t { case "userdefined": jt = USERDEFINED + case "insert": + jt = INSERT case "search": jt = SEARCH + case "update": + jt = UPDATE + case "upsert": + jt = UPSERT + case "remove": + jt = REMOVE } return WithJobType(jt) } @@ -156,14 +164,10 @@ func WithJobTypeByString(t string) Option { // WithJobType sets the jobType for running benchmark job. func WithJobType(jt jobType) Option { return func(j *job) error { - switch jt { - case USERDEFINED: - j.jobType = jt - case SEARCH: - j.jobType = jt - default: - return errors.NewErrInvalidOption("jobType", jt) + if len(jt.String()) == 0 { + return errors.NewErrInvalidOption("jobType", jt.String()) } + j.jobType = jt return nil } } diff --git a/pkg/tools/benchmark/job/service/remove.go b/pkg/tools/benchmark/job/service/remove.go new file mode 100644 index 00000000000..f453489a910 --- /dev/null +++ b/pkg/tools/benchmark/job/service/remove.go @@ -0,0 +1,90 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) remove(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking remove") + if j.removeConfig == nil { + err := errors.NewErrInvalidOption("removeConfig", j.removeConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.removeConfig.Timestamp) + cfg := &payload.Remove_Config{ + SkipStrictExistCheck: j.removeConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start insert: iter = %d", i) + bres := testing.Benchmark(func(b *testing.B) { + b.Helper() + b.ResetTimer() + b.StartTimer() + start := time.Now() + res, err := j.client.Remove(ctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(i + j.dataset.Indexes), + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + b.StopTimer() + latency := time.Since(start) + b.ReportMetric(float64(latency.Microseconds()), "latency") + log.Infof("[benchmark job] iter=%d, b=%d, Name=%s, Uuid=%s, Ips=%v, latency=%d", i, b.N, res.Name, res.Uuid, res.Ips, latency.Microseconds()) + }) + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish remove: iter= %d \n%v\n", i, bres) + } + + log.Info("[benchmark job] Finish benchmarking remove") + return nil +} diff --git a/pkg/tools/benchmark/job/service/search.go b/pkg/tools/benchmark/job/service/search.go index 2d14a6d84bf..1ebfd5a4ad9 100644 --- a/pkg/tools/benchmark/job/service/search.go +++ b/pkg/tools/benchmark/job/service/search.go @@ -44,7 +44,7 @@ func (j *job) search(ctx context.Context, ech chan error) error { } // create data - vecs := genVec(j.hdf5.GetTest(), j.dataset) + vecs := j.genVec(j.dataset) timeout, _ := time.ParseDuration(j.searchConfig.Timeout) cfg := &payload.Search_Config{ Num: uint32(j.searchConfig.Num), @@ -72,8 +72,10 @@ func (j *job) search(ctx context.Context, ech chan error) error { return err } bres := testing.Benchmark(func(b *testing.B) { + log.Info(b.N) b.Helper() b.ResetTimer() + b.StartTimer() start := time.Now() sres, err := j.client.Search(ctx, &payload.Search_Request{ Vector: vecs[i], @@ -91,13 +93,16 @@ func (j *job) search(ctx context.Context, ech chan error) error { break } } - latency := time.Since(start) + b.StopTimer() recall := calcRecall(lres.Results, sres.Results) b.ReportMetric(recall, "recall") + latency := time.Since(start) b.ReportMetric(float64(latency.Microseconds()), "latency") + log.Infof("[benchmark job] iter=%d, bN=%d, recall=%f", i, b.N, recall) + }) // TODO: send metrics to the Prometeus - log.Infof("[benchmark job] Finish search bench: iter= %d \n%#v\n", i, bres) + log.Infof("[benchmark job] Finish search bench: iter= %d \n%v\n", i, bres) } log.Info("[benchmark job] Finish benchmarking search") diff --git a/pkg/tools/benchmark/job/service/update.go b/pkg/tools/benchmark/job/service/update.go new file mode 100644 index 00000000000..93d1bf31704 --- /dev/null +++ b/pkg/tools/benchmark/job/service/update.go @@ -0,0 +1,93 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) update(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking update") + if j.updateConfig == nil { + err := errors.NewErrInvalidOption("updateConfig", j.updateConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.updateConfig.Timestamp) + cfg := &payload.Update_Config{ + SkipStrictExistCheck: j.updateConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + DisableBalancedUpdate: j.updateConfig.DisableBalancedUpdate, + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start update: iter = %d", i) + bres := testing.Benchmark(func(b *testing.B) { + b.Helper() + b.ResetTimer() + b.StartTimer() + start := time.Now() + id := i + j.dataset.Indexes + res, err := j.client.Update(ctx, &payload.Update_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + b.StopTimer() + latency := time.Since(start) + b.ReportMetric(float64(latency.Microseconds()), "latency") + log.Infof("[benchmark job] iter=%d, bN=%d, Name=%s, Uuid=%s, Ips=%v, latency=%d", i, b.N, res.Name, res.Uuid, res.Ips, latency.Microseconds()) + }) + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish upsert: iter= %d \t%v", i, bres, bres) + } + + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/upsert.go b/pkg/tools/benchmark/job/service/upsert.go new file mode 100644 index 00000000000..d523e5af418 --- /dev/null +++ b/pkg/tools/benchmark/job/service/upsert.go @@ -0,0 +1,93 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) upsert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking upsert") + if j.upsertConfig == nil { + err := errors.NewErrInvalidOption("upsertConfig", j.upsertConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.upsertConfig.Timestamp) + cfg := &payload.Upsert_Config{ + SkipStrictExistCheck: j.upsertConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + DisableBalancedUpdate: j.upsertConfig.DisableBalancedUpdate, + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start upsert: iter = %d", i) + bres := testing.Benchmark(func(b *testing.B) { + b.Helper() + b.ResetTimer() + b.StartTimer() + start := time.Now() + id := i + j.dataset.Indexes + res, err := j.client.Upsert(ctx, &payload.Upsert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + b.StopTimer() + latency := time.Since(start) + b.ReportMetric(float64(latency.Microseconds()), "latency") + log.Infof("[benchmark job] iter=%d, bN=%d, Name=%s, Uuid=%s, Ips=%v, latency=%d", i, b.N, res.Name, res.Uuid, res.Ips, latency.Microseconds()) + }) + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish upsert: iter= %d \t%v", i, bres, bres) + } + + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 9dc749c386f..e10ae862973 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -161,6 +161,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo if len(jobList) == 0 { log.Info("[reconcile job] no job is founded") o.jobs.Store(&map[string]string{}) + log.Debug("[reconcile job] finish") return } cjobs := o.getAtomicJob() @@ -215,8 +216,9 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[string]v1.ValdBenchmarkJob) { log.Debugf("[reconcile benchmark job resource] job list: %#v", benchJobList) if len(benchJobList) == 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) log.Info("[reconcile benchmark job resource] job resource not found") + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + log.Debug("[reconcile benchmark job resource] finish") return } cbjl := o.getAtomicBenchJob() @@ -227,7 +229,11 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin for k := range benchJobList { // update scenario status job := benchJobList[k] - if scenarios := o.getAtomicScenario(); scenarios != nil { + hasOwner := false + if len(job.GetOwnerReferences()) > 0 { + hasOwner = true + } + if scenarios := o.getAtomicScenario(); scenarios != nil && hasOwner { on := job.GetOwnerReferences()[0].Name if _, ok := scenarios[on]; ok { if scenarios[on].BenchJobStatus == nil { @@ -282,8 +288,7 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[ if len(scenarioList) == 0 { log.Info("[reconcile benchmark scenario resource]: scenario not found") o.scenarios.Store(&(map[string]*scenario{})) - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) - o.jobs.Store(&(map[string]string{})) + log.Debug("[reconcile benchmark scenario resource] finish") return } cbsl := o.getAtomicScenario() @@ -536,16 +541,44 @@ func (o *operator) checkJobsStatus(ctx context.Context, jobs map[string]string) return err } -func (o *operator) initAtomics() { - if cbsl := o.getAtomicScenario(); len(cbsl) > 0 { - o.scenarios.Store(&(map[string]*scenario{})) - } - if cbjl := o.getAtomicBenchJob(); len(cbjl) > 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) +// checkAtomics checks each atomic keeps consistency. +func (o *operator) checkAtomics() error { + cjl := o.getAtomicJob() + cbjl := o.getAtomicBenchJob() + cbsl := o.getAtomicScenario() + if len(cjl) == 0 { + if len(cbjl) > 0 || len(cbsl) > 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) + } + return nil + } else if len(cbjl) == 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) + } + jobCounter := len(cjl) + scenarioBenchCounter := 0 + for sc := range cbsl { + scenarioBenchCounter += len(cbsl[sc].BenchJobStatus) + } + for jobName := range cjl { + if benchJob := cbjl[jobName]; benchJob != nil { + jobCounter-- + if owner := benchJob.GetOwnerReferences(); len(owner) > 0 { + scenarioName := owner[0].Name + if scenario := cbsl[scenarioName]; scenario != nil { + if _, ok := scenario.BenchJobStatus[benchJob.GetName()]; ok { + scenarioBenchCounter-- + } + } + } + } } - if cjl := o.getAtomicJob(); len(cjl) > 0 { - o.jobs.Store(&(map[string]string{})) + if jobCounter != 0 || scenarioBenchCounter != 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) } + return nil } func (o *operator) PreStart(ctx context.Context) error { @@ -568,13 +601,13 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) { case <-ctx.Done(): return nil case <-rcticker.C: - cbsl := o.getAtomicScenario() - if cbsl == nil { - log.Info("benchmark scenario resource is empty") - // clear atomic pointer - o.initAtomics() - continue - } else { + // check mismatch atomic + err = o.checkAtomics() + if err != nil { + ech <- err + } + // determine whether benchmark scenario status should be updated. + if cbsl := o.getAtomicScenario(); cbsl != nil { scenarioStatus := make(map[string]v1.ValdBenchmarkScenarioStatus) for name, scenario := range cbsl { if scenario.Crd.Status != v1.BenchmarkScenarioCompleted {