diff --git a/.github/workflows/dockers-index-creation.yml b/.github/workflows/dockers-index-creation.yml deleted file mode 100644 index 1d09435f0a..0000000000 --- a/.github/workflows/dockers-index-creation.yml +++ /dev/null @@ -1,78 +0,0 @@ -# -# Copyright (C) 2019-2023 vdaas.org vald team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -name: "Build docker image: index-creation" -on: - push: - branches: - - main - tags: - - "*.*.*" - - "v*.*.*" - - "*.*.*-*" - - "v*.*.*-*" - paths: - - ".github/actions/docker-build/actions.yaml" - - ".github/workflows/dockers-index-creation.yml" - - "go.mod" - - "go.sum" - - "internal/**" - - "!internal/**/*_test.go" - - "!internal/db/**" - - "!internal/k8s/**" - - "apis/grpc/**" - - "pkg/index/job/creation/**" - - "cmd/index/job/creation/**" - - "dockers/index/job/creation/Dockerfile" - - "versions/GO_VERSION" - pull_request: - paths: - - ".github/actions/docker-build/actions.yaml" - - ".github/workflows/_docker-image.yaml" - - ".github/workflows/dockers-index-creation.yml" - - "go.mod" - - "go.sum" - - "internal/**" - - "!internal/**/*_test.go" - - "!internal/db/**" - - "!internal/k8s/**" - - "apis/grpc/**" - - "pkg/index/job/creation/**" - - "cmd/index/job/creation/**" - - "dockers/index/job/creation/Dockerfile" - - "versions/GO_VERSION" - pull_request_target: - paths: - - ".github/actions/docker-build/actions.yaml" - - ".github/workflows/_docker-image.yaml" - - ".github/workflows/dockers-index-creation.yml" - - "go.mod" - - "go.sum" - - "internal/**" - - "!internal/**/*_test.go" - - "!internal/db/**" - - "!internal/k8s/**" - - "apis/grpc/**" - - "pkg/index/job/creation/**" - - "cmd/index/job/creation/**" - - "dockers/index/job/creation/Dockerfile" - - "versions/GO_VERSION" - -jobs: - build: - uses: ./.github/workflows/_docker-image.yaml - with: - target: index-creation - secrets: inherit diff --git a/Makefile b/Makefile index 06f7c4775b..01b35bbec9 100644 --- a/Makefile +++ b/Makefile @@ -30,8 +30,7 @@ FILTER_GATEWAY_IMAGE = $(NAME)-filter-gateway HELM_OPERATOR_IMAGE = $(NAME)-helm-operator LB_GATEWAY_IMAGE = $(NAME)-lb-gateway LOADTEST_IMAGE = $(NAME)-loadtest -INDEX_CORRECTION_IMAGE = $(NAME)-index-correction -INDEX_CREATION_IMAGE = $(NAME)-index-creation +INDEX_CORRECTION_IMAGE = $(NAME)-index-correction MANAGER_INDEX_IMAGE = $(NAME)-manager-index MAINTAINER = "$(ORG).org $(NAME) team <$(NAME)@$(ORG).org>" diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index aa6a58a999..f2f8275b46 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -235,35 +235,6 @@ cmd/index/job/correction/index-correction: \ $(dir $@)main.go $@ -version -cmd/index/job/creation/index-creation: \ - $(GO_SOURCES_INTERNAL) \ - $(PBGOS) \ - $(shell find $(ROOTDIR)/cmd/index/job/creation -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ - $(shell find $(ROOTDIR)/pkg/index/job/creation -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') - $(eval CGO_ENABLED = 0) - CGO_ENABLED=$(CGO_ENABLED) \ - GO111MODULE=on \ - GOPRIVATE=$(GOPRIVATE) \ - go build \ - --ldflags "-w -extldflags=-static \ - -X '$(GOPKG)/internal/info.Version=$(VERSION)' \ - -X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \ - -X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \ - -X '$(GOPKG)/internal/info.GoVersion=$(GO_VERSION)' \ - -X '$(GOPKG)/internal/info.GoOS=$(GOOS)' \ - -X '$(GOPKG)/internal/info.GoArch=$(GOARCH)' \ - -X '$(GOPKG)/internal/info.CGOEnabled=$(CGO_ENABLED)' \ - -X '$(GOPKG)/internal/info.BuildCPUInfoFlags=$(CPU_INFO_FLAGS)' \ - -buildid=" \ - -mod=readonly \ - -modcacherw \ - -a \ - -tags "osusergo netgo static_build" \ - -trimpath \ - -o $@ \ - $(dir $@)main.go - $@ -version - .PHONY: binary/build/zip ## build all binaries and zip them binary/build/zip: \ diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index 0d0876c121..5771c857e6 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -202,17 +202,3 @@ docker/build/index-correction: -t $(ORG)/$(INDEX_CORRECTION_IMAGE):$(TAG) . \ --build-arg MAINTAINER=$(MAINTAINER) \ --build-arg GO_VERSION=$(GO_VERSION) - -.PHONY: docker/name/index-creation -docker/name/index-creation: - @echo "$(ORG)/$(INDEX_CREATION_IMAGE)" - -.PHONY: docker/build/index-creation -## build index-creation image -docker/build/index-creation: - $(DOCKER) build \ - $(DOCKER_OPTS) \ - -f dockers/index/job/creation/Dockerfile \ - -t $(ORG)/$(INDEX_CREATION_IMAGE):$(TAG) . \ - --build-arg MAINTAINER=$(MAINTAINER) \ - --build-arg GO_VERSION=$(GO_VERSION) diff --git a/cmd/index/job/creation/main.go b/cmd/index/job/creation/main.go deleted file mode 100644 index 8b63241135..0000000000 --- a/cmd/index/job/creation/main.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package main - -import ( - "context" - - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/info" - "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/runner" - "github.com/vdaas/vald/internal/safety" - "github.com/vdaas/vald/pkg/index/job/creation/config" - "github.com/vdaas/vald/pkg/index/job/creation/usecase" -) - -const ( - maxVersion = "v0.0.10" - minVersion = "v0.0.0" - name = "index creation job" -) - -func main() { - if err := safety.RecoverFunc(func() error { - return runner.Do( - context.Background(), - runner.WithName(name), - runner.WithVersion(info.Version, maxVersion, minVersion), - runner.WithConfigLoader(func(path string) (interface{}, *config.GlobalConfig, error) { - cfg, err := config.NewConfig(path) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") - } - return cfg, &cfg.GlobalConfig, nil - }), - runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) { - c, ok := cfg.(*config.Data) - if !ok { - return nil, errors.ErrInvalidConfig - } - return usecase.New(c) - }), - ) - })(); err != nil { - log.Fatal(err, info.Get()) - return - } -} diff --git a/cmd/index/job/creation/sample.yaml b/cmd/index/job/creation/sample.yaml deleted file mode 100644 index 74302f6d68..0000000000 --- a/cmd/index/job/creation/sample.yaml +++ /dev/null @@ -1,231 +0,0 @@ -# -# Copyright (C) 2019-2023 vdaas.org vald team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - ---- -version: v0.0.0 -time_zone: JST -logging: - format: raw - level: info - logger: glg -server_config: - servers: - - name: grpc - host: 0.0.0.0 - port: 8081 - grpc: - bidirectional_stream_concurrency: 20 - connection_timeout: "" - header_table_size: 0 - initial_conn_window_size: 0 - initial_window_size: 0 - interceptors: [] - keepalive: - max_conn_age: "" - max_conn_age_grace: "" - max_conn_idle: "" - time: "" - timeout: "" - max_header_list_size: 0 - max_receive_message_size: 0 - max_send_message_size: 0 - read_buffer_size: 0 - write_buffer_size: 0 - mode: GRPC - probe_wait_time: 3s - restart: true - health_check_servers: - - name: readiness - host: 0.0.0.0 - port: 3001 - http: - handler_timeout: "" - idle_timeout: "" - read_header_timeout: "" - read_timeout: "" - shutdown_duration: 0s - write_timeout: "" - mode: "" - probe_wait_time: 3s - metrics_servers: - startup_strategy: - - grpc - - readiness - full_shutdown_duration: 600s - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key -creator: - agent_port: 8081 - agent_name: "vald-agent-ngt" - agent_dns: vald-agent-ngt.default.svc.cluster.local - agent_namespace: "default" - node_name: "" - concurrency: 1 - discoverer: - duration: 500ms - client: - addrs: - - vald-discoverer.default.svc.cluster.local:8081 - health_check_duration: "1s" - connection_pool: - enable_dns_resolver: true - enable_rebalance: true - old_conn_close_duration: 3s - rebalance_duration: 30m - size: 3 - backoff: - backoff_factor: 1.1 - backoff_time_limit: 5s - enable_error_log: true - initial_duration: 5ms - jitter_limit: 100ms - maximum_duration: 5s - retry_count: 100 - call_option: - max_recv_msg_size: 0 - max_retry_rpc_buffer_size: 0 - max_send_msg_size: 0 - wait_for_ready: true - dial_option: - backoff_base_delay: 1s - backoff_jitter: 0.2 - backoff_max_delay: 120s - backoff_multiplier: 1.6 - enable_backoff: false - initial_connection_window_size: 0 - initial_window_size: 0 - insecure: true - keepalive: - permit_without_stream: false - time: "" - timeout: "" - max_msg_size: 0 - min_connection_timeout: 20s - read_buffer_size: 0 - tcp: - dialer: - dual_stack_enabled: true - keepalive: "" - timeout: "" - dns: - cache_enabled: true - cache_expiration: 1h - refresh_duration: 30m - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key - timeout: "" - write_buffer_size: 0 - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key - agent_client_options: - addrs: [] - health_check_duration: "1s" - connection_pool: - enable_dns_resolver: true - enable_rebalance: true - old_conn_close_duration: 3s - rebalance_duration: 30m - size: 3 - backoff: - backoff_factor: 1.1 - backoff_time_limit: 5s - enable_error_log: true - initial_duration: 5ms - jitter_limit: 100ms - maximum_duration: 5s - retry_count: 100 - call_option: - max_recv_msg_size: 0 - max_retry_rpc_buffer_size: 0 - max_send_msg_size: 0 - wait_for_ready: true - dial_option: - write_buffer_size: 0 - read_buffer_size: 0 - initial_window_size: 0 - initial_connection_window_size: 0 - max_msg_size: 0 - backoff_max_delay: "120s" - backoff_base_delay: "1s" - backoff_multiplier: 1.6 - backoff_jitter: 0.2 - min_connection_timeout: "20s" - enable_backoff: false - insecure: true - timeout: "" - tcp: - dns: - cache_enabled: true - cache_expiration: 1h - refresh_duration: 30m - dialer: - timeout: "" - keepalive: "15m" - dual_stack_enabled: true - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key - keepalive: - permit_without_stream: false - time: "" - timeout: "" - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key -observability: - enabled: false - otlp: - collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" - trace_batch_timeout: "1s" - trace_export_timeout: "1m" - trace_max_export_batch_size: 1024 - trace_max_queue_size: 256 - metrics_export_interval: "1s" - metrics_export_timeout: "1m" - attribute: - namespace: "_MY_POD_NAMESPACE_" - pod_name: "_MY_POD_NAME_" - node_name: "_MY_NODE_NAME_" - service_name: "vald-index-creation" - metrics: - enable_cgo: true - enable_goroutine: true - enable_memory: true - enable_version_info: true - version_info_labels: - - vald_version - - server_name - - git_commit - - build_time - - go_version - - go_os - - go_arch - - ngt_version - trace: - enabled: true diff --git a/dockers/index/job/creation/Dockerfile b/dockers/index/job/creation/Dockerfile deleted file mode 100644 index f17fb867f9..0000000000 --- a/dockers/index/job/creation/Dockerfile +++ /dev/null @@ -1,93 +0,0 @@ -# -# Copyright (C) 2019-2023 vdaas.org vald team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -ARG GO_VERSION=latest -ARG DISTROLESS_IMAGE=gcr.io/distroless/static -ARG DISTROLESS_IMAGE_TAG=nonroot -ARG MAINTAINER="vdaas.org vald team " - -FROM golang:${GO_VERSION} AS golang - -FROM ubuntu:devel AS builder - -ENV GO111MODULE on -ENV DEBIAN_FRONTEND noninteractive -ENV INITRD No -ENV LANG en_US.UTF-8 -ENV GOROOT /opt/go -ENV GOPATH /go -ENV PATH ${PATH}:${GOROOT}/bin:${GOPATH}/bin -ENV ORG vdaas -ENV REPO vald -ENV PKG index/job/creation -ENV APP_NAME index-creation - -# skipcq: DOK-DL3008 -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates \ - build-essential \ - curl \ - upx \ - git \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - -COPY --from=golang /usr/local/go $GOROOT -RUN mkdir -p "$GOPATH/src" - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/Makefile.d -COPY Makefile.d . -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} -COPY Makefile . -COPY .git . -COPY go.mod . -COPY go.sum . - -RUN make go/download - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/internal -COPY internal . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/apis/grpc -COPY apis/grpc . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG} -COPY pkg/${PKG} . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} -COPY cmd/${PKG} . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/versions -COPY versions . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} -RUN make REPO=${ORG} NAME=${REPO} cmd/${PKG}/${APP_NAME} \ - && mv "cmd/${PKG}/${APP_NAME}" "/usr/bin/${APP_NAME}" - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} -RUN cp sample.yaml /tmp/config.yaml - -FROM ${DISTROLESS_IMAGE}:${DISTROLESS_IMAGE_TAG} -LABEL maintainer="${MAINTAINER}" - -ENV APP_NAME index-creation - -COPY --from=builder /usr/bin/${APP_NAME} /go/bin/${APP_NAME} -COPY --from=builder /tmp/config.yaml /etc/server/config.yaml - -USER nonroot:nonroot - -ENTRYPOINT ["/go/bin/index-creation"] diff --git a/internal/config/index_creation.go b/internal/config/index_creation.go deleted file mode 100644 index 8024fc068e..0000000000 --- a/internal/config/index_creation.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package config - -// IndexCreation represents the configurations for index creation. -type IndexCreation struct { - // AgentPort represent agent port number - AgentPort int `json:"agent_port" yaml:"agent_port"` - - // AgentName represent agents meta_name for service discovery - AgentName string `json:"agent_name" yaml:"agent_name"` - - // AgentNamespace represent agent namespace location - AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"` - - // AgentDNS represent agents dns A record for service discovery - AgentDNS string `json:"agent_dns" yaml:"agent_dns"` - - // NodeName represents node name - NodeName string `json:"node_name" yaml:"node_name"` - - // Concurrency represents indexing concurrency. - Concurrency int `json:"concurrency" yaml:"concurrency"` - - // CreationPoolSize represents batch pool size for indexing. - CreationPoolSize uint32 `json:"creation_pool_size" yaml:"creation_pool_size"` - - // TargetAddrs represents indexing target addresses. - TargetAddrs []string `json:"target_addrs" yaml:"target_addrs"` - - // Discoverer represents agent discoverer service configuration. - Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` -} - -func (ic *IndexCreation) Bind() *IndexCreation { - ic.AgentName = GetActualValue(ic.AgentName) - ic.AgentNamespace = GetActualValue(ic.AgentNamespace) - ic.AgentDNS = GetActualValue(ic.AgentDNS) - ic.NodeName = GetActualValue(ic.NodeName) - ic.TargetAddrs = GetActualValues(ic.TargetAddrs) - - if ic.Discoverer != nil { - ic.Discoverer.Bind() - } - return ic -} diff --git a/internal/config/index_creation_test.go b/internal/config/index_creation_test.go deleted file mode 100644 index 52d8e65ca2..0000000000 --- a/internal/config/index_creation_test.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package config - -// NOT IMPLEMENTED BELOW -// -// func TestIndexCreation_Bind(t *testing.T) { -// type fields struct { -// AgentPort int -// AgentName string -// AgentNamespace string -// AgentDNS string -// NodeName string -// Concurrency int -// CreationPoolSize uint32 -// TargetAddrs []string -// Discoverer *DiscovererClient -// } -// type want struct { -// want *IndexCreation -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, *IndexCreation) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got *IndexCreation) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// NodeName:"", -// Concurrency:0, -// CreationPoolSize:0, -// TargetAddrs:nil, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// NodeName:"", -// Concurrency:0, -// CreationPoolSize:0, -// TargetAddrs:nil, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// ic := &IndexCreation{ -// AgentPort: test.fields.AgentPort, -// AgentName: test.fields.AgentName, -// AgentNamespace: test.fields.AgentNamespace, -// AgentDNS: test.fields.AgentDNS, -// NodeName: test.fields.NodeName, -// Concurrency: test.fields.Concurrency, -// CreationPoolSize: test.fields.CreationPoolSize, -// TargetAddrs: test.fields.TargetAddrs, -// Discoverer: test.fields.Discoverer, -// } -// -// got := ic.Bind() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/creation/config/config.go b/pkg/index/job/creation/config/config.go deleted file mode 100644 index 7755c1b4bc..0000000000 --- a/pkg/index/job/creation/config/config.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package config - -import ( - "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errors" -) - -// GlobalConfig is a type alias of config.GlobalConfig representing application base configurations. -type GlobalConfig = config.GlobalConfig - -// Data represents the application configurations. -type Data struct { - // GlobalConfig represents application base configurations. - config.GlobalConfig `json:",inline" yaml:",inline"` - - // Server represent all server configurations - Server *config.Servers `json:"server_config" yaml:"server_config"` - - // Observability represents observability configurations. - Observability *config.Observability `json:"observability" yaml:"observability"` - - // Creation represents auto indexing service configurations. - Creation *config.IndexCreation `json:"creator" yaml:"creator"` -} - -// NewConfig load configurations from file path. -func NewConfig(path string) (cfg *Data, err error) { - cfg = new(Data) - - if err = config.Read(path, &cfg); err != nil { - return nil, err - } - - if cfg != nil { - _ = cfg.GlobalConfig.Bind() - } else { - return nil, errors.ErrInvalidConfig - } - - if cfg.Server != nil { - _ = cfg.Server.Bind() - } else { - return nil, errors.ErrInvalidConfig - } - - if cfg.Observability != nil { - _ = cfg.Observability.Bind() - } else { - cfg.Observability = new(config.Observability).Bind() - } - - if cfg.Creation != nil { - _ = cfg.Creation.Bind() - } else { - return nil, errors.ErrInvalidConfig - } - return cfg, nil -} diff --git a/pkg/index/job/creation/config/config_test.go b/pkg/index/job/creation/config/config_test.go deleted file mode 100644 index 0cf6858bf7..0000000000 --- a/pkg/index/job/creation/config/config_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package config - -// NOT IMPLEMENTED BELOW -// -// func TestNewConfig(t *testing.T) { -// type args struct { -// path string -// } -// type want struct { -// wantCfg *Data -// err error -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, *Data, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotCfg *Data, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(gotCfg, w.wantCfg) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotCfg, w.wantCfg) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// path:"", -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// path:"", -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// gotCfg, err := NewConfig(test.args.path) -// if err := checkFunc(test.want, gotCfg, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/creation/service/indexer.go b/pkg/index/job/creation/service/indexer.go deleted file mode 100644 index 73dfa221f0..0000000000 --- a/pkg/index/job/creation/service/indexer.go +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "context" - "reflect" - - agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" - "github.com/vdaas/vald/apis/grpc/v1/payload" - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc" - "github.com/vdaas/vald/internal/net/grpc/codes" - "github.com/vdaas/vald/internal/net/grpc/status" - "github.com/vdaas/vald/internal/observability/trace" - "github.com/vdaas/vald/internal/strings" - "github.com/vdaas/vald/internal/sync" -) - -const ( - apiName = "vald/index/job/create" - grpcMethodName = "core.v1.Agent/" + agent.CreateIndexRPCName -) - -// Indexer represents an interface for indexing. -type Indexer interface { - PreStart(ctx context.Context) (<-chan error, error) - Start(ctx context.Context) error -} - -type index struct { - client discoverer.Client - targetAddrs []string - targetAddrList map[string]bool - - creationPoolSize uint32 - concurrency int -} - -// New returns Indexer object if no error occurs. -func New(opts ...Option) (Indexer, error) { - idx := new(index) - for _, opt := range append(defaultOpts, opts...) { - if err := opt(idx); err != nil { - oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) - e := &errors.ErrCriticalOption{} - if errors.As(oerr, &e) { - log.Error(err) - return nil, oerr - } - log.Warn(oerr) - } - } - idx.targetAddrList = make(map[string]bool, len(idx.targetAddrs)) - for _, addr := range idx.targetAddrs { - idx.targetAddrList[addr] = true - } - return idx, nil -} - -// PreStart starts the preparation process. -func (idx *index) PreStart(ctx context.Context) (<-chan error, error) { - return idx.client.Start(ctx) -} - -// Start starts indexing process. -func (idx *index) Start(ctx context.Context) error { - ctx, span := trace.StartSpan(ctx, apiName+"/service/index.Start") - defer func() { - if span != nil { - span.End() - } - }() - - err := idx.doCreateIndex(ctx, - func(ctx context.Context, ac agent.AgentClient, copts ...grpc.CallOption) (*payload.Empty, error) { - return ac.CreateIndex(ctx, &payload.Control_CreateIndexRequest{ - PoolSize: idx.creationPoolSize, - }, copts...) - }, - ) - if err != nil { - var attrs trace.Attributes - switch { - case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): - err = status.WrapWithInternal( - agent.CreateIndexRPCName+" API connection not found", err, - ) - attrs = trace.StatusCodeInternal(err.Error()) - case errors.Is(err, errors.ErrGRPCTargetAddrNotFound): - err = status.WrapWithInternal( - agent.CreateIndexRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err, - ) - attrs = trace.StatusCodeInternal(err.Error()) - default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+agent.CreateIndexRPCName+" gRPC error response", - ) - attrs = trace.FromGRPCStatus(st.Code(), msg) - } - log.Warn(err) - if span != nil { - span.RecordError(err) - span.SetAttributes(attrs...) - span.SetStatus(trace.StatusError, err.Error()) - } - return err - } - return nil -} - -func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error)) (errs error) { - ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doCreateIndex") - defer func() { - if span != nil { - span.End() - } - }() - - targetAddrs := idx.client.GetAddrs(ctx) - if len(idx.targetAddrs) != 0 { - targetAddrs = idx.extractTargetAddrs(targetAddrs) - - // If targetAddrs is empty, an invalid target addresses may be registered in targetAddrList. - if len(targetAddrs) == 0 { - return errors.ErrGRPCTargetAddrNotFound - } - } - log.Infof("target agent addrs: %v", targetAddrs) - - var emu sync.Mutex - err := idx.client.GetClient().OrderedRangeConcurrent(ctx, targetAddrs, idx.concurrency, - func(ctx context.Context, target string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { - ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), agent.CreateIndexRPCName+"/"+target) - defer func() { - if span != nil { - span.End() - } - }() - _, err := fn(ctx, agent.NewAgentClient(conn), copts...) - if err != nil { - var attrs trace.Attributes - switch { - case errors.Is(err, context.Canceled): - err = status.WrapWithCanceled( - agent.CreateIndexRPCName+" API canceld", err, - ) - attrs = trace.StatusCodeCancelled(err.Error()) - case errors.Is(err, context.DeadlineExceeded): - err = status.WrapWithCanceled( - agent.CreateIndexRPCName+" API deadline exceeded", err, - ) - attrs = trace.StatusCodeDeadlineExceeded(err.Error()) - case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): - err = status.WrapWithInternal( - agent.CreateIndexRPCName+" API connection not found", err, - ) - attrs = trace.StatusCodeInternal(err.Error()) - case errors.Is(err, errors.ErrTargetNotFound): - err = status.WrapWithInvalidArgument( - agent.CreateIndexRPCName+" API target not found", err, - ) - attrs = trace.StatusCodeInternal(err.Error()) - default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+agent.CreateIndexRPCName+" gRPC error response", - ) - if st != nil && err != nil && st.Code() == codes.FailedPrecondition { - log.Warnf("CreateIndex of %s skipped, message: %s, err: %v", target, st.Message(), errors.Join(st.Err(), err)) - return nil - } - attrs = trace.FromGRPCStatus(st.Code(), msg) - } - log.Warnf("an error occurred in (%s) during indexing: %v", target, err) - if span != nil { - span.RecordError(err) - span.SetAttributes(attrs...) - span.SetStatus(trace.StatusError, err.Error()) - } - emu.Lock() - errs = errors.Join(errs, err) - emu.Unlock() - } - return err - }, - ) - return errors.Join(err, errs) -} - -// extractTargetAddresses filters and extracts target addresses registered in targetAddrList from the given address list. -func (idx *index) extractTargetAddrs(addrs []string) []string { - res := make([]string, 0, len(addrs)) - for _, addr := range addrs { - if !idx.targetAddrList[addr] { - log.Warnf("the gRPC target address not found: %s", addr) - } else { - res = append(res, addr) - } - } - return res -} diff --git a/pkg/index/job/creation/service/indexer_test.go b/pkg/index/job/creation/service/indexer_test.go deleted file mode 100644 index 27dd95ec9d..0000000000 --- a/pkg/index/job/creation/service/indexer_test.go +++ /dev/null @@ -1,423 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "context" - "testing" - - agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/net/grpc" - "github.com/vdaas/vald/internal/net/grpc/codes" - "github.com/vdaas/vald/internal/net/grpc/status" - "github.com/vdaas/vald/internal/test/goleak" -) - -func Test_index_Start(t *testing.T) { - t.Parallel() - type args struct { - ctx context.Context - } - type fields struct { - client discoverer.Client - targetAddrs []string - targetAddrList map[string]bool - creationPoolSize uint32 - concurrency int - } - type want struct { - err error - } - type test struct { - name string - args args - fields fields - want want - checkFunc func(want, error) error - beforeFunc func(*testing.T, args) - afterFunc func(*testing.T, args) - } - defaultCheckFunc := func(w want, err error) error { - if !errors.Is(err, w.err) { - return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) - } - return nil - } - tests := []test{ - func() test { - addrs := []string{ - "127.0.0.1:8080", - } - return test{ - name: "Success: when there is no error in the indexing request process", - args: args{ - ctx: context.Background(), - }, - - fields: fields{ - client: &mockDiscovererClient{ - GetAddrsFunc: func(_ context.Context) []string { - return addrs - }, - GetClientFunc: func() grpc.Client { - return &mockGrpcClient{ - OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, - _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, - ) error { - return nil - }, - } - }, - }, - }, - } - }(), - func() test { - addrs := []string{ - "127.0.0.1:8080", - } - return test{ - name: "Fail: when there is an error wrapped with gRPC status in the indexing request process", - args: args{ - ctx: context.Background(), - }, - fields: fields{ - client: &mockDiscovererClient{ - GetAddrsFunc: func(_ context.Context) []string { - return addrs - }, - GetClientFunc: func() grpc.Client { - return &mockGrpcClient{ - OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, - _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, - ) error { - return status.WrapWithInternal( - agent.CreateIndexRPCName+" API connection not found", - errors.ErrGRPCClientConnNotFound("*"), - ) - }, - } - }, - }, - }, - want: want{ - err: status.Error(codes.Internal, - agent.CreateIndexRPCName+" API connection not found"), - }, - } - }(), - func() test { - addrs := []string{ - "127.0.0.1:8080", - } - return test{ - name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error", - args: args{ - ctx: context.Background(), - }, - - fields: fields{ - client: &mockDiscovererClient{ - GetAddrsFunc: func(_ context.Context) []string { - return addrs - }, - GetClientFunc: func() grpc.Client { - return &mockGrpcClient{ - OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, - _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, - ) error { - return errors.ErrGRPCClientConnNotFound("*") - }, - } - }, - }, - }, - want: want{ - err: status.Error(codes.Internal, - agent.CreateIndexRPCName+" API connection not found"), - }, - } - }(), - func() test { - targetAddrs := []string{ - "127.0.0.1:8080", - } - targetAddrList := map[string]bool{ - targetAddrs[0]: true, - } - return test{ - name: "Fail: when there is no address matching targetAddrList", - args: args{ - ctx: context.Background(), - }, - fields: fields{ - client: &mockDiscovererClient{ - GetAddrsFunc: func(_ context.Context) []string { - return nil - }, - }, - targetAddrs: targetAddrs, - targetAddrList: targetAddrList, - }, - want: want{ - err: status.Error(codes.Internal, - agent.CreateIndexRPCName+" API connection target address \"127.0.0.1:8080\" not found"), - }, - } - }(), - } - - for _, tc := range tests { - test := tc - t.Run(test.name, func(tt *testing.T) { - tt.Parallel() - defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) - if test.beforeFunc != nil { - test.beforeFunc(tt, test.args) - } - if test.afterFunc != nil { - defer test.afterFunc(tt, test.args) - } - checkFunc := test.checkFunc - if test.checkFunc == nil { - checkFunc = defaultCheckFunc - } - idx := &index{ - client: test.fields.client, - targetAddrs: test.fields.targetAddrs, - targetAddrList: test.fields.targetAddrList, - creationPoolSize: test.fields.creationPoolSize, - concurrency: test.fields.concurrency, - } - - err := idx.Start(test.args.ctx) - if err := checkFunc(test.want, err); err != nil { - tt.Errorf("error = %v", err) - } - }) - } -} - -// NOT IMPLEMENTED BELOW -// -// func TestNew(t *testing.T) { -// type args struct { -// opts []Option -// } -// type want struct { -// want Indexer -// err error -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Indexer, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Indexer, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// opts:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// opts:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got, err := New(test.args.opts...) -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_index_PreStart(t *testing.T) { -// type args struct { -// ctx context.Context -// } -// type fields struct { -// client discoverer.Client -// targetAddrs []string -// targetAddrList map[string]bool -// creationPoolSize uint32 -// concurrency int -// } -// type want struct { -// want <-chan error -// err error -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, <-chan error, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got <-chan error, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// client:nil, -// targetAddrs:nil, -// targetAddrList:nil, -// creationPoolSize:0, -// concurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// client:nil, -// targetAddrs:nil, -// targetAddrList:nil, -// creationPoolSize:0, -// concurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// idx := &index{ -// client: test.fields.client, -// targetAddrs: test.fields.targetAddrs, -// targetAddrList: test.fields.targetAddrList, -// creationPoolSize: test.fields.creationPoolSize, -// concurrency: test.fields.concurrency, -// } -// -// got, err := idx.PreStart(test.args.ctx) -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/creation/service/mock_test.go b/pkg/index/job/creation/service/mock_test.go deleted file mode 100644 index 89c0f19aa5..0000000000 --- a/pkg/index/job/creation/service/mock_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "context" - - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - "github.com/vdaas/vald/internal/net/grpc" -) - -type mockDiscovererClient struct { - discoverer.Client - GetAddrsFunc func(ctx context.Context) []string - GetClientFunc func() grpc.Client -} - -func (mc *mockDiscovererClient) GetAddrs(ctx context.Context) []string { - return mc.GetAddrsFunc(ctx) -} - -func (mc *mockDiscovererClient) GetClient() grpc.Client { - return mc.GetClientFunc() -} - -type mockGrpcClient struct { - grpc.Client - OrderedRangeConcurrentFunc func(ctx context.Context, - order []string, - concurrency int, - f func(ctx context.Context, - addr string, - conn *grpc.ClientConn, - copts ...grpc.CallOption) error) error -} - -func (mc *mockGrpcClient) OrderedRangeConcurrent(ctx context.Context, - order []string, - concurrency int, - f func(ctx context.Context, - addr string, - conn *grpc.ClientConn, - copts ...grpc.CallOption) error, -) error { - return mc.OrderedRangeConcurrentFunc(ctx, order, concurrency, f) -} diff --git a/pkg/index/job/creation/service/options.go b/pkg/index/job/creation/service/options.go deleted file mode 100644 index b6c7322a25..0000000000 --- a/pkg/index/job/creation/service/options.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - "github.com/vdaas/vald/internal/errors" -) - -// Option represents the functional option for index. -type Option func(_ *index) error - -var defaultOpts = []Option{ - WithIndexingConcurrency(1), - WithCreationPoolSize(1000), -} - -// WithDiscoverer returns Option that sets discoverer client. -func WithDiscoverer(client discoverer.Client) Option { - return func(idx *index) error { - if client == nil { - return errors.NewErrCriticalOption("discoverer", client) - } - idx.client = client - return nil - } -} - -// WithIndexingConcurrency returns Option that sets indexing concurrency. -func WithIndexingConcurrency(num int) Option { - return func(idx *index) error { - if num <= 0 { - return errors.NewErrInvalidOption("indexingConcurrency", num) - } - idx.concurrency = num - return nil - } -} - -// WithCreationPoolSize returns Option that sets indexing pool size. -func WithCreationPoolSize(size uint32) Option { - return func(idx *index) error { - if size <= 0 { - return errors.NewErrInvalidOption("creationPoolSize", size) - } - idx.creationPoolSize = size - return nil - } -} - -// WithTargetAddrs returns Option that sets indexing target addresses. -func WithTargetAddrs(addrs ...string) Option { - return func(idx *index) error { - if len(addrs) != 0 { - idx.targetAddrs = append(idx.targetAddrs, addrs...) - } - return nil - } -} diff --git a/pkg/index/job/creation/service/options_test.go b/pkg/index/job/creation/service/options_test.go deleted file mode 100644 index 56b7b34cd8..0000000000 --- a/pkg/index/job/creation/service/options_test.go +++ /dev/null @@ -1,360 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -// NOT IMPLEMENTED BELOW -// -// func TestWithDiscoverer(t *testing.T) { -// type args struct { -// client discoverer.Client -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// client:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// client:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithDiscoverer(test.args.client) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestWithIndexingConcurrency(t *testing.T) { -// type args struct { -// num int -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// num:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithIndexingConcurrency(test.args.num) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestWithCreationPoolSize(t *testing.T) { -// type args struct { -// size uint32 -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// size:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// size:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithCreationPoolSize(test.args.size) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestWithTargetAddrs(t *testing.T) { -// type args struct { -// addrs []string -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// addrs:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// addrs:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithTargetAddrs(test.args.addrs...) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/creation/usecase/creation.go b/pkg/index/job/creation/usecase/creation.go deleted file mode 100644 index 7afa0f1958..0000000000 --- a/pkg/index/job/creation/usecase/creation.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package usecase - -import ( - "context" - "os" - "syscall" - - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - iconfig "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc" - "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" - "github.com/vdaas/vald/internal/observability" - "github.com/vdaas/vald/internal/runner" - "github.com/vdaas/vald/internal/safety" - "github.com/vdaas/vald/internal/servers/server" - "github.com/vdaas/vald/internal/servers/starter" - "github.com/vdaas/vald/internal/sync/errgroup" - "github.com/vdaas/vald/pkg/index/job/creation/config" - "github.com/vdaas/vald/pkg/index/job/creation/service" -) - -type run struct { - eg errgroup.Group - cfg *config.Data - observability observability.Observability - server starter.Server - indexer service.Indexer -} - -// New returns Runner instance. -func New(cfg *config.Data) (_ runner.Runner, err error) { - eg := errgroup.Get() - - dOpts, err := cfg.Creation.Discoverer.Client.Opts() - if err != nil { - return nil, err - } - // skipcq: CRT-D0001 - dOpts = append(dOpts, grpc.WithErrGroup(eg)) - - acOpts, err := cfg.Creation.Discoverer.AgentClientOptions.Opts() - if err != nil { - return nil, err - } - // skipcq: CRT-D0001 - acOpts = append(acOpts, grpc.WithErrGroup(eg)) - - discoverer, err := discoverer.New( - discoverer.WithAutoConnect(true), - discoverer.WithName(cfg.Creation.AgentName), - discoverer.WithNamespace(cfg.Creation.AgentNamespace), - discoverer.WithPort(cfg.Creation.AgentPort), - discoverer.WithServiceDNSARecord(cfg.Creation.AgentDNS), - discoverer.WithDiscovererClient(grpc.New(dOpts...)), - discoverer.WithDiscoverDuration(cfg.Creation.Discoverer.Duration), - discoverer.WithOptions(acOpts...), - discoverer.WithNodeName(cfg.Creation.NodeName), - discoverer.WithOnDiscoverFunc(func(ctx context.Context, c discoverer.Client, addrs []string) error { - last := len(addrs) - 1 - for i := 0; i < len(addrs)/2; i++ { - addrs[i], addrs[last-i] = addrs[last-i], addrs[i] - } - return nil - }), - ) - if err != nil { - return nil, err - } - - indexer, err := service.New( - service.WithDiscoverer(discoverer), - service.WithIndexingConcurrency(cfg.Creation.Concurrency), - service.WithCreationPoolSize(cfg.Creation.CreationPoolSize), - service.WithTargetAddrs(cfg.Creation.TargetAddrs...), - ) - if err != nil { - return nil, err - } - - srv, err := starter.New( - starter.WithConfig(cfg.Server), - starter.WithGRPC(func(cfg *iconfig.Server) []server.Option { - return []server.Option{ - server.WithGRPCOption( - grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), - grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), - ), - } - }), - ) - if err != nil { - return nil, err - } - - var obs observability.Observability - if cfg.Observability.Enabled { - obs, err = observability.NewWithConfig( - cfg.Observability, - ) - if err != nil { - return nil, err - } - } - - return &run{ - eg: eg, - cfg: cfg, - observability: obs, - server: srv, - indexer: indexer, - }, nil -} - -// PreStart is a method called before execution of Start, and it invokes the PreStart method of observability. -func (r *run) PreStart(ctx context.Context) error { - if r.observability != nil { - return r.observability.PreStart(ctx) - } - return nil -} - -// Start is a method used to initiate an operation in the run, and it returns a channel for receiving errors -// during the operation and an error representing any initialization errors. -func (r *run) Start(ctx context.Context) (<-chan error, error) { - ech := make(chan error, 4) - var sech, oech <-chan error - if r.observability != nil { - oech = r.observability.Start(ctx) - } - sech = r.server.ListenAndServe(ctx) - ipech, err := r.indexer.PreStart(ctx) - if err != nil { - close(ech) - return nil, err - } - - r.eg.Go(safety.RecoverFunc(func() (err error) { - defer func() { - p, err := os.FindProcess(os.Getpid()) - if err != nil { - // using Fatal to avoid this process to be zombie - // skipcq: RVV-A0003 - log.Fatalf("failed to find my pid to kill %v", err) - return - } - log.Info("sending SIGTERM to myself to stop this job") - if err := p.Signal(syscall.SIGTERM); err != nil { - log.Error(err) - } - }() - return r.indexer.Start(ctx) - })) - - r.eg.Go(safety.RecoverFunc(func() (err error) { - defer close(ech) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err = <-oech: - case err = <-sech: - case err = <-ipech: - } - if err != nil { - select { - case <-ctx.Done(): - return errors.Join(ctx.Err(), err) - case ech <- err: - } - } - } - })) - return ech, nil -} - -// PreStop is a method called before execution of Stop. -func (*run) PreStop(_ context.Context) error { - return nil -} - -// Stop is a method used to stop an operation in the run. -func (r *run) Stop(ctx context.Context) (errs error) { - if r.observability != nil { - if err := r.observability.Stop(ctx); err != nil { - errs = errors.Join(errs, err) - } - } - if r.server != nil { - if err := r.server.Shutdown(ctx); err != nil { - errs = errors.Join(errs, err) - } - } - return errs -} - -// PtopStop is a method called after execution of Stop. -func (*run) PostStop(_ context.Context) error { - return nil -} diff --git a/pkg/index/job/creation/usecase/creation_test.go b/pkg/index/job/creation/usecase/creation_test.go deleted file mode 100644 index 1eab852c2d..0000000000 --- a/pkg/index/job/creation/usecase/creation_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package usecase - -// NOT IMPLEMENTED BELOW -// -// func TestNew(t *testing.T) { -// type args struct { -// cfg *config.Data -// } -// type want struct { -// want runner.Runner -// err error -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, runner.Runner, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got runner.Runner, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// cfg:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// cfg:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got, err := New(test.args.cfg) -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// }