From d6953b41f30c35c019780fa27f839fae226389a6 Mon Sep 17 00:00:00 2001 From: Kiichiro YUKAWA Date: Fri, 23 Jun 2023 16:35:41 +0900 Subject: [PATCH] Refactor helm template and operator logic (#2043) * Implement base of continuous benchmark tool (#1776) * Create Continuous Bench Search Job tool (#1733) * :sparkles: create bench job search tools Signed-off-by: vankichi * :sparkles: add load hdf5 functions Signed-off-by: vankichi * :recycle: fix format Signed-off-by: vankichi * :recycle: fix docker and use hdf5 data Signed-off-by: vankichi * :recycle: fix Signed-off-by: vankichi * :recycle: refactor benchmark job Signed-off-by: vankichi * :recycle: fix Signed-off-by: vankichi * :recycle: fix proto Signed-off-by: vankichi * :green_heart: add benchmark job image build ci Signed-off-by: vankichi * :green_heart: invest Signed-off-by: vankichi * Revert ":green_heart: invest" This reverts commit f0f585ccf71b1c95a88559941557a27774096e69. * Apply suggestions from code review Co-authored-by: Yusuke Kato * :recycle: apply code review Signed-off-by: vankichi * Apply suggestions from code review Co-authored-by: Hiroto Funakoshi * :sparkles: apply from feedback Signed-off-by: vankichi * Update internal/config/benchmark.go Co-authored-by: Yusuke Kato * :recycle: change directory path Signed-off-by: vankichi Signed-off-by: vankichi Co-authored-by: Yusuke Kato Co-authored-by: Hiroto Funakoshi * Add crds for continuous benchmark tools (#1789) * :sparkles: add crds for continuous benchmark operator Signed-off-by: vankichi * :sparkles: add benchmark operator/job scheme Signed-off-by: vankichi * :sparkles: rename package names and add doc.go Signed-off-by: vankichi * :sparkles: create runtime object Signed-off-by: vankichi * Apply suggestions from code review Co-authored-by: Yusuke Kato * :recycle: apply feedback Signed-off-by: vankichi Signed-off-by: vankichi Co-authored-by: Yusuke Kato * Add Job reconciler & Change directory constitution of internal/k8s for benchmark (#1825) * :sparkles: :recycle: add Job reconciler & use scenario instead of operator Signed-off-by: vankichi * :recycle: fix format & rename file Signed-off-by: vankichi Signed-off-by: vankichi * Add benchmark operator framework (#1916) * :sparkles: impl benchmark reconciler Signed-off-by: vankichi * :sparkles: create benchmark operator framework Signed-off-by: vankichi * :recycle: remove unness changes Signed-off-by: vankichi Signed-off-by: vankichi * Format code with prettier and gofumpt * impl reconcile logic for create benchmark job (#1923) * :sparkles: impl reconcile logic for create benchmark job Signed-off-by: vankichi * Format code with prettier and gofumpt * :recycle: fix Signed-off-by: vankichi * :recycle: refactor continuous benchmark's crds Signed-off-by: vankichi * :recycle: resolve error due to update conn bench crds for pkg/tools/benchmark/job Signed-off-by: vankichi * :recycle: refactor continuous benchmark job logic Signed-off-by: vankichi * Format code with prettier and gofumpt * :recycle: update charts Signed-off-by: vankichi * Format code with prettier and gofumpt * :recycle: rafactor con bench config and bug fix reconcile logic Signed-off-by: vankichi * :bug: Bugfix: fix typo and recall function logic Signed-off-by: vankichi * :recycle: refactor pkg benchmark job Signed-off-by: vankichi --------- Signed-off-by: vankichi Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --------- Signed-off-by: vankichi Co-authored-by: Yusuke Kato Co-authored-by: Hiroto Funakoshi Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> * :sparkles: impl status handle of continuous benchmark crds (#1955) Signed-off-by: vankichi * Impl benchmark jobs (#1977) * Implement base of continuous benchmark tool (#1776) * Create Continuous Bench Search Job tool (#1733) * :sparkles: create bench job search tools Signed-off-by: vankichi * :sparkles: add load hdf5 functions Signed-off-by: vankichi * :recycle: fix format Signed-off-by: vankichi * :recycle: fix docker and use hdf5 data Signed-off-by: vankichi * :recycle: fix Signed-off-by: vankichi * :recycle: refactor benchmark job Signed-off-by: vankichi * :recycle: fix Signed-off-by: vankichi * :recycle: fix proto Signed-off-by: vankichi * :green_heart: add benchmark job image build ci Signed-off-by: vankichi * :green_heart: invest Signed-off-by: vankichi * Revert ":green_heart: invest" This reverts commit f0f585ccf71b1c95a88559941557a27774096e69. * Apply suggestions from code review Co-authored-by: Yusuke Kato * :recycle: apply code review Signed-off-by: vankichi * Apply suggestions from code review Co-authored-by: Hiroto Funakoshi * :sparkles: apply from feedback Signed-off-by: vankichi * Update internal/config/benchmark.go Co-authored-by: Yusuke Kato * :recycle: change directory path Signed-off-by: vankichi Signed-off-by: vankichi Co-authored-by: Yusuke Kato Co-authored-by: Hiroto Funakoshi * Add crds for continuous benchmark tools (#1789) * :sparkles: add crds for continuous benchmark operator Signed-off-by: vankichi * :sparkles: add benchmark operator/job scheme Signed-off-by: vankichi * :sparkles: rename package names and add doc.go Signed-off-by: vankichi * :sparkles: create runtime object Signed-off-by: vankichi * Apply suggestions from code review Co-authored-by: Yusuke Kato * :recycle: apply feedback Signed-off-by: vankichi Signed-off-by: vankichi Co-authored-by: Yusuke Kato * Add Job reconciler & Change directory constitution of internal/k8s for benchmark (#1825) * :sparkles: :recycle: add Job reconciler & use scenario instead of operator Signed-off-by: vankichi * :recycle: fix format & rename file Signed-off-by: vankichi Signed-off-by: vankichi * Add benchmark operator framework (#1916) * :sparkles: impl benchmark reconciler Signed-off-by: vankichi * :sparkles: create benchmark operator framework Signed-off-by: vankichi * :recycle: remove unness changes Signed-off-by: vankichi Signed-off-by: vankichi * Format code with prettier and gofumpt * impl reconcile logic for create benchmark job (#1923) * :sparkles: impl reconcile logic for create benchmark job Signed-off-by: vankichi * Format code with prettier and gofumpt * :recycle: fix Signed-off-by: vankichi * :recycle: refactor continuous benchmark's crds Signed-off-by: vankichi * :recycle: resolve error due to update conn bench crds for pkg/tools/benchmark/job Signed-off-by: vankichi * :recycle: refactor continuous benchmark job logic Signed-off-by: vankichi * Format code with prettier and gofumpt * :recycle: update charts Signed-off-by: vankichi * Format code with prettier and gofumpt * :recycle: rafactor con bench config and bug fix reconcile logic Signed-off-by: vankichi * :bug: Bugfix: fix typo and recall function logic Signed-off-by: vankichi * :recycle: refactor pkg benchmark job Signed-off-by: vankichi --------- Signed-off-by: vankichi Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --------- Signed-off-by: vankichi Co-authored-by: Yusuke Kato Co-authored-by: Hiroto Funakoshi Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> * :sparkles: impl benchmark jobs Signed-off-by: vankichi * :recycle: apply feedback Signed-off-by: vankichi --------- Signed-off-by: vankichi Co-authored-by: Yusuke Kato Co-authored-by: Hiroto Funakoshi Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> * :recycle: Refactor helm template and operator logic Signed-off-by: vankichi * :recycle: Add download original dataset URL option Signed-off-by: vankichi * :recycle: Set docker image location at the benchmark operator configmap and use it when information is set Signed-off-by: vankichi * add search algorithm benchmark and update search aggregation algo Signed-off-by: kpango * :sparkles: Add search result aggregation option Signed-off-by: vankichi * style: Format code with prettier and gofumpt * Improve job performance (#2061) * :bug: Fix job function to apply rate limiter * :recycle: Add pyroscope setting Signed-off-by: vankichi * :recycle: fix Signed-off-by: vankichi * :bug: Fix build error Signed-off-by: vankichi --------- Signed-off-by: vankichi * :bug: Fix docker file and add concurrencyLimit for job goroutine Signed-off-by: vankichi * :recycle: Fix job_template.go by feedback Signed-off-by: vankichi * :recycle: Fix job logic by feedback Signed-off-by: vankichi * :recycle: Fix Signed-off-by: vankichi --------- Signed-off-by: vankichi Signed-off-by: kpango Co-authored-by: Yusuke Kato Co-authored-by: Hiroto Funakoshi Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --- .../workflows/dockers-benchmark-job-image.yml | 1 + .../crds/valdbenchmarkjob.yaml | 26 +- .../crds/valdbenchmarkoperatorrelease.yaml | 13 + .../crds/valdbenchmarkscenario.yaml | 10 +- .../job-values.schema.json | 38 +- .../scenario-values.schema.json | 9 +- .../schemas/job-values.yaml | 21 +- .../schemas/scenario-values.yaml | 9 +- .../templates/clusterrole.yaml | 1 + .../templates/configmap.yaml | 3 + .../templates/deployment.yaml | 14 + .../values.schema.json | 21 +- charts/vald-benchmark-operator/values.yaml | 12 + .../values/benchmark-job.yaml | 1 + .../values/benchmark-scenario.yaml | 4 +- dockers/tools/benchmark/job/Dockerfile | 46 ++- dockers/tools/benchmark/operator/Dockerfile | 17 +- go.mod | 1 + internal/config/benchmark.go | 30 +- internal/k8s/client/client.go | 6 + internal/k8s/job/job.go | 3 +- .../k8s/vald/benchmark/api/v1/job_types.go | 32 +- internal/k8s/vald/benchmark/job/job.go | 3 +- .../k8s/vald/benchmark/job/job_template.go | 72 +++- .../vald/benchmark/job/job_template_option.go | 76 +++- .../k8s/vald/benchmark/scenario/scenario.go | 6 +- internal/test/data/hdf5/hdf5.go | 54 +-- internal/test/data/hdf5/hdf5_test.go | 64 ++- internal/test/data/hdf5/option.go | 4 + internal/test/data/hdf5/option_test.go | 387 ++++++++++++++++++ k8s/tools/benchmark/operator/clusterrole.yaml | 1 + k8s/tools/benchmark/operator/configmap.yaml | 3 + .../operator/crds/valdbenchmarkjob.yaml | 26 +- .../crds/valdbenchmarkoperatorrelease.yaml | 13 + .../operator/crds/valdbenchmarkscenario.yaml | 10 +- k8s/tools/benchmark/operator/deployment.yaml | 5 + pkg/tools/benchmark/job/config/config.go | 2 + pkg/tools/benchmark/job/service/insert.go | 77 ++-- pkg/tools/benchmark/job/service/job.go | 83 ++-- pkg/tools/benchmark/job/service/object.go | 143 ++++--- pkg/tools/benchmark/job/service/option.go | 18 +- pkg/tools/benchmark/job/service/remove.go | 71 ++-- pkg/tools/benchmark/job/service/search.go | 181 +++++--- pkg/tools/benchmark/job/service/update.go | 80 ++-- pkg/tools/benchmark/job/service/upsert.go | 78 ++-- pkg/tools/benchmark/job/usecase/benchmarkd.go | 36 +- pkg/tools/benchmark/operator/config/config.go | 9 + .../benchmark/operator/service/operator.go | 135 ++++-- .../benchmark/operator/service/option.go | 22 + .../benchmark/operator/usecase/benchmarkd.go | 6 + 50 files changed, 1513 insertions(+), 470 deletions(-) create mode 100644 internal/test/data/hdf5/option_test.go diff --git a/.github/workflows/dockers-benchmark-job-image.yml b/.github/workflows/dockers-benchmark-job-image.yml index 13d8aa8a16..f38149a7c3 100644 --- a/.github/workflows/dockers-benchmark-job-image.yml +++ b/.github/workflows/dockers-benchmark-job-image.yml @@ -47,6 +47,7 @@ on: - "internal/**" - "!internal/**/*_test.go" - "!internal/db/**" + - "!internal/k8s/**" - "apis/grpc/**" - "pkg/benchmark/job/**" - "cmd/benchmark/job/**" diff --git a/charts/vald-benchmark-operator/crds/valdbenchmarkjob.yaml b/charts/vald-benchmark-operator/crds/valdbenchmarkjob.yaml index de85714816..d85ea0c12b 100644 --- a/charts/vald-benchmark-operator/crds/valdbenchmarkjob.yaml +++ b/charts/vald-benchmark-operator/crds/valdbenchmarkjob.yaml @@ -240,6 +240,10 @@ spec: type: string wait_for_ready: type: boolean + concurrency_limit: + type: integer + maximum: 65535 + minimum: 0 dataset: type: object properties: @@ -252,6 +256,7 @@ spec: name: type: string enum: + - original - fashion-mnist range: type: object @@ -262,6 +267,11 @@ spec: start: type: integer minimum: 1 + required: + - start + - end + url: + type: string required: - name - indexes @@ -285,7 +295,7 @@ spec: - upsert - search - remove - - get_object + - getobject - exists object_config: type: object @@ -319,6 +329,16 @@ spec: search_config: type: object properties: + aggregation_algorithm: + type: string + enum: + - Unknown + - ConcurrentQueue + - SortSlice + - SortPoolSlice + - PairingHeap + enable_linear_search: + type: boolean epsilon: type: number min_num: @@ -342,6 +362,10 @@ spec: required: - host - port + ttl_seconds_after_finished: + type: integer + maximum: 65535 + minimum: 0 update_config: type: object properties: diff --git a/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml b/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml index 3451a48b38..28ab299701 100644 --- a/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml +++ b/charts/vald-benchmark-operator/crds/valdbenchmarkoperatorrelease.yaml @@ -81,6 +81,19 @@ spec: type: string tag: type: string + job_image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string logging: type: object properties: diff --git a/charts/vald-benchmark-operator/crds/valdbenchmarkscenario.yaml b/charts/vald-benchmark-operator/crds/valdbenchmarkscenario.yaml index f705e7df6b..871f59b390 100644 --- a/charts/vald-benchmark-operator/crds/valdbenchmarkscenario.yaml +++ b/charts/vald-benchmark-operator/crds/valdbenchmarkscenario.yaml @@ -25,8 +25,8 @@ spec: plural: valdbenchmarkscenarios singular: valdbenchmarkscenario shortNames: - - vbo - - vbos + - vbs + - vbss scope: Namespaced versions: - name: v1 @@ -74,6 +74,7 @@ spec: name: type: string enum: + - original - fashion-mnist range: type: object @@ -84,6 +85,11 @@ spec: start: type: integer minimum: 1 + required: + - start + - end + url: + type: string required: - name - indexes diff --git a/charts/vald-benchmark-operator/job-values.schema.json b/charts/vald-benchmark-operator/job-values.schema.json index 24f2f5b91f..40759dd01b 100644 --- a/charts/vald-benchmark-operator/job-values.schema.json +++ b/charts/vald-benchmark-operator/job-values.schema.json @@ -288,6 +288,12 @@ "wait_for_ready": { "type": "boolean" } } }, + "concurrency_limit": { + "type": "integer", + "description": "concurrency_limit represents the goroutine limit count. It affects the job performance.", + "maximum": 65535, + "minimum": 0 + }, "dataset": { "type": "object", "description": "dataset information", @@ -305,7 +311,7 @@ "name": { "type": "string", "description": "the name of dataset", - "enum": ["fashion-mnist"] + "enum": ["original", "fashion-mnist"] }, "range": { "type": "object", @@ -321,7 +327,12 @@ "description": "start index number", "minimum": 1 } - } + }, + "required": ["start", "end"] + }, + "url": { + "type": "string", + "description": "the dataset url which is used for executing benchmark job with user defined hdf5 file" } }, "required": ["name", "indexes", "group", "range"] @@ -351,7 +362,7 @@ "upsert", "search", "remove", - "get_object", + "getobject", "exists" ] }, @@ -404,6 +415,21 @@ "type": "object", "description": "upsert config", "properties": { + "aggregation_algorithm": { + "type": "string", + "description": "search result aggregation algorithm", + "enum": [ + "Unknown", + "ConcurrentQueue", + "SortSlice", + "SortPoolSlice", + "PairingHeap" + ] + }, + "enable_linear_search": { + "type": "boolean", + "description": "enable linear search for calculation recall" + }, "epsilon": { "type": "number", "description": "epsilon" }, "min_num": { "type": "integer", @@ -435,6 +461,12 @@ }, "required": ["host", "port"] }, + "ttl_seconds_after_finished": { + "type": "integer", + "description": "limits the lifetime of a Job that has finished execution.", + "maximum": 65535, + "minimum": 0 + }, "update_config": { "type": "object", "description": "update config", diff --git a/charts/vald-benchmark-operator/scenario-values.schema.json b/charts/vald-benchmark-operator/scenario-values.schema.json index e3e549e69d..c4aadae03c 100644 --- a/charts/vald-benchmark-operator/scenario-values.schema.json +++ b/charts/vald-benchmark-operator/scenario-values.schema.json @@ -20,7 +20,7 @@ "name": { "type": "string", "description": "the name of dataset", - "enum": ["fashion-mnist"] + "enum": ["original", "fashion-mnist"] }, "range": { "type": "object", @@ -36,7 +36,12 @@ "description": "start index number", "minimum": 1 } - } + }, + "required": ["start", "end"] + }, + "url": { + "type": "string", + "description": "the dataset url which is used for executing benchmark job with user defined hdf5 file" } }, "required": ["name", "indexes", "group", "range"] diff --git a/charts/vald-benchmark-operator/schemas/job-values.yaml b/charts/vald-benchmark-operator/schemas/job-values.yaml index 47fafd98b0..021418e9f5 100644 --- a/charts/vald-benchmark-operator/schemas/job-values.yaml +++ b/charts/vald-benchmark-operator/schemas/job-values.yaml @@ -27,7 +27,7 @@ target: # @schema {"name": "dataset", "type": "object", "required": ["name", "indexes", "group", "range"]} # dataset -- dataset information dataset: - # @schema {"name": "dataset.name", "type": "string", "enum": ["fashion-mnist"] } + # @schema {"name": "dataset.name", "type": "string", "enum": ["original", "fashion-mnist"] } # dataset.name -- the name of dataset name: "fashion-mnist" # @schema {"name": "dataset.indexes", "type": "integer", "minimum": 0} @@ -36,7 +36,7 @@ dataset: # @schema {"name": "dataset.group", "type": "string", "minLength": 1} # dataset.group -- the hdf5 group name of dataset group: "test" - # @schema {"name": "dataset.range", "type": "object", "range": ["start", "port"]} + # @schema {"name": "dataset.range", "type": "object", "required": ["start", "end"]} # dataset.range -- the data range of indexes range: # @schema {"name": "dataset.range.start", "type": "integer", "minimum": 1} @@ -45,6 +45,9 @@ dataset: # @schema {"name": "dataset.range.end", "type": "integer", "minimum": 1} # dataset.range.end -- end index number end: 1000 + # @schema {"name": "dataset.url", "type": "string"} + # dataset.url -- the dataset url which is used for executing benchmark job with user defined hdf5 file + url: "" # @schema {"name": "dimension", "type": "integer", "minimum": 1} # dimension -- vector dimension dimension: 784 @@ -54,7 +57,7 @@ replica: 1 # @schema {"name": "repetition", "type": "integer", "minimum": 1} # repetition -- the number of repeat job repetition: 1 -# @schema {"name": "job_type", "type": "string", "enum": ["insert", "update", "upsert", "search", "remove", "get_object", "exists"]} +# @schema {"name": "job_type", "type": "string", "enum": ["insert", "update", "upsert", "search", "remove", "getobject", "exists"]} # job_type -- job type name job_type: "search" # @schema {"name": "insert_config", "type": "object"} @@ -108,6 +111,12 @@ search_config: # @schema {"name": "search_config.timeout", "type": "string"} # search_config.timeout -- search operation timeout timeout: "10s" + # @schema {"name": "search_config.enable_linear_search", "type": "boolean"} + # search_config.enable_linear_search -- enable linear search for calculation recall + enable_linear_search: true + # @schema {"name": "search_config.aggregation_algorithm", "type": "string", "enum": ["Unknown", "ConcurrentQueue", "SortSlice", "SortPoolSlice", "PairingHeap"]} + # search_config.aggregation_algorithm -- search result aggregation algorithm + aggregation_algorithm: "Unknown" # @schema {"name": "remove_config", "type": "object"} # remove_config -- remove config @@ -358,3 +367,9 @@ rules: [] # @schema {"name": "rps", "type": "integer", "minimum": 0, "maximum": 65535} # rps -- desired request per sec rps: 1000 +# @schema {"name": "concurrency_limit", "type": "integer", "minimum": 0, "maximum": 65535} +# concurrency_limit -- concurrency_limit represents the goroutine limit count. It affects the job performance. +concurrency_limit: 200 +# @schema {"name": "ttl_seconds_after_finished", "type": "integer", "minimum": 0, "maximum": 65535} +# ttl_seconds_after_finished -- limits the lifetime of a Job that has finished execution. +ttl_seconds_after_finished: 10 diff --git a/charts/vald-benchmark-operator/schemas/scenario-values.yaml b/charts/vald-benchmark-operator/schemas/scenario-values.yaml index dedc6b51b5..e33b76f5f5 100644 --- a/charts/vald-benchmark-operator/schemas/scenario-values.yaml +++ b/charts/vald-benchmark-operator/schemas/scenario-values.yaml @@ -28,7 +28,7 @@ target: # @schema {"name": "dataset", "type": "object", "required": ["name", "indexes", "group", "range"]} # dataset -- dataset information dataset: - # @schema {"name": "dataset.name", "type": "string", "enum": ["fashion-mnist"] } + # @schema {"name": "dataset.name", "type": "string", "enum": ["original", "fashion-mnist"] } # dataset.name -- the name of dataset name: "fashion-mnist" # @schema {"name": "dataset.indexes", "type": "integer", "minimum": 0} @@ -37,7 +37,7 @@ dataset: # @schema {"name": "dataset.group", "type": "string", "minLength": 1} # dataset.group -- the hdf5 group name of dataset group: "test" - # @schema {"name": "dataset.range", "type": "object", "range": ["start", "port"]} + # @schema {"name": "dataset.range", "type": "object", "required": ["start", "end"]} # dataset.range -- the data range of indexes range: # @schema {"name": "dataset.range.start", "type": "integer", "minimum": 1} @@ -46,6 +46,9 @@ dataset: # @schema {"name": "dataset.range.end", "type": "integer", "minimum": 1} # dataset.range.end -- end index number end: 1000 + # @schema {"name": "dataset.url", "type": "string"} + # dataset.url -- the dataset url which is used for executing benchmark job with user defined hdf5 file + url: "" # @schema {"name": "jobs", "type": "array", "items": {"type": "object"}} jobs: @@ -80,6 +83,7 @@ jobs: num: 10 min_num: 10 timeout: "10s" + enable_linear_search: true remove_config: skip_strict_exist_check: false timestamp: "" @@ -167,3 +171,4 @@ jobs: insecure_skip_verify: false rules: [] rps: 1000 + ttl_seconds_after_finished: 100 diff --git a/charts/vald-benchmark-operator/templates/clusterrole.yaml b/charts/vald-benchmark-operator/templates/clusterrole.yaml index 63fc0c6812..cd41992a5a 100644 --- a/charts/vald-benchmark-operator/templates/clusterrole.yaml +++ b/charts/vald-benchmark-operator/templates/clusterrole.yaml @@ -55,6 +55,7 @@ rules: verbs: - create - delete + - deletecollection - get - list - patch diff --git a/charts/vald-benchmark-operator/templates/configmap.yaml b/charts/vald-benchmark-operator/templates/configmap.yaml index ceb92f2b55..f00359380f 100644 --- a/charts/vald-benchmark-operator/templates/configmap.yaml +++ b/charts/vald-benchmark-operator/templates/configmap.yaml @@ -38,3 +38,6 @@ data: observability: {{- $observability := dict "Values" .Values.observability}} {{- include "vald.observability" $observability | nindent 6 }} + job_image: + image: "{{ .Values.job_image.repository }}:{{ .Values.job_image.tag }}" + pullPolicy: {{ .Values.job_image.pullPolicy }} diff --git a/charts/vald-benchmark-operator/templates/deployment.yaml b/charts/vald-benchmark-operator/templates/deployment.yaml index bff799a214..5be94d8829 100644 --- a/charts/vald-benchmark-operator/templates/deployment.yaml +++ b/charts/vald-benchmark-operator/templates/deployment.yaml @@ -44,7 +44,16 @@ spec: app.kubernetes.io/component: benchmark-operator {{- with .Values.podAnnotations }} annotations: + {{- if .Values.podAnnotations }} {{- toYaml . | nindent 8 }} + {{- end }} + {{- if .Values.server_config.metrics.pprof.enabeld }} + pyroscope.io/scrape: "true" + pyroscope.io/application-name: {{ .Values.name }} + pyroscope.io/profile-cpu-enabled: "true" + pyroscope.io/profile-mem-enabled: "true" + pyroscope.io/port: "{{ .Values.server_config.metrics.pprof.port }}" + {{- end}} {{- end }} spec: serviceAccountName: {{ .Values.serviceAccount.name }} @@ -113,6 +122,11 @@ spec: volumeMounts: - name: {{ .Values.name }}-config mountPath: /etc/server + env: + - name: JOB_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace terminationMessagePath: /dev/termination-log terminationMessagePolicy: File restartPolicy: Always diff --git a/charts/vald-benchmark-operator/values.schema.json b/charts/vald-benchmark-operator/values.schema.json index 6ed6f1764f..57fb5cd25b 100644 --- a/charts/vald-benchmark-operator/values.schema.json +++ b/charts/vald-benchmark-operator/values.schema.json @@ -16,8 +16,25 @@ "description": "image pull policy", "enum": ["Always", "Never", "IfNotPresent"] }, - "repository": { "type": "string", "description": "image repository" }, - "tag": { "type": "string", "description": "image tag" } + "repository": { + "type": "string", + "description": "job image repository" + }, + "tag": { + "type": "string", + "description": "image tag for job docker image" + } + } + }, + "job_image": { + "type": "object", + "properties": { + "pullPolicy": { + "type": "string", + "enum": ["Always", "Never", "IfNotPresent"] + }, + "repository": { "type": "string" }, + "tag": { "type": "string" } } }, "logging": { diff --git a/charts/vald-benchmark-operator/values.yaml b/charts/vald-benchmark-operator/values.yaml index 587bcd9121..a6c2b16bef 100644 --- a/charts/vald-benchmark-operator/values.yaml +++ b/charts/vald-benchmark-operator/values.yaml @@ -42,6 +42,18 @@ image: # image.pullPolicy -- image pull policy pullPolicy: Always +# @schema {"name": "job_image", "type": "object"} +job_image: + # @schema {"name": "job_image.repository", "type": "string"} + # image.repository -- job image repository + repository: vdaas/vald-benchmark-job + # @schema {"name": "job_image.tag", "type": "string"} + # image.tag -- image tag for job docker image + tag: v1.7.5 + # @schema {"name": "job_image.pullPolicy", "type": "string", "enum": ["Always", "Never", "IfNotPresent"]} + # image.pullPolicy -- image pull policy + pullPolicy: Always + # @schema {"name": "rbac", "type": "object"} rbac: # @schema {"name": "rbac.create", "type": "boolean"} diff --git a/charts/vald-benchmark-operator/values/benchmark-job.yaml b/charts/vald-benchmark-operator/values/benchmark-job.yaml index c8c13e1f3a..424845cba6 100644 --- a/charts/vald-benchmark-operator/values/benchmark-job.yaml +++ b/charts/vald-benchmark-operator/values/benchmark-job.yaml @@ -40,6 +40,7 @@ spec: num: 10 min_num: 10 timeout: "1m" + enable_linear_search: true target: host: "vald-lb-gateway.default.svc.cluster.local" port: 8081 diff --git a/charts/vald-benchmark-operator/values/benchmark-scenario.yaml b/charts/vald-benchmark-operator/values/benchmark-scenario.yaml index 4a0068eb20..0460e1a151 100644 --- a/charts/vald-benchmark-operator/values/benchmark-scenario.yaml +++ b/charts/vald-benchmark-operator/values/benchmark-scenario.yaml @@ -89,6 +89,7 @@ spec: num: 10 min_num: 10 timeout: "1m" + enable_linear_search: true client_config: health_check_duration: "10s" rps: 2000 @@ -111,7 +112,7 @@ spec: rps: 1000 - job_type: "search" dimension: 784 - repetition: 1 + repetition: 2 replica: 1 rules: [] dataset: @@ -127,6 +128,7 @@ spec: num: 10 min_num: 10 timeout: "1m" + enable_linear_search: false client_config: health_check_duration: "10s" rps: 4000 diff --git a/dockers/tools/benchmark/job/Dockerfile b/dockers/tools/benchmark/job/Dockerfile index 4c378a90a6..a6251e3075 100644 --- a/dockers/tools/benchmark/job/Dockerfile +++ b/dockers/tools/benchmark/job/Dockerfile @@ -19,27 +19,39 @@ ARG DISTROLESS_IMAGE_TAG=nonroot ARG UPX_OPTIONS=-9 ARG MAINTAINER="vdaas.org vald team " -FROM golang:${GO_VERSION} AS builder +FROM ubuntu:devel AS builder ARG UPX_OPTIONS ENV GO111MODULE on +ENV DEBIAN_FRONTEND noninteractive +ENV INITRD No ENV LANG en_US.UTF-8 +ENV GOROOT /opt/go +ENV GOPATH /go +ENV PATH ${PATH}:${GOROOT}/bin:${GOPATH}/bin ENV ORG vdaas ENV REPO vald ENV APP_NAME job ENV PKG tools/benchmark/${APP_NAME} +# skipcq: DOK-DL3008 RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + build-essential \ + libhdf5-dev \ + curl \ upx \ git \ - libhdf5-dev \ && ldconfig \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* RUN mkdir -p ${GOPATH}/src +COPY --from=golang /usr/local/go $GOROOT +RUN mkdir -p "$GOPATH/src" + WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} COPY go.mod . @@ -90,6 +102,36 @@ COPY --from=builder /lib/x86_64-linux-gnu/libc.so.6 /lib/x86_64-linux-gnu/ COPY --from=builder /lib/x86_64-linux-gnu/libz.so.1 /lib/x86_64-linux-gnu/ COPY --from=builder /lib/x86_64-linux-gnu/libdl.so.2 /lib/x86_64-linux-gnu/ COPY --from=builder /lib/x86_64-linux-gnu/libm.so.6 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libcrypto.so.3 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libcurl.so.4 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libnghttp2.so.14 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libidn2.so.0 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/librtmp.so.1 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libssh.so.4 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libpsl.so.5 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libssl.so.3 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libgssapi_krb5.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libldap.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/liblber.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libzstd.so.1 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libbrotlidec.so.1 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libunistring.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libgnutls.so.30 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libhogweed.so.6 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libnettle.so.8 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libgmp.so.10 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libkrb5.so.3 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libk5crypto.so.3 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libkrb5support.so.0 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libcom_err.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libsasl2.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libbrotlicommon.so.1 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libp11-kit.so.0 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libtasn1.so.6 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libkeyutils.so.1 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libresolv.so.2 /lib/x86_64-linux-gnu/ +COPY --from=builder /lib/x86_64-linux-gnu/libffi.so.8 /lib/x86_64-linux-gnu/ +# COPY --from=builder /lib/x86_64-linux-gnu/ /lib/x86_64-linux-gnu/ COPY --from=builder /lib64/ld-linux-x86-64.so.2 /lib64/ COPY --from=builder /usr/bin/${APP_NAME} /go/bin/${APP_NAME} diff --git a/dockers/tools/benchmark/operator/Dockerfile b/dockers/tools/benchmark/operator/Dockerfile index 48f0e1dc54..b930f92d04 100644 --- a/dockers/tools/benchmark/operator/Dockerfile +++ b/dockers/tools/benchmark/operator/Dockerfile @@ -20,18 +20,26 @@ ARG DISTROLESS_IMAGE_TAG=nonroot ARG UPX_OPTIONS=-9 ARG MAINTAINER="vdaas.org vald team " -FROM golang:${GO_VERSION} AS builder - -ARG UPX_OPTIONS +FROM ubuntu:devel AS builder ENV GO111MODULE on +ENV DEBIAN_FRONTEND noninteractive +ENV INITRD No ENV LANG en_US.UTF-8 +ENV GOROOT /opt/go +ENV GOPATH /go +ENV PATH ${PATH}:${GOROOT}/bin:${GOPATH}/bin +# ARG UPX_OPTIONS ENV ORG vdaas ENV REPO vald ENV APP_NAME operator ENV PKG tools/benchmark/${APP_NAME} +# skipcq: DOK-DL3008 RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + build-essential \ + curl \ upx \ git \ && apt-get clean \ @@ -39,6 +47,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ RUN mkdir -p ${GOPATH}/src +COPY --from=golang /usr/local/go $GOROOT +RUN mkdir -p "$GOPATH/src" + WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} COPY go.mod . diff --git a/go.mod b/go.mod index d4c5284809..93a4e53c0d 100755 --- a/go.mod +++ b/go.mod @@ -396,6 +396,7 @@ require ( golang.org/x/text v0.13.0 golang.org/x/tools v0.14.0 golang.org/x/time v0.3.0 + golang.org/x/time v0.3.0 gonum.org/v1/hdf5 v0.0.0-00010101000000-000000000000 gonum.org/v1/plot v0.10.1 google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a diff --git a/internal/config/benchmark.go b/internal/config/benchmark.go index 0361ddb294..84ec55b58a 100644 --- a/internal/config/benchmark.go +++ b/internal/config/benchmark.go @@ -36,6 +36,7 @@ type BenchmarkJob struct { BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"` BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"` RPS int `json:"rps,omitempty" yaml:"rps"` + ConcurrencyLimit int `json:"concurrency_limit,omitempty" yaml:"concurrency_limit"` } // BenchmarkScenario represents the configuration for the internal benchmark scenario. @@ -62,10 +63,13 @@ type BenchmarkDataset struct { Group string `json:"group,omitempty"` Indexes int `json:"indexes,omitempty"` Range *BenchmarkDatasetRange `json:"range,omitempty"` + URL string `json:"url,omitempty"` } func (d *BenchmarkDataset) Bind() *BenchmarkDataset { d.Name = GetActualValue(d.Name) + d.Group = GetActualValue(d.Group) + d.URL = GetActualValue(d.URL) return d } @@ -124,15 +128,18 @@ func (cfg *UpsertConfig) Bind() *UpsertConfig { // SearchConfig defines the desired state of search config type SearchConfig struct { - Epsilon float32 `json:"epsilon,omitempty"` - Radius float32 `json:"radius,omitempty"` - Num int32 `json:"num,omitempty"` - MinNum int32 `json:"min_num,omitempty"` - Timeout string `json:"timeout,omitempty"` + Epsilon float32 `json:"epsilon,omitempty"` + Radius float32 `json:"radius,omitempty"` + Num int32 `json:"num,omitempty"` + MinNum int32 `json:"min_num,omitempty"` + Timeout string `json:"timeout,omitempty"` + EnableLinearSearch bool `json:"enable_linear_search,omitempty"` + AggregationAlgorithm string `json:"aggregation_algorithm,omitempty"` } func (cfg *SearchConfig) Bind() *SearchConfig { cfg.Timeout = GetActualValue(cfg.Timeout) + cfg.AggregationAlgorithm = GetActualValue(cfg.AggregationAlgorithm) return cfg } @@ -225,3 +232,16 @@ func (b *BenchmarkJob) Bind() *BenchmarkJob { func (b *BenchmarkScenario) Bind() *BenchmarkScenario { return b } + +// BenchmarkJobImageInfo represents the docker image information for benchmark job. +type BenchmarkJobImageInfo struct { + Image string `json:"image,omitempty" yaml:"image"` + PullPolicy string `json:"pull_policy,omitempty" yaml:"pull_policy"` +} + +// Bind binds the actual data from the BenchmarkJobImageInfo receiver fields. +func (b *BenchmarkJobImageInfo) Bind() *BenchmarkJobImageInfo { + b.Image = GetActualValue(b.Image) + b.PullPolicy = GetActualValue(b.PullPolicy) + return b +} diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 2b3647ebd1..057932a200 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -20,6 +20,7 @@ package client import ( "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" cli "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,11 +31,16 @@ type ( Object = cli.Object ObjectKey = cli.ObjectKey DeleteAllOfOptions = cli.DeleteAllOfOptions + DeleteOptions = cli.DeleteOptions ListOptions = cli.ListOptions MatchingLabels = cli.MatchingLabels InNamespace = cli.InNamespace ) +const ( + DeletePropagationBackground = metav1.DeletePropagationBackground +) + type Client interface { // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response diff --git a/internal/k8s/job/job.go b/internal/k8s/job/job.go index aa07449d08..97ec3de0eb 100644 --- a/internal/k8s/job/job.go +++ b/internal/k8s/job/job.go @@ -30,7 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" ) // JobWatcher is a type alias for k8s resource controller. @@ -153,7 +152,7 @@ func (r *reconciler) Owns() (client.Object, []builder.OwnsOption) { // Watches returns the kind of the job and the event handler. // It will always return nil. -func (r *reconciler) Watches() (*source.Kind, handler.EventHandler, []builder.WatchesOption) { +func (r *reconciler) Watches() (client.Object, handler.EventHandler, []builder.WatchesOption) { // return &source.Kind{Type: new(corev1.Pod)}, &handler.EnqueueRequestForObject{} return nil, nil, nil } diff --git a/internal/k8s/vald/benchmark/api/v1/job_types.go b/internal/k8s/vald/benchmark/api/v1/job_types.go index 135d1e42e9..729bf27629 100644 --- a/internal/k8s/vald/benchmark/api/v1/job_types.go +++ b/internal/k8s/vald/benchmark/api/v1/job_types.go @@ -23,21 +23,23 @@ import ( ) type BenchmarkJobSpec struct { - Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` - Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` - Dimension int `json:"dimension,omitempty" yaml:"dimension"` - Replica int `json:"replica,omitempty" yaml:"replica"` - Repetition int `json:"repetition,omitempty" yaml:"repetition"` - JobType string `json:"job_type,omitempty" yaml:"job_type"` - InsertConfig *config.InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"` - UpdateConfig *config.UpdateConfig `json:"update_config,omitempty" yaml:"update_config"` - UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"` - SearchConfig *config.SearchConfig `json:"search_config,omitempty" yaml:"search_config"` - RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"` - ObjectConfig *config.ObjectConfig `json:"object_config,omitempty" yaml:"object_config"` - ClientConfig *config.GRPCClient `json:"client_config,omitempty" yaml:"client_config"` - Rules []*config.BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"` - RPS int `json:"rps,omitempty" yaml:"rps"` + Target *BenchmarkTarget `json:"target,omitempty"` + Dataset *BenchmarkDataset `json:"dataset,omitempty"` + Dimension int `json:"dimension,omitempty"` + Replica int `json:"replica,omitempty"` + Repetition int `json:"repetition,omitempty"` + JobType string `json:"job_type,omitempty"` + InsertConfig *config.InsertConfig `json:"insert_config,omitempty"` + UpdateConfig *config.UpdateConfig `json:"update_config,omitempty"` + UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty"` + SearchConfig *config.SearchConfig `json:"search_config,omitempty"` + RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty"` + ObjectConfig *config.ObjectConfig `json:"object_config,omitempty"` + ClientConfig *config.GRPCClient `json:"client_config,omitempty"` + Rules []*config.BenchmarkJobRule `json:"rules,omitempty"` + RPS int `json:"rps,omitempty"` + ConcurrencyLimit int `json:"concurrency_limit,omitempty"` + TTLSecondsAfterFinished int `json:"ttl_seconds_after_finished,omitempty"` } type BenchmarkJobStatus string diff --git a/internal/k8s/vald/benchmark/job/job.go b/internal/k8s/vald/benchmark/job/job.go index 87a7efe22a..6f81ee01fa 100644 --- a/internal/k8s/vald/benchmark/job/job.go +++ b/internal/k8s/vald/benchmark/job/job.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/scheme" - "sigs.k8s.io/controller-runtime/pkg/source" ) type BenchmarkJobWatcher k8s.ResourceController @@ -134,7 +133,7 @@ func (r *reconciler) Owns() (client.Object, []builder.OwnsOption) { return nil, nil } -func (r *reconciler) Watches() (*source.Kind, handler.EventHandler, []builder.WatchesOption) { +func (r *reconciler) Watches() (client.Object, handler.EventHandler, []builder.WatchesOption) { // return &source.Kind{Type: new(corev1.Pod)}, &handler.EnqueueRequestForObject{} return nil, nil, nil } diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go index 0368e6a5b5..6b1926f0dc 100644 --- a/internal/k8s/vald/benchmark/job/job_template.go +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -18,37 +18,71 @@ package job import ( - batchv1 "k8s.io/api/batch/v1" + jobs "github.com/vdaas/vald/internal/k8s/job" corev1 "k8s.io/api/core/v1" ) -type benchmarkJobTemplate = batchv1.Job +type ( + ImagePullPolicy corev1.PullPolicy + RestartPolicy corev1.RestartPolicy +) const ( - SvcAccountName = "vald-benchmark-operator" - ContainerName = "vald-benchmark-job" - // TODO: Fix - ContainerImage = "vdaas/vald-benchmark-job:pr-2027" + PullAlways ImagePullPolicy = "Always" + PullNever ImagePullPolicy = "Never" + PullIfNotPresent ImagePullPolicy = "PullIfNotPresent" + + RestartPolicyAlways RestartPolicy = "Always" + RestartPolicyOnFailure RestartPolicy = "OnFailure" + RestartPolicyNever RestartPolicy = "Never" +) - RestartPolicyAlways corev1.RestartPolicy = "Always" - RestartPolicyOnFailure corev1.RestartPolicy = "OnFailure" - RestartPolicyNever corev1.RestartPolicy = "Never" +const ( + svcAccount = "vald-benchmark-operator" ) -// NewBenchmarkJobTemplate creates the job template for crating k8s job resource. -func NewBenchmarkJobTemplate(opts ...BenchmarkJobOption) (benchmarkJobTemplate, error) { - jobTmpl := new(benchmarkJobTemplate) +type BenchmarkJobTpl interface { + CreateJobTpl(opts ...BenchmarkJobOption) (jobs.Job, error) +} + +type benchmarkJobTpl struct { + containerName string + containerImageName string + imagePullPolicy ImagePullPolicy + jobTpl jobs.Job +} + +func NewBenchmarkJob(opts ...BenchmarkJobTplOption) (BenchmarkJobTpl, error) { + bjTpl := new(benchmarkJobTpl) + for _, opt := range append(defaultBenchmarkJobTplOpts, opts...) { + err := opt(bjTpl) + if err != nil { + return nil, err + } + } + return bjTpl, nil +} + +func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (jobs.Job, error) { for _, opt := range append(defaultBenchmarkJobOpts, opts...) { - err := opt(jobTmpl) + err := opt(&b.jobTpl) if err != nil { - return *jobTmpl, err + return b.jobTpl, err } } - jobTmpl.Spec.Template.Spec.Containers = []corev1.Container{ + // TODO: check enable pprof flag + b.jobTpl.Spec.Template.Annotations = map[string]string{ + "pyroscope.io/scrape": "true", + "pyroscope.io/application-name": "benchmark-job", + "pyroscope.io/profile-cpu-enabled": "true", + "pyroscope.io/profile-mem-enabled": "true", + "pyroscope.io/port": "6060", + } + b.jobTpl.Spec.Template.Spec.Containers = []corev1.Container{ { - Name: ContainerName, - Image: ContainerImage, - ImagePullPolicy: corev1.PullAlways, + Name: b.containerName, + Image: b.containerImageName, + ImagePullPolicy: corev1.PullPolicy(b.imagePullPolicy), LivenessProbe: &corev1.Probe{ InitialDelaySeconds: int32(60), PeriodSeconds: int32(10), @@ -107,5 +141,5 @@ func NewBenchmarkJobTemplate(opts ...BenchmarkJobOption) (benchmarkJobTemplate, }, }, } - return *jobTmpl, nil + return b.jobTpl, nil } diff --git a/internal/k8s/vald/benchmark/job/job_template_option.go b/internal/k8s/vald/benchmark/job/job_template_option.go index f5cc78f875..4ec09250d3 100644 --- a/internal/k8s/vald/benchmark/job/job_template_option.go +++ b/internal/k8s/vald/benchmark/job/job_template_option.go @@ -18,20 +18,60 @@ package job import ( "github.com/vdaas/vald/internal/k8s" + jobs "github.com/vdaas/vald/internal/k8s/job" corev1 "k8s.io/api/core/v1" ) +type BenchmarkJobTplOption func(b *benchmarkJobTpl) error + +var defaultBenchmarkJobTplOpts = []BenchmarkJobTplOption{ + WithContainerName("vald-benchmark-job"), + WithContainerImage("vdaas/vald-benchmark-job"), + WithImagePullPolicy(PullAlways), +} + +// WithContainerName sets the docker container name of benchmark job. +func WithContainerName(name string) BenchmarkJobTplOption { + return func(b *benchmarkJobTpl) error { + if len(name) > 0 { + b.containerName = name + } + return nil + } +} + +// WithContainerImage sets the docker image path for benchmark job. +func WithContainerImage(name string) BenchmarkJobTplOption { + return func(b *benchmarkJobTpl) error { + if len(name) > 0 { + b.containerImageName = name + } + return nil + } +} + +// WithImagePullPolicy sets the docker image pull policy for benchmark job. +func WithImagePullPolicy(p ImagePullPolicy) BenchmarkJobTplOption { + return func(b *benchmarkJobTpl) error { + if len(p) == 0 { + return nil + } + b.imagePullPolicy = p + return nil + } +} + // BenchmarkJobOption represents the option for create benchmark job template. -type BenchmarkJobOption func(b *benchmarkJobTemplate) error +type BenchmarkJobOption func(b *jobs.Job) error var defaultBenchmarkJobOpts = []BenchmarkJobOption{ - WithSvcAccountName(SvcAccountName), + WithSvcAccountName(svcAccount), WithRestartPolicy(RestartPolicyNever), } // WithSvcAccountName sets the service account name for benchmark job. func WithSvcAccountName(name string) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { if len(name) > 0 { b.Spec.Template.Spec.ServiceAccountName = name } @@ -40,10 +80,10 @@ func WithSvcAccountName(name string) BenchmarkJobOption { } // WithRestartPolicy sets the job restart policy for benchmark job. -func WithRestartPolicy(rp corev1.RestartPolicy) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { +func WithRestartPolicy(rp RestartPolicy) BenchmarkJobOption { + return func(b *jobs.Job) error { if len(rp) > 0 { - b.Spec.Template.Spec.RestartPolicy = rp + b.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicy(rp) } return nil } @@ -51,7 +91,7 @@ func WithRestartPolicy(rp corev1.RestartPolicy) BenchmarkJobOption { // WithBackoffLimit sets the job backoff limit for benchmark job. func WithBackoffLimit(bo int32) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { b.Spec.BackoffLimit = &bo return nil } @@ -59,7 +99,7 @@ func WithBackoffLimit(bo int32) BenchmarkJobOption { // WithName sets the job name of benchmark job. func WithName(name string) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { b.Name = name return nil } @@ -67,7 +107,7 @@ func WithName(name string) BenchmarkJobOption { // WithNamespace specify namespace where job will execute. func WithNamespace(ns string) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { b.Namespace = ns return nil } @@ -75,7 +115,7 @@ func WithNamespace(ns string) BenchmarkJobOption { // WithOwnerRef sets the OwnerReference to the job resource. func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { if len(refs) > 0 { b.OwnerReferences = refs } @@ -85,7 +125,7 @@ func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption { // WithCompletions sets the job completion. func WithCompletions(com int32) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { if com > 1 { b.Spec.Completions = &com } @@ -95,7 +135,7 @@ func WithCompletions(com int32) BenchmarkJobOption { // WithParallelism sets the job parallelism. func WithParallelism(parallelism int32) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { if parallelism > 1 { b.Spec.Parallelism = ¶llelism } @@ -105,10 +145,20 @@ func WithParallelism(parallelism int32) BenchmarkJobOption { // WithLabel sets the label to the job resource. func WithLabel(label map[string]string) BenchmarkJobOption { - return func(b *benchmarkJobTemplate) error { + return func(b *jobs.Job) error { if len(label) > 0 { b.Labels = label } return nil } } + +// WithTTLSecondsAfterFinished sets the TTLSecondsAfterFinished to the job template. +func WithTTLSecondsAfterFinished(ttl int32) BenchmarkJobOption { + return func(b *jobs.Job) error { + if ttl > 0 { + b.Spec.TTLSecondsAfterFinished = &ttl + } + return nil + } +} diff --git a/internal/k8s/vald/benchmark/scenario/scenario.go b/internal/k8s/vald/benchmark/scenario/scenario.go index cde92a808d..d03d1e37da 100644 --- a/internal/k8s/vald/benchmark/scenario/scenario.go +++ b/internal/k8s/vald/benchmark/scenario/scenario.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" ) type BenchmarkScenarioWatcher k8s.ResourceController @@ -122,7 +121,8 @@ func (r *reconciler) Owns() (client.Object, []builder.OwnsOption) { return nil, nil } -func (r *reconciler) Watches() (*source.Kind, handler.EventHandler, []builder.WatchesOption) { +func (r *reconciler) Watches() (client.Object, handler.EventHandler, []builder.WatchesOption) { // return &source.Kind{Type: new(corev1.Pod)}, &handler.EnqueueRequestForObject{} - return &source.Kind{Type: new(v1.ValdBenchmarkScenario)}, &handler.EnqueueRequestForObject{}, nil + // return &source.Kind{Type: new(v1.ValdBenchmarkScenario)}, &handler.EnqueueRequestForObject{}, nil + return nil, &handler.EnqueueRequestForObject{}, nil } diff --git a/internal/test/data/hdf5/hdf5.go b/internal/test/data/hdf5/hdf5.go index f178a52c2e..b308687cf0 100644 --- a/internal/test/data/hdf5/hdf5.go +++ b/internal/test/data/hdf5/hdf5.go @@ -28,8 +28,8 @@ import ( ) type Data interface { - Download() error - Read() error + Download(url string) error + Read(key Hdf5Key) error GetName() DatasetName GetPath() string GetByGroupName(name string) [][]float32 @@ -41,13 +41,16 @@ type Data interface { type DatasetName int const ( - FashionMNIST784Euclidean DatasetName = iota + Original DatasetName = iota + FashionMNIST784Euclidean ) func (d DatasetName) String() string { switch d { + case Original: + return "original" case FashionMNIST784Euclidean: - return "fashion-mnist-784-euc" + return "fashion-mnist" default: return "" } @@ -68,16 +71,16 @@ func (d DatasetUrl) String() string { } } -type hdf5Key int +type Hdf5Key int const ( - Train hdf5Key = iota + Train Hdf5Key = iota + 1 Test Neighors ) -func (h hdf5Key) String() string { - switch h { +func (key Hdf5Key) String() string { + switch key { case Train: return "train" case Test: @@ -109,8 +112,10 @@ func New(opts ...Option) (Data, error) { // Get downloads the hdf5 file. // https://github.com/erikbern/ann-benchmarks/#data-sets -func (d *data) Download() error { +func (d *data) Download(url string) error { switch d.name { + case Original: + return downloadFile(url, d.path) case FashionMNIST784Euclidean: return downloadFile(FashionMNIST784EuclideanUrl.String(), d.path) default: @@ -118,26 +123,29 @@ func (d *data) Download() error { } } -func (d *data) Read() error { +func (d *data) Read(key Hdf5Key) error { f, err := hdf5.OpenFile(d.path, hdf5.F_ACC_RDONLY) if err != nil { return err } defer f.Close() - // load training data - train, err := ReadDatasetF32(f, Train) - if err != nil { - return err + if key != Test { + // load training data + train, err := ReadDatasetF32(f, Train) + if err != nil { + return err + } + d.train = train } - d.train = train - - // load test data - test, err := ReadDatasetF32(f, Test) - if err != nil { - return err + if key != Train { + // load test data + test, err := ReadDatasetF32(f, Test) + if err != nil { + return err + } + d.test = test } - d.test = test // load neighbors neighbors32, err := ReadDatasetI32(f, Neighors) @@ -230,7 +238,7 @@ func downloadFile(url, path string) error { return nil } -func ReadDatasetF32(file *hdf5.File, key hdf5Key) ([][]float32, error) { +func ReadDatasetF32(file *hdf5.File, key Hdf5Key) ([][]float32, error) { data, err := file.OpenDataset(key.String()) if err != nil { return nil, err @@ -259,7 +267,7 @@ func ReadDatasetF32(file *hdf5.File, key hdf5Key) ([][]float32, error) { return vecs, nil } -func ReadDatasetI32(file *hdf5.File, key hdf5Key) ([][]int32, error) { +func ReadDatasetI32(file *hdf5.File, key Hdf5Key) ([][]int32, error) { data, err := file.OpenDataset(key.String()) if err != nil { return nil, err diff --git a/internal/test/data/hdf5/hdf5_test.go b/internal/test/data/hdf5/hdf5_test.go index d308048781..940a76021b 100644 --- a/internal/test/data/hdf5/hdf5_test.go +++ b/internal/test/data/hdf5/hdf5_test.go @@ -47,37 +47,19 @@ func TestDatasetName_String(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - want: want{}, - checkFunc: defaultCheckFunc, - beforeFunc: func(t *testing.T,) { - t.Helper() - }, - afterFunc: func(t *testing.T,) { - t.Helper() - }, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - want: want{}, - checkFunc: defaultCheckFunc, - beforeFunc: func(t *testing.T,) { - t.Helper() - }, - afterFunc: func(t *testing.T,) { - t.Helper() - }, - } - }(), - */ + // { + // name: "set original", + // want: want{ + // want: "original", + // }, + // checkFunc: defaultCheckFunc, + // beforeFunc: func(t *testing.T,) { + // t.Helper() + // }, + // afterFunc: func(t *testing.T,) { + // t.Helper() + // }, + // }, } for _, tc := range tests { @@ -180,13 +162,13 @@ func TestDatasetUrl_String(t *testing.T) { } } -func Test_hdf5Key_String(t *testing.T) { +func Test_Hdf5Key_String(t *testing.T) { type want struct { want string } type test struct { name string - h hdf5Key + h Hdf5Key want want checkFunc func(want, string) error beforeFunc func(*testing.T) @@ -353,12 +335,16 @@ func Test_data_Download(t *testing.T) { test [][]float32 neighbors [][]int } + type args struct { + url string + } type want struct { err error } type test struct { name string fields fields + args args want want checkFunc func(want, error) error beforeFunc func(*testing.T) @@ -440,7 +426,7 @@ func Test_data_Download(t *testing.T) { neighbors: test.fields.neighbors, } - err := d.Download() + err := d.Download(test.args.url) if err := checkFunc(test.want, err); err != nil { tt.Errorf("error = %v", err) } @@ -456,6 +442,9 @@ func Test_data_Read(t *testing.T) { test [][]float32 neighbors [][]int } + type args struct { + key Hdf5Key + } type want struct { err error } @@ -463,6 +452,7 @@ func Test_data_Read(t *testing.T) { name string fields fields want want + args args checkFunc func(want, error) error beforeFunc func(*testing.T) afterFunc func(*testing.T) @@ -544,7 +534,7 @@ func Test_data_Read(t *testing.T) { neighbors: test.fields.neighbors, } - err := d.Read() + err := d.Read(test.args.key) if err := checkFunc(test.want, err); err != nil { tt.Errorf("error = %v", err) } @@ -1265,7 +1255,7 @@ func Test_downloadFile(t *testing.T) { func TestReadDatasetF32(t *testing.T) { type args struct { file *hdf5.File - key hdf5Key + key Hdf5Key } type want struct { want [][]float32 @@ -1357,7 +1347,7 @@ func TestReadDatasetF32(t *testing.T) { func TestReadDatasetI32(t *testing.T) { type args struct { file *hdf5.File - key hdf5Key + key Hdf5Key } type want struct { want [][]int32 diff --git a/internal/test/data/hdf5/option.go b/internal/test/data/hdf5/option.go index 6fee9d0ae3..4d02b281f2 100644 --- a/internal/test/data/hdf5/option.go +++ b/internal/test/data/hdf5/option.go @@ -31,6 +31,8 @@ var defaultOptions = []Option{ func WithNameByString(n string) Option { var name DatasetName switch n { + case Original.String(): + name = Original case FashionMNIST784Euclidean.String(): name = FashionMNIST784Euclidean } @@ -40,6 +42,8 @@ func WithNameByString(n string) Option { func WithName(dn DatasetName) Option { return func(d *data) error { switch dn { + case Original: + d.name = dn case FashionMNIST784Euclidean: d.name = dn default: diff --git a/internal/test/data/hdf5/option_test.go b/internal/test/data/hdf5/option_test.go new file mode 100644 index 0000000000..e239c3361d --- /dev/null +++ b/internal/test/data/hdf5/option_test.go @@ -0,0 +1,387 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package hdf5 is load hdf5 file +package hdf5 + +import ( + "reflect" + "testing" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/test/goleak" +) + +func TestWithNameByString(t *testing.T) { + // Change interface type to the type of object you are testing + type args struct { + n string + } + type want struct { + obj data + } + type test struct { + name string + args args + want want + checkFunc func(want want, got data) error + beforeFunc func(*testing.T) + afterFunc func(*testing.T) + } + + // Uncomment this block if the option returns an error, otherwise delete it + /* + defaultCheckFunc := func(w want, obj *T, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + */ + + defaultCheckFunc := func(w want, obj data) error { + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + + tests := []test{ + { + name: "set original", + args: args{ + n: "original", + }, + want: want{ + obj: data{ + name: Original, + }, + }, + beforeFunc: func(t *testing.T) { + t.Helper() + }, + afterFunc: func(t *testing.T) { + t.Helper() + }, + }, + { + name: "set fashion-mnist", + args: args{ + n: "fashion-mnist", + }, + want: want{ + obj: data{ + name: FashionMNIST784Euclidean, + }, + }, + beforeFunc: func(t *testing.T) { + t.Helper() + }, + afterFunc: func(t *testing.T) { + t.Helper() + }, + }, + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(tt) + } + if test.afterFunc != nil { + defer test.afterFunc(tt) + } + + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := WithNameByString(test.args.n) + obj := new(data) + _ = got(obj) + if err := checkFunc(test.want, *obj); err != nil { + tt.Errorf("error = %v", err) + } + }) + } +} + +func TestWithName(t *testing.T) { + // Change interface type to the type of object you are testing + type T = interface{} + type args struct { + dn DatasetName + } + type want struct { + obj *T + // Uncomment this line if the option returns an error, otherwise delete it + // err error + } + type test struct { + name string + args args + want want + // Use the first line if the option returns an error. otherwise use the second line + // checkFunc func(want, *T, error) error + // checkFunc func(want, *T) error + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) + } + + // Uncomment this block if the option returns an error, otherwise delete it + /* + defaultCheckFunc := func(w want, obj *T, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + */ + + // Uncomment this block if the option do not returns an error, otherwise delete it + /* + defaultCheckFunc := func(w want, obj *T) error { + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + */ + + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + dn:nil, + }, + want: want { + obj: new(T), + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dn:nil, + }, + want: want { + obj: new(T), + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(tt, test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(tt, test.args) + } + + // Uncomment this block if the option returns an error, otherwise delete it + /* + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := WithName(test.args.dn) + obj := new(T) + if err := checkFunc(test.want, obj, got(obj)); err != nil { + tt.Errorf("error = %v", err) + } + */ + + // Uncomment this block if the option do not return an error, otherwise delete it + /* + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + got := WithName(test.args.dn) + obj := new(T) + got(obj) + if err := checkFunc(test.want, obj); err != nil { + tt.Errorf("error = %v", err) + } + */ + }) + } +} + +func TestWithFilePath(t *testing.T) { + // Change interface type to the type of object you are testing + type T = interface{} + type args struct { + f string + } + type want struct { + obj *T + // Uncomment this line if the option returns an error, otherwise delete it + // err error + } + type test struct { + name string + args args + want want + // Use the first line if the option returns an error. otherwise use the second line + // checkFunc func(want, *T, error) error + // checkFunc func(want, *T) error + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) + } + + // Uncomment this block if the option returns an error, otherwise delete it + /* + defaultCheckFunc := func(w want, obj *T, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + */ + + // Uncomment this block if the option do not returns an error, otherwise delete it + /* + defaultCheckFunc := func(w want, obj *T) error { + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + */ + + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + f:"", + }, + want: want { + obj: new(T), + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + f:"", + }, + want: want { + obj: new(T), + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + }, + afterFunc: func(t *testing.T, args args) { + t.Helper() + }, + } + }(), + */ + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(tt, test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(tt, test.args) + } + + // Uncomment this block if the option returns an error, otherwise delete it + /* + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + + got := WithFilePath(test.args.f) + obj := new(T) + if err := checkFunc(test.want, obj, got(obj)); err != nil { + tt.Errorf("error = %v", err) + } + */ + + // Uncomment this block if the option do not return an error, otherwise delete it + /* + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + got := WithFilePath(test.args.f) + obj := new(T) + got(obj) + if err := checkFunc(test.want, obj); err != nil { + tt.Errorf("error = %v", err) + } + */ + }) + } +} diff --git a/k8s/tools/benchmark/operator/clusterrole.yaml b/k8s/tools/benchmark/operator/clusterrole.yaml index 70ac264c95..4d3fc54948 100644 --- a/k8s/tools/benchmark/operator/clusterrole.yaml +++ b/k8s/tools/benchmark/operator/clusterrole.yaml @@ -55,6 +55,7 @@ rules: verbs: - create - delete + - deletecollection - get - list - patch diff --git a/k8s/tools/benchmark/operator/configmap.yaml b/k8s/tools/benchmark/operator/configmap.yaml index 05f7b34594..7f6f7a492a 100644 --- a/k8s/tools/benchmark/operator/configmap.yaml +++ b/k8s/tools/benchmark/operator/configmap.yaml @@ -138,3 +138,6 @@ data: trace: enabled: false sampling_rate: 1 + job_image: + image: "vdaas/vald-benchmark-job:v1.7.5" + pullPolicy: Always diff --git a/k8s/tools/benchmark/operator/crds/valdbenchmarkjob.yaml b/k8s/tools/benchmark/operator/crds/valdbenchmarkjob.yaml index de85714816..d85ea0c12b 100644 --- a/k8s/tools/benchmark/operator/crds/valdbenchmarkjob.yaml +++ b/k8s/tools/benchmark/operator/crds/valdbenchmarkjob.yaml @@ -240,6 +240,10 @@ spec: type: string wait_for_ready: type: boolean + concurrency_limit: + type: integer + maximum: 65535 + minimum: 0 dataset: type: object properties: @@ -252,6 +256,7 @@ spec: name: type: string enum: + - original - fashion-mnist range: type: object @@ -262,6 +267,11 @@ spec: start: type: integer minimum: 1 + required: + - start + - end + url: + type: string required: - name - indexes @@ -285,7 +295,7 @@ spec: - upsert - search - remove - - get_object + - getobject - exists object_config: type: object @@ -319,6 +329,16 @@ spec: search_config: type: object properties: + aggregation_algorithm: + type: string + enum: + - Unknown + - ConcurrentQueue + - SortSlice + - SortPoolSlice + - PairingHeap + enable_linear_search: + type: boolean epsilon: type: number min_num: @@ -342,6 +362,10 @@ spec: required: - host - port + ttl_seconds_after_finished: + type: integer + maximum: 65535 + minimum: 0 update_config: type: object properties: diff --git a/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml b/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml index 3451a48b38..28ab299701 100644 --- a/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml +++ b/k8s/tools/benchmark/operator/crds/valdbenchmarkoperatorrelease.yaml @@ -81,6 +81,19 @@ spec: type: string tag: type: string + job_image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string logging: type: object properties: diff --git a/k8s/tools/benchmark/operator/crds/valdbenchmarkscenario.yaml b/k8s/tools/benchmark/operator/crds/valdbenchmarkscenario.yaml index f705e7df6b..871f59b390 100644 --- a/k8s/tools/benchmark/operator/crds/valdbenchmarkscenario.yaml +++ b/k8s/tools/benchmark/operator/crds/valdbenchmarkscenario.yaml @@ -25,8 +25,8 @@ spec: plural: valdbenchmarkscenarios singular: valdbenchmarkscenario shortNames: - - vbo - - vbos + - vbs + - vbss scope: Namespaced versions: - name: v1 @@ -74,6 +74,7 @@ spec: name: type: string enum: + - original - fashion-mnist range: type: object @@ -84,6 +85,11 @@ spec: start: type: integer minimum: 1 + required: + - start + - end + url: + type: string required: - name - indexes diff --git a/k8s/tools/benchmark/operator/deployment.yaml b/k8s/tools/benchmark/operator/deployment.yaml index 7930e06334..520609518b 100644 --- a/k8s/tools/benchmark/operator/deployment.yaml +++ b/k8s/tools/benchmark/operator/deployment.yaml @@ -98,6 +98,11 @@ spec: volumeMounts: - name: vald-benchmark-operator-config mountPath: /etc/server + env: + - name: JOB_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace terminationMessagePath: /dev/termination-log terminationMessagePolicy: File restartPolicy: Always diff --git a/pkg/tools/benchmark/job/config/config.go b/pkg/tools/benchmark/job/config/config.go index ad1cdb839a..3b1492e25d 100644 --- a/pkg/tools/benchmark/job/config/config.go +++ b/pkg/tools/benchmark/job/config/config.go @@ -104,6 +104,7 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { } else { cfg.Job.Target = (*config.BenchmarkTarget)(jobResource.Spec.Target) cfg.Job.Dataset = (*config.BenchmarkDataset)(jobResource.Spec.Dataset) + cfg.Job.Dimension = jobResource.Spec.Dimension cfg.Job.Replica = jobResource.Spec.Replica cfg.Job.Repetition = jobResource.Spec.Repetition cfg.Job.JobType = jobResource.Spec.JobType @@ -116,6 +117,7 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) { cfg.Job.ObjectConfig = jobResource.Spec.ObjectConfig cfg.Job.ClientConfig = jobResource.Spec.ClientConfig cfg.Job.RPS = jobResource.Spec.RPS + cfg.Job.ConcurrencyLimit = jobResource.Spec.ConcurrencyLimit if annotations := jobResource.GetAnnotations(); annotations != nil { cfg.Job.BeforeJobName = annotations[JOBNAME_ANNOTATION] cfg.Job.BeforeJobNamespace = annotations[JOBNAMESPACE_ANNOTATION] diff --git a/pkg/tools/benchmark/job/service/insert.go b/pkg/tools/benchmark/job/service/insert.go index 946710df4b..69d16ecc27 100644 --- a/pkg/tools/benchmark/job/service/insert.go +++ b/pkg/tools/benchmark/job/service/insert.go @@ -19,58 +19,73 @@ package service import ( "context" + "math" "strconv" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) insert(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking insert") // create data - vecs := j.genVec(j.dataset) + vecs := j.hdf5.GetByGroupName(j.dataset.Group) cfg := &payload.Insert_Config{ SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck, } if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start insert: iter = %d", i) - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) - } - ech <- err - } - res, err := j.client.Insert(ctx, &payload.Insert_Request{ - Vector: &payload.Object_Vector{ - Id: strconv.Itoa(i), - Vector: vecs[i], - }, - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + iter := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start insert: iter = %d", iter) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + // return errors.Join(err, context.Canceled) + return nil } select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + return egctx.Err() + case ech <- err: } - default: - st, _ := status.FromError(err) - log.Warnf("[benchmark job] insert error is detected: code = %d, msg = %s", st.Code(), err.Error()) } - } - // TODO: send metrics to the Prometeus - log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, res) + loopCnt := math.Floor(float64(iter-1) / float64(len(vecs))) + idx := iter - 1 - (len(vecs) * int(loopCnt)) + res, err := j.client.Insert(egctx, &payload.Insert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(iter), + Vector: vecs[idx], + }, + Config: cfg, + }) + if err != nil { + select { + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // log.Warnf("[benchmark job] insert error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } + } + } + // TODO: send metrics to the Prometeus + log.Debugf("[benchmark job] Finish insert: iter= %d \n%v\n", iter, res) + return nil + }) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] insert error is detected: err = %s", err.Error()) + return err } log.Info("[benchmark job] Finish benchmarking insert") return nil diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 2736ad898c..2f115ea157 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -19,7 +19,6 @@ package service import ( "context" - "math" "os" "reflect" "strconv" @@ -100,6 +99,7 @@ type job struct { beforeJobDur time.Duration limiter rate.Limiter rps int + concurrencyLimit int timeout time.Duration timestamp int64 } @@ -186,20 +186,65 @@ func New(opts ...Option) (Job, error) { if j.rps > 0 { j.limiter = rate.NewLimiter(j.rps) } + // If (Range.End - Range.Start) is smaller than Indexes, Indexes are prioritized based on Range.Start. + if (j.dataset.Range.End - j.dataset.Range.Start + 1) < j.dataset.Indexes { + j.dataset.Range.End = j.dataset.Range.Start + j.dataset.Indexes + } + return j, nil } func (j *job) PreStart(ctx context.Context) error { - log.Infof("[benchmark job] start download dataset of %s", j.hdf5.GetName().String()) - if err := j.hdf5.Download(); err != nil { - return err + if j.jobType != GETOBJECT && j.jobType != EXISTS && j.jobType != REMOVE { + log.Infof("[benchmark job] start download dataset of %s", j.hdf5.GetName().String()) + if err := j.hdf5.Download(j.dataset.URL); err != nil { + return err + } + log.Infof("[benchmark job] success download dataset of %s", j.hdf5.GetName().String()) + log.Infof("[benchmark job] start load dataset of %s", j.hdf5.GetName().String()) + var key hdf5.Hdf5Key + switch j.dataset.Group { + case "train": + key = hdf5.Train + case "test": + key = hdf5.Test + case "neighbors": + key = hdf5.Neighors + default: + } + if err := j.hdf5.Read(key); err != nil { + return err + } + log.Infof("[benchmark job] success load dataset of %s", j.hdf5.GetName().String()) } - log.Infof("[benchmark job] success download dataset of %s", j.hdf5.GetName().String()) - log.Infof("[benchmark job] start load dataset of %s", j.hdf5.GetName().String()) - if err := j.hdf5.Read(); err != nil { - return err + // Wait for beforeJob completed if exists + if len(j.beforeJobName) != 0 { + var jobResource v1.ValdBenchmarkJob + log.Info("[benchmark job] check before benchjob is completed or not...") + j.eg.Go(safety.RecoverFunc(func() error { + dt := time.NewTicker(j.beforeJobDur) + defer dt.Stop() + for { + select { + case <-ctx.Done(): + return nil + case <-dt.C: + err := j.k8sClient.Get(ctx, j.beforeJobName, j.beforeJobNamespace, &jobResource) + if err != nil { + return err + } + if jobResource.Status == v1.BenchmarkJobCompleted { + log.Infof("[benchmark job ] before job (%s) is completed, job service will start soon.", j.beforeJobName) + return nil + } + log.Infof("[benchmark job] before job (%s/%s) is not completed...", j.beforeJobName, jobResource.Status) + } + } + })) + if err := j.eg.Wait(); err != nil { + return err + } } - log.Infof("[benchmark job] success load dataset of %s", j.hdf5.GetName().String()) // Wait for beforeJob completed if exists if len(j.beforeJobName) != 0 { var jobResource v1.ValdBenchmarkJob @@ -272,7 +317,6 @@ func (j *job) Start(ctx context.Context) (<-chan error, error) { } return }) - return ech, nil } @@ -301,22 +345,3 @@ func calcRecall(linearRes, searchRes *payload.Search_Response) (recall float64) } return recall / float64(len(lres)) } - -func (j *job) genVec(cfg *config.BenchmarkDataset) [][]float32 { - start := cfg.Range.Start - end := cfg.Range.End - // If (Range.End - Range.Start) is smaller than Indexes, Indexes are prioritized based on Range.Start. - if (end - start + 1) < cfg.Indexes { - end = cfg.Range.Start + cfg.Indexes - } - data := j.hdf5.GetByGroupName(cfg.Group) - if n := math.Ceil(float64(end) / float64(len(data))); n > 1 { - var def [][]float32 - for i := 0; i < int(n-1); i++ { - def = append(def, data...) - } - data = append(data, def...) - } - vectors := data[start-1 : end] - return vectors -} diff --git a/pkg/tools/benchmark/job/service/object.go b/pkg/tools/benchmark/job/service/object.go index c36ebd9fdb..eec7ea773f 100644 --- a/pkg/tools/benchmark/job/service/object.go +++ b/pkg/tools/benchmark/job/service/object.go @@ -22,43 +22,55 @@ import ( "strconv" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) exists(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking exists") - for i := 0; i < j.dataset.Indexes; i++ { - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) - } - ech <- err - } - res, err := j.client.Exists(ctx, &payload.Object_ID{ - Id: strconv.Itoa(i), - }) - if err != nil { - select { - case <-ctx.Done(): + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + idx := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start exists: iter = %d", i) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + return nil + // return errors.Join(err, context.Canceled) } select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + return egctx.Err() + case ech <- err: } - default: - st, _ := status.FromError(err) - log.Warnf("[benchmark job] exists error is detected: code = %d, msg = %s", st.Code(), err.Error()) } - } - if res != nil { - log.Infof("[benchmark exists job] iter=%d, Id=%s", i, res.GetId()) - } + res, err := j.client.Exists(egctx, &payload.Object_ID{ + Id: strconv.Itoa(idx), + }) + if err != nil { + select { + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return nil + // return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // log.Warnf("[benchmark job] exists error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } + } + } + log.Debugf("[benchmark job] Finish exists: iter= %d \n%v\n", idx, res) + return nil + }) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] exists RPC error is detected: err = %s", err.Error()) + return err } log.Info("[benchmark job] Finish benchmarking exists") return nil @@ -66,17 +78,10 @@ func (j *job) exists(ctx context.Context, ech chan error) error { func (j *job) getObject(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking getObject") - // create data - vecs := j.genVec(j.dataset) - for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start getObject: iter = %d", i) - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) - } - ech <- err - } + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + log.Infof("[benchmark job] Start get object: iter = %d", i) ft := []*payload.Filter_Target{} if j.objectConfig != nil { for i, target := range j.objectConfig.FilterConfig.Targets { @@ -86,33 +91,53 @@ func (j *job) getObject(ctx context.Context, ech chan error) error { } } } - res, err := j.client.GetObject(ctx, &payload.Object_VectorRequest{ - Id: &payload.Object_ID{ - Id: strconv.Itoa(i), - }, - Filters: &payload.Filter_Config{ - Targets: ft, - }, - }) - if res != nil { - log.Infof("[benchmark get object job] iter=%d, Id=%s, Vec=%v", i, res.GetId(), res.GetVector()) - } - if err != nil { - select { - case <-ctx.Done(): + idx := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start get object: iter = %d", idx) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + // return errors.Join(err, context.Canceled) + return nil } select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + return egctx.Err() + case ech <- err: } - default: - st, _ := status.FromError(err) - log.Warnf("[benchmark job] get object error is detected: code = %d, msg = %s", st.Code(), err.Error()) } - } + res, err := j.client.GetObject(egctx, &payload.Object_VectorRequest{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(idx), + }, + Filters: &payload.Filter_Config{ + Targets: ft, + }, + }) + if err != nil { + select { + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + // return errors.Join(err, egctx.Err()) + return nil + default: + // if st, ok := status.FromError(err); ok { + // log.Warnf("[benchmark job] object error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } + } + } + if res != nil { + log.Infof("[benchmark get object job] iter=%d, Id=%s, Vec=%v", idx, res.GetId(), res.GetVector()) + } + log.Debugf("[benchmark job] Finish get object: iter= %d \n%v\n", idx, res) + return nil + }) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] object error is detected: err = %s", err.Error()) + return err } log.Info("[benchmark job] Finish benchmarking getObject") return nil diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 772e638a6f..5b2d470fae 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -35,7 +35,8 @@ var defaultOpts = []Option{ // TODO: set default config for client WithDimension(748), WithBeforeJobDuration("30s"), - WithRPS(100), + WithRPS(1000), + WithConcurencyLimit(200), } // WithDimension sets the vector's dimension for running benchmark job with dataset. @@ -141,12 +142,15 @@ func WithHdf5(d hdf5.Data) Option { } } -// WithDataset sets the config.BenchmarkDataset including benchmakr dataset name, group name of hdf5.Data, the number of index, start range and end range. +// WithDataset sets the config.BenchmarkDataset including benchmark dataset name, group name of hdf5.Data, the number of index, start range and end range, and original URL which is used for download user defined hdf5. func WithDataset(d *config.BenchmarkDataset) Option { return func(j *job) error { if d == nil { return errors.NewErrInvalidOption("dataset", d) } + if d.Name == hdf5.Original.String() && len(d.URL) == 0 { + return errors.NewErrInvalidOption("dataset", d) + } j.dataset = d return nil } @@ -252,3 +256,13 @@ func WithRPS(rps int) Option { return nil } } + +// WithConcurencyLimit sets the goroutine limit for sending request to the target cluster. +func WithConcurencyLimit(limit int) Option { + return func(j *job) error { + if limit > 0 { + j.concurrencyLimit = limit + } + return nil + } +} diff --git a/pkg/tools/benchmark/job/service/remove.go b/pkg/tools/benchmark/job/service/remove.go index 7e6db88e12..66c604a6dd 100644 --- a/pkg/tools/benchmark/job/service/remove.go +++ b/pkg/tools/benchmark/job/service/remove.go @@ -22,55 +22,64 @@ import ( "strconv" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) remove(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking remove") - // create data - vecs := j.genVec(j.dataset) cfg := &payload.Remove_Config{ SkipStrictExistCheck: j.removeConfig.SkipStrictExistCheck, } if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start remove: iter = %d", i) - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) - } - ech <- err - } - res, err := j.client.Remove(ctx, &payload.Remove_Request{ - Id: &payload.Object_ID{ - Id: strconv.Itoa(i), - }, - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + idx := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start remove: iter = %d", i) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + return nil + // return errors.Join(err, context.Canceled) + } + select { + case <-egctx.Done(): + return egctx.Err() + case ech <- err: } + } + res, err := j.client.Remove(egctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: strconv.Itoa(idx), + }, + Config: cfg, + }) + if err != nil { select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // log.Warnf("[benchmark job] remove error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } } - default: - st, _ := status.FromError(err) - log.Warnf("[benchmark job] remove error is detected: code = %d, msg = %s", st.Code(), err.Error()) } - } - log.Infof("[benchmark job] Finish remove: iter= %d \n%v", i, res) + log.Debugf("[benchmark job] Finish remove: iter= %d \n%v", idx, res) + return nil + }) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] remove error is detected: err = %s", err.Error()) + return err } - log.Info("[benchmark job] Finish benchmarking remove") return nil } diff --git a/pkg/tools/benchmark/job/service/search.go b/pkg/tools/benchmark/job/service/search.go index dbc9bcbc35..d8fb0d25f9 100644 --- a/pkg/tools/benchmark/job/service/search.go +++ b/pkg/tools/benchmark/job/service/search.go @@ -19,98 +19,151 @@ package service import ( "context" + "math" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc/codes" - "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) search(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking search") // create data - vecs := j.genVec(j.dataset) + vecs := j.hdf5.GetByGroupName(j.dataset.Group) cfg := &payload.Search_Config{ Num: uint32(j.searchConfig.Num), MinNum: uint32(j.searchConfig.MinNum), Radius: float32(j.searchConfig.Radius), Epsilon: float32(j.searchConfig.Epsilon), Timeout: j.timeout.Nanoseconds(), + AggregationAlgorithm: func() payload.Search_AggregationAlgorithm { + if len(j.searchConfig.AggregationAlgorithm) > 0 { + if v, ok := payload.Search_AggregationAlgorithm_value[j.searchConfig.AggregationAlgorithm]; ok { + return payload.Search_AggregationAlgorithm(v) + } + } + return 0 + }(), } - lres := make([]*payload.Search_Response, len(vecs)) - for i := 0; i < len(vecs); i++ { - if len(vecs[i]) != j.dimension { - log.Warn("len(vecs) ", len(vecs[i]), "is not matched with ", j.dimension) - continue - } - res, err := j.client.LinearSearch(ctx, &payload.Search_Request{ - Vector: vecs[i], - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): + sres := make([]*payload.Search_Response, j.dataset.Indexes) + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + iter := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start search: iter = %d", iter) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + return nil + // return errors.Join(err, context.Canceled) } select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + return egctx.Err() + case ech <- err: } - default: - st, _ := status.FromError(err) - if st.Code() != codes.NotFound { - log.Warnf("[benchmark job] linear search error is detected: code = %d, msg = %s", st.Code(), err.Error()) + } + loopCnt := math.Floor(float64(iter-1) / float64(len(vecs))) + idx := iter - 1 - (len(vecs) * int(loopCnt)) + if len(vecs[idx]) != j.dimension { + log.Warn("len(vecs) ", len(vecs[iter]), "is not matched with ", j.dimension) + return nil + } + res, err := j.client.Search(egctx, &payload.Search_Request{ + Vector: vecs[idx], + Config: cfg, + }) + if err != nil { + select { + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return nil + // return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // if st.Code() != codes.NotFound { + // log.Warnf("[benchmark job] search error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } + // } } } - } - lres[i] = res - } - sres := make([]*payload.Search_Response, len(vecs)) - log.Infof("[benchmark job] Start search") - for i := 0; i < len(vecs); i++ { - if len(vecs[i]) != j.dimension { - log.Warn("len(vecs) ", len(vecs[i]), "is not matched with ", j.dimension) - continue - } - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + if res != nil && j.searchConfig.EnableLinearSearch { + sres[iter-j.dataset.Range.Start] = res } - ech <- err - } - res, err := j.client.Search(ctx, &payload.Search_Request{ - Vector: vecs[i], - Config: cfg, + log.Debugf("[benchmark job] Finish search: iter = %d, len = %d", iter, len(res.Results)) + return nil }) - log.Infof("[benchmark job] search %d", i) - if err != nil { - select { - case <-ctx.Done(): - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] search error is detected: err = %s", err.Error()) + return err + } + if j.searchConfig.EnableLinearSearch { + lres := make([]*payload.Search_Response, j.dataset.Indexes) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + iter := i + eg.Go(func() error { + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) + if errors.Is(err, context.Canceled) { + // return errors.Join(err, context.Canceled) + return nil + } + select { + case <-egctx.Done(): + return egctx.Err() + case ech <- err: + } } - select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + log.Debugf("[benchmark job] Start linear search: iter = %d", iter) + loopCnt := math.Floor(float64(i-1) / float64(len(vecs))) + idx := iter - 1 - (len(vecs) * int(loopCnt)) + if len(vecs[idx]) != j.dimension { + log.Warn("len(vecs) ", len(vecs[idx]), "is not matched with ", j.dimension) + return nil } - default: - st, _ := status.FromError(err) - if st.Code() != codes.NotFound { - log.Warnf("[benchmark job] search error is detected: code = %d, msg = %s", st.Code(), err.Error()) + res, err := j.client.LinearSearch(egctx, &payload.Search_Request{ + Vector: vecs[idx], + Config: cfg, + }) + if err != nil { + select { + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // if st.Code() != codes.NotFound { + // log.Warnf("[benchmark job] linear search error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } + // } + } } - } + if res != nil { + lres[idx-j.dataset.Range.Start] = res + } + log.Debugf("[benchmark job] Finish linear search: iter = %d", iter) + return nil + }) } - sres[i] = res - } - recall := make([]float64, len(vecs)) - for i := 0; i < len(vecs); i++ { - recall[i] = calcRecall(lres[i], sres[i]) - log.Info("[branch job] search recall: ", recall[i]) + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] linear search error is detected: err = %s", err.Error()) + return err + } + recall := make([]float64, j.dataset.Indexes) + cnt := float64(0) + for i := 0; i < j.dataset.Indexes; i++ { + recall[i] = calcRecall(lres[i], sres[i]) + log.Info("[branch job] search recall: ", recall[i]) + cnt += recall[i] + } + log.Info("[benchmark job] Total search recall: ", (cnt / float64(len(vecs)))) } log.Info("[benchmark job] Finish benchmarking search") return nil diff --git a/pkg/tools/benchmark/job/service/update.go b/pkg/tools/benchmark/job/service/update.go index ac7a59a596..2c7ab12c49 100644 --- a/pkg/tools/benchmark/job/service/update.go +++ b/pkg/tools/benchmark/job/service/update.go @@ -19,18 +19,19 @@ package service import ( "context" + "math" "strconv" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) update(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking update") // create data - vecs := j.genVec(j.dataset) + vecs := j.hdf5.GetByGroupName(j.dataset.Group) cfg := &payload.Update_Config{ SkipStrictExistCheck: j.updateConfig.SkipStrictExistCheck, DisableBalancedUpdate: j.updateConfig.DisableBalancedUpdate, @@ -38,41 +39,56 @@ func (j *job) update(ctx context.Context, ech chan error) error { if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start update: iter = %d", i) - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) - } - ech <- err - } - res, err := j.client.Update(ctx, &payload.Update_Request{ - Vector: &payload.Object_Vector{ - Id: strconv.Itoa(i), - Vector: vecs[i], - }, - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + iter := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start update: iter = %d", iter) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) + // return errors.Join(err, context.Canceled) + return nil + } + select { + case <-egctx.Done(): + return egctx.Err() + case ech <- err: } + } + loopCnt := math.Floor(float64(iter-1) / float64(len(vecs))) + idx := iter - 1 - (len(vecs) * int(loopCnt)) + res, err := j.client.Update(egctx, &payload.Update_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(iter), + Vector: vecs[idx], + }, + Config: cfg, + }) + if err != nil { select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // log.Warnf("[benchmark job] update error is detected: code = %d, msg = %s\n", st.Code(), err.Error()) + // } } - default: - st, _ := status.FromError(err) - log.Warnf("[benchmark job] update error is detected: code = %d, msg = %s\n", st.Code(), err.Error()) } - } - if res != nil { - log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) - } + if res != nil { + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", iter, res.Name, res.Uuid, res.Ips) + } + log.Debugf("[benchmark job] Finish update: iter = %d", iter) + return nil + }) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] update error is detected: err = %s", err.Error()) + return err } log.Info("[benchmark job] Finish benchmarking upsert") return nil diff --git a/pkg/tools/benchmark/job/service/upsert.go b/pkg/tools/benchmark/job/service/upsert.go index 4b3d664a83..a31f337972 100644 --- a/pkg/tools/benchmark/job/service/upsert.go +++ b/pkg/tools/benchmark/job/service/upsert.go @@ -19,18 +19,19 @@ package service import ( "context" + "math" "strconv" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc/status" ) func (j *job) upsert(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking upsert") // create data - vecs := j.genVec(j.dataset) + vecs := j.hdf5.GetByGroupName(j.dataset.Group) cfg := &payload.Upsert_Config{ SkipStrictExistCheck: j.upsertConfig.SkipStrictExistCheck, DisableBalancedUpdate: j.upsertConfig.DisableBalancedUpdate, @@ -38,43 +39,56 @@ func (j *job) upsert(ctx context.Context, ech chan error) error { if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - for i := 0; i < len(vecs); i++ { - log.Infof("[benchmark job] Start upsert: iter = %d", i) - err := j.limiter.Wait(ctx) - if err != nil { - if errors.Is(err, context.Canceled) { - return errors.Join(err, context.Canceled) - } - ech <- err - } - res, err := j.client.Upsert(ctx, &payload.Upsert_Request{ - Vector: &payload.Object_Vector{ - Id: strconv.Itoa(i), - Vector: vecs[i], - }, - Config: cfg, - }) - if err != nil { - select { - case <-ctx.Done(): + eg, egctx := errgroup.New(ctx) + eg.Limitation(j.concurrencyLimit) + for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { + iter := i + eg.Go(func() error { + log.Debugf("[benchmark job] Start upsert: iter = %d", iter) + err := j.limiter.Wait(egctx) + if err != nil { + log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { return errors.Join(err, context.Canceled) } select { - case <-ctx.Done(): - return errors.Join(err, context.Canceled) - case ech <- errors.Join(err, ctx.Err()): + case <-egctx.Done(): + return egctx.Err() + case ech <- err: + } + } + loopCnt := math.Floor(float64(iter-1) / float64(len(vecs))) + idx := iter - 1 - (len(vecs) * int(loopCnt)) + res, err := j.client.Upsert(egctx, &payload.Upsert_Request{ + Vector: &payload.Object_Vector{ + Id: strconv.Itoa(iter), + Vector: vecs[idx], + }, + Config: cfg, + }) + if err != nil { + select { + case <-egctx.Done(): + log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) + return errors.Join(err, egctx.Err()) + default: + // if st, ok := status.FromError(err); ok { + // log.Warnf("[benchmark job] upsert error is detected: code = %d, msg = %s", st.Code(), err.Error()) + // } } - default: - st, _ := status.FromError(err) - log.Warnf("[benchmark job] upsert error is detected: code = %d, msg = %s", st.Code(), err.Error()) } - } - if res != nil { - log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", i, res.Name, res.Uuid, res.Ips) - } + if res != nil { + log.Infof("[benchmark job] iter=%d, Name=%s, Uuid=%s, Ips=%v", idx, res.Name, res.Uuid, res.Ips) + } + log.Debugf("[benchmark job] Finish upsert: iter = %d", iter) + return nil + }) + } + err := eg.Wait() + if err != nil { + log.Warnf("[benchmark job] upsert error is detected: err = %s", err.Error()) + return err } - log.Info("[benchmark job] Finish benchmarking upsert") return nil } diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index f714bf9563..400c6d580a 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -79,7 +79,6 @@ func New(cfg *config.Config) (r runner.Runner, err error) { if err != nil { return nil, err } - d, err := hdf5.New( hdf5.WithNameByString(cfg.Job.Dataset.Name), ) @@ -87,7 +86,6 @@ func New(cfg *config.Config) (r runner.Runner, err error) { return nil, err } log.Info("pkg/tools/benchmark/job/cmd success d") - job, err := service.New( service.WithErrGroup(eg), service.WithValdClient(vcli), @@ -105,6 +103,7 @@ func New(cfg *config.Config) (r runner.Runner, err error) { service.WithBeforeJobNamespace(cfg.Job.BeforeJobNamespace), service.WithK8sClient(cfg.K8sClient), service.WithRPS(cfg.Job.RPS), + service.WithConcurencyLimit(cfg.Job.ConcurrencyLimit), ) if err != nil { return nil, err @@ -144,6 +143,39 @@ func New(cfg *config.Config) (r runner.Runner, err error) { } } + if len(cfg.Server.MetricsServers) == 0 { + cfg.Server.MetricsServers = []*iconf.Server{ + { + Name: "pprof", + Host: "0.0.0.0", + Port: uint16(6060), + HTTP: &iconf.HTTP{ + HandlerTimeout: "5s", + IdleTimeout: "2s", + ReadHeaderTimeout: "1s", + ReadTimeout: "1s", + ShutdownDuration: "5s", + WriteTimeout: "1m", + }, + Mode: "REST", + Network: "tcp", + ProbeWaitTime: "3s", + SocketOption: &iconf.SocketOption{ + IPRecoverDestinationAddr: false, + IPTransparent: false, + ReuseAddr: true, + ReusePort: true, + TCPCork: false, + TCPDeferAccept: true, + TCPFastOpen: true, + TCPNoDelay: true, + TCPQuickAck: true, + }, + SocketPath: "", + }, + } + } + srv, err := starter.New( starter.WithConfig(cfg.Server), starter.WithREST(func(sc *iconf.Server) []server.Option { diff --git a/pkg/tools/benchmark/operator/config/config.go b/pkg/tools/benchmark/operator/config/config.go index 45cec91f51..cf9acbdfd1 100644 --- a/pkg/tools/benchmark/operator/config/config.go +++ b/pkg/tools/benchmark/operator/config/config.go @@ -37,6 +37,9 @@ type Config struct { // Scenario represents benchmark scenario configurations Scenario *config.BenchmarkScenario `json:"scenario" yaml:"scenario"` + + // JobImage represents the location of Docker image for benchmark job and its ImagePullPolicy + JobImage *config.BenchmarkJobImageInfo `json:"job_image" yaml:"job_image"` } // NewConfig represents the set config from the given setting file path. @@ -58,6 +61,12 @@ func NewConfig(path string) (cfg *Config, err error) { cfg.Observability = cfg.Observability.Bind() } + if cfg.JobImage != nil { + cfg.JobImage = cfg.JobImage.Bind() + } else { + cfg.JobImage = new(config.BenchmarkJobImageInfo) + } + if cfg.Scenario != nil { cfg.Scenario = cfg.Scenario.Bind() } else { diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 57b6cb4d15..c3c6872859 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -47,19 +47,22 @@ type scenario struct { const ( Scenario = "scenario" + ScenarioKind = "ValdBenchmarkScenario" BenchmarkName = "benchmark-name" BeforeJobName = "before-job-name" BeforeJobNamespace = "before-job-namespace" ) type operator struct { - jobNamespace string - scenarios atomic.Pointer[map[string]*scenario] - benchjobs atomic.Pointer[map[string]*v1.ValdBenchmarkJob] - jobs atomic.Pointer[map[string]string] - rcd time.Duration // reconcile check duration - eg errgroup.Group - ctrl k8s.Controller + jobNamespace string + jobImage string + jobImagePullPolicy string + scenarios atomic.Pointer[map[string]*scenario] + benchjobs atomic.Pointer[map[string]*v1.ValdBenchmarkJob] + jobs atomic.Pointer[map[string]string] + rcd time.Duration // reconcile check duration + eg errgroup.Group + ctrl k8s.Controller } // New creates the new scenario struct to handle vald benchmark job scenario. @@ -251,6 +254,11 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin if err != nil { log.Warnf("[reconcile benchmark job resource] failed to delete old version job: job name=%s, version=%d\t%s", oldJob.GetName(), oldJob.GetGeneration(), err.Error()) } + // create new version job + err = o.createJob(ctx, job) + if err != nil { + log.Errorf("[reconcile benchmark job resource] failed to create new version job: %s", err.Error()) + } cbjl[k] = &job } } else if oldJob.Status == "" { @@ -326,7 +334,7 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[ // create new benchmark job resources of new version. jobNames, err := o.createBenchmarkJob(ctx, sc) if err != nil { - log.Errorf("[reconcile benchmark scenario resource] failed to create benchmark job resource: %s", err.Error()) + log.Errorf("[reconcile benchmark scenario resource] failed to create new version benchmark job resource: %s", err.Error()) } cbsl[name] = &scenario{ Crd: &sc, @@ -373,12 +381,18 @@ func (o *operator) deleteBenchmarkJob(ctx context.Context, name string, generati // deleteJob deletes job resource according to given benchmark job name and generation. func (o *operator) deleteJob(ctx context.Context, name string, generation int64) error { - opts := new(client.DeleteAllOfOptions) - client.MatchingLabels(map[string]string{ - BenchmarkName: name + strconv.Itoa(int(generation)), - }).ApplyToDeleteAllOf(opts) - client.InNamespace(o.jobNamespace).ApplyToDeleteAllOf(opts) - return o.ctrl.GetManager().GetClient().DeleteAllOf(ctx, &job.Job{}, opts) + cj := new(job.Job) + err := o.ctrl.GetManager().GetClient().Get(ctx, client.ObjectKey{ + Namespace: o.jobNamespace, + Name: name, + }, cj) + if err != nil { + return err + } + opts := new(client.DeleteOptions) + deleteProgation := client.DeletePropagationBackground + opts.PropagationPolicy = &deleteProgation + return o.ctrl.GetManager().GetClient().Delete(ctx, cj, opts) } // createBenchmarkJob creates the ValdBenchmarkJob crd for running job. @@ -434,10 +448,18 @@ func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBench // createJob creates benchmark job from benchmark job resource. func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error { label := map[string]string{ - BenchmarkName: bjr.GetName() + strconv.Itoa(int(bjr.Generation)), + BenchmarkName: bjr.GetName() + strconv.Itoa(int(bjr.GetGeneration())), } - job, err := benchjob.NewBenchmarkJobTemplate( - benchjob.WithName(bjr.Name), + job, err := benchjob.NewBenchmarkJob( + benchjob.WithContainerName(bjr.GetName()), + benchjob.WithContainerImage(o.jobImage), + benchjob.WithImagePullPolicy(benchjob.ImagePullPolicy(o.jobImagePullPolicy)), + ) + if err != nil { + return err + } + tpl, err := job.CreateJobTpl( + benchjob.WithName(bjr.GetName()), benchjob.WithNamespace(bjr.Namespace), benchjob.WithLabel(label), benchjob.WithCompletions(int32(bjr.Spec.Repetition)), @@ -450,14 +472,15 @@ func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error UID: bjr.UID, }, }), + benchjob.WithTTLSecondsAfterFinished(int32(bjr.Spec.TTLSecondsAfterFinished)), ) if err != nil { - return err + return nil } // create job c := o.ctrl.GetManager().GetClient() - if err = c.Create(ctx, &job); err != nil { - return errors.ErrFailedToCreateJob(err, job.GetName()) + if err = c.Create(ctx, &tpl); err != nil { + return errors.ErrFailedToCreateJob(err, tpl.GetName()) } return nil } @@ -546,37 +569,59 @@ func (o *operator) checkAtomics() error { cjl := o.getAtomicJob() cbjl := o.getAtomicBenchJob() cbsl := o.getAtomicScenario() - if len(cjl) == 0 { - if len(cbjl) > 0 || len(cbsl) > 0 { - log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) - return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) - } - return nil - } else if len(cbjl) == 0 { - log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + bjCompletedCnt := 0 + bjAvailableCnt := 0 + + if len(cbjl) == 0 && len(cbsl) > 0 && len(cjl) > 0 { + log.Errorf("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) } - jobCounter := len(cjl) - scenarioBenchCounter := 0 - for sc := range cbsl { - scenarioBenchCounter += len(cbsl[sc].BenchJobStatus) - } - for jobName := range cjl { - if benchJob := cbjl[jobName]; benchJob != nil { - jobCounter-- - if owner := benchJob.GetOwnerReferences(); len(owner) > 0 { - scenarioName := owner[0].Name - if scenario := cbsl[scenarioName]; scenario != nil { - if _, ok := scenario.BenchJobStatus[benchJob.GetName()]; ok { - scenarioBenchCounter-- - } + + for _, bj := range cbjl { + // check bench and job + if bj.Status == v1.BenchmarkJobCompleted { + bjCompletedCnt++ + } else { + bjAvailableCnt++ + if ns, ok := cjl[bj.GetName()]; !ok || ns != bj.GetNamespace() { + log.Errorf("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) + } + } + // check scenario and bench + if owners := bj.GetOwnerReferences(); len(owners) > 0 { + var scenarioName string + for _, o := range owners { + if o.Kind == ScenarioKind { + scenarioName = o.Name + } + } + if sc := cbsl[scenarioName]; sc != nil { + if sc.BenchJobStatus[bj.Name] != bj.Status { + log.Errorf("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) } + } else { + log.Errorf("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) } } } - if jobCounter != 0 || scenarioBenchCounter != 0 { - log.Error("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) - return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) + // check benchmarkjob status list and scenario benchmark job status list + if len(cbsl) > 0 { + for _, sc := range cbsl { + for _, status := range sc.BenchJobStatus { + if status == v1.BenchmarkJobCompleted { + bjCompletedCnt-- + } else { + bjAvailableCnt-- + } + } + } + if bjAvailableCnt != 0 || bjCompletedCnt != 0 { + log.Errorf("mismatch atomics: job=%v, benchjob=%v, scenario=%v", cjl, cbjl, cbsl) + return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl) + } } return nil } diff --git a/pkg/tools/benchmark/operator/service/option.go b/pkg/tools/benchmark/operator/service/option.go index a34242f5a5..f38c10f38e 100644 --- a/pkg/tools/benchmark/operator/service/option.go +++ b/pkg/tools/benchmark/operator/service/option.go @@ -28,6 +28,8 @@ import ( type Option func(o *operator) error var defaultOpts = []Option{ + WithJobImage("vdaas/vald-benchmark-job"), + WithJobImagePullPolicy("Always"), WithReconcileCheckDuration("10s"), WithJobNamespace("default"), } @@ -66,3 +68,23 @@ func WithJobNamespace(ns string) Option { return nil } } + +// WithJobImage sets the benchmark job docker image info. +func WithJobImage(image string) Option { + return func(o *operator) error { + if len(image) > 0 { + o.jobImage = image + } + return nil + } +} + +// WithJobImagePullPolicy sets the benchmark job docker image pullPolicy. +func WithJobImagePullPolicy(p string) Option { + return func(o *operator) error { + if len(p) > 0 { + o.jobImagePullPolicy = p + } + return nil + } +} diff --git a/pkg/tools/benchmark/operator/usecase/benchmarkd.go b/pkg/tools/benchmark/operator/usecase/benchmarkd.go index 9f77e9ae15..e867473bc5 100644 --- a/pkg/tools/benchmark/operator/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/operator/usecase/benchmarkd.go @@ -19,6 +19,7 @@ package usecase import ( "context" + "os" iconf "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errgroup" @@ -48,6 +49,8 @@ type run struct { observability observability.Observability } +var JOB_NAMESPACE = os.Getenv("JOB_NAMESPACE") + func New(cfg *config.Config) (r runner.Runner, err error) { log.Info("pkg/tools/benchmark/scenario/cmd start") @@ -57,6 +60,9 @@ func New(cfg *config.Config) (r runner.Runner, err error) { operator, err := service.New( service.WithErrGroup(eg), + service.WithJobNamespace(JOB_NAMESPACE), + service.WithJobImage(cfg.JobImage.Image), + service.WithJobImagePullPolicy(cfg.JobImage.PullPolicy), ) if err != nil { return nil, err