From 366cd02c9ed6b288a8aad0a02bfea67c1ab5a0c9 Mon Sep 17 00:00:00 2001 From: Yusuke Kadowaki Date: Tue, 10 Oct 2023 10:20:16 +0900 Subject: [PATCH 1/4] Add index correction helm templates and E2E (#2200) * implement the initail framework * add corrector configuration * add corrector logic * add build make command for index correction binary * add Dockerfile for index correction * add Docker image for index job correction * add timer * fix tag align * tmp * fix log * temporally implement two versions of correct function * set eg limit from config * add stream list concurrency config * implement index id caching * add config to use cache or not * style: Format code with prettier and gofumpt * refactor availableAddrs * add kvs range duration * add leftAgentAddrs for performance * Revert "add kvs range duration" This reverts commit 5b647be6ccc0f9be7e78e38c89ea8897fa3ee574. * refactor * fix without cache bug * enable observability * refactor * SIGTERM after complete * add metrics server * add pcache * remove comment * [TEMP] use pcache * [TMP] use pcache * fix empty shard returns error * fix to use local map * [TMP] add prestop for pcache * [TEMP] add pcache config * style: Format code with prettier and gofumpt * [TEMP] add pcache log * fix map alloc size * [TMP] Add bbolt cache * update bbolt * fix bbolt bug * add bbolt test * [TEMP] use bbolt as persistent cache * style: Format code with prettier and gofumpt * add SetBatch to bbolt * use batch to write map to disk * style: Format code with prettier and gofumpt * delete the map elements on finalize * manually call GC after the map shrink * add limit to SetBatch goroutine number * stop unnecesarry GC * increase eg limit to the MaxBatchSize * use ch to set batch bbolt * fix servers shutdown properly * use internal/kvs/bbolt * refactor * always use bbolt cache for correction * update sample.yaml for correction * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in 319ec8b according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2152 * use go std slices pkg * refactor * add comment * remove valdsync * use vald errgroup * refactor * Define ErrNoAvailableAgentToInsert * update comment in English * Apply new actions yaml format * Disable godox * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in c860ddc according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2194 * remove comment * Apply format * Add type check for type assertion * use const to specify filemode * Add bbolt concurrency as config * fix var style * Suppress linter * fix comment * add test template * Refactor parameters for index correction * Refactor config * Add corrector test * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in 004bf81 according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2194 * Add timestamp check * Apply format * fix schema type * Fix DeepSource errors * Fix misspell * Add type check * Remove unused config * Fix DeepSource error * Add required go:build e2e tag * Remove memo * Refactor comment * Add index job correction helm templates * Add more fields * Add index correction job E2E test * Add e2e action for job * [REVERT THIS] Temporally change version * Fix name and command * Apply format * update crd * Revert "[REVERT THIS] Temporally change version" This reverts commit 1801a63b2bb8826960c3596f42637933f0eab6e6. * Remove unused pkg * Remove experimental file * remove old workflow * Fix cron job name to new one * Update sample.yaml * fix build path * Fix corrector name * add e2e-jobs to slack notification * Update crds --------- Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --- .github/helm/values/values-lb.yaml | 5 + .github/workflows/e2e.yml | 37 + Makefile.d/build.mk | 2 +- Makefile.d/e2e.mk | 6 +- Makefile.d/k8s.mk | 14 +- .../vald-helm-operator/crds/valdrelease.yaml | 965 ++++++++++++++++++ .../vald/templates/agent/networkpolicy.yaml | 7 + .../templates/discoverer/networkpolicy.yaml | 7 + .../index/job/correction/configmap.yaml | 71 ++ .../index/job/correction/cronjob.yaml | 62 ++ charts/vald/values.yaml | 62 +- cmd/index/job/correction/sample.yaml | 5 +- tests/e2e/crud/crud_test.go | 36 + tests/e2e/operation/job.go | 121 +++ tests/e2e/operation/operation.go | 30 +- 15 files changed, 1410 insertions(+), 20 deletions(-) create mode 100644 charts/vald/templates/index/job/correction/configmap.yaml create mode 100644 charts/vald/templates/index/job/correction/cronjob.yaml create mode 100644 tests/e2e/operation/job.go diff --git a/.github/helm/values/values-lb.yaml b/.github/helm/values/values-lb.yaml index 591a269672..bc0043b6a9 100644 --- a/.github/helm/values/values-lb.yaml +++ b/.github/helm/values/values-lb.yaml @@ -69,3 +69,8 @@ manager: auto_index_duration_limit: 2m auto_index_check_duration: 30s auto_index_length: 1000 + corrector: + enabled: true + # suspend because you do not want corrector to start automatically in CI + # instead run it manually + suspend: true diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 924865531f..20a9e60c8c 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -227,6 +227,42 @@ jobs: env: POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }} + e2e-jobs: + name: "E2E test (Jobs)" + needs: [dump-contexts-to-log] + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v3 + + - name: Set Git config + run: | + git config --global --add safe.directory ${GITHUB_WORKSPACE} + + - name: Setup E2E environment + id: setup_e2e + uses: ./.github/actions/setup-e2e + + - name: Deploy Vald + id: deploy_vald + uses: ./.github/actions/e2e-deploy-vald + with: + helm_extra_options: ${{ steps.setup_e2e.outputs.HELM_EXTRA_OPTIONS }} + values: .github/helm/values/values-lb.yaml + wait_for_selector: app=vald-lb-gateway + + - name: Run E2E Jobs + run: | + make hack/benchmark/assets/dataset/${{ env.DATASET }} + make E2E_BIND_PORT=8081 \ + E2E_INSERT_COUNT=10000\ + E2E_WAIT_FOR_CREATE_INDEX_DURATION=3m \ + E2E_TARGET_POD_NAME=${POD_NAME} \ + E2E_TARGET_NAMESPACE=default \ + e2e/index/job/correction + env: + POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }} + e2e-agent-and-sidecar: name: "E2E Agent & Sidecar test" needs: [dump-contexts-to-log] @@ -278,6 +314,7 @@ jobs: - e2e-stream-crud - e2e-stream-crud-for-operator - e2e-stream-crud-skip-exist-check + - e2e-jobs runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 5f0d2089e1..f2f8275b46 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -209,7 +209,7 @@ cmd/manager/index/index: \ cmd/index/job/correction/index-correction: \ $(GO_SOURCES_INTERNAL) \ $(PBGOS) \ - $(shell find $(ROOTDIR)/cmd/index/job/correction/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ + $(shell find $(ROOTDIR)/cmd/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ $(shell find $(ROOTDIR)/pkg/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') $(eval CGO_ENABLED = 0) CGO_ENABLED=$(CGO_ENABLED) \ diff --git a/Makefile.d/e2e.mk b/Makefile.d/e2e.mk index 8cd6e43800..f830343788 100644 --- a/Makefile.d/e2e.mk +++ b/Makefile.d/e2e.mk @@ -69,6 +69,11 @@ e2e/remove/timestamp: e2e/insert/search: $(call run-e2e-crud-test,-run TestE2EInsertAndSearch) +.PHONY: e2e/index/job/correction +## run index correction job e2e +e2e/index/job/correction: + $(call run-e2e-crud-test,-run TestE2EIndexJobCorrection) + .PHONY: e2e/maxdim ## run e2e/maxdim e2e/maxdim: @@ -79,4 +84,3 @@ e2e/maxdim: e2e/sidecar: $(call run-e2e-sidecar-test,-run TestE2EForSidecar) - diff --git a/Makefile.d/k8s.mk b/Makefile.d/k8s.mk index 0962b95407..a3b0868056 100644 --- a/Makefile.d/k8s.mk +++ b/Makefile.d/k8s.mk @@ -23,22 +23,26 @@ k8s/manifest/clean: k8s/agent \ k8s/discoverer \ k8s/gateway \ - k8s/manager + k8s/manager \ + k8s/index .PHONY: k8s/manifest/update ## update k8s manifests using helm templates k8s/manifest/update: \ k8s/manifest/clean helm template \ - --values $(HELM_VALUES) \ - --output-dir $(TEMP_DIR) \ - charts/vald + --values $(HELM_VALUES) \ + --set defaults.image.tag=$(VERSION) \ + --output-dir $(TEMP_DIR) \ + charts/vald mkdir -p k8s/gateway mkdir -p k8s/manager + mkdir -p k8s/index/job mv $(TEMP_DIR)/vald/templates/agent k8s/agent mv $(TEMP_DIR)/vald/templates/discoverer k8s/discoverer mv $(TEMP_DIR)/vald/templates/gateway/lb k8s/gateway/lb mv $(TEMP_DIR)/vald/templates/manager/index k8s/manager/index + mv $(TEMP_DIR)/vald/templates/index/job/correction k8s/index/job/correction rm -rf $(TEMP_DIR) .PHONY: k8s/manifest/helm-operator/clean @@ -80,6 +84,7 @@ k8s/vald/deploy: kubectl apply -f $(TEMP_DIR)/vald/templates/agent || true kubectl apply -f $(TEMP_DIR)/vald/templates/discoverer || true kubectl apply -f $(TEMP_DIR)/vald/templates/gateway/lb || true + kubectl apply -f $(TEMP_DIR)/vald/templates/index/job/correction || true rm -rf $(TEMP_DIR) kubectl get pods -o jsonpath="{.items[*].spec.containers[*].image}" | tr " " "\n" @@ -97,6 +102,7 @@ k8s/vald/delete: --set manager.index.image.repository=$(CRORG)/$(MANAGER_INDEX_IMAGE) \ --output-dir $(TEMP_DIR) \ charts/vald + kubectl delete -f $(TEMP_DIR)/vald/templates/index/job/correction kubectl delete -f $(TEMP_DIR)/vald/templates/gateway/lb kubectl delete -f $(TEMP_DIR)/vald/templates/manager/index kubectl delete -f $(TEMP_DIR)/vald/templates/discoverer diff --git a/charts/vald-helm-operator/crds/valdrelease.yaml b/charts/vald-helm-operator/crds/valdrelease.yaml index 46c151ab94..3854881577 100644 --- a/charts/vald-helm-operator/crds/valdrelease.yaml +++ b/charts/vald-helm-operator/crds/valdrelease.yaml @@ -6068,6 +6068,971 @@ spec: annotations: type: object x-kubernetes-preserve-unknown-fields: true + corrector: + type: object + properties: + agent_namespace: + type: string + bbolt_async_write_concurrency: + type: integer + minimum: 1 + discoverer: + type: object + properties: + agent_client_options: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + client: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + duration: + type: string + enabled: + type: boolean + env: + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string + name: + type: string + node_name: + type: string + observability: + type: object + properties: + enabled: + type: boolean + metrics: + type: object + properties: + enable_cgo: + type: boolean + enable_goroutine: + type: boolean + enable_memory: + type: boolean + enable_version_info: + type: boolean + version_info_labels: + type: array + items: + type: string + enum: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - cgo_enabled + - ngt_version + - build_cpu_info_flags + otlp: + type: object + properties: + attribute: + type: object + properties: + namespace: + type: string + node_name: + type: string + pod_name: + type: string + service_name: + type: string + collector_endpoint: + type: string + metrics_export_interval: + type: string + metrics_export_timeout: + type: string + trace_batch_timeout: + type: string + trace_export_timeout: + type: string + trace_max_export_batch_size: + type: integer + trace_max_queue_size: + type: integer + trace: + type: object + properties: + enabled: + type: boolean + schedule: + type: string + server_config: + type: object + properties: + full_shutdown_duration: + type: string + healths: + type: object + properties: + liveness: + type: object + properties: + enabled: + type: boolean + host: + type: string + livenessProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + readiness: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + readinessProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + startup: + type: object + properties: + enabled: + type: boolean + port: + type: integer + maximum: 65535 + minimum: 0 + startupProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + metrics: + type: object + properties: + pprof: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + servers: + type: object + properties: + grpc: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + grpc: + type: object + properties: + bidirectional_stream_concurrency: + type: integer + connection_timeout: + type: string + enable_reflection: + type: boolean + header_table_size: + type: integer + initial_conn_window_size: + type: integer + initial_window_size: + type: integer + interceptors: + type: array + items: + type: string + enum: + - RecoverInterceptor + - AccessLogInterceptor + - TraceInterceptor + - MetricInterceptor + keepalive: + type: object + properties: + max_conn_age: + type: string + max_conn_age_grace: + type: string + max_conn_idle: + type: string + min_time: + type: string + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_header_list_size: + type: integer + max_receive_message_size: + type: integer + max_send_message_size: + type: integer + read_buffer_size: + type: integer + write_buffer_size: + type: integer + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + restart: + type: boolean + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + rest: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + startingDeadlineSeconds: + type: integer + stream_list_concurrency: + type: integer + minimum: 1 + suspend: + type: boolean + ttlSecondsAfterFinished: + type: integer + version: + type: string + pattern: ^v[0-9]+\.[0-9]+\.[0-9]$ enabled: type: boolean env: diff --git a/charts/vald/templates/agent/networkpolicy.yaml b/charts/vald/templates/agent/networkpolicy.yaml index e6e1ef90bb..ef51ea5439 100644 --- a/charts/vald/templates/agent/networkpolicy.yaml +++ b/charts/vald/templates/agent/networkpolicy.yaml @@ -17,6 +17,7 @@ {{- $agent := .Values.agent -}} {{- $lb := .Values.gateway.lb -}} {{- $index := .Values.manager.index -}} +{{- $corrector := .Values.manager.index.corrector -}} {{- if .Values.defaults.networkPolicy.enabled }} apiVersion: networking.k8s.io/v1 kind: NetworkPolicy @@ -44,6 +45,12 @@ spec: podSelector: matchLabels: app: {{ $index.name }} + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: {{ .Release.Namespace }} + podSelector: + matchLabels: + app: {{ $corrector.name }} {{- if .Values.defaults.networkPolicy.custom.ingress }} {{- toYaml .Values.defaults.networkPolicy.custom.ingress | nindent 4 }} {{- end }} diff --git a/charts/vald/templates/discoverer/networkpolicy.yaml b/charts/vald/templates/discoverer/networkpolicy.yaml index 065cc97b59..41803702dd 100644 --- a/charts/vald/templates/discoverer/networkpolicy.yaml +++ b/charts/vald/templates/discoverer/networkpolicy.yaml @@ -17,6 +17,7 @@ {{- $discoverer := .Values.discoverer -}} {{- $lb := .Values.gateway.lb -}} {{- $index := .Values.manager.index -}} +{{- $corrector := .Values.manager.index.corrector -}} {{- if .Values.defaults.networkPolicy.enabled }} apiVersion: networking.k8s.io/v1 kind: NetworkPolicy @@ -46,6 +47,12 @@ spec: podSelector: matchLabels: app: {{ $index.name }} + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: {{ .Release.Namespace }} + podSelector: + matchLabels: + app: {{ $corrector.name }} {{- if .Values.defaults.networkPolicy.custom.ingress }} {{- toYaml .Values.defaults.networkPolicy.custom.ingress | nindent 4 }} {{- end }} diff --git a/charts/vald/templates/index/job/correction/configmap.yaml b/charts/vald/templates/index/job/correction/configmap.yaml new file mode 100644 index 0000000000..77db8328e2 --- /dev/null +++ b/charts/vald/templates/index/job/correction/configmap.yaml @@ -0,0 +1,71 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{{- $corrector := .Values.manager.index.corrector -}} +{{- $gateway := .Values.gateway.lb -}} +{{- $index := .Values.manager.index -}} +{{- $agent := .Values.agent -}} +{{- $discoverer := .Values.discoverer -}} +{{- if $corrector.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ $corrector.name }}-config + labels: + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} + app.kubernetes.io/component: {{ $corrector.name }} +data: + config.yaml: | + --- + version: {{ $corrector.version }} + time_zone: {{ default .Values.defaults.time_zone $corrector.time_zone }} + logging: + {{- $logging := dict "Values" $corrector.logging "default" .Values.defaults.logging }} + {{- include "vald.logging" $logging | nindent 6 }} + server_config: + {{- $servers := dict "Values" $corrector.server_config "default" .Values.defaults.server_config }} + {{- include "vald.servers" $servers | nindent 6 }} + observability: + {{- $observability := dict "Values" $corrector.observability "default" .Values.defaults.observability }} + {{- include "vald.observability" $observability | nindent 6 }} + corrector: + agent_port: {{ default .Values.defaults.server_config.servers.grpc.port $agent.server_config.servers.grpc.port }} + agent_name: {{ $agent.name | quote }} + agent_dns: {{ $agent.name }}.{{ .Release.Namespace }}.svc.cluster.local + agent_namespace: {{ $index.indexer.agent_namespace | quote }} + node_name: {{ $index.indexer.node_name | quote }} + stream_list_concurrency: {{ $corrector.stream_list_concurrency }} + bbolt_async_write_concurrency: {{ $corrector.bbolt_async_write_concurrency }} + index_replica: {{ $gateway.gateway_config.index_replica }} + discoverer: + duration: {{ $corrector.discoverer.duration }} + client: + {{- $discovererClient := $index.indexer.discoverer.client }} + {{- $discovererServerPort := $discoverer.server_config.servers.grpc.port }} + {{- $defaultDiscovererHost := printf "%s.%s.svc.cluster.local" $discoverer.name .Release.Namespace }} + {{- $defaultDiscovererPort := default .Values.defaults.server_config.servers.grpc.port $discovererServerPort }} + {{- $defaultDiscovererAddr := (list (printf "%s:%d" $defaultDiscovererHost (int64 $defaultDiscovererPort))) }} + {{- $discovererAddrs := dict "Values" $discovererClient.addrs "default" $defaultDiscovererAddr }} + {{- include "vald.grpc.client.addrs" $discovererAddrs | nindent 10 }} + {{- $discovererGRPCclient := dict "Values" $discovererClient "default" .Values.defaults.grpc.client }} + {{- include "vald.grpc.client" $discovererGRPCclient | nindent 10 }} + agent_client_options: + {{- include "vald.grpc.client.addrs" (dict "Values" $corrector.discoverer.agent_client_options.addrs) | nindent 10 }} + {{- include "vald.grpc.client" (dict "Values" $corrector.discoverer.agent_client_options "default" .Values.defaults.grpc.client) | nindent 10 }} +{{- end }} diff --git a/charts/vald/templates/index/job/correction/cronjob.yaml b/charts/vald/templates/index/job/correction/cronjob.yaml new file mode 100644 index 0000000000..70c3f408f5 --- /dev/null +++ b/charts/vald/templates/index/job/correction/cronjob.yaml @@ -0,0 +1,62 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{{- $corrector := .Values.manager.index.corrector -}} +{{- if $corrector.enabled }} +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ $corrector.name }} + labels: + app: {{ $corrector.name }} + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} +spec: + schedule: {{ $corrector.schedule }} + concurrencyPolicy: Forbid + suspend: {{ $corrector.suspend }} + startingDeadlineSeconds: {{ $corrector.startingDeadlineSeconds }} + jobTemplate: + spec: + ttlSecondsAfterFinished: {{ $corrector.ttlSecondsAfterFinished }} + template: + metadata: + labels: + app: {{ $corrector.name }} + spec: + containers: + - name: {{ $corrector.name }} + image: "{{ $corrector.image.repository }}:{{ default .Values.defaults.image.tag $corrector.image.tag }}" + imagePullPolicy: {{ $corrector.image.pullPolicy }} + volumeMounts: + - name: {{ $corrector.name }}-config + mountPath: /etc/server/ + {{- $servers := dict "Values" $corrector.server_config "default" .Values.defaults.server_config -}} + {{- include "vald.containerPorts" $servers | trim | nindent 14 }} + {{- if $corrector.env }} + env: + {{- toYaml $corrector.env | nindent 16 }} + {{- end }} + restartPolicy: OnFailure + volumes: + - name: {{ $corrector.name }}-config + configMap: + defaultMode: 420 + name: {{ $corrector.name }}-config +{{- end }} diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 0cc8c02bbb..f095a921bf 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -2633,12 +2633,68 @@ manager: keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual # @schema {"name": "manager.index.corrector", "type": "object"} corrector: + # @schema {"name": "manager.index.corrector.name", "type": "string"} + # manager.index.corrector.name -- name of index correction job + name: vald-index-correction + # @schema {"name": "manager.index.corrector.image", "alias": "image"} + image: + # manager.index.corrector.image.repository -- image repository + repository: vdaas/vald-index-correction + # manager.index.corrector.image.tag -- image tag (overrides defaults.image.tag) + tag: "" + # manager.index.image.pullPolicy -- image pull policy + pullPolicy: Always + # @schema {"name": "manager.index.corrector.server_config", "alias": "server_config"} + # manager.index.corrector.server_config -- server config (overrides defaults.server_config) + server_config: + servers: + rest: {} + grpc: {} + healths: + liveness: {} + readiness: {} + startup: {} + metrics: + pprof: {} + # @schema {"name": "manager.index.corrector.env", "alias": "env"} + # manager.index.corrector.env -- environment variables + env: + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: MY_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + # @schema {"name": "manager.index.corrector.observability", "alias": "observability"} + # manager.index.corrector.observability -- observability config (overrides defaults.observability) + observability: + otlp: + attribute: + service_name: vald-manager-index # @schema {"name": "manager.index.corrector.enabled", "type": "boolean"} # manager.index.corrector.enabled -- enable index correction CronJob enabled: false - # @schema {"name": "manager.index.corrector.check_duration", "type": "string"} - # manager.index.corrector.enabled -- check duration of index correction CronJob - check_duration: 24h + # @schema {"name": "manager.index.corrector.schedule", "type": "string"} + # manager.index.corrector.schedule -- CronJob schedule setting for index correction + schedule: "5 * * * *" + # @schema {"name": "manager.index.corrector.suspend", "type": "boolean"} + # manager.index.corrector.suspend -- CronJob suspend setting for index correction + suspend: false + # @schema {"name": "manager.index.corrector.ttlSecondsAfterFinished", "type": "integer"} + # manager.index.corrector.ttlSecondsAfterFinished -- ttl setting for K8s completed jobs + ttlSecondsAfterFinished: 86400 + # @schema {"name": "manager.index.corrector.startingDeadlineSeconds", "type": "integer"} + # manager.index.corrector.startingDeadlineSeconds -- startingDeadlineSeconds setting for K8s completed jobs + startingDeadlineSeconds: 86400 + # @schema {"name": "manager.index.corrector.version", "alias": "version"} + # manager.index.corrector.version -- version of index manager config + version: v0.0.0 # @schema {"name": "manager.index.corrector.stream_list_concurrency", "type": "integer", "minimum": 1} # manager.index.corrector.stream_list_concurrency -- concurrency for stream list object rpc stream_list_concurrency: 200 diff --git a/cmd/index/job/correction/sample.yaml b/cmd/index/job/correction/sample.yaml index 09ad7dc5ca..bc3f144e41 100644 --- a/cmd/index/job/correction/sample.yaml +++ b/cmd/index/job/correction/sample.yaml @@ -70,8 +70,6 @@ server_config: cert: /path/to/cert enabled: false key: /path/to/key -gateway: - index_replica: 3 corrector: agent_port: 8081 agent_name: "vald-agent-ngt" @@ -80,6 +78,7 @@ corrector: node_name: "" stream_list_concurrency: 200 bbolt_async_write_concurrency: 2048 + index_replica: 3 discoverer: duration: 500ms client: @@ -215,7 +214,7 @@ observability: namespace: "_MY_POD_NAMESPACE_" pod_name: "_MY_POD_NAME_" node_name: "_MY_NODE_NAME_" - service_name: "vald-index-job-correction" + service_name: "vald-index-correction" metrics: enable_cgo: true enable_goroutine: true diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 97e3428e08..f78a99c6aa 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -738,3 +738,39 @@ func TestE2ECRUDWithSkipStrictExistCheck(t *testing.T) { t.Fatalf("an error occurred on #13: %s", err) } } + +// TestE2EIndexJobCorrection tests the index correction job. +// It inserts vectors, runs the index correction job, and then removes the vectors. +// TODO: Add index replica count check after inplementing StreamListObject in LB +func TestE2EIndexJobCorrection(t *testing.T) { + t.Cleanup(teardown) + ctx := context.Background() + + op, err := operation.New(host, port) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + train := ds.Train[insertFrom : insertFrom+insertNum] + err = op.Insert(t, ctx, operation.Dataset{ + Train: train, + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + + exe := operation.NewCronJobExecutor("vald-index-correction") + err = exe.CreateAndWait(t, ctx, "correction-test") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Remove(t, ctx, operation.Dataset{ + Train: train, + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } +} diff --git a/tests/e2e/operation/job.go b/tests/e2e/operation/job.go new file mode 100644 index 0000000000..9c3567929d --- /dev/null +++ b/tests/e2e/operation/job.go @@ -0,0 +1,121 @@ +//go:build e2e + +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package operation + +import ( + "context" + "fmt" + "os/exec" + "testing" +) + +func (j *cronJobExecute) CreateAndWait(t *testing.T, ctx context.Context, jobName string) error { + if err := createJob(t, jobName, j.cronJob); err != nil { + return err + } + + defer func() { + err := deleteJob(t, jobName) + if err != nil { + t.Errorf("failed to delete job: %s", err) + } + }() + + return waitJob(t, ctx, jobName) +} + +func createJob(t *testing.T, jobName, cronJobName string) error { + t.Helper() + t.Logf("creating job: %s from CronJob %s", jobName, cronJobName) + createCmd := fmt.Sprintf("kubectl create job %s --from=cronjob/%s", jobName, cronJobName) + cmd := exec.Command("sh", "-c", createCmd) + return execCmd(t, cmd) +} + +func deleteJob(t *testing.T, jobName string) error { + t.Helper() + t.Log("deleting correction job") + deleteKubeCmd := fmt.Sprintf("kubectl delete job %s", jobName) + cmd := exec.Command("sh", "-c", deleteKubeCmd) + return execCmd(t, cmd) +} + +func waitJob(t *testing.T, ctx context.Context, jobName string) error { + t.Helper() + t.Log("waiting for the correction job to complete or fail") + waitCompleteCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=complete", jobName) + waitFailedCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=failed", jobName) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + complete := make(chan struct{}) + failed := make(chan struct{}) + ech := make(chan error) + go func() { + cmd := exec.CommandContext(ctx, "sh", "-c", waitCompleteCmd) + err := execCmd(t, cmd) + if err != nil { + ech <- err + return + } + + complete <- struct{}{} + }() + + go func() { + cmd := exec.CommandContext(ctx, "sh", "-c", waitFailedCmd) + err := execCmd(t, cmd) + if err != nil { + ech <- err + return + } + + t.Logf("%s failed. dumping status", jobName) + dumpStatusCmd := fmt.Sprintf("kubectl get job %s -o yaml", jobName) + cmd = exec.Command("sh", "-c", dumpStatusCmd) + err = execCmd(t, cmd) + if err != nil { + t.Log("failed to dump status") + } + failed <- struct{}{} + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-complete: + return nil + case <-failed: + return fmt.Errorf("correction job failed") + case err := <-ech: + return err + } +} + +func execCmd(t *testing.T, cmd *exec.Cmd) error { + t.Helper() + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return fmt.Errorf("%s, %s, %w", string(out), string(exitErr.Stderr), err) + } else { + return fmt.Errorf("unexpected error on creating job: %w", err) + } + } + t.Log(string(out)) + return nil +} diff --git a/tests/e2e/operation/operation.go b/tests/e2e/operation/operation.go index 332f8fdaf0..6cfc078ee8 100644 --- a/tests/e2e/operation/operation.go +++ b/tests/e2e/operation/operation.go @@ -28,11 +28,6 @@ import ( "google.golang.org/grpc/keepalive" ) -type client struct { - host string - port int -} - type Dataset struct { Train [][]float32 Test [][]float32 @@ -138,9 +133,12 @@ type Client interface { IndexInfo(t *testing.T, ctx context.Context) (*payload.Info_Index_Count, error) } -const ( - defaultSearchTimeout = 4 * int64(time.Second) -) +type client struct { + host string + port int +} + +var _ Client = (*client)(nil) func New(host string, port int) (Client, error) { return &client{ @@ -229,3 +227,19 @@ func (c *client) recall(results []string, neighbors []int) (recall float64) { return recall / float64(len(neighbors)) } + +type JobExecutor interface { + CreateAndWait(t *testing.T, ctx context.Context, jobName string) error +} + +type cronJobExecute struct { + cronJob string +} + +var _ JobExecutor = (*cronJobExecute)(nil) + +func NewCronJobExecutor(cronJob string) JobExecutor { + return &cronJobExecute{ + cronJob: cronJob, + } +} From 9f27ff206d3756a70226a72df7f0caec482ad6c1 Mon Sep 17 00:00:00 2001 From: Yusuke Kadowaki Date: Wed, 11 Oct 2023 09:45:27 +0900 Subject: [PATCH 2/4] Add StreamListObject to LB (#2203) * Add StreamListObject to LB * Add E2E for StreamListObject * Update error handling * Fix StreamListObject e2e verification * Update internal/errors/grpc.go Co-authored-by: Kiichiro YUKAWA --------- Co-authored-by: Kiichiro YUKAWA --- internal/errors/grpc.go | 10 +++ pkg/gateway/lb/handler/grpc/handler.go | 90 +++++++++++++++++++++++++- pkg/gateway/lb/service/gateway.go | 4 +- tests/e2e/crud/crud_test.go | 7 ++ tests/e2e/operation/operation.go | 1 + tests/e2e/operation/stream.go | 59 +++++++++++++++++ 6 files changed, 168 insertions(+), 3 deletions(-) diff --git a/internal/errors/grpc.go b/internal/errors/grpc.go index b14c754fd5..a5675de8d6 100644 --- a/internal/errors/grpc.go +++ b/internal/errors/grpc.go @@ -70,4 +70,14 @@ var ( ErrInvalidProtoMessageType = func(v interface{}) error { return Errorf("failed to marshal/unmarshal proto message, message type is %T (missing vtprotobuf/protobuf helpers)", v) } + + // ErrServerStreamClientRecv represents a function to generate an error that the gRPC client couldn't receive from stream. + ErrServerStreamClientRecv = func(err error) error { + return Wrap(err, "gRPC client failed to receive from stream") + } + + // ErrServerStreamClientSend represents a function to generate an error that the gRPC server couldn't send to stream. + ErrServerStreamServerSend = func(err error) error { + return Wrap(err, "gRPC server failed to send to stream") + } ) diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index 27f5bf8c90..0554a9c2cb 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -20,6 +20,7 @@ package grpc import ( "context" "fmt" + "io" "slices" "strconv" "sync/atomic" @@ -2907,7 +2908,7 @@ func (s *server) getObject(ctx context.Context, uuid string) (vec *payload.Objec ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/getObject/BroadCast/"+target) defer func() { - if span != nil { + if sspan != nil { sspan.End() } }() @@ -3134,3 +3135,90 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err } return nil } + +func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald.Object_StreamListObjectServer) error { + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(stream.Context(), vald.PackageName+"."+vald.ObjectRPCServiceName+"/"+vald.StreamListObjectRPCName), apiName+"/"+vald.StreamListObjectRPCName) + defer func() { + if span != nil { + span.End() + } + }() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var rmu, smu sync.Mutex + err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + ctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target) + defer func() { + if sspan != nil { + sspan.End() + } + }() + + client, err := vc.StreamListObject(ctx, req, copts...) + if err != nil { + log.Errorf("failed to get StreamListObject client for agent(%s): %v", target, err) + return err + } + + eg, ctx := errgroup.WithContext(ctx) + ectx, ecancel := context.WithCancel(ctx) + defer ecancel() + eg.SetLimit(s.streamConcurrency) + + for { + select { + case <-ectx.Done(): + var err error + if !errors.Is(ctx.Err(), context.Canceled) { + err = errors.Join(err, ctx.Err()) + } + if egerr := eg.Wait(); err != nil { + err = errors.Join(err, egerr) + } + return err + default: + eg.Go(safety.RecoverFunc(func() error { + rmu.Lock() + res, err := client.Recv() + rmu.Unlock() + if err != nil { + if errors.Is(err, io.EOF) { + ecancel() + return nil + } + return errors.ErrServerStreamClientRecv(err) + } + + vec := res.GetVector() + if vec == nil { + st := res.GetStatus() + log.Warnf("received empty vector: code %v: details %v: message %v", + st.GetCode(), + st.GetDetails(), + st.GetMessage(), + ) + return nil + } + + smu.Lock() + err = stream.Send(res) + smu.Unlock() + if err != nil { + if sspan != nil { + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamListObject send gRPC error response") + sspan.RecordError(err) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetStatus(trace.StatusError, err.Error()) + } + return errors.ErrServerStreamServerSend(err) + } + + return nil + })) + } + } + }) + return err +} diff --git a/pkg/gateway/lb/service/gateway.go b/pkg/gateway/lb/service/gateway.go index a58cd8a5f3..7d32364df0 100644 --- a/pkg/gateway/lb/service/gateway.go +++ b/pkg/gateway/lb/service/gateway.go @@ -36,9 +36,9 @@ type Gateway interface { GetAgentCount(ctx context.Context) int Addrs(ctx context.Context) []string DoMulti(ctx context.Context, num int, - f func(ctx context.Context, tgt string, ac vald.Client, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error BroadCast(ctx context.Context, - f func(ctx context.Context, tgt string, ac vald.Client, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error } type gateway struct { diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index f78a99c6aa..b5a774b88b 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -375,6 +375,13 @@ func TestE2EStandardCRUD(t *testing.T) { t.Fatalf("an error occurred: %s", err) } + err = op.StreamListObject(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + err = op.Update(t, ctx, operation.Dataset{ Train: ds.Train[updateFrom : updateFrom+updateNum], }) diff --git a/tests/e2e/operation/operation.go b/tests/e2e/operation/operation.go index 6cfc078ee8..24cc988f7e 100644 --- a/tests/e2e/operation/operation.go +++ b/tests/e2e/operation/operation.go @@ -127,6 +127,7 @@ type Client interface { MultiUpsert(t *testing.T, ctx context.Context, ds Dataset) error MultiRemove(t *testing.T, ctx context.Context, ds Dataset) error GetObject(t *testing.T, ctx context.Context, ds Dataset) error + StreamListObject(t *testing.T, ctx context.Context, ds Dataset) error Exists(t *testing.T, ctx context.Context, id string) error CreateIndex(t *testing.T, ctx context.Context) error SaveIndex(t *testing.T, ctx context.Context) error diff --git a/tests/e2e/operation/stream.go b/tests/e2e/operation/stream.go index 585039dc2b..dd37d6ad67 100644 --- a/tests/e2e/operation/stream.go +++ b/tests/e2e/operation/stream.go @@ -17,6 +17,7 @@ package operation import ( "context" + "fmt" "reflect" "strconv" "testing" @@ -1167,3 +1168,61 @@ func (c *client) GetObject( return rerr } + +func (c *client) StreamListObject( + t *testing.T, + ctx context.Context, + ds Dataset, +) error { + t.Log("StreamListObject operation started") + + client, err := c.getClient(ctx) + if err != nil { + return err + } + + sc, err := client.StreamListObject(ctx, &payload.Object_List_Request{}) + if err != nil { + return err + } + + // kv : [indexId]count + indexCnt := make(map[string]int) +exit_loop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + res, err := sc.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + break exit_loop + } + return err + } + vec := res.GetVector() + if vec == nil { + st := res.GetStatus() + return fmt.Errorf("returned vector is empty: code: %v, msg: %v, details: %v", st.GetCode(), st.GetMessage(), st.GetDetails()) + } + indexCnt[vec.GetId()]++ + } + } + + if len(indexCnt) != len(ds.Train) { + return fmt.Errorf("the number of vectors returned is different: got %v, want %v", len(indexCnt), len(ds.Train)) + } + + replica := -1 + for k, v := range indexCnt { + if replica == -1 { + replica = v + continue + } + if v != replica { + return fmt.Errorf("the number of vectors returned is different at index id %v: got %v, want %v", k, v, replica) + } + } + return nil +} From 3af9773c4667344fa2b1366dd03c0b8c0f1955ce Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 11 Oct 2023 14:54:53 +0900 Subject: [PATCH 3/4] Add step to get k3s latest version (#2206) * feat: add k3s version Signed-off-by: hlts2 * fix: deleted unnecessary option and fix image name Signed-off-by: hlts2 * fix: fixed k3s version Signed-off-by: hlts2 * feat: add step to get latest tag Signed-off-by: hlts2 * fix: get tag from docker hub Signed-off-by: hlts2 * fix: bugfix syntax Signed-off-by: hlts2 * feat: add make command to update k3s version Signed-off-by: hlts2 --------- Signed-off-by: hlts2 --- .github/actions/setup-e2e/action.yaml | 1 - .github/actions/setup-k3d/action.yaml | 16 ++++++++++++++++ Makefile.d/dependencies.mk | 8 +++++++- versions/K3S_VERSION | 1 + 4 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 versions/K3S_VERSION diff --git a/.github/actions/setup-e2e/action.yaml b/.github/actions/setup-e2e/action.yaml index 897f333926..0a4e51fb8f 100644 --- a/.github/actions/setup-e2e/action.yaml +++ b/.github/actions/setup-e2e/action.yaml @@ -81,7 +81,6 @@ runs: with: agents: 3 ingress_port: ${{ inputs.ingress_port }} - options: "--image docker.io/rancher/k3s:latest" - name: Check Kubernetes cluster shell: bash run: | diff --git a/.github/actions/setup-k3d/action.yaml b/.github/actions/setup-k3d/action.yaml index 079def0631..99e10bfa27 100644 --- a/.github/actions/setup-k3d/action.yaml +++ b/.github/actions/setup-k3d/action.yaml @@ -21,6 +21,10 @@ inputs: description: "k3d version" required: false default: "latest" + k3s_version: + description: "The k3s to use. The default version is `versions/K3S_VERSION`" + required: false + default: "" name: description: "Cluster name" required: false @@ -52,6 +56,16 @@ runs: env: K3D_VERSION: ${{ inputs.version }} + - name: Detect k3s version + id: k3s_version + shell: bash + run: | + K3S_VERSION=${K3S_VERSION:-`cat versions/K3S_VERSION`} + + echo "tag=${K3S_VERSION=$}" >> $GITHUB_OUTPUT + env: + K3S_VERSION: ${{ inputs.k3s_version }} + - name: Install k3d shell: bash run: | @@ -74,11 +88,13 @@ runs: if [ "${INGRESS_PORT}" != 0 ]; then OPTIONS="${OPTIONS} -p ${INGRESS_PORT}:80@loadbalancer" fi + OPTIONS="${OPTIONS} --image rancher/k3s:${K3S_VERSION}" echo "options=${OPTIONS}" >> $GITHUB_OUTPUT env: AGENTS: ${{ inputs.agents }} INGRESS_PORT: ${{ inputs.ingress_port }} OPTIONS: ${{ inputs.options }} + K3S_VERSION: ${{ steps.k3s_version.outputs.tag }} - name: Create k8s cluster shell: bash diff --git a/Makefile.d/dependencies.mk b/Makefile.d/dependencies.mk index 4f03a42add..7f528edc47 100644 --- a/Makefile.d/dependencies.mk +++ b/Makefile.d/dependencies.mk @@ -24,9 +24,10 @@ update/libs: \ update/helm-docs \ update/helm-operator \ update/jaeger-operator \ + update/k3s \ update/kind \ - update/kubectl \ update/kube-linter \ + update/kubectl \ update/ngt \ update/prometheus-stack \ update/protobuf \ @@ -77,6 +78,11 @@ go/example/deps: update/chaos-mesh: curl --silent https://api.github.com/repos/chaos-mesh/chaos-mesh/releases/latest | grep -Po '"tag_name": "\K.*?(?=")' | sed 's/v//g' > $(ROOTDIR)/versions/CHAOS_MESH_VERSION +.PHONY: update/k3s +## update k3s version +update/k3s: + curl --silent https://hub.docker.com/v2/repositories/rancher/k3s/tags | jq -r '.results[].name' | grep -E '.*-k3s1$$' | sort -V | tail -n 1 > $(ROOTDIR)/versions/K3S_VERSION + .PHONY: update/go ## update go version update/go: diff --git a/versions/K3S_VERSION b/versions/K3S_VERSION new file mode 100644 index 0000000000..079b9ecbc4 --- /dev/null +++ b/versions/K3S_VERSION @@ -0,0 +1 @@ +v1.28.2-k3s1 From b4028e780af500909738d57b2fa18611a3c55892 Mon Sep 17 00:00:00 2001 From: Yusuke Kadowaki Date: Mon, 16 Oct 2023 16:50:23 +0900 Subject: [PATCH 4/4] Add verification for index correction e2e and add clusterrole cronjobs for operator to deploy index correction (#2205) * implement the initail framework * add corrector configuration * add corrector logic * add build make command for index correction binary * add Dockerfile for index correction * add Docker image for index job correction * add timer * fix tag align * tmp * fix log * temporally implement two versions of correct function * set eg limit from config * add stream list concurrency config * implement index id caching * add config to use cache or not * style: Format code with prettier and gofumpt * refactor availableAddrs * add kvs range duration * add leftAgentAddrs for performance * Revert "add kvs range duration" This reverts commit 5b647be6ccc0f9be7e78e38c89ea8897fa3ee574. * refactor * fix without cache bug * enable observability * refactor * SIGTERM after complete * add metrics server * add pcache * remove comment * [TEMP] use pcache * [TMP] use pcache * fix empty shard returns error * fix to use local map * [TMP] add prestop for pcache * [TEMP] add pcache config * style: Format code with prettier and gofumpt * [TEMP] add pcache log * fix map alloc size * [TMP] Add bbolt cache * update bbolt * fix bbolt bug * add bbolt test * [TEMP] use bbolt as persistent cache * style: Format code with prettier and gofumpt * add SetBatch to bbolt * use batch to write map to disk * style: Format code with prettier and gofumpt * delete the map elements on finalize * manually call GC after the map shrink * add limit to SetBatch goroutine number * stop unnecesarry GC * increase eg limit to the MaxBatchSize * use ch to set batch bbolt * fix servers shutdown properly * use internal/kvs/bbolt * refactor * always use bbolt cache for correction * update sample.yaml for correction * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in 319ec8b according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2152 * use go std slices pkg * refactor * add comment * remove valdsync * use vald errgroup * refactor * Define ErrNoAvailableAgentToInsert * update comment in English * Apply new actions yaml format * Disable godox * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in c860ddc according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2194 * remove comment * Apply format * Add type check for type assertion * use const to specify filemode * Add bbolt concurrency as config * fix var style * Suppress linter * fix comment * add test template * Refactor parameters for index correction * Refactor config * Add corrector test * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in 004bf81 according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2194 * Add timestamp check * Apply format * fix schema type * Fix DeepSource errors * Fix misspell * Add type check * Remove unused config * Fix DeepSource error * Add required go:build e2e tag * Remove memo * Refactor comment * Add index job correction helm templates * Add more fields * Add index correction job E2E test * Add e2e action for job * [REVERT THIS] Temporally change version * Fix name and command * Apply format * update crd * Revert "[REVERT THIS] Temporally change version" This reverts commit 1801a63b2bb8826960c3596f42637933f0eab6e6. * Remove unused pkg * Remove experimental file * remove old workflow * Fix cron job name to new one * Update sample.yaml * fix build path * Fix corrector name * add e2e-jobs to slack notification * Update crds * Add StreamListObject to LB * Add E2E for StreamListObject * Update error handling * Fix StreamListObject e2e verification * Add StreamListObject to LB * Add E2E for StreamListObject * Update error handling * Fix StreamListObject e2e verification * Update index correction e2e to verify correction result with StramListObject * Make it possible to deploy index correction cronjob from operator * Update operator manifests * Make schedule field empty so that a user has to specify manually * add default schedule of index correction --------- Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --- .github/helm/values/values-lb.yaml | 1 + .github/valdrelease/valdrelease.yaml | 4 + .../templates/clusterrole.yaml | 13 + charts/vald/values.yaml | 2 +- k8s/operator/helm/clusterrole.yaml | 12 + k8s/operator/helm/crds/valdrelease.yaml | 965 ++++++++++++++++++ tests/e2e/crud/crud_test.go | 58 +- tests/e2e/operation/job.go | 4 +- tests/e2e/operation/stream.go | 2 + 9 files changed, 1050 insertions(+), 11 deletions(-) diff --git a/.github/helm/values/values-lb.yaml b/.github/helm/values/values-lb.yaml index bc0043b6a9..7075b90708 100644 --- a/.github/helm/values/values-lb.yaml +++ b/.github/helm/values/values-lb.yaml @@ -74,3 +74,4 @@ manager: # suspend because you do not want corrector to start automatically in CI # instead run it manually suspend: true + schedule: "1 2 3 4 5" diff --git a/.github/valdrelease/valdrelease.yaml b/.github/valdrelease/valdrelease.yaml index d2df3e28d1..e49752f4b4 100644 --- a/.github/valdrelease/valdrelease.yaml +++ b/.github/valdrelease/valdrelease.yaml @@ -76,3 +76,7 @@ spec: auto_index_duration_limit: 2m auto_index_check_duration: 30s auto_index_length: 1000 + corrector: + enabled: true + suspend: true + schedule: "1 2 3 4 5" diff --git a/charts/vald-helm-operator/templates/clusterrole.yaml b/charts/vald-helm-operator/templates/clusterrole.yaml index c0403124b5..b027ea16a4 100644 --- a/charts/vald-helm-operator/templates/clusterrole.yaml +++ b/charts/vald-helm-operator/templates/clusterrole.yaml @@ -175,4 +175,17 @@ rules: - get - patch - update + - apiGroups: + - batch + resources: + - cronjobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + {{- end }} diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index f095a921bf..46035e6197 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -2682,7 +2682,7 @@ manager: enabled: false # @schema {"name": "manager.index.corrector.schedule", "type": "string"} # manager.index.corrector.schedule -- CronJob schedule setting for index correction - schedule: "5 * * * *" + schedule: "6 3 * * *" # @schema {"name": "manager.index.corrector.suspend", "type": "boolean"} # manager.index.corrector.suspend -- CronJob suspend setting for index correction suspend: false diff --git a/k8s/operator/helm/clusterrole.yaml b/k8s/operator/helm/clusterrole.yaml index 5137df5e71..ab151aaa8f 100644 --- a/k8s/operator/helm/clusterrole.yaml +++ b/k8s/operator/helm/clusterrole.yaml @@ -175,3 +175,15 @@ rules: - get - patch - update + - apiGroups: + - batch + resources: + - cronjobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch diff --git a/k8s/operator/helm/crds/valdrelease.yaml b/k8s/operator/helm/crds/valdrelease.yaml index 46c151ab94..3854881577 100644 --- a/k8s/operator/helm/crds/valdrelease.yaml +++ b/k8s/operator/helm/crds/valdrelease.yaml @@ -6068,6 +6068,971 @@ spec: annotations: type: object x-kubernetes-preserve-unknown-fields: true + corrector: + type: object + properties: + agent_namespace: + type: string + bbolt_async_write_concurrency: + type: integer + minimum: 1 + discoverer: + type: object + properties: + agent_client_options: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + client: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + duration: + type: string + enabled: + type: boolean + env: + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string + name: + type: string + node_name: + type: string + observability: + type: object + properties: + enabled: + type: boolean + metrics: + type: object + properties: + enable_cgo: + type: boolean + enable_goroutine: + type: boolean + enable_memory: + type: boolean + enable_version_info: + type: boolean + version_info_labels: + type: array + items: + type: string + enum: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - cgo_enabled + - ngt_version + - build_cpu_info_flags + otlp: + type: object + properties: + attribute: + type: object + properties: + namespace: + type: string + node_name: + type: string + pod_name: + type: string + service_name: + type: string + collector_endpoint: + type: string + metrics_export_interval: + type: string + metrics_export_timeout: + type: string + trace_batch_timeout: + type: string + trace_export_timeout: + type: string + trace_max_export_batch_size: + type: integer + trace_max_queue_size: + type: integer + trace: + type: object + properties: + enabled: + type: boolean + schedule: + type: string + server_config: + type: object + properties: + full_shutdown_duration: + type: string + healths: + type: object + properties: + liveness: + type: object + properties: + enabled: + type: boolean + host: + type: string + livenessProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + readiness: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + readinessProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + startup: + type: object + properties: + enabled: + type: boolean + port: + type: integer + maximum: 65535 + minimum: 0 + startupProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + metrics: + type: object + properties: + pprof: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + servers: + type: object + properties: + grpc: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + grpc: + type: object + properties: + bidirectional_stream_concurrency: + type: integer + connection_timeout: + type: string + enable_reflection: + type: boolean + header_table_size: + type: integer + initial_conn_window_size: + type: integer + initial_window_size: + type: integer + interceptors: + type: array + items: + type: string + enum: + - RecoverInterceptor + - AccessLogInterceptor + - TraceInterceptor + - MetricInterceptor + keepalive: + type: object + properties: + max_conn_age: + type: string + max_conn_age_grace: + type: string + max_conn_idle: + type: string + min_time: + type: string + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_header_list_size: + type: integer + max_receive_message_size: + type: integer + max_send_message_size: + type: integer + read_buffer_size: + type: integer + write_buffer_size: + type: integer + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + restart: + type: boolean + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + rest: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + startingDeadlineSeconds: + type: integer + stream_list_concurrency: + type: integer + minimum: 1 + suspend: + type: boolean + ttlSecondsAfterFinished: + type: integer + version: + type: string + pattern: ^v[0-9]+\.[0-9]+\.[0-9]$ enabled: type: boolean env: diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index b5a774b88b..bb2ce4c2a5 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -24,6 +24,7 @@ import ( "flag" "fmt" "os" + "os/exec" "testing" "time" @@ -42,13 +43,14 @@ var ( port int ds *hdf5.Dataset - insertNum int - searchNum int - searchByIDNum int - getObjectNum int - updateNum int - upsertNum int - removeNum int + insertNum int + correctionInsertNum int + searchNum int + searchByIDNum int + getObjectNum int + updateNum int + upsertNum int + removeNum int insertFrom int searchFrom int @@ -73,6 +75,7 @@ func init() { flag.IntVar(&port, "port", 8081, "gRPC port") flag.IntVar(&insertNum, "insert-num", 10000, "number of id-vector pairs used for insert") + flag.IntVar(&correctionInsertNum, "correction-insert-num", 3000, "number of id-vector pairs used for insert") flag.IntVar(&searchNum, "search-num", 10000, "number of id-vector pairs used for search") flag.IntVar(&searchByIDNum, "search-by-id-num", 100, "number of id-vector pairs used for search-by-id") flag.IntVar(&getObjectNum, "get-object-num", 100, "number of id-vector pairs used for get-object") @@ -758,7 +761,9 @@ func TestE2EIndexJobCorrection(t *testing.T) { t.Fatalf("an error occurred: %s", err) } - train := ds.Train[insertFrom : insertFrom+insertNum] + // prepare train data + train := ds.Train[insertFrom : insertFrom+correctionInsertNum] + err = op.Insert(t, ctx, operation.Dataset{ Train: train, }) @@ -768,12 +773,49 @@ func TestE2EIndexJobCorrection(t *testing.T) { sleep(t, waitAfterInsertDuration) + t.Log("Test case 1: just execute index correction and check if replica number is correct after correction") exe := operation.NewCronJobExecutor("vald-index-correction") err = exe.CreateAndWait(t, ctx, "correction-test") if err != nil { t.Fatalf("an error occurred: %s", err) } + // check if replica number is correct + err = op.StreamListObject(t, ctx, operation.Dataset{ + Train: train, + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + t.Log("Test case 2: execute index correction after one agent removed") + t.Log("removing vald-agent-ngt-0...") + cmd := exec.CommandContext(ctx, "sh", "-c", "kubectl delete pod vald-agent-ngt-0 && kubectl wait --for=condition=Ready pod/vald-agent-ngt-0") + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + t.Fatalf("%s, %s, %v", string(out), string(exitErr.Stderr), err) + } else { + t.Fatalf("unexpected error on creating job: %v", err) + } + } + t.Log(string(out)) + + // correct the deleted index + err = exe.CreateAndWait(t, ctx, "correction-test") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + // check if replica number is correct + err = op.StreamListObject(t, ctx, operation.Dataset{ + Train: train, + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + t.Log("Tear down. Removing all vectors...") err = op.Remove(t, ctx, operation.Dataset{ Train: train, }) diff --git a/tests/e2e/operation/job.go b/tests/e2e/operation/job.go index 9c3567929d..20b20fb17d 100644 --- a/tests/e2e/operation/job.go +++ b/tests/e2e/operation/job.go @@ -56,8 +56,8 @@ func deleteJob(t *testing.T, jobName string) error { func waitJob(t *testing.T, ctx context.Context, jobName string) error { t.Helper() t.Log("waiting for the correction job to complete or fail") - waitCompleteCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=complete", jobName) - waitFailedCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=failed", jobName) + waitCompleteCmd := fmt.Sprintf("kubectl wait --timeout=10m job/%s --for=condition=complete", jobName) + waitFailedCmd := fmt.Sprintf("kubectl wait --timeout=10m job/%s --for=condition=failed", jobName) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/tests/e2e/operation/stream.go b/tests/e2e/operation/stream.go index dd37d6ad67..fb6a46ac36 100644 --- a/tests/e2e/operation/stream.go +++ b/tests/e2e/operation/stream.go @@ -1224,5 +1224,7 @@ exit_loop: return fmt.Errorf("the number of vectors returned is different at index id %v: got %v, want %v", k, v, replica) } } + + t.Log("StreamListObject operation finished successfully and all vectors are returned with correct replica number") return nil }