From 80650fb933c51ed87b0b916dc28b1353478ca723 Mon Sep 17 00:00:00 2001 From: HighPon Date: Sat, 14 Dec 2024 16:02:21 +0000 Subject: [PATCH 1/2] fix: create templates for index-importation --- .../dockers-index-importation-image.yaml | 69 ++++++++++++++ Makefile | 1 + Makefile.d/build.mk | 10 +++ Makefile.d/docker.mk | 9 ++ cmd/index/job/importation/main.go | 26 ++++++ dockers/index/job/importation/Dockerfile | 89 +++++++++++++++++++ hack/docker/gen/main.go | 4 + 7 files changed, 208 insertions(+) create mode 100644 .github/workflows/dockers-index-importation-image.yaml create mode 100644 cmd/index/job/importation/main.go create mode 100644 dockers/index/job/importation/Dockerfile diff --git a/.github/workflows/dockers-index-importation-image.yaml b/.github/workflows/dockers-index-importation-image.yaml new file mode 100644 index 0000000000..cb5b6fc356 --- /dev/null +++ b/.github/workflows/dockers-index-importation-image.yaml @@ -0,0 +1,69 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DO_NOT_EDIT this workflow file is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go + + +name: 'Build docker image: index-importation' +on: + push: + branches: + - main + - release/v*.* + - '!release/v*.*.*' + tags: + - '*.*.*' + - '*.*.*-*' + - v*.*.* + - v*.*.*-* + pull_request: + paths: + - '!**/*_mock.go' + - '!**/*_test.go' + - .github/actions/docker-build/action.yaml + - .github/workflows/_docker-image.yaml + - .github/workflows/dockers-index-importation-image.yaml + - Makefile + - Makefile.d/** + - apis/proto/** + - cmd/index/job/importation/*.go + - dockers/index/job/importation/Dockerfile + - go.mod + - go.sum + - hack/docker/gen/main.go + - versions/GO_VERSION + pull_request_target: + paths: + - '!**/*_mock.go' + - '!**/*_test.go' + - .github/actions/docker-build/action.yaml + - .github/workflows/_docker-image.yaml + - .github/workflows/dockers-index-importation-image.yaml + - Makefile + - Makefile.d/** + - apis/proto/** + - cmd/index/job/importation/*.go + - dockers/index/job/importation/Dockerfile + - go.mod + - go.sum + - hack/docker/gen/main.go + - versions/GO_VERSION +jobs: + build: + uses: ./.github/workflows/_docker-image.yaml + with: + target: index-importation + secrets: inherit diff --git a/Makefile b/Makefile index 47acef0862..1fc3e8aff4 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,7 @@ HELM_OPERATOR_IMAGE = $(NAME)-helm-operator INDEX_CORRECTION_IMAGE = $(NAME)-index-correction INDEX_CREATION_IMAGE = $(NAME)-index-creation INDEX_DELETION_IMAGE = $(NAME)-index-deletion +INDEX_IMPORTATION_IMAGE = $(NAME)-index-importation INDEX_OPERATOR_IMAGE = $(NAME)-index-operator INDEX_SAVE_IMAGE = $(NAME)-index-save LB_GATEWAY_IMAGE = $(NAME)-lb-gateway diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 029ada10fe..a57ce81d9b 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -25,6 +25,7 @@ binary/build: \ cmd/index/job/correction/index-correction \ cmd/index/job/creation/index-creation \ cmd/index/job/deletion/index-deletion \ + cmd/index/job/importation/index-importation \ cmd/index/job/readreplica/rotate/readreplica-rotate \ cmd/index/job/save/index-save \ cmd/index/operator/index-operator \ @@ -85,6 +86,10 @@ cmd/index/job/deletion/index-deletion: $(eval CGO_ENABLED = 0) $(call go-build,index/job/deletion,,-static,,,$@) +cmd/index/job/importation/index-importation: + $(eval CGO_ENABLED = 0) + $(call go-build,index/job/importation,,-static,,,$@) + cmd/index/job/save/index-save: $(eval CGO_ENABLED = 0) $(call go-build,index/job/save,,-static,,,$@) @@ -134,6 +139,7 @@ binary/build/zip: \ artifacts/vald-index-correction-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-creation-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-deletion-$(GOOS)-$(GOARCH).zip \ + artifacts/vald-index-importation-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-operator-$(GOOS)-$(GOARCH).zip \ artifacts/vald-index-save-$(GOOS)-$(GOARCH).zip \ artifacts/vald-lb-gateway-$(GOOS)-$(GOARCH).zip \ @@ -197,6 +203,10 @@ artifacts/vald-index-deletion-$(GOOS)-$(GOARCH).zip: cmd/index/job/deletion/inde $(call mkdir, $(dir $@)) zip --junk-paths $@ $< +artifacts/vald-index-importation-$(GOOS)-$(GOARCH).zip: cmd/index/job/importation/index-importation + $(call mkdir, $(dir $@)) + zip --junk-paths $@ $< + artifacts/vald-index-save-$(GOOS)-$(GOARCH).zip: cmd/index/job/save/index-save $(call mkdir, $(dir $@)) zip --junk-paths $@ $< diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index 793ef66e27..93bae0dfba 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -37,6 +37,7 @@ docker/build: \ docker/build/index-correction \ docker/build/index-creation \ docker/build/index-deletion \ + docker/build/index-importation \ docker/build/index-operator \ docker/build/index-save \ docker/build/loadtest \ @@ -65,6 +66,7 @@ docker/xpanes/build: docker/build/index-correction \ docker/build/index-creation \ docker/build/index-deletion \ + docker/build/index-importation \ docker/build/index-operator \ docker/build/index-save \ docker/build/loadtest \ @@ -354,6 +356,13 @@ docker/build/index-deletion: IMAGE=$(INDEX_DELETION_IMAGE) \ docker/build/image +.PHONY: docker/build/index-importation +## build index-importation image +docker/build/index-importation: + @make DOCKERFILE="$(ROOTDIR)/dockers/index/job/importation/Dockerfile" \ + IMAGE=$(INDEX_IMPORTATION_IMAGE) \ + docker/build/image + .PHONY: docker/name/index-operator docker/name/index-operator: @echo "$(ORG)/$(INDEX_OPERATOR_IMAGE)" diff --git a/cmd/index/job/importation/main.go b/cmd/index/job/importation/main.go new file mode 100644 index 0000000000..ab8fb8146e --- /dev/null +++ b/cmd/index/job/importation/main.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import "fmt" + +const ( + maxVersion = "v0.0.10" + minVersion = "v0.0.0" + name = "index importation job" +) + +func main() { + fmt.Println("hello world") +} diff --git a/dockers/index/job/importation/Dockerfile b/dockers/index/job/importation/Dockerfile new file mode 100644 index 0000000000..65b6c60c29 --- /dev/null +++ b/dockers/index/job/importation/Dockerfile @@ -0,0 +1,89 @@ +# syntax = docker/dockerfile:latest +# check=error=true +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DO_NOT_EDIT this Dockerfile is generated by https://github.com/vdaas/vald/blob/main/hack/docker/gen/main.go +ARG UPX_OPTIONS=-9 +# skipcq: DOK-DL3026,DOK-DL3007 +FROM ghcr.io/vdaas/vald/vald-buildbase:nightly AS builder +LABEL maintainer="vdaas.org vald team " +# skipcq: DOK-DL3002 +USER root:root +ARG TARGETARCH +ARG TARGETOS +ARG GO_VERSION +ARG RUST_VERSION +ENV APP_NAME=index-importation +ENV DEBIAN_FRONTEND=noninteractive +ENV GO111MODULE=on +ENV GOPATH=/go +ENV GOROOT=/opt/go +ENV HOME=/root +ENV INITRD=No +ENV LANG=en_US.UTF-8 +ENV LANGUAGE=en_US.UTF-8 +ENV LC_ALL=en_US.UTF-8 +ENV ORG=vdaas +ENV PKG=index/job/importation +ENV REPO=vald +ENV TZ=Etc/UTC +ENV USER=root +ENV PATH=${GOPATH}/bin:${GOROOT}/bin:/usr/local/bin:${PATH} +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} +SHELL ["/bin/bash", "-o", "pipefail", "-c"] +#skipcq: DOK-W1001, DOK-SC2046, DOK-SC2086, DOK-DL3008 +RUN --mount=type=bind,target=.,rw \ + --mount=type=tmpfs,target=/tmp \ + --mount=type=cache,target=/var/lib/apt,sharing=locked,id=${APP_NAME} \ + --mount=type=cache,target=/var/cache/apt,sharing=locked,id=${APP_NAME} \ + --mount=type=cache,target="${GOPATH}/pkg",id="go-build-${TARGETARCH}" \ + --mount=type=cache,target="${HOME}/.cache/go-build",id="go-build-${TARGETARCH}" \ + --mount=type=tmpfs,target="${GOPATH}/src" \ + set -ex \ + && echo 'Binary::apt::APT::Keep-Downloaded-Packages "true";' > /etc/apt/apt.conf.d/keep-cache \ + && echo 'APT::Install-Recommends "false";' > /etc/apt/apt.conf.d/no-install-recommends \ + && apt-get clean \ + && apt-get update -y \ + && apt-get upgrade -y \ + && apt-get install -y --no-install-recommends --fix-missing \ + build-essential \ + ca-certificates \ + curl \ + tzdata \ + locales \ + git \ + && ldconfig \ + && echo "${LANG} UTF-8" > /etc/locale.gen \ + && ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \ + && locale-gen ${LANGUAGE} \ + && update-locale LANG=${LANGUAGE} \ + && dpkg-reconfigure -f noninteractive tzdata \ + && apt-get clean \ + && apt-get autoclean -y \ + && apt-get autoremove -y \ + && make GOPATH="${GOPATH}" GOROOT="${GOROOT}" GO_VERSION="${GO_VERSION}" go/install \ + && make GOPATH="${GOPATH}" GOROOT="${GOROOT}" GO_VERSION="${GO_VERSION}" go/download \ + && make GOARCH="${TARGETARCH}" GOOS="${TARGETOS}" REPO="${ORG}" NAME="${REPO}" cmd/${PKG}/${APP_NAME} \ + && mv "cmd/${PKG}/${APP_NAME}" "/usr/bin/${APP_NAME}" +# skipcq: DOK-DL3026,DOK-DL3007 +FROM gcr.io/distroless/static:nonroot +LABEL maintainer="vdaas.org vald team " +COPY --from=builder /usr/bin/index-importation /usr/bin/index-importation +COPY cmd/index/job/importation/sample.yaml /etc/server/config.yaml +# skipcq: DOK-DL3002 +USER nonroot:nonroot +ENTRYPOINT ["/usr/bin/index-importation"] \ No newline at end of file diff --git a/hack/docker/gen/main.go b/hack/docker/gen/main.go index 4ba68db4a0..7646f757f6 100644 --- a/hack/docker/gen/main.go +++ b/hack/docker/gen/main.go @@ -735,6 +735,10 @@ func main() { AppName: "index-deletion", PackageDir: "index/job/deletion", }, + "vald-index-importation": { + AppName: "index-importation", + PackageDir: "index/job/importation", + }, "vald-readreplica-rotate": { AppName: "readreplica-rotate", PackageDir: "index/job/readreplica/rotate", From bc35f40ddbe8b21613f825e22df623ec9f05d86b Mon Sep 17 00:00:00 2001 From: HighPon Date: Sun, 15 Dec 2024 17:13:21 +0000 Subject: [PATCH 2/2] [WIP] feat: Implement vald index-importer --- cmd/index/job/importation/main.go | 38 +++- cmd/index/job/importation/sample.yaml | 191 +++++++++++++++++ internal/config/index_importer.go | 35 ++++ pkg/index/job/importation/config/config.go | 66 ++++++ pkg/index/job/importation/service/importer.go | 195 ++++++++++++++++++ pkg/index/job/importation/service/options.go | 71 +++++++ .../job/importation/usecase/importation.go | 183 ++++++++++++++++ 7 files changed, 777 insertions(+), 2 deletions(-) create mode 100644 cmd/index/job/importation/sample.yaml create mode 100644 internal/config/index_importer.go create mode 100644 pkg/index/job/importation/config/config.go create mode 100644 pkg/index/job/importation/service/importer.go create mode 100644 pkg/index/job/importation/service/options.go create mode 100644 pkg/index/job/importation/usecase/importation.go diff --git a/cmd/index/job/importation/main.go b/cmd/index/job/importation/main.go index ab8fb8146e..42909e8af7 100644 --- a/cmd/index/job/importation/main.go +++ b/cmd/index/job/importation/main.go @@ -13,7 +13,17 @@ // limitations under the License. package main -import "fmt" +import ( + "context" + "log" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/info" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/pkg/index/job/importation/config" + "github.com/vdaas/vald/pkg/index/job/importation/usecase" +) const ( maxVersion = "v0.0.10" @@ -22,5 +32,29 @@ const ( ) func main() { - fmt.Println("hello world") + if err := safety.RecoverFunc(func() error { + return runner.Do( + context.Background(), + runner.WithName(name), + runner.WithVersion(info.Version, maxVersion, minVersion), + runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) { + // cfg, err := config.NewConfig(path) + cfg, err := config.NewConfig("cmd/index/job/importation/sample.yaml") + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") + } + return cfg, &cfg.GlobalConfig, nil + }), + runner.WithDaemonInitializer(func(cfg any) (runner.Runner, error) { + c, ok := cfg.(*config.Data) + if !ok { + return nil, errors.ErrInvalidConfig + } + return usecase.New(c) + }), + ) + })(); err != nil { + log.Fatal(err, info.Get()) + return + } } diff --git a/cmd/index/job/importation/sample.yaml b/cmd/index/job/importation/sample.yaml new file mode 100644 index 0000000000..0ee953191b --- /dev/null +++ b/cmd/index/job/importation/sample.yaml @@ -0,0 +1,191 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: v0.0.0 +time_zone: JST +logging: + format: raw + level: info + logger: glg +server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + header_table_size: 0 + initial_conn_window_size: 0 + initial_window_size: 0 + interceptors: [] + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + time: "" + timeout: "" + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + read_buffer_size: 0 + write_buffer_size: 0 + mode: GRPC + probe_wait_time: 3s + restart: true + health_check_servers: + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: "" + probe_wait_time: 3s + metrics_servers: + startup_strategy: + - grpc + - readiness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +importer: + concurrency: 1 + index_path: "/var/export/index/1733964783.db" + gateway: + addrs: + - localhost:20000 + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 2m + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + circuit_breaker: + closed_error_rate: 0.7 + closed_refresh_timeout: 10s + half_open_error_rate: 0.5 + min_samples: 1000 + open_timeout: 1s + call_option: + content_subtype: "" + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + authority: "" + backoff_base_delay: 1s + backoff_jitter: 0.2 + backoff_max_delay: 120s + backoff_multiplier: 1.6 + disable_retry: false + enable_backoff: false + idle_timeout: 1h + initial_connection_window_size: 2097152 + initial_window_size: 1048576 + insecure: true + interceptors: [] + keepalive: + permit_without_stream: false + time: "" + timeout: 30s + max_call_attempts: 0 + max_header_list_size: 0 + max_msg_size: 0 + min_connection_timeout: 20s + net: + dialer: + dual_stack_enabled: true + keepalive: "" + timeout: "" + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + socket_option: + ip_recover_destination_addr: false + ip_transparent: false + reuse_addr: true + reuse_port: true + tcp_cork: false + tcp_defer_accept: false + tcp_fast_open: false + tcp_no_delay: false + tcp_quick_ack: false + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key + read_buffer_size: 0 + shared_write_buffer: false + timeout: "" + user_agent: Vald-gRPC + write_buffer_size: 0 + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + insecure_skip_verify: false + key: /path/to/key +observability: + enabled: false + otlp: + collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" + trace_batch_timeout: "1s" + trace_export_timeout: "1m" + trace_max_export_batch_size: 1024 + trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" + attribute: + namespace: "_MY_POD_NAMESPACE_" + pod_name: "_MY_POD_NAME_" + node_name: "_MY_NODE_NAME_" + service_name: "vald-index-deletion" + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - algorithm_info + trace: + enabled: true diff --git a/internal/config/index_importer.go b/internal/config/index_importer.go new file mode 100644 index 0000000000..6b666058e6 --- /dev/null +++ b/internal/config/index_importer.go @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// IndexImporter represents the configurations for index importation. +type IndexImporter struct { + // Concurrency represents indexing concurrency. + Concurrency int `json:"concurrency" yaml:"concurrency"` + + // IndexPath represents the export index file path + IndexPath string `json:"index_path,omitempty" yaml:"index_path"` + + // Gateway represent gateway service configuration + Gateway *GRPCClient `json:"gateway" yaml:"gateway"` +} + +func (e *IndexImporter) Bind() *IndexImporter { + e.IndexPath = GetActualValue(e.IndexPath) + + if e.Gateway != nil { + e.Gateway = e.Gateway.Bind() + } + return e +} diff --git a/pkg/index/job/importation/config/config.go b/pkg/index/job/importation/config/config.go new file mode 100644 index 0000000000..62af419a90 --- /dev/null +++ b/pkg/index/job/importation/config/config.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import ( + "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" +) + +// GlobalConfig is a type alias of config.GlobalConfig representing application base configurations. +type GlobalConfig = config.GlobalConfig + +// Data represents the application configurations. +type Data struct { + // GlobalConfig represents application base configurations. + config.GlobalConfig `json:",inline" yaml:",inline"` + + // Server represent all server configurations + Server *config.Servers `json:"server_config" yaml:"server_config"` + + // Observability represents observability configurations. + Observability *config.Observability `json:"observability" yaml:"observability"` + + // Importer represents auto indexing service configurations. + Importer *config.IndexImporter `json:"importer" yaml:"importer"` +} + +// NewConfig load configurations from file path. +func NewConfig(path string) (cfg *Data, err error) { + cfg = new(Data) + + if err = config.Read(path, &cfg); err != nil { + return nil, err + } + + if cfg != nil { + cfg.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Observability != nil { + _ = cfg.Observability.Bind() + } else { + cfg.Observability = new(config.Observability).Bind() + } + + if cfg.Importer != nil { + cfg.Importer = cfg.Importer.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + return cfg, nil +} diff --git a/pkg/index/job/importation/service/importer.go b/pkg/index/job/importation/service/importer.go new file mode 100644 index 0000000000..cffc816b69 --- /dev/null +++ b/pkg/index/job/importation/service/importer.go @@ -0,0 +1,195 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "context" + "reflect" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/apis/grpc/v1/vald" + vc "github.com/vdaas/vald/internal/client/v1/client/vald" + "github.com/vdaas/vald/internal/db/kvs/pogreb" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + apiName = "vald/index/job/export" + grpcMethodName = "vald.v1.StreamListObject/" + vald.StreamListObjectRPCName +) + +// Exporter represents an interface for exporting. +type Importer interface { + StartClient(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) error + PreStop(ctx context.Context) error +} + +type importer struct { + eg errgroup.Group + gateway vc.Client + storedVector pogreb.DB + + streamListConcurrency int + backgroundSyncInterval time.Duration + backgroundCompactionInterval time.Duration + indexPath string + forceUpdate bool +} + +// New returns Importer object if no error occurs. +func New(opts ...Option) (Importer, error) { + i := new(importer) + for _, opt := range append(defaultOpts, opts...) { + if err := opt(i); err != nil { + oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + e := &errors.ErrCriticalOption{} + if errors.As(oerr, &e) { + log.Error(err) + return nil, oerr + } + log.Warn(oerr) + } + } + + db, err := pogreb.New(pogreb.WithPath(i.indexPath)) + if err != nil { + log.Errorf("failed to open checked List kvs DB %s", i.indexPath) + return nil, err + } + i.storedVector = db + return i, nil +} + +// StartClient starts the gRPC client. +func (i *importer) StartClient(ctx context.Context) (<-chan error, error) { + ech := make(chan error, 1) + gch, err := i.gateway.Start(ctx) + if err != nil { + return nil, err + } + i.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-gch: + } + if err != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case ech <- err: + } + } + } + })) + return ech, nil +} + +func (i *importer) Start(ctx context.Context) error { + err := i.doImportIndex(ctx, + func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error) { + return rc.StreamListObject(ctx, &payload.Object_List_Request{}, copts...) + }, + ) + return err +} + +func (i *importer) doImportIndex( + ctx context.Context, + fn func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error), +) (errs error) { + log.Info("starting doImporterIndex") + ctx, span := trace.StartSpan(igrpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doExportIndex") + defer func() { + if span != nil { + span.End() + } + }() + + eg, egctx := errgroup.WithContext(ctx) + eg.SetLimit(i.streamListConcurrency) + ctx = context.WithoutCancel(egctx) + gatewayAddrs := i.gateway.GRPCClient().ConnectedAddrs() + if len(gatewayAddrs) < 0 { + log.Errorf("Active gateway is not found.: %v ", ctx.Err()) + } + + conn, err := grpc.NewClient(gatewayAddrs[0], grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + + vcClient := vc.NewValdClient(conn) + grpcCallOpts := []grpc.CallOption{ + grpc.WaitForReady(true), + } + stream, err := vcClient.StreamInsert(ctx, grpcCallOpts...) + // stream, err := fn(ctx, vc.NewValdClient(conn), grpcCallOpts...) + log.Info("stream", stream) + if err != nil || stream == nil { + return err + } + + if err := i.storedVector.Range(ctx, func(key string, value []byte) bool { + eg.Go(safety.RecoverFunc(func() (err error) { + log.Info("sending object vector to stream", key) + objVec := new(payload.Object_Vector) + if err := objVec.UnmarshalVT(value); err != nil { + log.Errorf("failed to Unmarshal proto to payload.Object_Vector: %v", + err) + return err + } + + if err := stream.Send(&payload.Insert_Request{ + Vector: objVec, + Config: &payload.Insert_Config{ + SkipStrictExistCheck: !i.forceUpdate, + }, + }); err != nil { + log.Errorf("failed to send object vector to stream: %v", err) + return err + } + return nil + })) + return true + }); err != nil { + log.Errorf("failed to Range from check list: %v", err) + return err + } + + err = eg.Wait() + if err != nil { + log.Errorf("importer returned error status errgroup returned error: %v", ctx.Err()) + return err + } else { + log.Infof("importer finished") + } + return nil +} + +func (i *importer) PreStop(ctx context.Context) error { + log.Info("removing lock.") + return i.storedVector.Close(false) +} diff --git a/pkg/index/job/importation/service/options.go b/pkg/index/job/importation/service/options.go new file mode 100644 index 0000000000..6bd7cdb286 --- /dev/null +++ b/pkg/index/job/importation/service/options.go @@ -0,0 +1,71 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "github.com/vdaas/vald/internal/client/v1/client/vald" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/sync/errgroup" +) + +type Option func(_ *importer) error + +var defaultOpts = []Option{ + WithStreamListConcurrency(200), + WithIndexPath("/var/export/index"), + WithErrGroup(errgroup.Get()), +} + +// WithStreamListConcurrency returns Option that sets streamListConcurrency. +func WithStreamListConcurrency(num int) Option { + return func(e *importer) error { + if num <= 0 { + return errors.NewErrInvalidOption("streamListConcurrency", num) + } + e.streamListConcurrency = num + return nil + } +} + +// WithIndexPath returns Option that sets indexPath. +func WithIndexPath(path string) Option { + return func(e *importer) error { + if path == "" { + return errors.NewErrInvalidOption("indexPath", path) + } + e.indexPath = path + return nil + } +} + +// WithGateway returns Option that sets gateway client. +func WithGateway(client vald.Client) Option { + return func(e *importer) error { + if client == nil { + return errors.NewErrCriticalOption("gateway", client) + } + e.gateway = client + return nil + } +} + +// WithErrGroup returns Option that set errgroup. +func WithErrGroup(eg errgroup.Group) Option { + return func(e *importer) error { + if eg != nil { + e.eg = eg + } + return nil + } +} diff --git a/pkg/index/job/importation/usecase/importation.go b/pkg/index/job/importation/usecase/importation.go new file mode 100644 index 0000000000..1ddd76a833 --- /dev/null +++ b/pkg/index/job/importation/usecase/importation.go @@ -0,0 +1,183 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package usecase + +import ( + "context" + "os" + "syscall" + + "github.com/vdaas/vald/internal/client/v1/client/vald" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/observability" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/importation/config" + "github.com/vdaas/vald/pkg/index/job/importation/service" +) + +type run struct { + eg errgroup.Group + cfg *config.Data + observability observability.Observability + // server starter.Server + importer service.Importer +} + +// New returns Runner instance. +func New(cfg *config.Data) (_ runner.Runner, err error) { + eg := errgroup.Get() + + gOpts, err := cfg.Importer.Gateway.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + gOpts = append(gOpts, grpc.WithErrGroup(eg)) + + gateway, err := vald.New(vald.WithClient(grpc.New(gOpts...))) + if err != nil { + return nil, err + } + + importer, err := service.New( + service.WithStreamListConcurrency(cfg.Importer.Concurrency), + service.WithIndexPath(cfg.Importer.IndexPath), + service.WithGateway(gateway), + ) + if err != nil { + return nil, err + } + + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig( + cfg.Observability, + ) + if err != nil { + return nil, err + } + } + + // grpcServerOptions := []server.Option{ + // server.WithGRPCOption( + // grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), + // grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), + // ), + // } + + // For health check and metrics + // srv, err := starter.New(starter.WithConfig(cfg.Server), + // starter.WithGRPC(func(_ *iconf.Server) []server.Option { + // return grpcServerOptions + // }), + // ) + // if err != nil { + // return nil, err + // } + + return &run{ + eg: eg, + cfg: cfg, + observability: obs, + // server: srv, + importer: importer, + }, nil +} + +// PreStart is a method called before execution of Start, and it invokes the PreStart method of observability. +func (r *run) PreStart(ctx context.Context) error { + if r.observability != nil { + return r.observability.PreStart(ctx) + } + return nil +} + +// Start is a method used to initiate an operation in the run, and it returns a channel for receiving errors +// during the operation and an error representing any initialization errors. +func (r *run) Start(ctx context.Context) (<-chan error, error) { + ech := make(chan error, 3) + var sech, oech, cech <-chan error + if r.observability != nil { + oech = r.observability.Start(ctx) + } + // sech = r.server.ListenAndServe(ctx) + cech, err := r.importer.StartClient(ctx) + if err != nil { + close(ech) + return nil, err + } + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + p, err := os.FindProcess(os.Getpid()) + if err != nil { + // using Fatal to avoid this process to be zombie + // skipcq: RVV-A0003 + log.Fatalf("failed to find my pid to kill %v", err) + return + } + log.Info("sending SIGTERM to myself to stop this job") + if err := p.Signal(syscall.SIGTERM); err != nil { + log.Error(err) + } + }() + return r.importer.Start(ctx) + })) + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-oech: + case err = <-sech: + case err = <-cech: + } + if err != nil { + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), err) + case ech <- err: + } + } + } + })) + return ech, nil +} + +// PreStop is a method called before execution of Stop. +func (r *run) PreStop(ctx context.Context) error { + return r.importer.PreStop(ctx) +} + +// Stop is a method used to stop an operation in the run. +func (r *run) Stop(ctx context.Context) (errs error) { + if r.observability != nil { + if err := r.observability.Stop(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + return errs +} + +// PostStop is a method called after execution of Stop. +func (*run) PostStop(_ context.Context) error { + return nil +}