diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index 83fb41b2dde..6cc8ba41294 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -32,11 +32,13 @@ type BenchmarkJob struct { UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"` RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` + ObjectConfig *ObjectConfig `json:"object_config,omitempty" yaml:"object_config"` ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"` Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"` BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"` Client client.Client `json:"client,omitempty" yaml:"client"` + RPC int `json:"rpc,omitempty" yaml:"rpc"` } // BenchmarkScenario represents the configuration for the internal benchmark scenario. @@ -80,14 +82,16 @@ type InsertConfig struct { // UpdateConfig defines the desired state of update config type UpdateConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` } // UpsertConfig defines the desired state of upsert config type UpsertConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` } // SearchConfig defines the desired state of search config @@ -105,6 +109,22 @@ type RemoveConfig struct { Timestamp string `json:"timestamp,omitempty"` } +// ObjectConfig defines the desired state of object config +type ObjectConfig struct { + FilterConfig FilterConfig `json:"filter_config,omitempty" yaml:"filter_config"` +} + +// FilterTarget defines the desired state of filter target +type FilterTarget struct { + Host string `json:"host,omitempty" yaml:"host"` + Port int32 `json:"port,omitempty" yaml:"port"` +} + +// FilterConfig defines the desired state of filter config +type FilterConfig struct { + Targets []*FilterTarget `json:"target,omitempty" yaml:"target"` +} + // Bind binds the actual data from the Job receiver fields. func (b *BenchmarkJob) Bind() *BenchmarkJob { b.JobType = GetActualValue(b.JobType) diff --git a/internal/errors/benchmark.go b/internal/errors/benchmark.go index 368d8a24270..61202025fb9 100644 --- a/internal/errors/benchmark.go +++ b/internal/errors/benchmark.go @@ -29,4 +29,9 @@ var ( ErrFailedToCreateJob = func(err error, jn string) error { return Wrapf(err, "could not create job: %s ", jn) } + + // ErrMismatchAtomics represents a function to generate an error that mismatch each atomic.Pointer stored corresponding to benchmark tasks. + ErrMismatchAtomics = func(job, benchjob, benchscenario interface{}) error { + return Errorf("mismatch atomics: job=%v\tbenchjob=%v\tbenchscenario=%v", job, benchjob, benchscenario) + } ) diff --git a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml index cdefc9dcaa8..14ecbcb93ca 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml @@ -129,6 +129,8 @@ spec: - type type: object type: array + rpc: + type: integer insert_config: description: InsertConfig defines the desired state of insert config properties: @@ -168,6 +170,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -176,6 +180,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml index 135dee3c302..f2eeafe5e3f 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml @@ -167,6 +167,8 @@ spec: - type type: object type: array + rpc: + type: integer insert_config: description: InsertConfig defines the desired state of insert config properties: @@ -206,6 +208,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -214,6 +218,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/k8s/rbac/benchmark/operator/clusterrole.yaml b/internal/k8s/rbac/benchmark/operator/clusterrole.yaml index acd2ffe14c4..dd91cc7529b 100644 --- a/internal/k8s/rbac/benchmark/operator/clusterrole.yaml +++ b/internal/k8s/rbac/benchmark/operator/clusterrole.yaml @@ -32,6 +32,18 @@ rules: - patch - update - watch + - apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - vald.vdaas.org resources: diff --git a/internal/k8s/vald/benchmark/api/v1/job_types.go b/internal/k8s/vald/benchmark/api/v1/job_types.go index 40c71232d7b..e8dc0ea6043 100644 --- a/internal/k8s/vald/benchmark/api/v1/job_types.go +++ b/internal/k8s/vald/benchmark/api/v1/job_types.go @@ -23,19 +23,21 @@ import ( ) type BenchmarkJobSpec struct { - Target *BenchmarkTarget `json:"target,omitempty"` - Dataset *BenchmarkDataset `json:"dataset,omitempty"` - Dimension int `json:"dimension,omitempty"` - Replica int `json:"replica,omitempty"` - Repetition int `json:"repetition,omitempty"` - JobType string `json:"job_type,omitempty"` - InsertConfig *config.InsertConfig `json:"insert_config,omitempty"` - UpdateConfig *config.UpdateConfig `json:"update_config,omitempty"` - UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty"` - SearchConfig *config.SearchConfig `json:"search_config,omitempty"` - RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty"` - ClientConfig *config.GRPCClient `json:"client_config,omitempty"` - Rules []*config.BenchmarkJobRule `json:"rules,omitempty"` + Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` + Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` + Dimension int `json:"dimension,omitempty" yaml:"dimension"` + Replica int `json:"replica,omitempty" yaml:"replica"` + Repetition int `json:"repetition,omitempty" yaml:"repetition"` + JobType string `json:"job_type,omitempty" yaml:"job_type"` + InsertConfig *config.InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` + UpdateConfig *config.UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` + UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` + SearchConfig *config.SearchConfig `json:"search_config,omitempty" yaml:"search_config"` + RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` + ObjectConfig *config.ObjectConfig `json:"object_config,omitempty" yaml:"object_config"` + ClientConfig *config.GRPCClient `json:"client_config,omitempty" yaml:"client_config"` + Rules []*config.BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` + RPC int `json:"rpc,omitempty" yaml:"rpc"` } type BenchmarkJobStatus string diff --git a/internal/test/data/hdf5/hdf5.go b/internal/test/data/hdf5/hdf5.go index e83965ec8c1..7c9936a0c57 100644 --- a/internal/test/data/hdf5/hdf5.go +++ b/internal/test/data/hdf5/hdf5.go @@ -33,6 +33,7 @@ type Data interface { Read() error GetName() DatasetName GetPath() string + GetByGroupName(name string) [][]float32 GetTrain() [][]float32 GetTest() [][]float32 GetNeighbors() [][]int @@ -164,6 +165,27 @@ func (d *data) GetPath() string { return d.path } +// TODO: Apply generics +func (d *data) GetByGroupName(name string) [][]float32 { + switch name { + case "train": + return d.GetTrain() + case "test": + return d.GetTest() + case "neighbors": + l := d.GetNeighbors() + r := make([][]float32, 0) + for x := range l { + for y, z := range l[x] { + r[x][y] = float32(z) + } + } + return r + default: + return nil + } +} + func (d *data) GetTrain() [][]float32 { return d.train } diff --git a/internal/timeutil/rate/rate.go b/internal/timeutil/rate/rate.go new file mode 100644 index 00000000000..75adc2d4314 --- /dev/null +++ b/internal/timeutil/rate/rate.go @@ -0,0 +1,62 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package rate + +import ( + "context" + "runtime" + + "go.uber.org/ratelimit" + "golang.org/x/time/rate" +) + +type Limiter interface { + Wait(ctx context.Context) error +} + +type limiter struct { + isStd bool + uber ratelimit.Limiter + std *rate.Limiter +} + +func NewLimiter(cnt int) Limiter { + if runtime.GOMAXPROCS(0) >= 32 { + return &limiter{ + isStd: true, + std: rate.NewLimiter(rate.Limit(cnt), 1), + } + } + return &limiter{ + isStd: false, + uber: ratelimit.New(cnt, ratelimit.WithoutSlack), + } +} + +func (l *limiter) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if l.isStd { + return l.std.Wait(ctx) + } else { + l.uber.Take() + } + } + return nil +} diff --git a/pkg/tools/benchmark/job/config/config.go b/pkg/tools/benchmark/job/config/config.go index 34aa8e34feb..4b01aa90571 100644 --- a/pkg/tools/benchmark/job/config/config.go +++ b/pkg/tools/benchmark/job/config/config.go @@ -108,7 +108,9 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { cfg.Job.UpsertConfig = jobResource.Spec.UpsertConfig cfg.Job.SearchConfig = jobResource.Spec.SearchConfig cfg.Job.RemoveConfig = jobResource.Spec.RemoveConfig + cfg.Job.ObjectConfig = jobResource.Spec.ObjectConfig cfg.Job.ClientConfig = jobResource.Spec.ClientConfig + cfg.Job.RPC = jobResource.Spec.RPC if annotations := jobResource.GetAnnotations(); annotations != nil { cfg.Job.BeforeJobName = annotations["before-job-name"] cfg.Job.BeforeJobNamespace = annotations["before-job-namespace"] diff --git a/pkg/tools/benchmark/job/service/insert.go b/pkg/tools/benchmark/job/service/insert.go new file mode 100644 index 00000000000..f2b13c2b891 --- /dev/null +++ b/pkg/tools/benchmark/job/service/insert.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) insert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking insert") + if j.insertConfig == nil { + err := errors.NewErrInvalidOption("insertConfig", j.insertConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.insertConfig.Timestamp) + cfg := &payload.Insert_Config{ + SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start insert: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + id := i + j.dataset.Indexes + res, err := j.client.Insert(ctx, &payload.Insert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, res) + } + + log.Info("[benchmark job] Finish benchmarking insert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 69c0d43f0ee..a6184a4e356 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -19,6 +19,7 @@ package service import ( "context" + "math" "os" "reflect" "syscall" @@ -34,6 +35,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/test/data/hdf5" + "github.com/vdaas/vald/internal/timeutil/rate" ) type Job interface { @@ -46,15 +48,33 @@ type jobType int const ( USERDEFINED jobType = iota + INSERT SEARCH + UPDATE + UPSERT + REMOVE + GETOBJECT + EXISTS ) func (jt jobType) String() string { switch jt { case USERDEFINED: return "userdefined" + case INSERT: + return "insert" case SEARCH: return "search" + case UPDATE: + return "update" + case UPSERT: + return "upsert" + case REMOVE: + return "remove" + case GETOBJECT: + return "getObject" + case EXISTS: + return "exists" } return "" } @@ -70,12 +90,15 @@ type job struct { upsertConfig *config.UpsertConfig searchConfig *config.SearchConfig removeConfig *config.RemoveConfig + objectConfig *config.ObjectConfig client vald.Client hdf5 hdf5.Data beforeJobName string beforeJobNamespace string k8sClient client.Client beforeJobDur time.Duration + limiter rate.Limiter + rpc int } func New(opts ...Option) (Job, error) { @@ -91,12 +114,27 @@ func New(opts ...Option) (Job, error) { opt := WithJobFunc(j.jobFunc) err := opt(j) return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + case INSERT: + j.jobFunc = j.insert case SEARCH: j.jobFunc = j.search + case UPDATE: + j.jobFunc = j.update + case UPSERT: + j.jobFunc = j.upsert + case REMOVE: + j.jobFunc = j.remove + case GETOBJECT: + j.jobFunc = j.getObject + case EXISTS: + j.jobFunc = j.exists } } else if j.jobType != USERDEFINED { log.Warnf("[benchmark job] userdefined jobFunc is set but jobType is set %s", j.jobType.String()) } + if j.rpc > 0 { + j.limiter = rate.NewLimiter(j.rpc) + } return j, nil } @@ -208,16 +246,21 @@ func calcRecall(linearRes, searchRes []*payload.Object_Distance) (recall float64 return recall / float64(len(linearRes)) } -func genVec(data [][]float32, cfg *config.BenchmarkDataset) [][]float32 { +func (j *job) genVec(cfg *config.BenchmarkDataset) [][]float32 { start := cfg.Range.Start end := cfg.Range.End + // If (Range.End - Range.Start) is smaller than Indexes, Indexes are prioritized based on Range.Start. if (end - start) < cfg.Indexes { end = cfg.Indexes } num := end - start + 1 - if len(data) < num { - num = len(data) - end = start + num + 1 + data := j.hdf5.GetByGroupName(cfg.Group) + if n := math.Ceil(float64(num) / float64(len(data))); n > 1 { + var def [][]float32 + for i := 0; i < int(n-1); i++ { + def = append(def, data...) + } + data = append(data, def...) } vectors := data[start : end+1] return vectors diff --git a/pkg/tools/benchmark/job/service/object.go b/pkg/tools/benchmark/job/service/object.go new file mode 100644 index 00000000000..2f81c7b4604 --- /dev/null +++ b/pkg/tools/benchmark/job/service/object.go @@ -0,0 +1,113 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) exists(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking exists") + // create data + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + res, err := j.client.Exists(ctx, &payload.Object_ID{ + Id: strconv.Itoa(i), + }) + log.Debug(res) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + // log.Infof("[benchmark job] iter=%d, bN=%d, Id=%s, latency=%d", i, b.N, res.Id, latency.Microseconds()) + // TODO: send metrics to the Prometeus + // log.Infof("[benchmark job] Finish exists: iter= %d \t%v", i, bres, bres) + } + log.Info("[benchmark job] Finish benchmarking exists") + return nil +} + +func (j *job) getObject(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking getObject") + if j.objectConfig == nil { + log.Warnf("[benchmark job] No get object config is set: %v", j.objectConfig) + } + + // create data + vecs := j.genVec(j.dataset) + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start getObject: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + id := i + j.dataset.Indexes + res, err := j.client.GetObject(ctx, &payload.Object_VectorRequest{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(id), + }, + Filters: &payload.Filter_Config{ + Targets: func() []*payload.Filter_Target { + t := make([]*payload.Filter_Target, len(j.objectConfig.FilterConfig.Targets)) + for i, target := range j.objectConfig.FilterConfig.Targets { + t[i] = &payload.Filter_Target{ + Host: target.Host, + Port: uint32(target.Port), + } + } + return t + }(), + }, + }) + log.Debug(res) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + } + log.Info("[benchmark job] Finish benchmarking getObject") + return nil +} diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 998f9f87001..905583eaf5f 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -35,6 +35,7 @@ var defaultOpts = []Option{ // TODO: set default config for client WithDimension(748), WithBeforeJobDuration("30s"), + WithRPC(100), } // WithDimension sets the vector's dimension for running benchmark job with dataset. @@ -97,6 +98,16 @@ func WithRemoveConfig(c *config.RemoveConfig) Option { } } +// WithObjectConfig sets the get object API config for running get object request job. +func WithObjectConfig(c *config.ObjectConfig) Option { + return func(j *job) error { + if c != nil { + j.objectConfig = c + } + return nil + } +} + // WithValdClient sets the Vald client for sending request to the target Vald cluster. func WithValdClient(c vald.Client) Option { return func(j *job) error { @@ -147,8 +158,20 @@ func WithJobTypeByString(t string) Option { switch t { case "userdefined": jt = USERDEFINED + case "insert": + jt = INSERT case "search": jt = SEARCH + case "update": + jt = UPDATE + case "upsert": + jt = UPSERT + case "remove": + jt = REMOVE + case "getObject": + jt = GETOBJECT + case "exists": + jt = EXISTS } return WithJobType(jt) } @@ -156,14 +179,10 @@ func WithJobTypeByString(t string) Option { // WithJobType sets the jobType for running benchmark job. func WithJobType(jt jobType) Option { return func(j *job) error { - switch jt { - case USERDEFINED: - j.jobType = jt - case SEARCH: - j.jobType = jt - default: - return errors.NewErrInvalidOption("jobType", jt) + if len(jt.String()) == 0 { + return errors.NewErrInvalidOption("jobType", jt.String()) } + j.jobType = jt return nil } } @@ -223,3 +242,13 @@ func WithK8sClient(cli client.Client) Option { return nil } } + +// WithRPC sets the rpc for sending request per seconds to the target Vald cluster. +func WithRPC(rpc int) Option { + return func(j *job) error { + if rpc > 0 { + j.rpc = rpc + } + return nil + } +} diff --git a/pkg/tools/benchmark/job/service/remove.go b/pkg/tools/benchmark/job/service/remove.go new file mode 100644 index 00000000000..18727a8186e --- /dev/null +++ b/pkg/tools/benchmark/job/service/remove.go @@ -0,0 +1,83 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) remove(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking remove") + if j.removeConfig == nil { + err := errors.NewErrInvalidOption("removeConfig", j.removeConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.removeConfig.Timestamp) + cfg := &payload.Remove_Config{ + SkipStrictExistCheck: j.removeConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start remove: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + res, err := j.client.Remove(ctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(i + j.dataset.Indexes), + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + log.Infof("[benchmark job] Finish remove: iter= %d \n%v\n", i, res) + } + + log.Info("[benchmark job] Finish benchmarking remove") + return nil +} diff --git a/pkg/tools/benchmark/job/service/search.go b/pkg/tools/benchmark/job/service/search.go index 2d14a6d84bf..56a38922ffc 100644 --- a/pkg/tools/benchmark/job/service/search.go +++ b/pkg/tools/benchmark/job/service/search.go @@ -19,7 +19,6 @@ package service import ( "context" - "testing" "time" "github.com/vdaas/vald/apis/grpc/v1/payload" @@ -42,9 +41,8 @@ func (j *job) search(ctx context.Context, ech chan error) error { } return err } - // create data - vecs := genVec(j.hdf5.GetTest(), j.dataset) + vecs := j.genVec(j.dataset) timeout, _ := time.ParseDuration(j.searchConfig.Timeout) cfg := &payload.Search_Config{ Num: uint32(j.searchConfig.Num), @@ -53,9 +51,9 @@ func (j *job) search(ctx context.Context, ech chan error) error { Epsilon: float32(j.searchConfig.Epsilon), Timeout: timeout.Nanoseconds(), } + lres := make([]*payload.Search_Response, len(vecs)) for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start search: iter = %d", i) - lres, err := j.client.LinearSearch(ctx, &payload.Search_Request{ + res, err := j.client.LinearSearch(ctx, &payload.Search_Request{ Vector: vecs[i], Config: cfg, }) @@ -71,35 +69,42 @@ func (j *job) search(ctx context.Context, ech chan error) error { } return err } - bres := testing.Benchmark(func(b *testing.B) { - b.Helper() - b.ResetTimer() - start := time.Now() - sres, err := j.client.Search(ctx, &payload.Search_Request{ - Vector: vecs[i], - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): - if errors.Is(err, context.Canceled) { - ech <- errors.Wrap(err, ctx.Err().Error()) - } else { - ech <- err - } - case ech <- err: - break + lres[i] = res + } + // TODO: apply rpc from crd setting params + sres := make([]*payload.Search_Response, len(vecs)) + log.Infof("[benchmark job] Start search") + for i := 0; i < len(vecs); i++ { + err := j.limiter.Wait(ctx) + if err != nil { + errors.Is(context.Canceled, err) + ech <- err + break + } + res, err := j.client.Search(ctx, &payload.Search_Request{ + Vector: vecs[i], + Config: cfg, + }) + log.Infof("[benchmark job] search %d", i) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err } + case ech <- err: + break } - latency := time.Since(start) - recall := calcRecall(lres.Results, sres.Results) - b.ReportMetric(recall, "recall") - b.ReportMetric(float64(latency.Microseconds()), "latency") - }) - // TODO: send metrics to the Prometeus - log.Infof("[benchmark job] Finish search bench: iter= %d \n%#v\n", i, bres) + } + sres[i] = res + } + recall := make([]float64, len(vecs)) + for i := 0; i < len(vecs); i++ { + recall[i] = calcRecall(lres[i].Results, sres[i].Results) + log.Info("[branch job] search recall: ", recall[i]) } - log.Info("[benchmark job] Finish benchmarking search") return nil } diff --git a/pkg/tools/benchmark/job/service/update.go b/pkg/tools/benchmark/job/service/update.go new file mode 100644 index 00000000000..9570671cdd1 --- /dev/null +++ b/pkg/tools/benchmark/job/service/update.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) update(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking update") + if j.updateConfig == nil { + err := errors.NewErrInvalidOption("updateConfig", j.updateConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.updateConfig.Timestamp) + cfg := &payload.Update_Config{ + SkipStrictExistCheck: j.updateConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + DisableBalancedUpdate: j.updateConfig.DisableBalancedUpdate, + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start update: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + + id := i + j.dataset.Indexes + res, err := j.client.Update(ctx, &payload.Update_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) + } + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/upsert.go b/pkg/tools/benchmark/job/service/upsert.go new file mode 100644 index 00000000000..0660b8beafd --- /dev/null +++ b/pkg/tools/benchmark/job/service/upsert.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2022 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" +) + +func (j *job) upsert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking upsert") + if j.upsertConfig == nil { + err := errors.NewErrInvalidOption("upsertConfig", j.upsertConfig) + select { + case <-ctx.Done(): + if err != context.Canceled { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + } + return err + } + + // create data + vecs := j.genVec(j.dataset) + timestamp, _ := strconv.Atoi(j.upsertConfig.Timestamp) + cfg := &payload.Upsert_Config{ + SkipStrictExistCheck: j.upsertConfig.SkipStrictExistCheck, + Timestamp: int64(timestamp), + DisableBalancedUpdate: j.upsertConfig.DisableBalancedUpdate, + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start upsert: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + ech <- err + } + } + id := i + j.dataset.Indexes + res, err := j.client.Upsert(ctx, &payload.Upsert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(id), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + ech <- errors.Wrap(err, ctx.Err().Error()) + } else { + ech <- err + } + case ech <- err: + break + } + } + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) + } + + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index 4bd9917b324..3c79c5e3bfe 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -99,10 +99,12 @@ func New(cfg *config.Config) (r runner.Runner, err error) { service.WithUpsertConfig(cfg.Job.UpsertConfig), service.WithSearchConfig(cfg.Job.SearchConfig), service.WithRemoveConfig(cfg.Job.RemoveConfig), + service.WithObjectConfig(cfg.Job.ObjectConfig), service.WithHdf5(d), service.WithBeforeJobName(cfg.Job.BeforeJobName), service.WithBeforeJobNamespace(cfg.Job.BeforeJobNamespace), service.WithK8sClient(cfg.Job.Client), + service.WithRPC(cfg.Job.RPC), ) if err != nil { return nil, err diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 9dc749c386f..e10ae862973 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -161,6 +161,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo if len(jobList) == 0 { log.Info("[reconcile job] no job is founded") o.jobs.Store(&map[string]string{}) + log.Debug("[reconcile job] finish") return } cjobs := o.getAtomicJob() @@ -215,8 +216,9 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[string]v1.ValdBenchmarkJob) { log.Debugf("[reconcile benchmark job resource] job list: %#v", benchJobList) if len(benchJobList) == 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) log.Info("[reconcile benchmark job resource] job resource not found") + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + log.Debug("[reconcile benchmark job resource] finish") return } cbjl := o.getAtomicBenchJob() @@ -227,7 +229,11 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin for k := range benchJobList { // update scenario status job := benchJobList[k] - if scenarios := o.getAtomicScenario(); scenarios != nil { + hasOwner := false + if len(job.GetOwnerReferences()) > 0 { + hasOwner = true + } + if scenarios := o.getAtomicScenario(); scenarios != nil && hasOwner { on := job.GetOwnerReferences()[0].Name if _, ok := scenarios[on]; ok { if scenarios[on].BenchJobStatus == nil { @@ -282,8 +288,7 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[ if len(scenarioList) == 0 { log.Info("[reconcile benchmark scenario resource]: scenario not found") o.scenarios.Store(&(map[string]*scenario{})) - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) - o.jobs.Store(&(map[string]string{})) + log.Debug("[reconcile benchmark scenario resource] finish") return } cbsl := o.getAtomicScenario() @@ -536,16 +541,44 @@ func (o *operator) checkJobsStatus(ctx context.Context, jobs map[string]string) return err } -func (o *operator) initAtomics() { - if cbsl := o.getAtomicScenario(); len(cbsl) > 0 { - o.scenarios.Store(&(map[string]*scenario{})) - } - if cbjl := o.getAtomicBenchJob(); len(cbjl) > 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) +// checkAtomics checks each atomic keeps consistency. +func (o *operator) checkAtomics() error { + cjl := o.getAtomicJob() + cbjl := o.getAtomicBenchJob() + cbsl := o.getAtomicScenario() + if len(cjl) == 0 { + if len(cbjl) > 0 || len(cbsl) > 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) + } + return nil + } else if len(cbjl) == 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) + } + jobCounter := len(cjl) + scenarioBenchCounter := 0 + for sc := range cbsl { + scenarioBenchCounter += len(cbsl[sc].BenchJobStatus) + } + for jobName := range cjl { + if benchJob := cbjl[jobName]; benchJob != nil { + jobCounter-- + if owner := benchJob.GetOwnerReferences(); len(owner) > 0 { + scenarioName := owner[0].Name + if scenario := cbsl[scenarioName]; scenario != nil { + if _, ok := scenario.BenchJobStatus[benchJob.GetName()]; ok { + scenarioBenchCounter-- + } + } + } + } } - if cjl := o.getAtomicJob(); len(cjl) > 0 { - o.jobs.Store(&(map[string]string{})) + if jobCounter != 0 || scenarioBenchCounter != 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchAtomics(cjl, cbjl, cbsl) } + return nil } func (o *operator) PreStart(ctx context.Context) error { @@ -568,13 +601,13 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) { case <-ctx.Done(): return nil case <-rcticker.C: - cbsl := o.getAtomicScenario() - if cbsl == nil { - log.Info("benchmark scenario resource is empty") - // clear atomic pointer - o.initAtomics() - continue - } else { + // check mismatch atomic + err = o.checkAtomics() + if err != nil { + ech <- err + } + // determine whether benchmark scenario status should be updated. + if cbsl := o.getAtomicScenario(); cbsl != nil { scenarioStatus := make(map[string]v1.ValdBenchmarkScenarioStatus) for name, scenario := range cbsl { if scenario.Crd.Status != v1.BenchmarkScenarioCompleted {