diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index 83fb41b2dde..b2682cf8cbf 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -80,14 +80,16 @@ type InsertConfig struct { // UpdateConfig defines the desired state of update config type UpdateConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` } // UpsertConfig defines the desired state of upsert config type UpsertConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` } // SearchConfig defines the desired state of search config diff --git a/internal/errors/benchmark.go b/internal/errors/benchmark.go index 368d8a24270..61202025fb9 100644 --- a/internal/errors/benchmark.go +++ b/internal/errors/benchmark.go @@ -29,4 +29,9 @@ var ( ErrFailedToCreateJob = func(err error, jn string) error { return Wrapf(err, "could not create job: %s ", jn) } + + // ErrMismatchAtomics represents a function to generate an error that mismatch each atomic.Pointer stored corresponding to benchmark tasks. + ErrMismatchAtomics = func(job, benchjob, benchscenario interface{}) error { + return Errorf("mismatch atomics: job=%v\tbenchjob=%v\tbenchscenario=%v", job, benchjob, benchscenario) + } ) diff --git a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml index cdefc9dcaa8..6accb8e0eab 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml @@ -168,6 +168,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -176,6 +178,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml index 135dee3c302..bc36d49dfb8 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml @@ -206,6 +206,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -214,6 +216,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/test/data/hdf5/hdf5.go b/internal/test/data/hdf5/hdf5.go index e83965ec8c1..7c9936a0c57 100644 --- a/internal/test/data/hdf5/hdf5.go +++ b/internal/test/data/hdf5/hdf5.go @@ -33,6 +33,7 @@ type Data interface { Read() error GetName() DatasetName GetPath() string + GetByGroupName(name string) [][]float32 GetTrain() [][]float32 GetTest() [][]float32 GetNeighbors() [][]int @@ -164,6 +165,27 @@ func (d *data) GetPath() string { return d.path } +// TODO: Apply generics +func (d *data) GetByGroupName(name string) [][]float32 { + switch name { + case "train": + return d.GetTrain() + case "test": + return d.GetTest() + case "neighbors": + l := d.GetNeighbors() + r := make([][]float32, 0) + for x := range l { + for y, z := range l[x] { + r[x][y] = float32(z) + } + } + return r + default: + return nil + } +} + func (d *data) GetTrain() [][]float32 { return d.train } diff --git a/internal/timeutil/rate/rate.go b/internal/timeutil/rate/rate.go new file mode 100644 index 00000000000..75adc2d4314 --- /dev/null +++ b/internal/timeutil/rate/rate.go @@ -0,0 +1,62 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package rate + +import ( + "context" + "runtime" + + "go.uber.org/ratelimit" + "golang.org/x/time/rate" +) + +type Limiter interface { + Wait(ctx context.Context) error +} + +type limiter struct { + isStd bool + uber ratelimit.Limiter + std *rate.Limiter +} + +func NewLimiter(cnt int) Limiter { + if runtime.GOMAXPROCS(0) >= 32 { + return &limiter{ + isStd: true, + std: rate.NewLimiter(rate.Limit(cnt), 1), + } + } + return &limiter{ + isStd: false, + uber: ratelimit.New(cnt, ratelimit.WithoutSlack), + } +} + +func (l *limiter) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if l.isStd { + return l.std.Wait(ctx) + } else { + l.uber.Take() + } + } + return nil +} diff --git a/pkg/tools/benchmark/job/service/insert.go b/pkg/tools/benchmark/job/service/insert.go new file mode 100644 index 00000000000..f2b13c2b891 --- /dev/null +++ b/pkg/tools/benchmark/job/service/insert.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) insert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking insert") + if j.insertConfig == nil { + err := errors.NewErrInvalidOption("insertConfig", j.insertConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.insertConfig.Timestamp) + cfg := &payload.Insert_Config{ + SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start insert: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + id := i + j.dataset.Indexes + res, err := j.client.Insert(ctx, &payload.Insert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, res) + } + + log.Info("[benchmark job] Finish benchmarking insert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 69c0d43f0ee..e4db8d169cc 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -34,6 +34,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/test/data/hdf5" + "github.com/vdaas/vald/internal/timeutil/rate" ) type Job interface { @@ -46,15 +47,33 @@ type jobType int const ( USERDEFINED jobType = iota + INSERT SEARCH + UPDATE + UPSERT + REMOVE + GETOBJECT + EXISTS ) func (jt jobType) String() string { switch jt { case USERDEFINED: return "userdefined" + case INSERT: + return "insert" case SEARCH: return "search" + case UPDATE: + return "update" + case UPSERT: + return "upsert" + case REMOVE: + return "remove" + case GETOBJECT: + return "getObject" + case EXISTS: + return "exists" } return "" } @@ -76,6 +95,7 @@ type job struct { beforeJobNamespace string k8sClient client.Client beforeJobDur time.Duration + limiter rate.Limiter } func New(opts ...Option) (Job, error) { @@ -91,12 +111,25 @@ func New(opts ...Option) (Job, error) { opt := WithJobFunc(j.jobFunc) err := opt(j) return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + case INSERT: + j.jobFunc = j.insert case SEARCH: j.jobFunc = j.search + case UPDATE: + j.jobFunc = j.update + case UPSERT: + j.jobFunc = j.upsert + case REMOVE: + j.jobFunc = j.remove + case GETOBJECT: + j.jobFunc = j.getObject + case EXISTS: + j.jobFunc = j.exists } } else if j.jobType != USERDEFINED { log.Warnf("[benchmark job] userdefined jobFunc is set but jobType is set %s", j.jobType.String()) } + j.limiter = rate.NewLimiter(1000) return j, nil } @@ -208,13 +241,14 @@ func calcRecall(linearRes, searchRes []*payload.Object_Distance) (recall float64 return recall / float64(len(linearRes)) } -func genVec(data [][]float32, cfg *config.BenchmarkDataset) [][]float32 { +func (j *job) genVec(cfg *config.BenchmarkDataset) [][]float32 { start := cfg.Range.Start end := cfg.Range.End if (end - start) < cfg.Indexes { end = cfg.Indexes } num := end - start + 1 + data := j.hdf5.GetByGroupName(cfg.Group) if len(data) < num { num = len(data) end = start + num + 1 diff --git a/pkg/tools/benchmark/job/service/object.go b/pkg/tools/benchmark/job/service/object.go new file mode 100644 index 00000000000..bc66586013f --- /dev/null +++ b/pkg/tools/benchmark/job/service/object.go @@ -0,0 +1,113 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) exists(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking exists") + // create data + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + res, err := j.client.Exists(ctx, &payload.Object_ID{ + Id: strconv.Itoa(i), + }) + log.Debug(res) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + // log.Infof("[benchmark job] iter=%d, bN=%d, Id=%s, latency=%d", i, b.N, res.Id, latency.Microseconds()) + // TODO: send metrics to the Prometeus + // log.Infof("[benchmark job] Finish exists: iter= %d \t%v", i, bres, bres) + } + log.Info("[benchmark job] Finish benchmarking exists") + return nil +} + +func (j *job) getObject(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking getObject") + // TODO: Set object api config + // if j.upsertConfig == nil { + // err := errors.NewErrInvalidOption("upsertConfig", j.upsertConfig) + // select { + // case <-ctx.Done(): + // if err != context.Canceled { + // ech <- errors.Wrap(err, ctx.Err().Error()) + // } else { + // ech <- err + // } + // case ech <- err: + // } + // return err + // } + + // create data + vecs := j.genVec(j.dataset) + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start getObject: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + id := i + j.dataset.Indexes + res, err := j.client.GetObject(ctx, &payload.Object_VectorRequest{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(id), + }, + Filters: &payload.Filter_Config{}, + }) + log.Debug(res) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + } + log.Info("[benchmark job] Finish benchmarking getObject") + return nil +} diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 998f9f87001..330bad527e3 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -147,8 +147,20 @@ func WithJobTypeByString(t string) Option { switch t { case "userdefined": jt = USERDEFINED + case "insert": + jt = INSERT case "search": jt = SEARCH + case "update": + jt = UPDATE + case "upsert": + jt = UPSERT + case "remove": + jt = REMOVE + case "getObject": + jt = GETOBJECT + case "exists": + jt = EXISTS } return WithJobType(jt) } @@ -156,14 +168,10 @@ func WithJobTypeByString(t string) Option { // WithJobType sets the jobType for running benchmark job. func WithJobType(jt jobType) Option { return func(j *job) error { - switch jt { - case USERDEFINED: - j.jobType = jt - case SEARCH: - j.jobType = jt - default: - return errors.NewErrInvalidOption("jobType", jt) + if len(jt.String()) == 0 { + return errors.NewErrInvalidOption("jobType", jt.String()) } + j.jobType = jt return nil } } diff --git a/pkg/tools/benchmark/job/service/remove.go b/pkg/tools/benchmark/job/service/remove.go new file mode 100644 index 00000000000..18727a8186e --- /dev/null +++ b/pkg/tools/benchmark/job/service/remove.go @@ -0,0 +1,83 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) remove(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking remove") + if j.removeConfig == nil { + err := errors.NewErrInvalidOption("removeConfig", j.removeConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.removeConfig.Timestamp) + cfg := &payload.Remove_Config{ + SkipStrictExistCheck: j.removeConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start remove: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + res, err := j.client.Remove(ctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(i + j.dataset.Indexes), + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + log.Infof("[benchmark job] Finish remove: iter= %d \n%v\n", i, res) + } + + log.Info("[benchmark job] Finish benchmarking remove") + return nil +} diff --git a/pkg/tools/benchmark/job/service/search.go b/pkg/tools/benchmark/job/service/search.go index 2d14a6d84bf..56a38922ffc 100644 --- a/pkg/tools/benchmark/job/service/search.go +++ b/pkg/tools/benchmark/job/service/search.go @@ -19,7 +19,6 @@ package service import ( "context" - "testing" "time" "github.com/vdaas/vald/apis/grpc/v1/payload" @@ -42,9 +41,8 @@ func (j *job) search(ctx context.Context, ech chan error) error { } return err } - // create data - vecs := genVec(j.hdf5.GetTest(), j.dataset) + vecs := j.genVec(j.dataset) timeout, _ := time.ParseDuration(j.searchConfig.Timeout) cfg := &payload.Search_Config{ Num: uint32(j.searchConfig.Num), @@ -53,9 +51,9 @@ func (j *job) search(ctx context.Context, ech chan error) error { Epsilon: float32(j.searchConfig.Epsilon), Timeout: timeout.Nanoseconds(), } + lres := make([]*payload.Search_Response, len(vecs)) for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start search: iter = %d", i) - lres, err := j.client.LinearSearch(ctx, &payload.Search_Request{ + res, err := j.client.LinearSearch(ctx, &payload.Search_Request{ Vector: vecs[i], Config: cfg, }) @@ -71,35 +69,42 @@ func (j *job) search(ctx context.Context, ech chan error) error { } return err } - bres := testing.Benchmark(func(b *testing.B) { - b.Helper() - b.ResetTimer() - start := time.Now() - sres, err := j.client.Search(ctx, &payload.Search_Request{ - Vector: vecs[i], - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): - if errors.Is(err, context.Canceled) { - ech <- errors.Wrap(err, ctx.Err().Error()) - } else { - ech <- err - } - case ech <- err: - break + lres[i] = res + } + // TODO: apply rpc from crd setting params + sres := make([]*payload.Search_Response, len(vecs)) + log.Infof("[benchmark job] Start search") + for i := 0; i < len(vecs); i++ { + err := j.limiter.Wait(ctx) + if err != nil { + errors.Is(context.Canceled, err) + ech <- err + break + } + res, err := j.client.Search(ctx, &payload.Search_Request{ + Vector: vecs[i], + Config: cfg, + }) + log.Infof("[benchmark job] search %d", i) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err } + case ech <- err: + break } - latency := time.Since(start) - recall := calcRecall(lres.Results, sres.Results) - b.ReportMetric(recall, "recall") - b.ReportMetric(float64(latency.Microseconds()), "latency") - }) - // TODO: send metrics to the Prometeus - log.Infof("[benchmark job] Finish search bench: iter= %d \n%#v\n", i, bres) + } + sres[i] = res + } + recall := make([]float64, len(vecs)) + for i := 0; i < len(vecs); i++ { + recall[i] = calcRecall(lres[i].Results, sres[i].Results) + log.Info("[branch job] search recall: ", recall[i]) } - log.Info("[benchmark job] Finish benchmarking search") return nil } diff --git a/pkg/tools/benchmark/job/service/update.go b/pkg/tools/benchmark/job/service/update.go new file mode 100644 index 00000000000..9570671cdd1 --- /dev/null +++ b/pkg/tools/benchmark/job/service/update.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) update(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking update") + if j.updateConfig == nil { + err := errors.NewErrInvalidOption("updateConfig", j.updateConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.updateConfig.Timestamp) + cfg := &payload.Update_Config{ + SkipStrictExistCheck: j.updateConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + DisableBalancedUpdate: j.updateConfig.DisableBalancedUpdate, + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start update: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + + id := i + j.dataset.Indexes + res, err := j.client.Update(ctx, &payload.Update_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) + } + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/upsert.go b/pkg/tools/benchmark/job/service/upsert.go new file mode 100644 index 00000000000..0660b8beafd --- /dev/null +++ b/pkg/tools/benchmark/job/service/upsert.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) upsert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking upsert") + if j.upsertConfig == nil { + err := errors.NewErrInvalidOption("upsertConfig", j.upsertConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.upsertConfig.Timestamp) + cfg := &payload.Upsert_Config{ + SkipStrictExistCheck: j.upsertConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + DisableBalancedUpdate: j.upsertConfig.DisableBalancedUpdate, + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start upsert: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + id := i + j.dataset.Indexes + res, err := j.client.Upsert(ctx, &payload.Upsert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) + } + + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 9dc749c386f..e10ae862973 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -161,6 +161,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo if len(jobList) == 0 { log.Info("[reconcile job] no job is founded") o.jobs.Store(&map[string]string{}) + log.Debug("[reconcile job] finish") return } cjobs := o.getAtomicJob() @@ -215,8 +216,9 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[string]v1.ValdBenchmarkJob) { log.Debugf("[reconcile benchmark job resource] job list: %#v", benchJobList) if len(benchJobList) == 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) log.Info("[reconcile benchmark job resource] job resource not found") + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + log.Debug("[reconcile benchmark job resource] finish") return } cbjl := o.getAtomicBenchJob() @@ -227,7 +229,11 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin for k := range benchJobList { // update scenario status job := benchJobList[k] - if scenarios := o.getAtomicScenario(); scenarios != nil { + hasOwner := false + if len(job.GetOwnerReferences()) > 0 { + hasOwner = true + } + if scenarios := o.getAtomicScenario(); scenarios != nil && hasOwner { on := job.GetOwnerReferences()[0].Name if _, ok := scenarios[on]; ok { if scenarios[on].BenchJobStatus == nil { @@ -282,8 +288,7 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[ if len(scenarioList) == 0 { log.Info("[reconcile benchmark scenario resource]: scenario not found") o.scenarios.Store(&(map[string]*scenario{})) - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) - o.jobs.Store(&(map[string]string{})) + log.Debug("[reconcile benchmark scenario resource] finish") return } cbsl := o.getAtomicScenario() @@ -536,16 +541,44 @@ func (o *operator) checkJobsStatus(ctx context.Context, jobs map[string]string) return err } -func (o *operator) initAtomics() { - if cbsl := o.getAtomicScenario(); len(cbsl) > 0 { - o.scenarios.Store(&(map[string]*scenario{})) - } - if cbjl := o.getAtomicBenchJob(); len(cbjl) > 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) +// checkAtomics checks each atomic keeps consistency. +func (o *operator) checkAtomics() error { + cjl := o.getAtomicJob() + cbjl := o.getAtomicBenchJob() + cbsl := o.getAtomicScenario() + if len(cjl) == 0 { + if len(cbjl) > 0 || len(cbsl) > 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) + } + return nil + } else if len(cbjl) == 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) + } + jobCounter := len(cjl) + scenarioBenchCounter := 0 + for sc := range cbsl { + scenarioBenchCounter += len(cbsl[sc].BenchJobStatus) + } + for jobName := range cjl { + if benchJob := cbjl[jobName]; benchJob != nil { + jobCounter-- + if owner := benchJob.GetOwnerReferences(); len(owner) > 0 { + scenarioName := owner[0].Name + if scenario := cbsl[scenarioName]; scenario != nil { + if _, ok := scenario.BenchJobStatus[benchJob.GetName()]; ok { + scenarioBenchCounter-- + } + } + } + } } - if cjl := o.getAtomicJob(); len(cjl) > 0 { - o.jobs.Store(&(map[string]string{})) + if jobCounter != 0 || scenarioBenchCounter != 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) } + return nil } func (o *operator) PreStart(ctx context.Context) error { @@ -568,13 +601,13 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) { case <-ctx.Done(): return nil case <-rcticker.C: - cbsl := o.getAtomicScenario() - if cbsl == nil { - log.Info("benchmark scenario resource is empty") - // clear atomic pointer - o.initAtomics() - continue - } else { + // check mismatch atomic + err = o.checkAtomics() + if err != nil { + ech <- err + } + // determine whether benchmark scenario status should be updated. + if cbsl := o.getAtomicScenario(); cbsl != nil { scenarioStatus := make(map[string]v1.ValdBenchmarkScenarioStatus) for name, scenario := range cbsl { if scenario.Crd.Status != v1.BenchmarkScenarioCompleted {