diff --git a/.github/workflows/dockers-index-job-correction.yml b/.github/workflows/dockers-index-job-correction.yml new file mode 100644 index 0000000000..e67b051def --- /dev/null +++ b/.github/workflows/dockers-index-job-correction.yml @@ -0,0 +1,188 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +name: "Build docker image: index-job-correction" +on: + push: + branches: + - main + tags: + - "*.*.*" + - "v*.*.*" + - "*.*.*-*" + - "v*.*.*-*" + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/dockers-index-job-correction.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "!internal/k8s/**" + - "apis/grpc/**" + - "pkg/index/job/correction/**" + - "cmd/index/job/correction/**" + - "dockers/index/job/correction/Dockerfile" + - "versions/GO_VERSION" + pull_request: + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/dockers-index-job-correction.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "!internal/k8s/**" + - "apis/grpc/**" + - "pkg/index/job/correction/**" + - "cmd/index/job/correction/**" + - "dockers/index/job/correction/Dockerfile" + - "versions/GO_VERSION" + pull_request_target: + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/dockers-index-job-correction.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "!internal/k8s/**" + - "apis/grpc/**" + - "pkg/index/job/correction/**" + - "cmd/index/job/correction/**" + - "dockers/index/job/correction/Dockerfile" + - "versions/GO_VERSION" + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref != 'refs/heads/main' && github.ref || github.sha }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + dump_contexts_to_log: + runs-on: ubuntu-latest + steps: + - name: Dump GitHub context + id: github_context_step + run: echo $JSON + env: + JSON: ${{ toJSON(github) }} + - name: Dump job context + run: echo $JSON + env: + JSON: ${{ toJSON(job) }} + - name: Dump steps context + run: echo $JSON + env: + JSON: ${{ toJSON(steps) }} + - name: Dump runner context + run: echo $JSON + env: + JSON: ${{ toJSON(runner) }} + - name: Dump strategy context + run: echo $JSON + env: + JSON: ${{ toJSON(strategy) }} + - name: Dump matrix context + run: echo $JSON + env: + JSON: ${{ toJSON(matrix) }} + build: + strategy: + max-parallel: 4 + runs-on: ubuntu-latest + if: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false) || (github.event.pull_request.head.repo.fork == true && github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'ci/approved')) || (github.event_name == 'push' && github.ref == 'refs/heads/main') || startsWith( github.ref, 'refs/tags/') }} + steps: + - name: Get ref + id: ref + run: | + if [ ${{ github.event.pull_request.head.sha }} != "" ]; then + echo ref=${{ github.event.pull_request.head.sha }} >> $GITHUB_OUTPUT + else + echo ref=${{ github.sha }} >> $GITHUB_OUTPUT + fi + - uses: actions/checkout@v3 + with: + ref: ${{ steps.ref.outputs.ref }} + - name: set git config + run: | + git config --global --add safe.directory ${GITHUB_WORKSPACE} + - name: Setup QEMU + uses: docker/setup-qemu-action@v2 + with: + platforms: all + - name: Setup Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v2 + with: + buildkitd-flags: "--debug" + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USER }} + password: ${{ secrets.DOCKERHUB_PASS }} + - name: Login to GitHub Container Registry + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ secrets.PACKAGE_USER }} + password: ${{ secrets.PACKAGE_TOKEN }} + - name: Build and Publish + id: build_and_publish + uses: ./.github/actions/docker-build + with: + target: index-job-correction + builder: ${{ steps.buildx.outputs.name }} + - name: Initialize CodeQL + if: startsWith( github.ref, 'refs/tags/') + uses: github/codeql-action/init@v2 + - name: Run vulnerability scanner (table) + if: startsWith( github.ref, 'refs/tags/') + uses: aquasecurity/trivy-action@master + with: + image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}" + format: "table" + - name: Run vulnerability scanner (sarif) + if: startsWith( github.ref, 'refs/tags/') + uses: aquasecurity/trivy-action@master + with: + image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}" + format: "template" + template: "@/contrib/sarif.tpl" + output: "trivy-results.sarif" + - name: Upload Trivy scan results to Security tab + if: startsWith( github.ref, 'refs/tags/') + uses: github/codeql-action/upload-sarif@v2 + with: + sarif_file: "trivy-results.sarif" + slack: + name: Slack notification + needs: build + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/main' || startsWith( github.ref, 'refs/tags/') + steps: + - uses: technote-space/workflow-conclusion-action@v2 + with: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - uses: 8398a7/action-slack@v3 + with: + author_name: index-job-correction image build + status: ${{ env.WORKFLOW_CONCLUSION }} + only_mention_fail: channel + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_NOTIFY_WEBHOOK_URL }} diff --git a/Makefile b/Makefile index 03ec84dfa7..7933a89689 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,7 @@ FILTER_GATEWAY_IMAGE = $(NAME)-filter-gateway HELM_OPERATOR_IMAGE = $(NAME)-helm-operator LB_GATEWAY_IMAGE = $(NAME)-lb-gateway LOADTEST_IMAGE = $(NAME)-loadtest +INDEX_JOB_CORRECTION_IMAGE = $(NAME)-index-job-correction MANAGER_INDEX_IMAGE = $(NAME)-manager-index MAINTAINER = "$(ORG).org $(NAME) team <$(NAME)@$(ORG).org>" diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 580cf34599..f62dbdf76a 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -206,6 +206,35 @@ cmd/manager/index/index: \ $(dir $@)main.go $@ -version +cmd/index/job/correction/correction: \ + $(GO_SOURCES_INTERNAL) \ + $(PBGOS) \ + $(shell find $(ROOTDIR)/cmd/index/job/correction/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ + $(shell find $(ROOTDIR)/pkg/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') + $(eval CGO_ENABLED = 0) + CGO_ENABLED=$(CGO_ENABLED) \ + GO111MODULE=on \ + GOPRIVATE=$(GOPRIVATE) \ + go build \ + --ldflags "-w -extldflags=-static \ + -X '$(GOPKG)/internal/info.Version=$(VERSION)' \ + -X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \ + -X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \ + -X '$(GOPKG)/internal/info.GoVersion=$(GO_VERSION)' \ + -X '$(GOPKG)/internal/info.GoOS=$(GOOS)' \ + -X '$(GOPKG)/internal/info.GoArch=$(GOARCH)' \ + -X '$(GOPKG)/internal/info.CGOEnabled=$(CGO_ENABLED)' \ + -X '$(GOPKG)/internal/info.BuildCPUInfoFlags=$(CPU_INFO_FLAGS)' \ + -buildid=" \ + -mod=readonly \ + -modcacherw \ + -a \ + -tags "osusergo netgo static_build" \ + -trimpath \ + -o $@ \ + $(dir $@)main.go + $@ -version + .PHONY: binary/build/zip ## build all binaries and zip them binary/build/zip: \ diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index 0997f7e1ef..eed7283a46 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -188,3 +188,17 @@ docker/build/loadtest: -t $(ORG)/$(LOADTEST_IMAGE):$(TAG) . \ --build-arg MAINTAINER=$(MAINTAINER) \ --build-arg GO_VERSION=$(GO_VERSION) + +.PHONY: docker/name/index-job-correction +docker/name/index-job-correction: + @echo "$(ORG)/$(INDEX_JOB_CORRECTION_IMAGE)" + +.PHONY: docker/build/index-job-correction +## build index-job-correction image +docker/build/index-job-correction: + $(DOCKER) build \ + $(DOCKER_OPTS) \ + -f dockers/index/job/correction/Dockerfile \ + -t $(ORG)/$(INDEX_JOB_CORRECTION_IMAGE):$(TAG) . \ + --build-arg MAINTAINER=$(MAINTAINER) \ + --build-arg GO_VERSION=$(GO_VERSION) diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index b177e5a4e1..e619964f29 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -2631,3 +2631,29 @@ manager: net: dialer: keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual + # @schema {"name": "manager.index.corrector", "type": "object"} + corrector: + # @schema {"name": "manager.index.corrector.agent_namespace", "type": "string"} + # manager.index.corrector.agent_namespace -- namespace of agent pods to manage + agent_namespace: _MY_POD_NAMESPACE_ + # @schema {"name": "manager.index.corrector.node_name", "type": "string"} + # manager.index.corrector.node_name -- node name + node_name: "" # _MY_NODE_NAME_ + # @schema {"name": "manager.index.corrector.concurrency", "type": "integer", "minimum": 1} + # manager.index.corrector.concurrency -- concurrency + concurrency: 1 + # @schema {"name": "manager.index.corrector.discoverer", "type": "object"} + discoverer: + # @schema {"name": "manager.index.corrector.discoverer.duration", "type": "string"} + # manager.index.corrector.discoverer.duration -- refresh duration to discover + duration: 500ms + # @schema {"name": "manager.index.corrector.discoverer.client", "alias": "grpc.client"} + # manager.index.corrector.discoverer.client -- gRPC client for discoverer (overrides defaults.grpc.client) + client: {} + # @schema {"name": "manager.index.corrector.discoverer.agent_client_options", "alias": "grpc.client"} + # manager.index.corrector.discoverer.agent_client_options -- gRPC client options for agents (overrides defaults.grpc.client) + agent_client_options: + dial_option: + net: + dialer: + keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual diff --git a/cmd/index/job/correction/main.go b/cmd/index/job/correction/main.go new file mode 100644 index 0000000000..04a604a04b --- /dev/null +++ b/cmd/index/job/correction/main.go @@ -0,0 +1,55 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "context" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/info" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/pkg/index/job/correction/config" + "github.com/vdaas/vald/pkg/index/job/correction/usecase" +) + +const ( + maxVersion = "v0.0.10" + minVersion = "v0.0.0" + name = "index correction job" +) + +func main() { + if err := safety.RecoverFunc(func() error { + return runner.Do( + context.Background(), + runner.WithName(name), + runner.WithVersion(info.Version, maxVersion, minVersion), + runner.WithConfigLoader(func(path string) (interface{}, *config.GlobalConfig, error) { + cfg, err := config.NewConfig(path) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") + } + return cfg, &cfg.GlobalConfig, nil + }), + runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) { + return usecase.New(cfg.(*config.Data)) + }), + ) + })(); err != nil { + log.Fatal(err, info.Get()) + return + } +} diff --git a/cmd/index/job/correction/sample.yaml b/cmd/index/job/correction/sample.yaml new file mode 100644 index 0000000000..87ddd08657 --- /dev/null +++ b/cmd/index/job/correction/sample.yaml @@ -0,0 +1,233 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +--- +version: v0.0.0 +time_zone: JST +logging: + format: raw + level: info + logger: glg +server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + header_table_size: 0 + initial_conn_window_size: 0 + initial_window_size: 0 + interceptors: [] + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + time: "" + timeout: "" + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + read_buffer_size: 0 + write_buffer_size: 0 + mode: GRPC + probe_wait_time: 3s + restart: true + health_check_servers: + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: "" + probe_wait_time: 3s + metrics_servers: + startup_strategy: + - grpc + - readiness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +gateway: + index_replica: 3 +corrector: + agent_port: 8081 + agent_name: "vald-agent-ngt" + agent_dns: vald-agent-ngt.default.svc.cluster.local + agent_namespace: "default" + node_name: "" + stream_list_concurrency: 100 + discoverer: + duration: 500ms + client: + addrs: + - vald-discoverer.default.svc.cluster.local:8081 + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 3s + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + backoff_base_delay: 1s + backoff_jitter: 0.2 + backoff_max_delay: 120s + backoff_multiplier: 1.6 + enable_backoff: false + initial_connection_window_size: 0 + initial_window_size: 0 + insecure: true + keepalive: + permit_without_stream: false + time: "" + timeout: "" + max_msg_size: 0 + min_connection_timeout: 20s + read_buffer_size: 0 + tcp: + dialer: + dual_stack_enabled: true + keepalive: "" + timeout: "" + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + timeout: "" + write_buffer_size: 0 + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + agent_client_options: + addrs: [] + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 3s + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + write_buffer_size: 0 + read_buffer_size: 0 + initial_window_size: 0 + initial_connection_window_size: 0 + max_msg_size: 0 + backoff_max_delay: "120s" + backoff_base_delay: "1s" + backoff_multiplier: 1.6 + backoff_jitter: 0.2 + min_connection_timeout: "20s" + enable_backoff: false + insecure: true + timeout: "" + tcp: + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + dialer: + timeout: "" + keepalive: "15m" + dual_stack_enabled: true + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + keepalive: + permit_without_stream: false + time: "" + timeout: "" + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +observability: + enabled: false + otlp: + collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" + trace_batch_timeout: "1s" + trace_export_timeout: "1m" + trace_max_export_batch_size: 1024 + trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" + attribute: + namespace: "_MY_POD_NAMESPACE_" + pod_name: "_MY_POD_NAME_" + node_name: "_MY_NODE_NAME_" + service_name: "vald-index-job-correction" + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - ngt_version + trace: + enabled: true diff --git a/dockers/index/job/correction/Dockerfile b/dockers/index/job/correction/Dockerfile new file mode 100644 index 0000000000..0e8f717e7a --- /dev/null +++ b/dockers/index/job/correction/Dockerfile @@ -0,0 +1,93 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG GO_VERSION=latest +ARG DISTROLESS_IMAGE=gcr.io/distroless/static +ARG DISTROLESS_IMAGE_TAG=nonroot +ARG MAINTAINER="vdaas.org vald team " + +FROM golang:${GO_VERSION} AS golang + +FROM ubuntu:devel AS builder + +ENV GO111MODULE on +ENV DEBIAN_FRONTEND noninteractive +ENV INITRD No +ENV LANG en_US.UTF-8 +ENV GOROOT /opt/go +ENV GOPATH /go +ENV PATH ${PATH}:${GOROOT}/bin:${GOPATH}/bin +ENV ORG vdaas +ENV REPO vald +ENV PKG index/job/correction +ENV APP_NAME correction + +# skipcq: DOK-DL3008 +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + build-essential \ + curl \ + upx \ + git \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=golang /usr/local/go $GOROOT +RUN mkdir -p "$GOPATH/src" + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/Makefile.d +COPY Makefile.d . +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} +COPY Makefile . +COPY .git . +COPY go.mod . +COPY go.sum . + +RUN make go/download + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/internal +COPY internal . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/apis/grpc +COPY apis/grpc . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG} +COPY pkg/${PKG} . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} +COPY cmd/${PKG} . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/versions +COPY versions . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} +RUN make REPO=${ORG} NAME=${REPO} cmd/${PKG}/${APP_NAME} \ + && mv "cmd/${PKG}/${APP_NAME}" "/usr/bin/${APP_NAME}" + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} +RUN cp sample.yaml /tmp/config.yaml + +FROM ${DISTROLESS_IMAGE}:${DISTROLESS_IMAGE_TAG} +LABEL maintainer="${MAINTAINER}" + +ENV APP_NAME correction + +COPY --from=builder /usr/bin/${APP_NAME} /go/bin/${APP_NAME} +COPY --from=builder /tmp/config.yaml /etc/server/config.yaml + +USER nonroot:nonroot + +ENTRYPOINT ["/go/bin/correction"] diff --git a/internal/config/corrector.go b/internal/config/corrector.go new file mode 100644 index 0000000000..6e4064a09d --- /dev/null +++ b/internal/config/corrector.go @@ -0,0 +1,66 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package config providers configuration type and load configuration logic +package config + +// Corrector represents the index correction configurations. +type Corrector struct { + // AgentPort represent agent port number + AgentPort int `json:"agent_port" yaml:"agent_port"` + + // AgentName represent agents meta_name for service discovery + AgentName string `json:"agent_name" yaml:"agent_name"` + + // AgentNamespace represent agent namespace location + AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"` + + // AgentDNS represent agents dns A record for service discovery + AgentDNS string `json:"agent_dns" yaml:"agent_dns"` + + CreationPoolSize uint32 `json:"creation_pool_size" yaml:"creation_pool_size"` + + // NodeName represents node name + NodeName string `json:"node_name" yaml:"node_name"` + + // StreamConcurrency represent stream concurrency for StreamListObject rpc client + // this directly affects the memory usage of this job + StreamListConcurrency int `json:"stream_list_concurrency" yaml:"stream_list_concurrency"` + + // Discoverer represent agent discoverer service configuration + Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` +} + +// Bind binds the actual data from the Indexer receiver field. +func (c *Corrector) Bind() *Corrector { + c.AgentName = GetActualValue(c.AgentName) + c.AgentNamespace = GetActualValue(c.AgentNamespace) + c.AgentDNS = GetActualValue(c.AgentDNS) + c.NodeName = GetActualValue(c.NodeName) + + if c.Discoverer != nil { + c.Discoverer = c.Discoverer.Bind() + } + return c +} + +// GetStreamListConcurrency returns the StreamListConcurrency field value if set, -1 otherwise, which means no limit. +func (c *Corrector) GetStreamListConcurrency() int { + if c != nil { + return c.StreamListConcurrency + } + return -1 +} diff --git a/internal/errors/corrector.go b/internal/errors/corrector.go new file mode 100644 index 0000000000..b47296e424 --- /dev/null +++ b/internal/errors/corrector.go @@ -0,0 +1,20 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package errors provides error types and function +package errors + +var ErrIndexReplicaOne = New("nothing to correct when index replica is 1") diff --git a/internal/runner/runner.go b/internal/runner/runner.go index fcf741c51d..4dccd14c90 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -215,6 +215,8 @@ func Run(ctx context.Context, run Runner, name string) (err error) { emap[err.Error()]++ } + // waif for all the goroutines to finish. + // this errgroup is global across the program err = errgroup.Wait() if err != nil && !errors.Is(err, context.DeadlineExceeded) && diff --git a/internal/servers/server/server.go b/internal/servers/server/server.go index 904e76d371..9e70fb9512 100644 --- a/internal/servers/server/server.go +++ b/internal/servers/server/server.go @@ -334,7 +334,6 @@ func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err erro s.mu.RUnlock() log.Infof("%s server %s stopped", s.mode.String(), s.name) } - return nil })) } return nil diff --git a/pkg/index/job/correction/config/config.go b/pkg/index/job/correction/config/config.go new file mode 100644 index 0000000000..d704bf59d0 --- /dev/null +++ b/pkg/index/job/correction/config/config.go @@ -0,0 +1,86 @@ +// +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package setting stores all server application settings +package config + +import ( + "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" +) + +type GlobalConfig = config.GlobalConfig + +// Config represent a application setting data content (config.yaml). +// In K8s environment, this configuration is stored in K8s ConfigMap. +type Data struct { + config.GlobalConfig `json:",inline" yaml:",inline"` + + // Server represent all server configurations + Server *config.Servers `json:"server_config" yaml:"server_config"` + + // Observability represent observability configurations + Observability *config.Observability `json:"observability" yaml:"observability"` + + // Indexer represent agent auto indexing service configuration + Corrector *config.Corrector `json:"corrector" yaml:"corrector"` + + // FIXME: ここから読み込むときLB側の設定とのconsistencyをどう担保するのか + // Gateway represent agent gateway service configuration + Gateway *config.LB `json:"gateway" yaml:"gateway"` +} + +func NewConfig(path string) (cfg *Data, err error) { + cfg = new(Data) + + err = config.Read(path, &cfg) + + if err != nil { + return nil, err + } + + if cfg != nil { + cfg.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Server != nil { + cfg.Server = cfg.Server.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Observability != nil { + cfg.Observability = cfg.Observability.Bind() + } else { + cfg.Observability = new(config.Observability).Bind() + } + + if cfg.Corrector != nil { + cfg.Corrector = cfg.Corrector.Bind() + } else { + cfg.Corrector = new(config.Corrector).Bind() + } + + if cfg.Gateway != nil { + cfg.Gateway = cfg.Gateway.Bind() + } else { + cfg.Gateway = new(config.LB).Bind() + } + + return cfg, nil +} diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go new file mode 100644 index 0000000000..a5e75eff2e --- /dev/null +++ b/pkg/index/job/correction/service/corrector.go @@ -0,0 +1,549 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "cmp" + "context" + "fmt" + "io" + "os" + "path/filepath" + "slices" + "sync/atomic" + + agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/apis/grpc/v1/vald" + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/db/kvs/bbolt" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/file" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/sync" + "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/correction/config" +) + +type Corrector interface { + Start(ctx context.Context) (<-chan error, error) + PreStop(ctx context.Context) error +} + +type correct struct { + cfg *config.Data + discoverer discoverer.Client + agentAddrs []string + indexInfos sync.Map[string, *payload.Info_Index_Count] + uuidsCount uint32 + uncommittedUUIDsCount uint32 + checkedID bbolt.Bbolt +} + +func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { + d := filepath.Join(os.TempDir(), "bbolt") + file.MkdirAll(d, os.ModePerm) + dbfile := filepath.Join(d, "checkedid.db") + bolt, err := bbolt.New(dbfile, "", os.FileMode(0o600)) + if err != nil { + return nil, err + } + + return &correct{ + cfg: cfg, + discoverer: discoverer, + checkedID: bolt, + }, nil +} + +func (c *correct) Start(ctx context.Context) (<-chan error, error) { + dech, err := c.discoverer.Start(ctx) + if err != nil { + return nil, err + } + + // addrs is sorted by the memory usage of each agent(descending order) + // this is decending because it's supposed to be used for index manager to decide + // which pod to make a create index rpc(higher memory, first to commit) + c.agentAddrs = c.discoverer.GetAddrs(ctx) + log.Debug("agent addrs found:", c.agentAddrs) + + if l := len(c.agentAddrs); l <= 1 { + log.Warn("only %d agent found, there must be more than two agents for correction to happen", l) + return nil, err + } + + err = c.loadInfos(ctx) + if err != nil { + return nil, err + } + + // For debugging + c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool { + log.Debugf("index info: addr(%s), stored(%d), uncommitted(%d)", addr, info.GetStored(), info.GetUncommitted()) + return true + }) + + log.Info("starting correction with bbolt disk cache...") + if err := c.correct(ctx); err != nil { + log.Errorf("there's some errors while correction: %v", err) + return nil, err + } + log.Info("correction finished successfully") + + return dech, nil +} + +func (c *correct) PreStop(_ context.Context) error { + log.Info("removing persistent cache files...") + if err := c.checkedID.Close(true); err != nil { + return err + } + return nil +} + +func (c *correct) correct(ctx context.Context) (err error) { + // leftAgentAddrs is the agents' addr that hasn't been corrected yet. + // This is used to know which agents possibly have the same index as the target replica. + // We can say this because, thanks to caching, there is no way that the target replica is + // in the agent that has already been corrected. + leftAgentAddrs := make([]string, len(c.agentAddrs)) + n := copy(leftAgentAddrs, c.agentAddrs) + if n != len(c.agentAddrs) { + return fmt.Errorf("failed to copy agentAddrs") + } + + if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, + func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + // current address is the leftAgentAddrs[0] because this is OrderedRange and + // leftAgentAddrs is copied from c.agentAddrs + leftAgentAddrs = leftAgentAddrs[1:] + + vc := vald.NewValdClient(conn) + stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{}) + if err != nil { + return err + } + + // context and errgroup for stream.Recv and correction + sctx, scancel := context.WithCancel(ctx) + defer scancel() + seg, sctx := errgroup.WithContext(sctx) + concurrency := c.cfg.Corrector.GetStreamListConcurrency() + seg.SetLimit(concurrency) + + // errgroup for bbolt AsyncSet + bolteg, ctx := errgroup.WithContext(ctx) + bolteg.SetLimit(2048) + + var mu sync.Mutex + log.Infof("starting correction for agent %s, concurrency: %d", addr, concurrency) + + // 事前にRecvすべき件数は事前にわからない。なぜなら処理中に新規でinsertされる可能性があるため + // TODO: そういうものはtimestampで判断して弾かないといけない + for { + select { + case <-sctx.Done(): + if !errors.Is(sctx.Err(), context.Canceled) { + log.Errorf("context done unexpectedly: %v", sctx.Err()) + } + goto Finalize + default: + seg.Go(func() error { + mu.Lock() + // As long as we don't stream.Recv() from the stream, we do not consume the memory of the message. + // So by limiting the number of this errgroup.Go instances, we can limit the memory usage + // https://github.com/grpc/grpc-go/blob/33f9fa2e6e5bcf4cf8fe45133e23779ae6e43f6c/rpc_util.go#L795 + res, err := stream.Recv() + mu.Unlock() + + if errors.Is(err, io.EOF) { + log.Debugf("StreamListObject stream finished for agent %s", addr) + scancel() + return nil + } + if err != nil { + log.Errorf("StreamListObject stream finished unexpectedly: %v", err) + return err + } + + if res.GetVector() == nil { + st := res.GetStatus() + log.Error(st.GetCode(), st.GetMessage(), st.GetDetails()) + // continue + return nil + } + + log.Debugf("received object in StreamListObject: agent(%s), id(%s), timestamp(%v)", addr, res.GetVector().GetId(), res.GetVector().GetTimestamp()) + + // check if the index is already checked + id := res.GetVector().GetId() + _, ok, err := c.checkedID.Get([]byte(id)) + if err != nil { + log.Errorf("failed to perform Get from bbolt: %v", err) + } + if ok { + // already checked index + return nil + } + + if err := c.checkConsistency( + ctx, + &vectorReplica{ + addr: addr, + vec: res.GetVector(), + }, + leftAgentAddrs, + ); err != nil { + log.Errorf("failed to check consistency: %v", err) + return nil // continue other processes + } + + // now this id is checked so set it to the disk cache + c.checkedID.AsyncSet(bolteg, []byte(id), nil) + + return nil + }) + } + } + + Finalize: + err = seg.Wait() + if err != nil { + log.Errorf("err group returned error: %v", err) + } + + berr := bolteg.Wait() + if berr != nil { + log.Errorf("bolt err group returned error: %v", err) + err = errors.Join(err, berr) + } + log.Info("bbolt all batch finished") + + log.Infof("correction finished for agent %s", addr) + return err + }, + ); err != nil { + log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err) + return err + } + + return nil +} + +type vectorReplica struct { + addr string + vec *payload.Object_Vector +} + +// Validate len(addrs) >= 2 before calling this function +func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorReplica, leftAgentAddrs []string) error { + // availableAddrs is the agents' addr that doesn't have the target replica thus is available to insert the replica + // to fix the index replica number if required. + availableAddrs := make([]string, 0, len(c.agentAddrs)-1) + for _, addr := range c.agentAddrs { + if addr != targetReplica.addr { + availableAddrs = append(availableAddrs, addr) + } + } + + foundReplicas := make([]*vectorReplica, 0, len(availableAddrs)) + var mu sync.Mutex + if err := c.discoverer.GetClient().OrderedRangeConcurrent(ctx, leftAgentAddrs, len(leftAgentAddrs), + func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + // To avoid GetObject to myself. To maintain backward compatibility for withoug cache operation + if addr == targetReplica.addr { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + vc := vald.NewValdClient(conn) + v, err := vc.GetObject(ctx, &payload.Object_VectorRequest{ + Id: &payload.Object_ID{ + Id: targetReplica.vec.GetId(), + }, + }) + if err != nil { + if st, ok := status.FromError(err); !ok { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + return err + } else if st.Code() == codes.NotFound { + // when replica of agent > index replica, this happens + return nil + } else { + log.Errorf("failed to GetObject with unexpected error. code: %v, message: %s", st.Code(), st.Message()) + return err + } + } + + // the target replica is found in this agent with the addr + log.Debugf("object found: agent(%s), id(%v), timestamp(%v)", addr, v.GetId(), v.GetTimestamp()) + + mu.Lock() + foundReplicas = append(foundReplicas, &vectorReplica{ + addr: addr, + vec: v, + }) + + // Remove this addr from availableAddrs because this addr has the target replica + // and not available to insert the replica to fix the index replica number + slices.DeleteFunc(availableAddrs, func(availableAddr string) bool { + return availableAddr == addr + }) + mu.Unlock() + + return nil + }, + ); err != nil { + return err + } + + // check timestamps + if err := c.correctTimestamp(ctx, targetReplica, foundReplicas); err != nil { + return fmt.Errorf("failed to fix timestamp: %w", err) + } + + // check replica number + if err := c.correctReplica(ctx, targetReplica, foundReplicas, availableAddrs); err != nil { + return fmt.Errorf("failed to fix index replica: %w", err) + } + + return nil +} + +func (c *correct) correctTimestamp(ctx context.Context, targetReplica *vectorReplica, foundReplicas []*vectorReplica) error { + if len(foundReplicas) == 0 { + // no replica found. nothing to do about timestamp + return nil + } + + allReplicas := append(foundReplicas, targetReplica) + + // sort by timestamp + slices.SortFunc(allReplicas, func(i, j *vectorReplica) int { + // largest timestamp means the latest + return cmp.Compare(j.vec.GetTimestamp(), i.vec.GetTimestamp()) + }) + + latest := allReplicas[0] + latestTs := latest.vec.GetTimestamp() + for _, replica := range allReplicas { + if replica.vec.GetTimestamp() == latestTs { + // no inconsistency + continue + } + + // udate the vector with the new one + log.Infof("timestamp inconsistency detected with vector(id: %s, timestamp: %v). updating with the latest vector(id: %s, timestamp: %v)", + replica.vec.GetId(), + replica.vec.GetTimestamp(), + latest.vec.GetId(), + latest.vec.GetTimestamp(), + ) + if err := c.updateObject(ctx, replica.addr, latest.vec); err != nil { + return err + } + } + + return nil +} + +func (c *correct) correctReplica( + ctx context.Context, + targetReplica *vectorReplica, + foundReplicas []*vectorReplica, + availableAddrs []string, +) error { + // diff < 0 means there is less replica than the correct number + existReplica := len(foundReplicas) + 1 + diff := existReplica - c.cfg.Gateway.IndexReplica + if diff == 0 { + // replica number is correct + return nil + } + + // when there are less replicas than the correct number, add the extra replicas + // TODO: refine this logic. pretty complicated + if diff < 0 { + log.Infof("replica shortage of vector %s. inserting to other agents...", + targetReplica.vec.GetId()) + if len(availableAddrs) == 0 { + // TODO: define errors in errors pkg + return fmt.Errorf("no available agent to insert replica") + } + + // inserting with the reverse order of availableAddrs since the last agent has the lowest memory usage + for i := len(availableAddrs) - 1; i >= 0 && diff < 0; i-- { + addr := availableAddrs[i] + log.Infof("inserting replica to %s", addr) + if err := c.insertObject(ctx, addr, targetReplica.vec); err != nil { + log.Errorf("failed to insert object to agent(%s): %v", addr, err) + continue + } + diff++ + } + + if diff < 0 { + return fmt.Errorf("failed to insert the sufficient amount of index to meet the replica setting") + } + + return nil + } + + // when there are more replicas than the correct number, delete the extra replicas + log.Infof("replica oversupply of vector %s. deleting...", + targetReplica.vec.GetId()) + // delete from myself + if err := c.deleteObject(ctx, targetReplica.addr, targetReplica.vec); err != nil { + log.Errorf("failed to delete object from agent(%s): %v", targetReplica.addr, err) + } else { + diff-- + } + + // delte from others if there's more to delete + for _, replica := range foundReplicas { + if diff == 0 { + break + } + if err := c.deleteObject(ctx, replica.addr, replica.vec); err != nil { + log.Errorf("failed to delete object from agent(%s): %v", replica.addr, err) + continue + } + diff-- + } + + if diff > 0 { + return fmt.Errorf("failed to delete the sufficient amount of index to meet the replica setting") + } + + return nil +} + +func (c *correct) updateObject(ctx context.Context, addr string, vector *payload.Object_Vector) error { + res, err := c.discoverer.GetClient(). + Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/Update"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + // TODO: use UpdateTimestamp when it's implemented because here we just want to update only the timestamp but not the vector + return vald.NewUpdateClient(conn).Update(ctx, &payload.Update_Request{ + Vector: vector, + // FIXME: this should be deleted after Config.Timestamp deprecation + Config: &payload.Update_Config{ + // TODO: Decrementing because it's gonna be incremented befor being pushed + // to vqueue in the agent. This is a not ideal workaround for the current vqueue implementation + // so we should consider refactoring vqueue. + Timestamp: vector.GetTimestamp() - 1, + }, + }, copts...) + }) + if err != nil { + return err + } + + if v, ok := res.(*payload.Object_Location); ok { + log.Infof("vector successfully updated. address: %s, uuid: %v", addr, v.GetUuid()) + } + + return nil +} + +func (c *correct) insertObject(ctx context.Context, addr string, vector *payload.Object_Vector) error { + res, err := c.discoverer.GetClient(). + Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/Insert"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewInsertClient(conn).Insert(ctx, &payload.Insert_Request{ + Vector: vector, + // FIXME: this should be deleted after Config.Timestamp deprecation + Config: &payload.Insert_Config{ + Timestamp: vector.GetTimestamp(), + }, + }, copts...) + }) + if err != nil { + return err + } + + if v, ok := res.(*payload.Object_Location); ok { + log.Infof("vector successfully inserted. address: %s, uuid: %v", addr, v.GetUuid()) + } + + return nil +} + +func (c *correct) deleteObject(ctx context.Context, addr string, vector *payload.Object_Vector) error { + res, err := c.discoverer.GetClient(). + Do(grpc.WithGRPCMethod(ctx, "core.v1.Vald/Delete"), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewRemoveClient(conn).Remove(ctx, &payload.Remove_Request{ + Id: &payload.Object_ID{ + Id: vector.GetId(), + }, + }, copts...) + }) + if err != nil { + return err + } + + if v, ok := res.(*payload.Object_Location); ok { + log.Infof("vector successfully deleted. address: %s, uuid: %v", addr, v.GetUuid()) + } + + return nil +} + +func (c *correct) loadInfos(ctx context.Context) (err error) { + var u, ucu uint32 + var infoMap sync.Map[string, *payload.Info_Index_Count] + err = c.discoverer.GetClient().RangeConcurrent(ctx, len(c.discoverer.GetAddrs(ctx)), + func(ctx context.Context, + addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, + ) (err error) { + select { + case <-ctx.Done(): + return nil + default: + info, err := agent.NewAgentClient(conn).IndexInfo(ctx, new(payload.Empty), copts...) + if err != nil { + log.Warnf("an error occurred while calling IndexInfo of %s: %s", addr, err) + return nil + } + infoMap.Store(addr, info) + atomic.AddUint32(&u, info.GetStored()) + atomic.AddUint32(&ucu, info.GetUncommitted()) + } + return nil + }) + if err != nil { + return err + } + atomic.StoreUint32(&c.uuidsCount, atomic.LoadUint32(&u)) + atomic.StoreUint32(&c.uncommittedUUIDsCount, atomic.LoadUint32(&ucu)) + c.indexInfos.Range(func(addr string, _ *payload.Info_Index_Count) bool { + info, ok := infoMap.Load(addr) + if !ok { + c.indexInfos.Delete(addr) + } + c.indexInfos.Store(addr, info) + infoMap.Delete(addr) + return true + }) + infoMap.Range(func(addr string, info *payload.Info_Index_Count) bool { + c.indexInfos.Store(addr, info) + return true + }) + return nil +} diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go new file mode 100644 index 0000000000..e738bba803 --- /dev/null +++ b/pkg/index/job/correction/usecase/corrector.go @@ -0,0 +1,221 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package usecase + +import ( + "context" + "os" + "syscall" + "time" + + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + iconf "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" + "github.com/vdaas/vald/internal/observability" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/servers/server" + "github.com/vdaas/vald/internal/servers/starter" + "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/correction/config" + "github.com/vdaas/vald/pkg/index/job/correction/service" +) + +type run struct { + eg errgroup.Group + cfg *config.Data + observability observability.Observability + server starter.Server + corrector service.Corrector +} + +func New(cfg *config.Data) (r runner.Runner, err error) { + if cfg.Gateway.IndexReplica == 1 { + return nil, errors.ErrIndexReplicaOne + } + + eg := errgroup.Get() + + cOpts, err := cfg.Corrector.Discoverer.Client.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + dopts := append( + cOpts, + grpc.WithErrGroup(eg)) + + acOpts, err := cfg.Corrector.Discoverer.AgentClientOptions.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + aopts := append( + acOpts, + grpc.WithErrGroup(eg)) + + // Construct discoverer + discoverer, err := discoverer.New( + discoverer.WithAutoConnect(true), + discoverer.WithName(cfg.Corrector.AgentName), + discoverer.WithNamespace(cfg.Corrector.AgentNamespace), + discoverer.WithPort(cfg.Corrector.AgentPort), + discoverer.WithServiceDNSARecord(cfg.Corrector.AgentDNS), + discoverer.WithDiscovererClient(grpc.New(dopts...)), + discoverer.WithDiscoverDuration(cfg.Corrector.Discoverer.Duration), + discoverer.WithOptions(aopts...), + discoverer.WithNodeName(cfg.Corrector.NodeName), + discoverer.WithOnDiscoverFunc(func(ctx context.Context, c discoverer.Client, addrs []string) error { + last := len(addrs) - 1 + for i := 0; i < len(addrs)/2; i++ { + addrs[i], addrs[last-i] = addrs[last-i], addrs[i] + } + return nil + }), + ) + if err != nil { + return nil, err + } + + grpcServerOptions := []server.Option{ + server.WithGRPCOption( + grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), + grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), + ), + } + + // For health check and metrics + srv, err := starter.New(starter.WithConfig(cfg.Server), + starter.WithGRPC(func(sc *iconf.Server) []server.Option { + return grpcServerOptions + }), + ) + if err != nil { + return nil, err + } + + corrector, err := service.New(cfg, discoverer) + if err != nil { + return nil, err + } + + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig(cfg.Observability) + if err != nil { + log.Error("failed to initialize observability") + return nil, err + } + } + + return &run{ + eg: eg, + cfg: cfg, + observability: obs, + server: srv, + corrector: corrector, + }, nil +} + +func (r *run) PreStart(ctx context.Context) error { + if r.observability != nil { + return r.observability.PreStart(ctx) + } + return nil +} + +func (r *run) Start(ctx context.Context) (<-chan error, error) { + // TODO: Set timeout? + // ctx, cancel := context.WithTimeout(ctx, time.Microsecond*10) + // defer cancel() + + log.Info("starting servers") + ech := make(chan error, 3) + var oech, nech, sech <-chan error + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech = r.server.ListenAndServe(ctx) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-oech: + case err = <-nech: + case err = <-sech: + } + if err != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case ech <- err: + } + } + } + })) + + // main groutine to run the job + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + log.Info("fiding my pid to kill myself") + p, err := os.FindProcess(os.Getpid()) + if err != nil { + // using Fatal to avoid this process to be zombie + log.Fatalf("failed to find my pid to kill %v", err) + return + } + + log.Info("sending SIGTERM to myself to stop this job") + if err := p.Signal(syscall.SIGTERM); err != nil { + log.Error(err) + } + }() + + start := time.Now() + _, err = r.corrector.Start(ctx) + if err != nil { + log.Errorf("index correction process failed: %v", err) + return err + } + end := time.Since(start) + log.Infof("correction finished in %v", end) + return nil + })) + + return ech, nil +} + +func (r *run) PreStop(ctx context.Context) error { + r.corrector.PreStop(ctx) + return nil +} + +func (r *run) Stop(ctx context.Context) error { + if r.observability != nil { + r.observability.Stop(ctx) + } + if r.server != nil { + r.server.Shutdown(ctx) + } + return nil +} + +func (*run) PostStop(ctx context.Context) error { + return nil +}