diff --git a/.github/workflows/dockers-benchmark-job-image.yml b/.github/workflows/dockers-benchmark-job-image.yml index b45c698aabd..28a239f96f5 100644 --- a/.github/workflows/dockers-benchmark-job-image.yml +++ b/.github/workflows/dockers-benchmark-job-image.yml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/apis/grpc/v1/benchmark/benchmark.pb.go b/apis/grpc/v1/benchmark/benchmark.pb.go index 62c2735b802..69a3a42d391 100644 --- a/apis/grpc/v1/benchmark/benchmark.pb.go +++ b/apis/grpc/v1/benchmark/benchmark.pb.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/apis/grpc/v1/benchmark/benchmark_vtproto.pb.go b/apis/grpc/v1/benchmark/benchmark_vtproto.pb.go index 450737e3544..d0c6147ab29 100644 --- a/apis/grpc/v1/benchmark/benchmark_vtproto.pb.go +++ b/apis/grpc/v1/benchmark/benchmark_vtproto.pb.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/apis/proto/v1/benchmark/benchmark.proto b/apis/proto/v1/benchmark/benchmark.proto index f798b27e050..cc1bbc95963 100644 --- a/apis/proto/v1/benchmark/benchmark.proto +++ b/apis/proto/v1/benchmark/benchmark.proto @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/cmd/tools/benchmark/job/main.go b/cmd/tools/benchmark/job/main.go index b63331da0f3..c7e31931de2 100644 --- a/cmd/tools/benchmark/job/main.go +++ b/cmd/tools/benchmark/job/main.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/cmd/tools/benchmark/job/sample.yaml b/cmd/tools/benchmark/job/sample.yaml index 76dd83f8ea2..b9ee53ab515 100644 --- a/cmd/tools/benchmark/job/sample.yaml +++ b/cmd/tools/benchmark/job/sample.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/cmd/tools/benchmark/operator/main.go b/cmd/tools/benchmark/operator/main.go index 13660db9838..a6799e2922f 100644 --- a/cmd/tools/benchmark/operator/main.go +++ b/cmd/tools/benchmark/operator/main.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/go.mod b/go.mod index 664863bbe02..b4709760abf 100755 --- a/go.mod +++ b/go.mod @@ -386,6 +386,7 @@ require ( go.opentelemetry.io/otel/trace v1.11.2 go.uber.org/automaxprocs v0.0.0-00010101000000-000000000000 go.uber.org/goleak v1.2.1 + go.uber.org/ratelimit v0.2.0 go.uber.org/zap v1.24.0 gocloud.dev v0.0.0-00010101000000-000000000000 golang.org/x/net v0.15.0 @@ -394,6 +395,7 @@ require ( golang.org/x/sys v0.12.0 golang.org/x/text v0.13.0 golang.org/x/tools v0.13.0 + golang.org/x/time v0.3.0 gonum.org/v1/hdf5 v0.0.0-00010101000000-000000000000 gonum.org/v1/plot v0.10.1 google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb @@ -418,6 +420,7 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b // indirect + github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/campoy/embedmd v1.0.0 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect diff --git a/go.sum b/go.sum index 69e26ece2d7..bf12cf592ce 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,8 @@ github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b h1:slYM766cy2nI3BwyR github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/akavel/rsrc v0.10.2/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/stroke v0.0.0-20221221101821-bd29b49d73f0/go.mod h1:ccdDYaY5+gO+cbnQdFxEXqfy0RkoV25H3jLXUDNM3wg= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index 83fb41b2dde..0361ddb294f 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,33 +17,32 @@ // Package config providers configuration type and load configuration logic package config -import "github.com/vdaas/vald/internal/k8s/client" - // BenchmarkJob represents the configuration for the internal benchmark search job. type BenchmarkJob struct { - Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` - Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` - Dimension int `json:"dimension,omitempty" yaml:"dimension"` - Replica int `json:"replica,omitempty" yaml:"replica"` - Repetition int `json:"repetition,omitempty" yaml:"repetition"` - JobType string `json:"job_type,omitempty" yaml:"job_type"` - InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` - UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` - UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` - SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"` - RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` - ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"` - Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` - BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"` + Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` + Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` + Dimension int `json:"dimension,omitempty" yaml:"dimension"` + Replica int `json:"replica,omitempty" yaml:"replica"` + Repetition int `json:"repetition,omitempty" yaml:"repetition"` + JobType string `json:"job_type,omitempty" yaml:"job_type"` + InsertConfig *InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` + UpdateConfig *UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` + UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` + SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"` + RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` + ObjectConfig *ObjectConfig `json:"object_config,omitempty" yaml:"object_config"` + ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"` + Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` + BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"` BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"` - Client client.Client `json:"client,omitempty" yaml:"client"` + RPS int `json:"rps,omitempty" yaml:"rps"` } // BenchmarkScenario represents the configuration for the internal benchmark scenario. type BenchmarkScenario struct { - Target *BenchmarkTarget `json:"target" yaml:"target"` - Dataset *BenchmarkDataset `jon:"dataset" yaml:"dataset"` - Jobs []*BenchmarkJob `job:"jobs" yaml:jobs` + Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` + Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` + Jobs []*BenchmarkJob `json:"jobs,omitempty" yaml:"jobs"` } // BenchmarkTarget defines the desired state of BenchmarkTarget @@ -52,6 +51,11 @@ type BenchmarkTarget struct { Port int `json:"port,omitempty"` } +func (t *BenchmarkTarget) Bind() *BenchmarkTarget { + t.Host = GetActualValue(t.Host) + return t +} + // BenchmarkDataset defines the desired state of BenchmarkDateset type BenchmarkDataset struct { Name string `json:"name,omitempty"` @@ -60,6 +64,11 @@ type BenchmarkDataset struct { Range *BenchmarkDatasetRange `json:"range,omitempty"` } +func (d *BenchmarkDataset) Bind() *BenchmarkDataset { + d.Name = GetActualValue(d.Name) + return d +} + // BenchmarkDatasetRange defines the desired state of BenchmarkDatesetRange type BenchmarkDatasetRange struct { Start int `json:"start,omitempty"` @@ -72,22 +81,45 @@ type BenchmarkJobRule struct { Type string `json:"type,omitempty"` } +func (r *BenchmarkJobRule) Bind() *BenchmarkJobRule { + r.Name = GetActualValue(r.Name) + r.Type = GetActualValue(r.Type) + return r +} + // InsertConfig defines the desired state of insert config type InsertConfig struct { SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` Timestamp string `json:"timestamp,omitempty"` } +func (cfg *InsertConfig) Bind() *InsertConfig { + cfg.Timestamp = GetActualValue(cfg.Timestamp) + return cfg +} + // UpdateConfig defines the desired state of update config type UpdateConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` +} + +func (cfg *UpdateConfig) Bind() *UpdateConfig { + cfg.Timestamp = GetActualValue(cfg.Timestamp) + return cfg } // UpsertConfig defines the desired state of upsert config type UpsertConfig struct { - SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` - Timestamp string `json:"timestamp,omitempty"` + SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` + Timestamp string `json:"timestamp,omitempty"` + DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"` +} + +func (cfg *UpsertConfig) Bind() *UpsertConfig { + cfg.Timestamp = GetActualValue(cfg.Timestamp) + return cfg } // SearchConfig defines the desired state of search config @@ -99,19 +131,93 @@ type SearchConfig struct { Timeout string `json:"timeout,omitempty"` } +func (cfg *SearchConfig) Bind() *SearchConfig { + cfg.Timeout = GetActualValue(cfg.Timeout) + return cfg +} + // RemoveConfig defines the desired state of remove config type RemoveConfig struct { SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"` Timestamp string `json:"timestamp,omitempty"` } +func (cfg *RemoveConfig) Bind() *RemoveConfig { + cfg.Timestamp = GetActualValue(cfg.Timestamp) + return cfg +} + +// ObjectConfig defines the desired state of object config +type ObjectConfig struct { + FilterConfig FilterConfig `json:"filter_config,omitempty" yaml:"filter_config"` +} + +func (cfg *ObjectConfig) Bind() *ObjectConfig { + cfg.FilterConfig = *cfg.FilterConfig.Bind() + return cfg +} + +// FilterTarget defines the desired state of filter target +type FilterTarget struct { + Host string `json:"host,omitempty" yaml:"host"` + Port int32 `json:"port,omitempty" yaml:"port"` +} + +func (cfg *FilterTarget) Bind() *FilterTarget { + cfg.Host = GetActualValue(cfg.Host) + return cfg +} + +// FilterConfig defines the desired state of filter config +type FilterConfig struct { + Targets []*FilterTarget `json:"target,omitempty" yaml:"target"` +} + +func (cfg *FilterConfig) Bind() *FilterConfig { + for i := 0; i < len(cfg.Targets); i++ { + cfg.Targets[i] = cfg.Targets[i].Bind() + } + return cfg +} + // Bind binds the actual data from the Job receiver fields. func (b *BenchmarkJob) Bind() *BenchmarkJob { b.JobType = GetActualValue(b.JobType) + b.BeforeJobName = GetActualValue(b.BeforeJobName) + b.BeforeJobNamespace = GetActualValue(b.BeforeJobNamespace) + if b.Target != nil { + b.Target = b.Target.Bind() + } + if b.Dataset != nil { + b.Dataset = b.Dataset.Bind() + } + if b.InsertConfig != nil { + b.InsertConfig = b.InsertConfig.Bind() + } + if b.UpdateConfig != nil { + b.UpdateConfig = b.UpdateConfig.Bind() + } + if b.UpsertConfig != nil { + b.UpsertConfig = b.UpsertConfig.Bind() + } + if b.SearchConfig != nil { + b.SearchConfig = b.SearchConfig.Bind() + } + if b.RemoveConfig != nil { + b.RemoveConfig = b.RemoveConfig.Bind() + } + if b.ObjectConfig != nil { + b.ObjectConfig = b.ObjectConfig.Bind() + } if b.ClientConfig != nil { b.ClientConfig = b.ClientConfig.Bind() } + if len(b.Rules) > 0 { + for i := 0; i < len(b.Rules); i++ { + b.Rules[i] = b.Rules[i].Bind() + } + } return b } diff --git a/internal/errors/benchmark.go b/internal/errors/benchmark.go index d4fa5cd3c4e..aa5d5c4b877 100644 --- a/internal/errors/benchmark.go +++ b/internal/errors/benchmark.go @@ -29,4 +29,9 @@ var ( ErrFailedToCreateJob = func(err error, jn string) error { return Wrapf(err, "could not create job: %s ", jn) } + + // ErrMismatchBenchmarkAtomics represents a function to generate an error that mismatch each atomic.Pointer stored corresponding to benchmark tasks. + ErrMismatchBenchmarkAtomics = func(job, benchjob, benchscenario interface{}) error { + return Errorf("mismatch atomics: job=%v\tbenchjob=%v\tbenchscenario=%v", job, benchjob, benchscenario) + } ) diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 8cd8f7f6f87..2b3647ebd14 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/client/option.go b/internal/k8s/client/option.go index cae27f67758..6d2d9a624ba 100644 --- a/internal/k8s/client/option.go +++ b/internal/k8s/client/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml index cdefc9dcaa8..dbaf7547cc1 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkjob.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -129,6 +129,8 @@ spec: - type type: object type: array + rps: + type: integer insert_config: description: InsertConfig defines the desired state of insert config properties: @@ -168,6 +170,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -176,6 +180,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml index 135dee3c302..32010539ef1 100644 --- a/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml +++ b/internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -167,6 +167,8 @@ spec: - type type: object type: array + rps: + type: integer insert_config: description: InsertConfig defines the desired state of insert config properties: @@ -206,6 +208,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object upsert_config: description: UpsertConfig defines the desired state of upsert config @@ -214,6 +218,8 @@ spec: type: boolean timestamp: type: string + disable_balanced_update: + type: boolean type: object client_config: description: ClientConfig represents the configurations for gRPC client. diff --git a/internal/k8s/job/job.go b/internal/k8s/job/job.go index 026baa69e2d..aa07449d08c 100644 --- a/internal/k8s/job/job.go +++ b/internal/k8s/job/job.go @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,6 +20,9 @@ import ( "sync" "time" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s" + "github.com/vdaas/vald/internal/log" batchv1 "k8s.io/api/batch/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -28,10 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/k8s" - "github.com/vdaas/vald/internal/log" ) // JobWatcher is a type alias for k8s resource controller. diff --git a/internal/k8s/job/option.go b/internal/k8s/job/option.go index 283f4b156c5..7752092de48 100644 --- a/internal/k8s/job/option.go +++ b/internal/k8s/job/option.go @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/api/v1/info.go b/internal/k8s/vald/benchmark/api/v1/info.go index 1ad2f03a4a6..5fd71cf8cc8 100644 --- a/internal/k8s/vald/benchmark/api/v1/info.go +++ b/internal/k8s/vald/benchmark/api/v1/info.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/api/v1/job_types.go b/internal/k8s/vald/benchmark/api/v1/job_types.go index 40c71232d7b..135d1e42e95 100644 --- a/internal/k8s/vald/benchmark/api/v1/job_types.go +++ b/internal/k8s/vald/benchmark/api/v1/job_types.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,19 +23,21 @@ import ( ) type BenchmarkJobSpec struct { - Target *BenchmarkTarget `json:"target,omitempty"` - Dataset *BenchmarkDataset `json:"dataset,omitempty"` - Dimension int `json:"dimension,omitempty"` - Replica int `json:"replica,omitempty"` - Repetition int `json:"repetition,omitempty"` - JobType string `json:"job_type,omitempty"` - InsertConfig *config.InsertConfig `json:"insert_config,omitempty"` - UpdateConfig *config.UpdateConfig `json:"update_config,omitempty"` - UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty"` - SearchConfig *config.SearchConfig `json:"search_config,omitempty"` - RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty"` - ClientConfig *config.GRPCClient `json:"client_config,omitempty"` - Rules []*config.BenchmarkJobRule `json:"rules,omitempty"` + Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` + Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` + Dimension int `json:"dimension,omitempty" yaml:"dimension"` + Replica int `json:"replica,omitempty" yaml:"replica"` + Repetition int `json:"repetition,omitempty" yaml:"repetition"` + JobType string `json:"job_type,omitempty" yaml:"job_type"` + InsertConfig *config.InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` + UpdateConfig *config.UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` + UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` + SearchConfig *config.SearchConfig `json:"search_config,omitempty" yaml:"search_config"` + RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` + ObjectConfig *config.ObjectConfig `json:"object_config,omitempty" yaml:"object_config"` + ClientConfig *config.GRPCClient `json:"client_config,omitempty" yaml:"client_config"` + Rules []*config.BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` + RPS int `json:"rps,omitempty" yaml:"rps"` } type BenchmarkJobStatus string diff --git a/internal/k8s/vald/benchmark/api/v1/scenario_types.go b/internal/k8s/vald/benchmark/api/v1/scenario_types.go index 8973a1e6b5b..ddedb493714 100644 --- a/internal/k8s/vald/benchmark/api/v1/scenario_types.go +++ b/internal/k8s/vald/benchmark/api/v1/scenario_types.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/job/doc.go b/internal/k8s/vald/benchmark/job/doc.go index 678bce94ab7..0567d4f35bb 100644 --- a/internal/k8s/vald/benchmark/job/doc.go +++ b/internal/k8s/vald/benchmark/job/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/job/job.go b/internal/k8s/vald/benchmark/job/job.go index 3cfed17ed6d..87a7efe22a5 100644 --- a/internal/k8s/vald/benchmark/job/job.go +++ b/internal/k8s/vald/benchmark/job/job.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ type BenchmarkJobWatcher k8s.ResourceController var ( // GroupVersion is group version used to register these objects - GroupVersion = schema.GroupVersion{Group: "vald.benchmark.job", Version: "v1"} + GroupVersion = schema.GroupVersion{Group: "vald.vdaas.org", Version: "v1"} // SchemeBuilder is used to add go types to the GroupVersionKind scheme SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} // AddToScheme adds the types in this group-version to the given scheme. diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go index 2830e239311..884addcde59 100644 --- a/internal/k8s/vald/benchmark/job/job_template.go +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/job/job_template_option.go b/internal/k8s/vald/benchmark/job/job_template_option.go index 79b29a81b85..f5cc78f8756 100644 --- a/internal/k8s/vald/benchmark/job/job_template_option.go +++ b/internal/k8s/vald/benchmark/job/job_template_option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/job/option.go b/internal/k8s/vald/benchmark/job/option.go index bf834ec347d..aece8a869dc 100644 --- a/internal/k8s/vald/benchmark/job/option.go +++ b/internal/k8s/vald/benchmark/job/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/scenario/doc.go b/internal/k8s/vald/benchmark/scenario/doc.go index c3800f67233..b93a52b69b7 100644 --- a/internal/k8s/vald/benchmark/scenario/doc.go +++ b/internal/k8s/vald/benchmark/scenario/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/scenario/option.go b/internal/k8s/vald/benchmark/scenario/option.go index fcbc83af47f..adda6adfeec 100644 --- a/internal/k8s/vald/benchmark/scenario/option.go +++ b/internal/k8s/vald/benchmark/scenario/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/k8s/vald/benchmark/scenario/scenario.go b/internal/k8s/vald/benchmark/scenario/scenario.go index c3e99b49870..cde92a808de 100644 --- a/internal/k8s/vald/benchmark/scenario/scenario.go +++ b/internal/k8s/vald/benchmark/scenario/scenario.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/test/data/hdf5/doc.go b/internal/test/data/hdf5/doc.go index f8650c005ef..0db75d556ed 100644 --- a/internal/test/data/hdf5/doc.go +++ b/internal/test/data/hdf5/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/test/data/hdf5/hdf5.go b/internal/test/data/hdf5/hdf5.go index e83965ec8c1..f178a52c2e1 100644 --- a/internal/test/data/hdf5/hdf5.go +++ b/internal/test/data/hdf5/hdf5.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,11 +21,10 @@ import ( "os" "reflect" - "gonum.org/v1/hdf5" - "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/io" "github.com/vdaas/vald/internal/net/http/client" + "gonum.org/v1/hdf5" ) type Data interface { @@ -33,6 +32,7 @@ type Data interface { Read() error GetName() DatasetName GetPath() string + GetByGroupName(name string) [][]float32 GetTrain() [][]float32 GetTest() [][]float32 GetNeighbors() [][]int @@ -164,6 +164,27 @@ func (d *data) GetPath() string { return d.path } +// TODO: Apply generics +func (d *data) GetByGroupName(name string) [][]float32 { + switch name { + case "train": + return d.GetTrain() + case "test": + return d.GetTest() + case "neighbors": + l := d.GetNeighbors() + r := make([][]float32, 0) + for x := range l { + for y, z := range l[x] { + r[x][y] = float32(z) + } + } + return r + default: + return nil + } +} + func (d *data) GetTrain() [][]float32 { return d.train } diff --git a/internal/test/data/hdf5/option.go b/internal/test/data/hdf5/option.go index c18bcbd5e44..6fee9d0ae37 100644 --- a/internal/test/data/hdf5/option.go +++ b/internal/test/data/hdf5/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/internal/timeutil/rate/rate.go b/internal/timeutil/rate/rate.go new file mode 100644 index 00000000000..045b999a9a9 --- /dev/null +++ b/internal/timeutil/rate/rate.go @@ -0,0 +1,61 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package rate + +import ( + "context" + "runtime" + + "go.uber.org/ratelimit" + "golang.org/x/time/rate" +) + +type Limiter interface { + Wait(ctx context.Context) error +} + +type limiter struct { + isStd bool + uber ratelimit.Limiter + std *rate.Limiter +} + +func NewLimiter(cnt int) Limiter { + if runtime.GOMAXPROCS(0) >= 32 { + return &limiter{ + isStd: true, + std: rate.NewLimiter(rate.Limit(cnt), 1), + } + } + return &limiter{ + isStd: false, + uber: ratelimit.New(cnt, ratelimit.WithoutSlack), + } +} + +func (l *limiter) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if l.isStd { + return l.std.Wait(ctx) + } + l.uber.Take() + } + return nil +} diff --git a/internal/k8s/rbac/benchmark/job/clusterrole.yaml b/k8s/tools/benchmark/job/clusterrole.yaml similarity index 91% rename from internal/k8s/rbac/benchmark/job/clusterrole.yaml rename to k8s/tools/benchmark/job/clusterrole.yaml index 4cdc233ec49..3fe77f9445c 100644 --- a/internal/k8s/rbac/benchmark/job/clusterrole.yaml +++ b/k8s/tools/benchmark/job/clusterrole.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,9 +16,8 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null - name: benchmark-job-role # TODO: fix - namespace: default # TODO: fix + name: benchmark-job-role + namespace: default rules: - apiGroups: - apps diff --git a/internal/k8s/rbac/benchmark/job/clusterrolebinding.yaml b/k8s/tools/benchmark/job/clusterrolebinding.yaml similarity index 80% rename from internal/k8s/rbac/benchmark/job/clusterrolebinding.yaml rename to k8s/tools/benchmark/job/clusterrolebinding.yaml index f856e1b7535..46b5b27c632 100644 --- a/internal/k8s/rbac/benchmark/job/clusterrolebinding.yaml +++ b/k8s/tools/benchmark/job/clusterrolebinding.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,8 +20,8 @@ metadata: roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: benchmark-job-role # TODO: fix + name: benchmark-job-role subjects: - kind: ServiceAccount - name: benchmark-job-service # TODO: fix - namespace: default # TODO: fix + name: benchmark-job-service + namespace: default diff --git a/internal/k8s/rbac/benchmark/job/serviceaccount.yaml b/k8s/tools/benchmark/job/serviceaccount.yaml similarity index 81% rename from internal/k8s/rbac/benchmark/job/serviceaccount.yaml rename to k8s/tools/benchmark/job/serviceaccount.yaml index 3bf6b046cab..419a7f8f375 100644 --- a/internal/k8s/rbac/benchmark/job/serviceaccount.yaml +++ b/k8s/tools/benchmark/job/serviceaccount.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,5 +16,5 @@ apiVersoin: v1 kind: ServiceAccount metadata: - name: benchmark-job-service # TODO: fix - namespace: default # TODO: fix + name: benchmark-job-service + namespace: default diff --git a/internal/k8s/rbac/benchmark/job/svc.yaml b/k8s/tools/benchmark/job/svc.yaml similarity index 81% rename from internal/k8s/rbac/benchmark/job/svc.yaml rename to k8s/tools/benchmark/job/svc.yaml index ce79a170e1d..fd78cebd0e7 100644 --- a/internal/k8s/rbac/benchmark/job/svc.yaml +++ b/k8s/tools/benchmark/job/svc.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,11 +20,11 @@ metadata: spec: ports: - name: prometheus - port: 4000 # TODO: fix - targetPort: 4000 # TODO: fix + port: 4000 # FIX ME + targetPort: 4000 # FIX ME protocol: TCP selector: - app.kubernetes.io/name: benchmark-job-svc # TODO: fix + app.kubernetes.io/name: benchmark-job-svc app.kubernetes.io/component: benchmark-job clusterIP: None type: ClusterIP diff --git a/internal/k8s/rbac/benchmark/operator/clusterrole.yaml b/k8s/tools/benchmark/operator/clusterrole.yaml similarity index 86% rename from internal/k8s/rbac/benchmark/operator/clusterrole.yaml rename to k8s/tools/benchmark/operator/clusterrole.yaml index acd2ffe14c4..0b648889895 100644 --- a/internal/k8s/rbac/benchmark/operator/clusterrole.yaml +++ b/k8s/tools/benchmark/operator/clusterrole.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,9 +16,8 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null - name: benchmark-scenario-role # TODO: fix - namespace: default # TODO: fix + name: benchmark-operator-role + namespace: default rules: - apiGroups: - apps @@ -32,6 +31,18 @@ rules: - patch - update - watch + - apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - vald.vdaas.org resources: diff --git a/internal/k8s/rbac/benchmark/operator/clusterrolebinding.yaml b/k8s/tools/benchmark/operator/clusterrolebinding.yaml similarity index 76% rename from internal/k8s/rbac/benchmark/operator/clusterrolebinding.yaml rename to k8s/tools/benchmark/operator/clusterrolebinding.yaml index e87baf62420..d4506545169 100644 --- a/internal/k8s/rbac/benchmark/operator/clusterrolebinding.yaml +++ b/k8s/tools/benchmark/operator/clusterrolebinding.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,12 +16,12 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: benchmark-scenario-rolebinding + name: benchmark-operator-rolebinding roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: benchmark-scenario-role # TODO: fix + name: benchmark-operator-role subjects: - kind: ServiceAccount - name: vald-benchmark-operator # TODO: fix - namespace: default # TODO: fix + name: vald-benchmark-operator + namespace: default diff --git a/k8s/tools/benchmark/operator/configmap.yaml b/k8s/tools/benchmark/operator/configmap.yaml new file mode 100644 index 00000000000..7ac38b6e511 --- /dev/null +++ b/k8s/tools/benchmark/operator/configmap.yaml @@ -0,0 +1,166 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +kind: ConfigMap +metadata: + name: vald-benchmark-operator-config + labels: + app.kubernetes.io/name: vald + app.kubernetes.io/component: vald-benchmark-operator +data: + config.yaml: | + --- + version: v0.0.0 + time_zone: JST + logging: + format: raw + level: debug + logger: glg + server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + header_table_size: 0 + initial_conn_window_size: 0 + initial_window_size: 0 + interceptors: [] + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + time: "" + timeout: "" + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + read_buffer_size: 0 + write_buffer_size: 0 + mode: GRPC + probe_wait_time: 3s + restart: true + health_check_servers: + - name: liveness + host: 0.0.0.0 + port: 3000 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 5s + write_timeout: "" + mode: "" + probe_wait_time: 3s + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: "" + probe_wait_time: 3s + metrics_servers: + startup_strategy: + - liveness + - grpc + - readiness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + observability: + enabled: false + collector: + duration: 5s + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - ngt_version + trace: + enabled: false + sampling_rate: 1 + prometheus: + enabled: false + endpoint: /metrics + namespace: vald + jaeger: + enabled: false + collector_endpoint: "" + agent_endpoint: "jaeger-agent.default.svc.cluster.local:6831" + username: "" + password: "" + service_name: "vald-benchmark-job" + buffer_max_count: 10 + stackdriver: + project_id: "" + client: + api_key: "" + audiences: [] + authentication_enabled: true + credentials_file: "" + credentials_json: "" + endpoint: "" + quota_project: "" + request_reason: "" + scopes: [] + telemetry_enabled: true + user_agent: "" + exporter: + bundle_count_threshold: 0 + bundle_delay_threshold: "0" + location: "" + metric_prefix: vald.vdaas.org + monitoring_enabled: false + number_of_workers: 1 + reporting_interval: 1m + skip_cmd: false + timeout: 5s + trace_spans_buffer_max_bytes: 0 + tracing_enabled: false + profiler: + enabled: false + service: "vald-benchmark-job" + service_version: "" + debug_logging: false + mutex_profiling: true + cpu_profiling: true + alloc_profiling: true + heap_profiling: true + goroutine_profiling: true + alloc_force_gc: false + api_addr: "" + instance: "" + zone: "" diff --git a/k8s/tools/benchmark/operator/deployment.yaml b/k8s/tools/benchmark/operator/deployment.yaml new file mode 100644 index 00000000000..a5a5bd241b8 --- /dev/null +++ b/k8s/tools/benchmark/operator/deployment.yaml @@ -0,0 +1,51 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: apps/v1 +kind: Deployment +metadata: + name: vald-benchmark-operator + labels: + app: vald-benchmark-operator +spec: + replicas: 1 + selector: + matchLabels: + app: vald-benchmark-operator + template: + metadata: + labels: + app: vald-benchmark-operator + spec: + containers: + - name: vald-benchmark-operator + image: vdaas/vald-benchmark-operator:latest + imagePullPolicy: Always + volumeMounts: + - name: vald-benchmark-operator-config + mountPath: /etc/server/ + ports: + - name: liveness + protocol: TCP + containerPort: 3000 + - name: readiness + protocol: TCP + containerPort: 3001 + serviceAccountName: vald-benchmark-operator + volumes: + - name: vald-benchmark-operator-config + configMap: + defaultMode: 420 + name: vald-benchmark-operator-config diff --git a/internal/k8s/rbac/benchmark/operator/serviceaccount.yaml b/k8s/tools/benchmark/operator/serviceaccount.yaml similarity index 80% rename from internal/k8s/rbac/benchmark/operator/serviceaccount.yaml rename to k8s/tools/benchmark/operator/serviceaccount.yaml index 612c1bf84a4..617a413a166 100644 --- a/internal/k8s/rbac/benchmark/operator/serviceaccount.yaml +++ b/k8s/tools/benchmark/operator/serviceaccount.yaml @@ -1,5 +1,5 @@ # -# Copyright (C) 2019-2022 vdaas.org vald team +# Copyright (C) 2019-2023 vdaas.org vald team # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,5 +16,5 @@ apiVersion: v1 kind: ServiceAccount metadata: - name: vald-benchmark-operator # TODO: fix - namespace: default # TODO: fix + name: vald-benchmark-operator + namespace: default diff --git a/pkg/tools/benchmark/job/config/config.go b/pkg/tools/benchmark/job/config/config.go index 34aa8e34feb..ad1cdb839a9 100644 --- a/pkg/tools/benchmark/job/config/config.go +++ b/pkg/tools/benchmark/job/config/config.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -43,11 +43,16 @@ type Config struct { // Job represents benchmark job configurations Job *config.BenchmarkJob `json:"job" yaml:"job"` + + // K8sClient represents kubernetes clients + K8sClient client.Client } var ( - NAMESPACE = os.Getenv("CRD_NAMESPACE") - NAME = os.Getenv("CRD_NAME") + NAMESPACE = os.Getenv("CRD_NAMESPACE") + NAME = os.Getenv("CRD_NAME") + JOBNAME_ANNOTATION = "before-job-name" + JOBNAMESPACE_ANNOTATION = "before-job-namespace" ) // NewConfig represents the set config from the given setting file path. @@ -85,15 +90,15 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { // Get config from applied ValdBenchmarkJob custom resource var jobResource v1.ValdBenchmarkJob - if cfg.Job.Client == nil { + if cfg.K8sClient == nil { c, err := client.New(client.WithSchemeBuilder(*v1.SchemeBuilder)) if err != nil { log.Error(err.Error()) return nil, err } - cfg.Job.Client = c + cfg.K8sClient = c } - err = cfg.Job.Client.Get(ctx, NAME, NAMESPACE, &jobResource) + err = cfg.K8sClient.Get(ctx, NAME, NAMESPACE, &jobResource) if err != nil { log.Warn(err.Error()) } else { @@ -108,10 +113,12 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { cfg.Job.UpsertConfig = jobResource.Spec.UpsertConfig cfg.Job.SearchConfig = jobResource.Spec.SearchConfig cfg.Job.RemoveConfig = jobResource.Spec.RemoveConfig + cfg.Job.ObjectConfig = jobResource.Spec.ObjectConfig cfg.Job.ClientConfig = jobResource.Spec.ClientConfig + cfg.Job.RPS = jobResource.Spec.RPS if annotations := jobResource.GetAnnotations(); annotations != nil { - cfg.Job.BeforeJobName = annotations["before-job-name"] - cfg.Job.BeforeJobNamespace = annotations["before-job-namespace"] + cfg.Job.BeforeJobName = annotations[JOBNAME_ANNOTATION] + cfg.Job.BeforeJobNamespace = annotations[JOBNAMESPACE_ANNOTATION] } } diff --git a/pkg/tools/benchmark/job/config/config_test.go b/pkg/tools/benchmark/job/config/config_test.go index dc35aae48ba..57dcee5efdc 100644 --- a/pkg/tools/benchmark/job/config/config_test.go +++ b/pkg/tools/benchmark/job/config/config_test.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,12 +20,10 @@ package config import ( "context" "io/fs" - "os" "testing" "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/test/comparator" "github.com/vdaas/vald/internal/test/goleak" ) @@ -59,26 +57,12 @@ func TestNewConfig(t *testing.T) { } tests := []test{ func() test { - name := "/home/vankichi/Documents/vald-read-test.yaml" - path := name + var path string return test{ name: "return error when can't read file", args: args{ path: path, }, - beforeFunc: func(t *testing.T, a args) { - t.Helper() - f, err := file.Open(a.path, os.O_CREATE, fs.ModeIrregular) - if err != nil { - if errors.Is(err, fs.ErrPermission) { - return - } - t.Error(err) - } - if err := f.Close(); err != nil { - t.Error(err) - } - }, checkFunc: func(w want, gotCfg *Config, err error) error { if errors.Is(err, fs.ErrPermission) { return nil @@ -86,17 +70,14 @@ func TestNewConfig(t *testing.T) { if !errors.Is(err, w.err) { return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) } - return nil - }, - afterFunc: func(t *testing.T, a args) { - t.Helper() - if err := os.Remove(a.path); err != nil { - t.Fatal(err) + if gotCfg != nil { + return errors.Errorf("got cfg: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotCfg, nil) } + return nil }, want: want{ wantCfg: nil, - err: errors.ErrUnsupportedConfigFileType(".yaml"), + err: errors.ErrPathNotSpecified, }, } }(), diff --git a/pkg/tools/benchmark/job/config/doc.go b/pkg/tools/benchmark/job/config/doc.go index 57ef74c464a..04fa0ebe9bf 100644 --- a/pkg/tools/benchmark/job/config/doc.go +++ b/pkg/tools/benchmark/job/config/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/handler/doc.go b/pkg/tools/benchmark/job/handler/doc.go index f1014141ea4..825bcc7f61f 100644 --- a/pkg/tools/benchmark/job/handler/doc.go +++ b/pkg/tools/benchmark/job/handler/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/handler/grpc/handler.go b/pkg/tools/benchmark/job/handler/grpc/handler.go index 85be642c7d8..7febc72d11d 100644 --- a/pkg/tools/benchmark/job/handler/grpc/handler.go +++ b/pkg/tools/benchmark/job/handler/grpc/handler.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/handler/grpc/option.go b/pkg/tools/benchmark/job/handler/grpc/option.go index 4319ef3a748..d5d9c32d1e5 100644 --- a/pkg/tools/benchmark/job/handler/grpc/option.go +++ b/pkg/tools/benchmark/job/handler/grpc/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/handler/rest/handler.go b/pkg/tools/benchmark/job/handler/rest/handler.go index f752dbbbbf4..1b9494c3d57 100644 --- a/pkg/tools/benchmark/job/handler/rest/handler.go +++ b/pkg/tools/benchmark/job/handler/rest/handler.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/handler/rest/option.go b/pkg/tools/benchmark/job/handler/rest/option.go index ca3aee436d8..054b120b948 100644 --- a/pkg/tools/benchmark/job/handler/rest/option.go +++ b/pkg/tools/benchmark/job/handler/rest/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/router/doc.go b/pkg/tools/benchmark/job/router/doc.go index 053c9aeb49c..ffa1ce388fc 100644 --- a/pkg/tools/benchmark/job/router/doc.go +++ b/pkg/tools/benchmark/job/router/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/router/option.go b/pkg/tools/benchmark/job/router/option.go index 063f5ce6f2c..7053ee57ea2 100644 --- a/pkg/tools/benchmark/job/router/option.go +++ b/pkg/tools/benchmark/job/router/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/router/router.go b/pkg/tools/benchmark/job/router/router.go index e032578e97b..6b45c721843 100644 --- a/pkg/tools/benchmark/job/router/router.go +++ b/pkg/tools/benchmark/job/router/router.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/service/doc.go b/pkg/tools/benchmark/job/service/doc.go index eb86a28c5bf..4d4cfe1548d 100644 --- a/pkg/tools/benchmark/job/service/doc.go +++ b/pkg/tools/benchmark/job/service/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/job/service/insert.go b/pkg/tools/benchmark/job/service/insert.go new file mode 100644 index 00000000000..946710df4b5 --- /dev/null +++ b/pkg/tools/benchmark/job/service/insert.go @@ -0,0 +1,77 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/status" +) + +func (j *job) insert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking insert") + // create data + vecs := j.genVec(j.dataset) + cfg := &payload.Insert_Config{ + SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck, + } + if j.timestamp > int64(0) { + cfg.Timestamp = j.timestamp + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start insert: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + res, err := j.client.Insert(ctx, &payload.Insert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(i), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + log.Warnf("[benchmark job] insert error is detected: code = %d, msg = %s", st.Code(), err.Error()) + } + } + // TODO: send metrics to the Prometeus + log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, res) + } + log.Info("[benchmark job] Finish benchmarking insert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 69c0d43f0ee..2736ad898c9 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,8 +19,10 @@ package service import ( "context" + "math" "os" "reflect" + "strconv" "syscall" "time" @@ -34,6 +36,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/test/data/hdf5" + "github.com/vdaas/vald/internal/timeutil/rate" ) type Job interface { @@ -46,15 +49,33 @@ type jobType int const ( USERDEFINED jobType = iota + INSERT SEARCH + UPDATE + UPSERT + REMOVE + GETOBJECT + EXISTS ) func (jt jobType) String() string { switch jt { case USERDEFINED: return "userdefined" + case INSERT: + return "insert" case SEARCH: return "search" + case UPDATE: + return "update" + case UPSERT: + return "upsert" + case REMOVE: + return "remove" + case GETOBJECT: + return "getobject" + case EXISTS: + return "exists" } return "" } @@ -70,12 +91,17 @@ type job struct { upsertConfig *config.UpsertConfig searchConfig *config.SearchConfig removeConfig *config.RemoveConfig + objectConfig *config.ObjectConfig client vald.Client hdf5 hdf5.Data beforeJobName string beforeJobNamespace string k8sClient client.Client beforeJobDur time.Duration + limiter rate.Limiter + rps int + timeout time.Duration + timestamp int64 } func New(opts ...Option) (Job, error) { @@ -91,16 +117,90 @@ func New(opts ...Option) (Job, error) { opt := WithJobFunc(j.jobFunc) err := opt(j) return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + case INSERT: + j.jobFunc = j.insert + if j.insertConfig == nil { + return nil, errors.NewErrInvalidOption("insert config", j.insertConfig) + } + ts, err := strconv.Atoi(j.insertConfig.Timestamp) + if err != nil { + log.Warn("[benchmark job]: ", errors.NewErrInvalidOption("insert config timestamp", j.insert, err).Error()) + } else { + j.timestamp = int64(ts) + } case SEARCH: j.jobFunc = j.search + if j.searchConfig == nil { + return nil, errors.NewErrInvalidOption("search config", j.searchConfig) + } + to, err := time.ParseDuration(j.searchConfig.Timeout) + if err != nil { + log.Warn("[benchmark job]: ", errors.NewErrInvalidOption("search config timeout", j.searchConfig.Timeout, err).Error()) + } else { + j.timeout = to + } + case UPDATE: + j.jobFunc = j.update + if j.updateConfig == nil { + return nil, errors.NewErrInvalidOption("update config", j.updateConfig) + } + ts, err := strconv.Atoi(j.updateConfig.Timestamp) + if err != nil { + log.Warn("[benchmark job]: ", errors.NewErrInvalidOption("update config timestamp", j.updateConfig.Timestamp, err).Error()) + } else { + j.timestamp = int64(ts) + } + case UPSERT: + j.jobFunc = j.upsert + if j.upsertConfig == nil { + return nil, errors.NewErrInvalidOption("upsert config", j.insertConfig) + } + ts, err := strconv.Atoi(j.upsertConfig.Timestamp) + if err != nil { + log.Warn("[benchmark job]: ", errors.NewErrInvalidOption("upsert config timestamp", j.upsertConfig.Timestamp, err).Error()) + } else { + j.timestamp = int64(ts) + } + case REMOVE: + j.jobFunc = j.remove + if j.removeConfig == nil { + return nil, errors.NewErrInvalidOption("insert config", j.insertConfig) + } + ts, err := strconv.Atoi(j.removeConfig.Timestamp) + if err != nil { + log.Warn("[benchmark job]: ", errors.NewErrInvalidOption("remove config timestamp", j.removeConfig.Timestamp, err).Error()) + } else { + j.timestamp = int64(ts) + } + case GETOBJECT: + j.jobFunc = j.getObject + if j.objectConfig == nil { + log.Warnf("[benchmark job] No get object config is set: %v", j.objectConfig) + } + case EXISTS: + j.jobFunc = j.exists } } else if j.jobType != USERDEFINED { log.Warnf("[benchmark job] userdefined jobFunc is set but jobType is set %s", j.jobType.String()) } + if j.rps > 0 { + j.limiter = rate.NewLimiter(j.rps) + } return j, nil } func (j *job) PreStart(ctx context.Context) error { + log.Infof("[benchmark job] start download dataset of %s", j.hdf5.GetName().String()) + if err := j.hdf5.Download(); err != nil { + return err + } + log.Infof("[benchmark job] success download dataset of %s", j.hdf5.GetName().String()) + log.Infof("[benchmark job] start load dataset of %s", j.hdf5.GetName().String()) + if err := j.hdf5.Read(); err != nil { + return err + } + log.Infof("[benchmark job] success load dataset of %s", j.hdf5.GetName().String()) + // Wait for beforeJob completed if exists if len(j.beforeJobName) != 0 { var jobResource v1.ValdBenchmarkJob log.Info("[benchmark job] check before benchjob is completed or not...") @@ -128,17 +228,6 @@ func (j *job) PreStart(ctx context.Context) error { return err } } - - log.Infof("[benchmark job] start download dataset of %s", j.hdf5.GetName().String()) - if err := j.hdf5.Download(); err != nil { - return err - } - log.Infof("[benchmark job] success download dataset of %s", j.hdf5.GetName().String()) - log.Infof("[benchmark job] start load dataset of %s", j.hdf5.GetName().String()) - if err := j.hdf5.Read(); err != nil { - return err - } - log.Infof("[benchmark job] success load dataset of %s", j.hdf5.GetName().String()) return nil } @@ -169,7 +258,7 @@ func (j *job) Start(ctx context.Context) (<-chan error, error) { if err != nil { select { case <-ctx.Done(): - ech <- errors.Wrap(err, ctx.Err().Error()) + ech <- errors.Join(err, ctx.Err()) case ech <- err: } } @@ -192,33 +281,42 @@ func (j *job) Stop(ctx context.Context) (err error) { return } -func calcRecall(linearRes, searchRes []*payload.Object_Distance) (recall float64) { - if len(linearRes) == 0 || len(searchRes) == 0 { +func calcRecall(linearRes, searchRes *payload.Search_Response) (recall float64) { + if linearRes == nil || searchRes == nil { + return + } + lres := linearRes.Results + sres := searchRes.Results + if len(lres) == 0 || len(sres) == 0 { return } linearIds := map[string]struct{}{} - for _, v := range linearRes { + for _, v := range lres { linearIds[v.Id] = struct{}{} } - for _, v := range searchRes { + for _, v := range sres { if _, ok := linearIds[v.Id]; ok { recall++ } } - return recall / float64(len(linearRes)) + return recall / float64(len(lres)) } -func genVec(data [][]float32, cfg *config.BenchmarkDataset) [][]float32 { +func (j *job) genVec(cfg *config.BenchmarkDataset) [][]float32 { start := cfg.Range.Start end := cfg.Range.End - if (end - start) < cfg.Indexes { - end = cfg.Indexes + // If (Range.End - Range.Start) is smaller than Indexes, Indexes are prioritized based on Range.Start. + if (end - start + 1) < cfg.Indexes { + end = cfg.Range.Start + cfg.Indexes } - num := end - start + 1 - if len(data) < num { - num = len(data) - end = start + num + 1 + data := j.hdf5.GetByGroupName(cfg.Group) + if n := math.Ceil(float64(end) / float64(len(data))); n > 1 { + var def [][]float32 + for i := 0; i < int(n-1); i++ { + def = append(def, data...) + } + data = append(data, def...) } - vectors := data[start : end+1] + vectors := data[start-1 : end] return vectors } diff --git a/pkg/tools/benchmark/job/service/object.go b/pkg/tools/benchmark/job/service/object.go new file mode 100644 index 00000000000..c36ebd9fdbe --- /dev/null +++ b/pkg/tools/benchmark/job/service/object.go @@ -0,0 +1,119 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/status" +) + +func (j *job) exists(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking exists") + for i := 0; i < j.dataset.Indexes; i++ { + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + res, err := j.client.Exists(ctx, &payload.Object_ID{ + Id: strconv.Itoa(i), + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + log.Warnf("[benchmark job] exists error is detected: code = %d, msg = %s", st.Code(), err.Error()) + } + } + if res != nil { + log.Infof("[benchmark exists job] iter=%d, Id=%s", i, res.GetId()) + } + } + log.Info("[benchmark job] Finish benchmarking exists") + return nil +} + +func (j *job) getObject(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking getObject") + // create data + vecs := j.genVec(j.dataset) + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start getObject: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + ft := []*payload.Filter_Target{} + if j.objectConfig != nil { + for i, target := range j.objectConfig.FilterConfig.Targets { + ft[i] = &payload.Filter_Target{ + Host: target.Host, + Port: uint32(target.Port), + } + } + } + res, err := j.client.GetObject(ctx, &payload.Object_VectorRequest{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(i), + }, + Filters: &payload.Filter_Config{ + Targets: ft, + }, + }) + if res != nil { + log.Infof("[benchmark get object job] iter=%d, Id=%s, Vec=%v", i, res.GetId(), res.GetVector()) + } + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + log.Warnf("[benchmark job] get object error is detected: code = %d, msg = %s", st.Code(), err.Error()) + } + } + } + log.Info("[benchmark job] Finish benchmarking getObject") + return nil +} diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 998f9f87001..772e638a6ff 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ var defaultOpts = []Option{ // TODO: set default config for client WithDimension(748), WithBeforeJobDuration("30s"), + WithRPS(100), } // WithDimension sets the vector's dimension for running benchmark job with dataset. @@ -97,6 +98,16 @@ func WithRemoveConfig(c *config.RemoveConfig) Option { } } +// WithObjectConfig sets the get object API config for running get object request job. +func WithObjectConfig(c *config.ObjectConfig) Option { + return func(j *job) error { + if c != nil { + j.objectConfig = c + } + return nil + } +} + // WithValdClient sets the Vald client for sending request to the target Vald cluster. func WithValdClient(c vald.Client) Option { return func(j *job) error { @@ -147,8 +158,20 @@ func WithJobTypeByString(t string) Option { switch t { case "userdefined": jt = USERDEFINED + case "insert": + jt = INSERT case "search": jt = SEARCH + case "update": + jt = UPDATE + case "upsert": + jt = UPSERT + case "remove": + jt = REMOVE + case "getobject": + jt = GETOBJECT + case "exists": + jt = EXISTS } return WithJobType(jt) } @@ -156,14 +179,10 @@ func WithJobTypeByString(t string) Option { // WithJobType sets the jobType for running benchmark job. func WithJobType(jt jobType) Option { return func(j *job) error { - switch jt { - case USERDEFINED: - j.jobType = jt - case SEARCH: - j.jobType = jt - default: - return errors.NewErrInvalidOption("jobType", jt) + if len(jt.String()) == 0 { + return errors.NewErrInvalidOption("jobType", jt.String()) } + j.jobType = jt return nil } } @@ -223,3 +242,13 @@ func WithK8sClient(cli client.Client) Option { return nil } } + +// WithRPS sets the rpc for sending request per seconds to the target Vald cluster. +func WithRPS(rps int) Option { + return func(j *job) error { + if rps > 0 { + j.rps = rps + } + return nil + } +} diff --git a/pkg/tools/benchmark/job/service/remove.go b/pkg/tools/benchmark/job/service/remove.go new file mode 100644 index 00000000000..7e6db88e12b --- /dev/null +++ b/pkg/tools/benchmark/job/service/remove.go @@ -0,0 +1,76 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/status" +) + +func (j *job) remove(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking remove") + // create data + vecs := j.genVec(j.dataset) + cfg := &payload.Remove_Config{ + SkipStrictExistCheck: j.removeConfig.SkipStrictExistCheck, + } + if j.timestamp > int64(0) { + cfg.Timestamp = j.timestamp + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start remove: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + res, err := j.client.Remove(ctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(i), + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + log.Warnf("[benchmark job] remove error is detected: code = %d, msg = %s", st.Code(), err.Error()) + } + } + log.Infof("[benchmark job] Finish remove: iter= %d \n%v", i, res) + } + + log.Info("[benchmark job] Finish benchmarking remove") + return nil +} diff --git a/pkg/tools/benchmark/job/service/search.go b/pkg/tools/benchmark/job/service/search.go index 2d14a6d84bf..dbc9bcbc359 100644 --- a/pkg/tools/benchmark/job/service/search.go +++ b/pkg/tools/benchmark/job/service/search.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,87 +19,99 @@ package service import ( "context" - "testing" - "time" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) search(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking search") - if j.searchConfig == nil { - err := errors.NewErrInvalidOption("searchConfig", j.searchConfig) - select { - case <-ctx.Done(): - if err != context.Canceled { - ech <- errors.Wrap(err, ctx.Err().Error()) - } else { - ech <- err - } - case ech <- err: - } - return err - } - // create data - vecs := genVec(j.hdf5.GetTest(), j.dataset) - timeout, _ := time.ParseDuration(j.searchConfig.Timeout) + vecs := j.genVec(j.dataset) cfg := &payload.Search_Config{ Num: uint32(j.searchConfig.Num), MinNum: uint32(j.searchConfig.MinNum), Radius: float32(j.searchConfig.Radius), Epsilon: float32(j.searchConfig.Epsilon), - Timeout: timeout.Nanoseconds(), + Timeout: j.timeout.Nanoseconds(), } + lres := make([]*payload.Search_Response, len(vecs)) for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start search: iter = %d", i) - lres, err := j.client.LinearSearch(ctx, &payload.Search_Request{ + if len(vecs[i]) != j.dimension { + log.Warn("len(vecs) ", len(vecs[i]), "is not matched with ", j.dimension) + continue + } + res, err := j.client.LinearSearch(ctx, &payload.Search_Request{ Vector: vecs[i], Config: cfg, }) if err != nil { select { case <-ctx.Done(): - if !errors.Is(err, context.Canceled) { - ech <- errors.Wrap(err, ctx.Err().Error()) - } else { - ech <- err + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + if st.Code() != codes.NotFound { + log.Warnf("[benchmark job] linear search error is detected: code = %d, msg = %s", st.Code(), err.Error()) } - case ech <- err: } - return err } - bres := testing.Benchmark(func(b *testing.B) { - b.Helper() - b.ResetTimer() - start := time.Now() - sres, err := j.client.Search(ctx, &payload.Search_Request{ - Vector: vecs[i], - Config: cfg, - }) - if err != nil { + lres[i] = res + } + sres := make([]*payload.Search_Response, len(vecs)) + log.Infof("[benchmark job] Start search") + for i := 0; i < len(vecs); i++ { + if len(vecs[i]) != j.dimension { + log.Warn("len(vecs) ", len(vecs[i]), "is not matched with ", j.dimension) + continue + } + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + res, err := j.client.Search(ctx, &payload.Search_Request{ + Vector: vecs[i], + Config: cfg, + }) + log.Infof("[benchmark job] search %d", i) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } select { case <-ctx.Done(): - if errors.Is(err, context.Canceled) { - ech <- errors.Wrap(err, ctx.Err().Error()) - } else { - ech <- err - } - case ech <- err: - break + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + if st.Code() != codes.NotFound { + log.Warnf("[benchmark job] search error is detected: code = %d, msg = %s", st.Code(), err.Error()) } } - latency := time.Since(start) - recall := calcRecall(lres.Results, sres.Results) - b.ReportMetric(recall, "recall") - b.ReportMetric(float64(latency.Microseconds()), "latency") - }) - // TODO: send metrics to the Prometeus - log.Infof("[benchmark job] Finish search bench: iter= %d \n%#v\n", i, bres) + } + sres[i] = res + } + recall := make([]float64, len(vecs)) + for i := 0; i < len(vecs); i++ { + recall[i] = calcRecall(lres[i], sres[i]) + log.Info("[branch job] search recall: ", recall[i]) } - log.Info("[benchmark job] Finish benchmarking search") return nil } diff --git a/pkg/tools/benchmark/job/service/update.go b/pkg/tools/benchmark/job/service/update.go new file mode 100644 index 00000000000..ac7a59a5962 --- /dev/null +++ b/pkg/tools/benchmark/job/service/update.go @@ -0,0 +1,79 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/status" +) + +func (j *job) update(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking update") + // create data + vecs := j.genVec(j.dataset) + cfg := &payload.Update_Config{ + SkipStrictExistCheck: j.updateConfig.SkipStrictExistCheck, + DisableBalancedUpdate: j.updateConfig.DisableBalancedUpdate, + } + if j.timestamp > int64(0) { + cfg.Timestamp = j.timestamp + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start update: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + res, err := j.client.Update(ctx, &payload.Update_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(i), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + log.Warnf("[benchmark job] update error is detected: code = %d, msg = %s\n", st.Code(), err.Error()) + } + } + if res != nil { + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) + } + } + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/job/service/upsert.go b/pkg/tools/benchmark/job/service/upsert.go new file mode 100644 index 00000000000..4b3d664a83b --- /dev/null +++ b/pkg/tools/benchmark/job/service/upsert.go @@ -0,0 +1,80 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package service manages the main logic of benchmark job. +package service + +import ( + "context" + "strconv" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/status" +) + +func (j *job) upsert(ctx context.Context, ech chan error) error { + log.Info("[benchmark job] Start benchmarking upsert") + // create data + vecs := j.genVec(j.dataset) + cfg := &payload.Upsert_Config{ + SkipStrictExistCheck: j.upsertConfig.SkipStrictExistCheck, + DisableBalancedUpdate: j.upsertConfig.DisableBalancedUpdate, + } + if j.timestamp > int64(0) { + cfg.Timestamp = j.timestamp + } + for i := 0; i < len(vecs); i++ { + log.Infof("[benchmark job] Start upsert: iter = %d", i) + err := j.limiter.Wait(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + ech <- err + } + res, err := j.client.Upsert(ctx, &payload.Upsert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(i), + Vector: vecs[i], + }, + Config: cfg, + }) + if err != nil { + select { + case <-ctx.Done(): + if errors.Is(err, context.Canceled) { + return errors.Join(err, context.Canceled) + } + select { + case <-ctx.Done(): + return errors.Join(err, context.Canceled) + case ech <- errors.Join(err, ctx.Err()): + } + default: + st, _ := status.FromError(err) + log.Warnf("[benchmark job] upsert error is detected: code = %d, msg = %s", st.Code(), err.Error()) + } + } + if res != nil { + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) + } + } + + log.Info("[benchmark job] Finish benchmarking upsert") + return nil +} diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index 4bd9917b324..f714bf95636 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -99,10 +99,12 @@ func New(cfg *config.Config) (r runner.Runner, err error) { service.WithUpsertConfig(cfg.Job.UpsertConfig), service.WithSearchConfig(cfg.Job.SearchConfig), service.WithRemoveConfig(cfg.Job.RemoveConfig), + service.WithObjectConfig(cfg.Job.ObjectConfig), service.WithHdf5(d), service.WithBeforeJobName(cfg.Job.BeforeJobName), service.WithBeforeJobNamespace(cfg.Job.BeforeJobNamespace), - service.WithK8sClient(cfg.Job.Client), + service.WithK8sClient(cfg.K8sClient), + service.WithRPS(cfg.Job.RPS), ) if err != nil { return nil, err diff --git a/pkg/tools/benchmark/operator/config/config.go b/pkg/tools/benchmark/operator/config/config.go index 915ec1251a4..45cec91f51e 100644 --- a/pkg/tools/benchmark/operator/config/config.go +++ b/pkg/tools/benchmark/operator/config/config.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/config/doc.go b/pkg/tools/benchmark/operator/config/doc.go index 57ef74c464a..04fa0ebe9bf 100644 --- a/pkg/tools/benchmark/operator/config/doc.go +++ b/pkg/tools/benchmark/operator/config/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/handler/doc.go b/pkg/tools/benchmark/operator/handler/doc.go index f1014141ea4..825bcc7f61f 100644 --- a/pkg/tools/benchmark/operator/handler/doc.go +++ b/pkg/tools/benchmark/operator/handler/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/handler/grpc/handler.go b/pkg/tools/benchmark/operator/handler/grpc/handler.go index 8120e778b07..a11cf4133fa 100644 --- a/pkg/tools/benchmark/operator/handler/grpc/handler.go +++ b/pkg/tools/benchmark/operator/handler/grpc/handler.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/handler/grpc/option.go b/pkg/tools/benchmark/operator/handler/grpc/option.go index 4319ef3a748..d5d9c32d1e5 100644 --- a/pkg/tools/benchmark/operator/handler/grpc/option.go +++ b/pkg/tools/benchmark/operator/handler/grpc/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/handler/rest/handler.go b/pkg/tools/benchmark/operator/handler/rest/handler.go index f752dbbbbf4..1b9494c3d57 100644 --- a/pkg/tools/benchmark/operator/handler/rest/handler.go +++ b/pkg/tools/benchmark/operator/handler/rest/handler.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/handler/rest/option.go b/pkg/tools/benchmark/operator/handler/rest/option.go index ca3aee436d8..054b120b948 100644 --- a/pkg/tools/benchmark/operator/handler/rest/option.go +++ b/pkg/tools/benchmark/operator/handler/rest/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/router/doc.go b/pkg/tools/benchmark/operator/router/doc.go index 053c9aeb49c..ffa1ce388fc 100644 --- a/pkg/tools/benchmark/operator/router/doc.go +++ b/pkg/tools/benchmark/operator/router/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/router/option.go b/pkg/tools/benchmark/operator/router/option.go index fec24bfa260..681ce4bef7a 100644 --- a/pkg/tools/benchmark/operator/router/option.go +++ b/pkg/tools/benchmark/operator/router/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/router/router.go b/pkg/tools/benchmark/operator/router/router.go index 9ca7f60c958..1303eeec8d6 100644 --- a/pkg/tools/benchmark/operator/router/router.go +++ b/pkg/tools/benchmark/operator/router/router.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/service/doc.go b/pkg/tools/benchmark/operator/service/doc.go index eb86a28c5bf..4d4cfe1548d 100644 --- a/pkg/tools/benchmark/operator/service/doc.go +++ b/pkg/tools/benchmark/operator/service/doc.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 9dc749c386f..57b6cb4d152 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -161,6 +161,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo if len(jobList) == 0 { log.Info("[reconcile job] no job is founded") o.jobs.Store(&map[string]string{}) + log.Debug("[reconcile job] finish") return } cjobs := o.getAtomicJob() @@ -215,8 +216,9 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[string]v1.ValdBenchmarkJob) { log.Debugf("[reconcile benchmark job resource] job list: %#v", benchJobList) if len(benchJobList) == 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) log.Info("[reconcile benchmark job resource] job resource not found") + o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) + log.Debug("[reconcile benchmark job resource] finish") return } cbjl := o.getAtomicBenchJob() @@ -227,7 +229,11 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin for k := range benchJobList { // update scenario status job := benchJobList[k] - if scenarios := o.getAtomicScenario(); scenarios != nil { + hasOwner := false + if len(job.GetOwnerReferences()) > 0 { + hasOwner = true + } + if scenarios := o.getAtomicScenario(); scenarios != nil && hasOwner { on := job.GetOwnerReferences()[0].Name if _, ok := scenarios[on]; ok { if scenarios[on].BenchJobStatus == nil { @@ -282,8 +288,7 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[ if len(scenarioList) == 0 { log.Info("[reconcile benchmark scenario resource]: scenario not found") o.scenarios.Store(&(map[string]*scenario{})) - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) - o.jobs.Store(&(map[string]string{})) + log.Debug("[reconcile benchmark scenario resource] finish") return } cbsl := o.getAtomicScenario() @@ -536,16 +541,44 @@ func (o *operator) checkJobsStatus(ctx context.Context, jobs map[string]string) return err } -func (o *operator) initAtomics() { - if cbsl := o.getAtomicScenario(); len(cbsl) > 0 { - o.scenarios.Store(&(map[string]*scenario{})) - } - if cbjl := o.getAtomicBenchJob(); len(cbjl) > 0 { - o.benchjobs.Store(&(map[string]*v1.ValdBenchmarkJob{})) +// checkAtomics checks each atomic keeps consistency. +func (o *operator) checkAtomics() error { + cjl := o.getAtomicJob() + cbjl := o.getAtomicBenchJob() + cbsl := o.getAtomicScenario() + if len(cjl) == 0 { + if len(cbjl) > 0 || len(cbsl) > 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) + } + return nil + } else if len(cbjl) == 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) + } + jobCounter := len(cjl) + scenarioBenchCounter := 0 + for sc := range cbsl { + scenarioBenchCounter += len(cbsl[sc].BenchJobStatus) + } + for jobName := range cjl { + if benchJob := cbjl[jobName]; benchJob != nil { + jobCounter-- + if owner := benchJob.GetOwnerReferences(); len(owner) > 0 { + scenarioName := owner[0].Name + if scenario := cbsl[scenarioName]; scenario != nil { + if _, ok := scenario.BenchJobStatus[benchJob.GetName()]; ok { + scenarioBenchCounter-- + } + } + } + } } - if cjl := o.getAtomicJob(); len(cjl) > 0 { - o.jobs.Store(&(map[string]string{})) + if jobCounter != 0 || scenarioBenchCounter != 0 { + log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) } + return nil } func (o *operator) PreStart(ctx context.Context) error { @@ -568,13 +601,13 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) { case <-ctx.Done(): return nil case <-rcticker.C: - cbsl := o.getAtomicScenario() - if cbsl == nil { - log.Info("benchmark scenario resource is empty") - // clear atomic pointer - o.initAtomics() - continue - } else { + // check mismatch atomic + err = o.checkAtomics() + if err != nil { + ech <- err + } + // determine whether benchmark scenario status should be updated. + if cbsl := o.getAtomicScenario(); cbsl != nil { scenarioStatus := make(map[string]v1.ValdBenchmarkScenarioStatus) for name, scenario := range cbsl { if scenario.Crd.Status != v1.BenchmarkScenarioCompleted { diff --git a/pkg/tools/benchmark/operator/service/option.go b/pkg/tools/benchmark/operator/service/option.go index da106afa7f1..a34242f5a5e 100644 --- a/pkg/tools/benchmark/operator/service/option.go +++ b/pkg/tools/benchmark/operator/service/option.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/tools/benchmark/operator/usecase/benchmarkd.go b/pkg/tools/benchmark/operator/usecase/benchmarkd.go index 6aa069b0c0b..9f77e9ae15d 100644 --- a/pkg/tools/benchmark/operator/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/operator/usecase/benchmarkd.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2019-2022 vdaas.org vald team +// Copyright (C) 2019-2023 vdaas.org vald team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" - "github.com/vdaas/vald/internal/observability" infometrics "github.com/vdaas/vald/internal/observability/metrics/info" "github.com/vdaas/vald/internal/runner"