diff --git a/.github/workflows/dockers-index-correction.yml b/.github/workflows/dockers-index-correction.yml deleted file mode 100644 index 3d6ada85a4..0000000000 --- a/.github/workflows/dockers-index-correction.yml +++ /dev/null @@ -1,78 +0,0 @@ -# -# Copyright (C) 2019-2023 vdaas.org vald team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -name: "Build docker image: index-correction" -on: - push: - branches: - - main - tags: - - "*.*.*" - - "v*.*.*" - - "*.*.*-*" - - "v*.*.*-*" - paths: - - ".github/actions/docker-build/actions.yaml" - - ".github/workflows/dockers-index-correction.yml" - - "go.mod" - - "go.sum" - - "internal/**" - - "!internal/**/*_test.go" - - "!internal/db/**" - - "!internal/k8s/**" - - "apis/grpc/**" - - "pkg/index/job/correction/**" - - "cmd/index/job/correction/**" - - "dockers/index/job/correction/Dockerfile" - - "versions/GO_VERSION" - pull_request: - paths: - - ".github/actions/docker-build/actions.yaml" - - ".github/workflows/_docker-image.yaml" - - ".github/workflows/dockers-index-correction.yml" - - "go.mod" - - "go.sum" - - "internal/**" - - "!internal/**/*_test.go" - - "!internal/db/**" - - "!internal/k8s/**" - - "apis/grpc/**" - - "pkg/index/job/correction/**" - - "cmd/index/job/correction/**" - - "dockers/index/job/correction/Dockerfile" - - "versions/GO_VERSION" - pull_request_target: - paths: - - ".github/actions/docker-build/actions.yaml" - - ".github/workflows/_docker-image.yaml" - - ".github/workflows/dockers-index-correction.yml" - - "go.mod" - - "go.sum" - - "internal/**" - - "!internal/**/*_test.go" - - "!internal/db/**" - - "!internal/k8s/**" - - "apis/grpc/**" - - "pkg/index/job/correction/**" - - "cmd/index/job/correction/**" - - "dockers/index/job/correction/Dockerfile" - - "versions/GO_VERSION" - -jobs: - build: - uses: ./.github/workflows/_docker-image.yaml - with: - target: index-correction - secrets: inherit diff --git a/.golangci.yml b/.golangci.yml index 065e2b5550..a3bb179b38 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -51,6 +51,7 @@ linters: - gochecknoinits - goconst - godot + - godox - gofumpt - goimports - gomnd @@ -98,7 +99,6 @@ linters: # - gocognit # - gocritic # - gocyclo - # - godox # - goerr113 # - gofmt # - goheader diff --git a/Makefile b/Makefile index d859cef45d..03ec84dfa7 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,6 @@ FILTER_GATEWAY_IMAGE = $(NAME)-filter-gateway HELM_OPERATOR_IMAGE = $(NAME)-helm-operator LB_GATEWAY_IMAGE = $(NAME)-lb-gateway LOADTEST_IMAGE = $(NAME)-loadtest -INDEX_CORRECTION_IMAGE = $(NAME)-index-correction MANAGER_INDEX_IMAGE = $(NAME)-manager-index MAINTAINER = "$(ORG).org $(NAME) team <$(NAME)@$(ORG).org>" diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 5f0d2089e1..580cf34599 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -206,35 +206,6 @@ cmd/manager/index/index: \ $(dir $@)main.go $@ -version -cmd/index/job/correction/index-correction: \ - $(GO_SOURCES_INTERNAL) \ - $(PBGOS) \ - $(shell find $(ROOTDIR)/cmd/index/job/correction/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ - $(shell find $(ROOTDIR)/pkg/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') - $(eval CGO_ENABLED = 0) - CGO_ENABLED=$(CGO_ENABLED) \ - GO111MODULE=on \ - GOPRIVATE=$(GOPRIVATE) \ - go build \ - --ldflags "-w -extldflags=-static \ - -X '$(GOPKG)/internal/info.Version=$(VERSION)' \ - -X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \ - -X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \ - -X '$(GOPKG)/internal/info.GoVersion=$(GO_VERSION)' \ - -X '$(GOPKG)/internal/info.GoOS=$(GOOS)' \ - -X '$(GOPKG)/internal/info.GoArch=$(GOARCH)' \ - -X '$(GOPKG)/internal/info.CGOEnabled=$(CGO_ENABLED)' \ - -X '$(GOPKG)/internal/info.BuildCPUInfoFlags=$(CPU_INFO_FLAGS)' \ - -buildid=" \ - -mod=readonly \ - -modcacherw \ - -a \ - -tags "osusergo netgo static_build" \ - -trimpath \ - -o $@ \ - $(dir $@)main.go - $@ -version - .PHONY: binary/build/zip ## build all binaries and zip them binary/build/zip: \ diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index 5771c857e6..0997f7e1ef 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -188,17 +188,3 @@ docker/build/loadtest: -t $(ORG)/$(LOADTEST_IMAGE):$(TAG) . \ --build-arg MAINTAINER=$(MAINTAINER) \ --build-arg GO_VERSION=$(GO_VERSION) - -.PHONY: docker/name/index-correction -docker/name/index-correction: - @echo "$(ORG)/$(INDEX_CORRECTION_IMAGE)" - -.PHONY: docker/build/index-correction -## build index-correction image -docker/build/index-correction: - $(DOCKER) build \ - $(DOCKER_OPTS) \ - -f dockers/index/job/correction/Dockerfile \ - -t $(ORG)/$(INDEX_CORRECTION_IMAGE):$(TAG) . \ - --build-arg MAINTAINER=$(MAINTAINER) \ - --build-arg GO_VERSION=$(GO_VERSION) diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 0cc8c02bbb..b177e5a4e1 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -2631,38 +2631,3 @@ manager: net: dialer: keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual - # @schema {"name": "manager.index.corrector", "type": "object"} - corrector: - # @schema {"name": "manager.index.corrector.enabled", "type": "boolean"} - # manager.index.corrector.enabled -- enable index correction CronJob - enabled: false - # @schema {"name": "manager.index.corrector.check_duration", "type": "string"} - # manager.index.corrector.enabled -- check duration of index correction CronJob - check_duration: 24h - # @schema {"name": "manager.index.corrector.stream_list_concurrency", "type": "integer", "minimum": 1} - # manager.index.corrector.stream_list_concurrency -- concurrency for stream list object rpc - stream_list_concurrency: 200 - # @schema {"name": "manager.index.corrector.bbolt_async_write_concurrency", "type": "integer", "minimum": 1} - # manager.index.corrector.bbolt_async_write_concurrency -- concurrency for bbolt async write - bbolt_async_write_concurrency: 2048 - # @schema {"name": "manager.index.corrector.agent_namespace", "type": "string"} - # manager.index.corrector.agent_namespace -- namespace of agent pods to manage - agent_namespace: _MY_POD_NAMESPACE_ - # @schema {"name": "manager.index.corrector.node_name", "type": "string"} - # manager.index.corrector.node_name -- node name - node_name: "" # _MY_NODE_NAME_ - # @schema {"name": "manager.index.corrector.discoverer", "type": "object"} - discoverer: - # @schema {"name": "manager.index.corrector.discoverer.duration", "type": "string"} - # manager.index.corrector.discoverer.duration -- refresh duration to discover - duration: 500ms - # @schema {"name": "manager.index.corrector.discoverer.client", "alias": "grpc.client"} - # manager.index.corrector.discoverer.client -- gRPC client for discoverer (overrides defaults.grpc.client) - client: {} - # @schema {"name": "manager.index.corrector.discoverer.agent_client_options", "alias": "grpc.client"} - # manager.index.corrector.discoverer.agent_client_options -- gRPC client options for agents (overrides defaults.grpc.client) - agent_client_options: - dial_option: - net: - dialer: - keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual diff --git a/cmd/index/job/correction/main.go b/cmd/index/job/correction/main.go deleted file mode 100644 index d549626691..0000000000 --- a/cmd/index/job/correction/main.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package main - -import ( - "context" - - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/info" - "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/runner" - "github.com/vdaas/vald/internal/safety" - "github.com/vdaas/vald/pkg/index/job/correction/config" - "github.com/vdaas/vald/pkg/index/job/correction/usecase" -) - -const ( - maxVersion = "v0.0.10" - minVersion = "v0.0.0" - name = "index correction job" -) - -func main() { - if err := safety.RecoverFunc(func() error { - return runner.Do( - context.Background(), - runner.WithName(name), - runner.WithVersion(info.Version, maxVersion, minVersion), - runner.WithConfigLoader(func(path string) (interface{}, *config.GlobalConfig, error) { - cfg, err := config.NewConfig(path) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") - } - return cfg, &cfg.GlobalConfig, nil - }), - runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) { - c, ok := cfg.(*config.Data) - if !ok { - return nil, errors.ErrInvalidConfig - } - return usecase.New(c) - }), - ) - })(); err != nil { - log.Fatal(err, info.Get()) - return - } -} diff --git a/cmd/index/job/correction/sample.yaml b/cmd/index/job/correction/sample.yaml deleted file mode 100644 index 09ad7dc5ca..0000000000 --- a/cmd/index/job/correction/sample.yaml +++ /dev/null @@ -1,234 +0,0 @@ -# -# Copyright (C) 2019-2023 vdaas.org vald team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - ---- -version: v0.0.0 -time_zone: JST -logging: - format: raw - level: info - logger: glg -server_config: - servers: - - name: grpc - host: 0.0.0.0 - port: 8081 - grpc: - bidirectional_stream_concurrency: 20 - connection_timeout: "" - header_table_size: 0 - initial_conn_window_size: 0 - initial_window_size: 0 - interceptors: [] - keepalive: - max_conn_age: "" - max_conn_age_grace: "" - max_conn_idle: "" - time: "" - timeout: "" - max_header_list_size: 0 - max_receive_message_size: 0 - max_send_message_size: 0 - read_buffer_size: 0 - write_buffer_size: 0 - mode: GRPC - probe_wait_time: 3s - restart: true - health_check_servers: - - name: readiness - host: 0.0.0.0 - port: 3001 - http: - handler_timeout: "" - idle_timeout: "" - read_header_timeout: "" - read_timeout: "" - shutdown_duration: 0s - write_timeout: "" - mode: "" - probe_wait_time: 3s - metrics_servers: - startup_strategy: - - grpc - - readiness - full_shutdown_duration: 600s - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key -gateway: - index_replica: 3 -corrector: - agent_port: 8081 - agent_name: "vald-agent-ngt" - agent_dns: vald-agent-ngt.default.svc.cluster.local - agent_namespace: "default" - node_name: "" - stream_list_concurrency: 200 - bbolt_async_write_concurrency: 2048 - discoverer: - duration: 500ms - client: - addrs: - - vald-discoverer.default.svc.cluster.local:8081 - health_check_duration: "1s" - connection_pool: - enable_dns_resolver: true - enable_rebalance: true - old_conn_close_duration: 3s - rebalance_duration: 30m - size: 3 - backoff: - backoff_factor: 1.1 - backoff_time_limit: 5s - enable_error_log: true - initial_duration: 5ms - jitter_limit: 100ms - maximum_duration: 5s - retry_count: 100 - call_option: - max_recv_msg_size: 0 - max_retry_rpc_buffer_size: 0 - max_send_msg_size: 0 - wait_for_ready: true - dial_option: - backoff_base_delay: 1s - backoff_jitter: 0.2 - backoff_max_delay: 120s - backoff_multiplier: 1.6 - enable_backoff: false - initial_connection_window_size: 0 - initial_window_size: 0 - insecure: true - keepalive: - permit_without_stream: false - time: "" - timeout: "" - max_msg_size: 0 - min_connection_timeout: 20s - read_buffer_size: 0 - tcp: - dialer: - dual_stack_enabled: true - keepalive: "" - timeout: "" - dns: - cache_enabled: true - cache_expiration: 1h - refresh_duration: 30m - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key - timeout: "" - write_buffer_size: 0 - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key - agent_client_options: - addrs: [] - health_check_duration: "1s" - connection_pool: - enable_dns_resolver: true - enable_rebalance: true - old_conn_close_duration: 3s - rebalance_duration: 30m - size: 3 - backoff: - backoff_factor: 1.1 - backoff_time_limit: 5s - enable_error_log: true - initial_duration: 5ms - jitter_limit: 100ms - maximum_duration: 5s - retry_count: 100 - call_option: - max_recv_msg_size: 0 - max_retry_rpc_buffer_size: 0 - max_send_msg_size: 0 - wait_for_ready: true - dial_option: - write_buffer_size: 0 - read_buffer_size: 0 - initial_window_size: 0 - initial_connection_window_size: 0 - max_msg_size: 0 - backoff_max_delay: "120s" - backoff_base_delay: "1s" - backoff_multiplier: 1.6 - backoff_jitter: 0.2 - min_connection_timeout: "20s" - enable_backoff: false - insecure: true - timeout: "" - tcp: - dns: - cache_enabled: true - cache_expiration: 1h - refresh_duration: 30m - dialer: - timeout: "" - keepalive: "15m" - dual_stack_enabled: true - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key - keepalive: - permit_without_stream: false - time: "" - timeout: "" - tls: - ca: /path/to/ca - cert: /path/to/cert - enabled: false - key: /path/to/key -observability: - enabled: false - otlp: - collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" - trace_batch_timeout: "1s" - trace_export_timeout: "1m" - trace_max_export_batch_size: 1024 - trace_max_queue_size: 256 - metrics_export_interval: "1s" - metrics_export_timeout: "1m" - attribute: - namespace: "_MY_POD_NAMESPACE_" - pod_name: "_MY_POD_NAME_" - node_name: "_MY_NODE_NAME_" - service_name: "vald-index-job-correction" - metrics: - enable_cgo: true - enable_goroutine: true - enable_memory: true - enable_version_info: true - version_info_labels: - - vald_version - - server_name - - git_commit - - build_time - - go_version - - go_os - - go_arch - - ngt_version - trace: - enabled: true diff --git a/dockers/index/job/correction/Dockerfile b/dockers/index/job/correction/Dockerfile deleted file mode 100644 index 1938e2531f..0000000000 --- a/dockers/index/job/correction/Dockerfile +++ /dev/null @@ -1,93 +0,0 @@ -# -# Copyright (C) 2019-2023 vdaas.org vald team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# You may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -ARG GO_VERSION=latest -ARG DISTROLESS_IMAGE=gcr.io/distroless/static -ARG DISTROLESS_IMAGE_TAG=nonroot -ARG MAINTAINER="vdaas.org vald team " - -FROM golang:${GO_VERSION} AS golang - -FROM ubuntu:devel AS builder - -ENV GO111MODULE on -ENV DEBIAN_FRONTEND noninteractive -ENV INITRD No -ENV LANG en_US.UTF-8 -ENV GOROOT /opt/go -ENV GOPATH /go -ENV PATH ${PATH}:${GOROOT}/bin:${GOPATH}/bin -ENV ORG vdaas -ENV REPO vald -ENV PKG index/job/correction -ENV APP_NAME index-correction - -# skipcq: DOK-DL3008 -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates \ - build-essential \ - curl \ - upx \ - git \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - -COPY --from=golang /usr/local/go $GOROOT -RUN mkdir -p "$GOPATH/src" - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/Makefile.d -COPY Makefile.d . -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} -COPY Makefile . -COPY .git . -COPY go.mod . -COPY go.sum . - -RUN make go/download - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/internal -COPY internal . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/apis/grpc -COPY apis/grpc . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG} -COPY pkg/${PKG} . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} -COPY cmd/${PKG} . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/versions -COPY versions . - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} -RUN make REPO=${ORG} NAME=${REPO} cmd/${PKG}/${APP_NAME} \ - && mv "cmd/${PKG}/${APP_NAME}" "/usr/bin/${APP_NAME}" - -WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} -RUN cp sample.yaml /tmp/config.yaml - -FROM ${DISTROLESS_IMAGE}:${DISTROLESS_IMAGE_TAG} -LABEL maintainer="${MAINTAINER}" - -ENV APP_NAME index-correction - -COPY --from=builder /usr/bin/${APP_NAME} /go/bin/${APP_NAME} -COPY --from=builder /tmp/config.yaml /etc/server/config.yaml - -USER nonroot:nonroot - -ENTRYPOINT ["/go/bin/index-correction"] diff --git a/internal/config/corrector.go b/internal/config/corrector.go deleted file mode 100644 index 2b5b56beec..0000000000 --- a/internal/config/corrector.go +++ /dev/null @@ -1,79 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// Package config providers configuration type and load configuration logic -package config - -// Corrector represents the index correction configurations. -type Corrector struct { - // AgentPort represent agent port number - AgentPort int `json:"agent_port" yaml:"agent_port"` - - // AgentName represent agents meta_name for service discovery - AgentName string `json:"agent_name" yaml:"agent_name"` - - // AgentNamespace represent agent namespace location - AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"` - - // AgentDNS represent agents dns A record for service discovery - AgentDNS string `json:"agent_dns" yaml:"agent_dns"` - - // NodeName represents node name - NodeName string `json:"node_name" yaml:"node_name"` - - // StreamConcurrency represent stream concurrency for StreamListObject rpc client - // this directly affects the memory usage of this job - StreamListConcurrency int `json:"stream_list_concurrency" yaml:"stream_list_concurrency"` - - // BboltAsyncWriteConcurrency represent concurrency for bbolt async write - BboltAsyncWriteConcurrency int `json:"bbolt_async_write_concurrency" yaml:"bbolt_async_write_concurrency"` - - // IndexReplica represent index replica count. This should be equal to the lb setting - IndexReplica int `json:"index_replica" yaml:"index_replica"` - - // Discoverer represent agent discoverer service configuration - Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` -} - -// Bind binds the actual data from the Indexer receiver field. -func (c *Corrector) Bind() *Corrector { - c.AgentName = GetActualValue(c.AgentName) - c.AgentNamespace = GetActualValue(c.AgentNamespace) - c.AgentDNS = GetActualValue(c.AgentDNS) - c.NodeName = GetActualValue(c.NodeName) - - if c.Discoverer != nil { - c.Discoverer = c.Discoverer.Bind() - } - return c -} - -// GetStreamListConcurrency returns the StreamListConcurrency field value if set, otherwise 200 is set, -// since not setting this could use up all the available momory -func (c *Corrector) GetStreamListConcurrency() int { - if c != nil { - return c.StreamListConcurrency - } - return 200 //nolint:gomnd -} - -// GetBboltAsyncWriteConcurrency returns 2048 when not specified since not setting this could use up all the available momory -func (c *Corrector) GetBboltAsyncWriteConcurrency() int { - if c != nil { - return c.BboltAsyncWriteConcurrency - } - return 2048 //nolint:gomnd -} diff --git a/internal/config/corrector_test.go b/internal/config/corrector_test.go deleted file mode 100644 index a66af0181a..0000000000 --- a/internal/config/corrector_test.go +++ /dev/null @@ -1,379 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package config - -// NOT IMPLEMENTED BELOW -// -// func TestCorrector_Bind(t *testing.T) { -// type fields struct { -// AgentPort int -// AgentName string -// AgentNamespace string -// AgentDNS string -// CreationPoolSize uint32 -// NodeName string -// StreamListConcurrency int -// BboltAsyncWriteConcurrency int -// Discoverer *DiscovererClient -// } -// type want struct { -// want *Corrector -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, *Corrector) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got *Corrector) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// CreationPoolSize:0, -// NodeName:"", -// StreamListConcurrency:0, -// BboltAsyncWriteConcurrency:0, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// CreationPoolSize:0, -// NodeName:"", -// StreamListConcurrency:0, -// BboltAsyncWriteConcurrency:0, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &Corrector{ -// AgentPort: test.fields.AgentPort, -// AgentName: test.fields.AgentName, -// AgentNamespace: test.fields.AgentNamespace, -// AgentDNS: test.fields.AgentDNS, -// CreationPoolSize: test.fields.CreationPoolSize, -// NodeName: test.fields.NodeName, -// StreamListConcurrency: test.fields.StreamListConcurrency, -// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency, -// Discoverer: test.fields.Discoverer, -// } -// -// got := c.Bind() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestCorrector_GetStreamListConcurrency(t *testing.T) { -// type fields struct { -// AgentPort int -// AgentName string -// AgentNamespace string -// AgentDNS string -// CreationPoolSize uint32 -// NodeName string -// StreamListConcurrency int -// BboltAsyncWriteConcurrency int -// Discoverer *DiscovererClient -// } -// type want struct { -// want int -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, int) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got int) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// CreationPoolSize:0, -// NodeName:"", -// StreamListConcurrency:0, -// BboltAsyncWriteConcurrency:0, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// CreationPoolSize:0, -// NodeName:"", -// StreamListConcurrency:0, -// BboltAsyncWriteConcurrency:0, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &Corrector{ -// AgentPort: test.fields.AgentPort, -// AgentName: test.fields.AgentName, -// AgentNamespace: test.fields.AgentNamespace, -// AgentDNS: test.fields.AgentDNS, -// CreationPoolSize: test.fields.CreationPoolSize, -// NodeName: test.fields.NodeName, -// StreamListConcurrency: test.fields.StreamListConcurrency, -// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency, -// Discoverer: test.fields.Discoverer, -// } -// -// got := c.GetStreamListConcurrency() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func TestCorrector_GetBboltAsyncWriteConcurrency(t *testing.T) { -// type fields struct { -// AgentPort int -// AgentName string -// AgentNamespace string -// AgentDNS string -// CreationPoolSize uint32 -// NodeName string -// StreamListConcurrency int -// BboltAsyncWriteConcurrency int -// Discoverer *DiscovererClient -// } -// type want struct { -// want int -// } -// type test struct { -// name string -// fields fields -// want want -// checkFunc func(want, int) error -// beforeFunc func(*testing.T) -// afterFunc func(*testing.T) -// } -// defaultCheckFunc := func(w want, got int) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// CreationPoolSize:0, -// NodeName:"", -// StreamListConcurrency:0, -// BboltAsyncWriteConcurrency:0, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// fields: fields { -// AgentPort:0, -// AgentName:"", -// AgentNamespace:"", -// AgentDNS:"", -// CreationPoolSize:0, -// NodeName:"", -// StreamListConcurrency:0, -// BboltAsyncWriteConcurrency:0, -// Discoverer:DiscovererClient{}, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T,) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T,) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &Corrector{ -// AgentPort: test.fields.AgentPort, -// AgentName: test.fields.AgentName, -// AgentNamespace: test.fields.AgentNamespace, -// AgentDNS: test.fields.AgentDNS, -// CreationPoolSize: test.fields.CreationPoolSize, -// NodeName: test.fields.NodeName, -// StreamListConcurrency: test.fields.StreamListConcurrency, -// BboltAsyncWriteConcurrency: test.fields.BboltAsyncWriteConcurrency, -// Discoverer: test.fields.Discoverer, -// } -// -// got := c.GetBboltAsyncWriteConcurrency() -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/internal/errors/corrector.go b/internal/errors/corrector.go deleted file mode 100644 index 757c66820e..0000000000 --- a/internal/errors/corrector.go +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// Package errors provides error types and function -package errors - -// ErrIndexReplicaOne represents an error that nothing to correct when index replica is 1. -var ErrIndexReplicaOne = New("nothing to correct when index replica is 1") - -// ErrNoAvailableAgentToInsert represents an error that no available agent to insert replica. -var ErrNoAvailableAgentToInsert = New("no available agent to insert replica") - -// ErrFailedToCorrectReplicaNum represents an error that failed to correct replica number after correction process. -var ErrFailedToCorrectReplicaNum = New("failed to correct replica number after correction process") diff --git a/internal/net/grpc/context.go b/internal/net/grpc/context.go index 90e5ee32a6..a95bfca402 100644 --- a/internal/net/grpc/context.go +++ b/internal/net/grpc/context.go @@ -20,30 +20,28 @@ import ( type contextKey string -// GRPCMethodContextKey represents a context key for gRPC method. -// This is exported only for testing. -const GRPCMethodContextKey contextKey = "grpc_method" +const grpcMethodContextKey contextKey = "grpc_method" // WrapGRPCMethod returns a copy of parent in which the method associated with key (grpcMethodContextKey). func WrapGRPCMethod(ctx context.Context, method string) context.Context { m := FromGRPCMethod(ctx) if m == "" { - return context.WithValue(ctx, GRPCMethodContextKey, method) + return context.WithValue(ctx, grpcMethodContextKey, method) } if strings.HasSuffix(m, method) { return ctx } - return context.WithValue(ctx, GRPCMethodContextKey, m+"/"+method) + return context.WithValue(ctx, grpcMethodContextKey, m+"/"+method) } // WithGRPCMethod returns a copy of parent in which the method associated with key (grpcMethodContextKey). func WithGRPCMethod(ctx context.Context, method string) context.Context { - return context.WithValue(ctx, GRPCMethodContextKey, method) + return context.WithValue(ctx, grpcMethodContextKey, method) } // FromGRPCMethod returns the value associated with this context for key (grpcMethodContextKey). func FromGRPCMethod(ctx context.Context) string { - if v := ctx.Value(GRPCMethodContextKey); v != nil { + if v := ctx.Value(grpcMethodContextKey); v != nil { if method, ok := v.(string); ok { return method } diff --git a/internal/servers/server/server.go b/internal/servers/server/server.go index 7d094906c5..904e76d371 100644 --- a/internal/servers/server/server.go +++ b/internal/servers/server/server.go @@ -28,7 +28,6 @@ import ( "time" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net" "github.com/vdaas/vald/internal/net/control" @@ -129,8 +128,6 @@ type grpcKeepalive struct { permitWithoutStream bool } -// New returns Server implementation. -// skipcq: GO-R1005 func New(opts ...Option) (Server, error) { srv := new(server) @@ -256,7 +253,6 @@ func (s *server) Name() string { return s.name } -// skipcq: GO-R1005 func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err error) { if !s.IsRunning() { s.mu.Lock() @@ -278,9 +274,8 @@ func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err erro return s.network.String() }(), func() string { if s.network == net.UNIX { - if s.socketPath == "" { - sockFile := strings.Join([]string{s.name, strconv.Itoa(os.Getpid()), "sock"}, ".") - s.socketPath = file.Join(os.TempDir(), sockFile) + if len(s.socketPath) == 0 { + s.socketPath = os.TempDir() + string(os.PathSeparator) + s.name + "." + strconv.Itoa(os.Getpid()) + ".sock" } return s.socketPath } @@ -339,12 +334,12 @@ func (s *server) ListenAndServe(ctx context.Context, ech chan<- error) (err erro s.mu.RUnlock() log.Infof("%s server %s stopped", s.mode.String(), s.name) } + return nil })) } return nil } -// skipcq: GO-R1005 func (s *server) Shutdown(ctx context.Context) (rerr error) { if !s.IsRunning() { return nil @@ -391,7 +386,7 @@ func (s *server) Shutdown(ctx context.Context) (rerr error) { } } - if s.socketPath != "" { + if len(s.socketPath) != 0 { defer func() { err := os.RemoveAll(s.socketPath) if err != nil { diff --git a/internal/test/mock/grpc_testify_mock.go b/internal/test/mock/grpc_testify_mock.go index 9083d65554..5fcd6a41fa 100644 --- a/internal/test/mock/grpc_testify_mock.go +++ b/internal/test/mock/grpc_testify_mock.go @@ -15,13 +15,9 @@ package mock import ( "context" - "fmt" "github.com/stretchr/testify/mock" "github.com/vdaas/vald/apis/grpc/v1/payload" - "github.com/vdaas/vald/internal/backoff" - "github.com/vdaas/vald/internal/net/grpc/pool" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -67,139 +63,3 @@ func (losm *ListObjectStreamMock) Send(res *payload.Object_List_Response) error args := losm.Called(res) return args.Error(0) } - -type ClientInternal struct { - mock.Mock -} - -type ( - CallOption = grpc.CallOption - DialOption = pool.DialOption - ClientConn = pool.ClientConn -) - -func (c *ClientInternal) StartConnectionMonitor(ctx context.Context) (<-chan error, error) { - args := c.Called(ctx) - return args.Get(0).(<-chan error), args.Error(1) -} - -func (c *ClientInternal) Connect(ctx context.Context, addr string, dopts ...DialOption) (pool.Conn, error) { - args := c.Called(ctx, addr, dopts) - return args.Get(0).(pool.Conn), args.Error(1) -} - -func (c *ClientInternal) IsConnected(ctx context.Context, addr string) bool { - args := c.Called(ctx, addr) - return args.Bool(0) -} - -func (c *ClientInternal) Disconnect(ctx context.Context, addr string) error { - args := c.Called(ctx, addr) - return args.Error(0) -} - -func (c *ClientInternal) Range(ctx context.Context, - f func(ctx context.Context, - addr string, - conn *ClientConn, - copts ...CallOption) error, -) error { - args := c.Called(ctx, f) - return args.Error(0) -} - -func (c *ClientInternal) RangeConcurrent(ctx context.Context, - concurrency int, - f func(ctx context.Context, - addr string, - conn *ClientConn, - copts ...CallOption) error, -) error { - args := c.Called(ctx, concurrency, f) - return args.Error(0) -} - -func (c *ClientInternal) OrderedRange(ctx context.Context, - order []string, - f func(ctx context.Context, - addr string, - conn *ClientConn, - copts ...CallOption) error, -) error { - args := c.Called(ctx, order, f) - return args.Error(0) -} - -func (c *ClientInternal) OrderedRangeConcurrent(ctx context.Context, - order []string, - concurrency int, - f func(ctx context.Context, - addr string, - conn *ClientConn, - copts ...CallOption) error, -) error { - args := c.Called(ctx, order, concurrency, f) - return args.Error(0) -} - -func (c *ClientInternal) Do(ctx context.Context, addr string, - f func(ctx context.Context, - conn *ClientConn, - copts ...CallOption) (interface{}, error), -) (interface{}, error) { - args := c.Called(ctx, addr, f) - return args.Get(0), args.Error(1) -} - -func (c *ClientInternal) RoundRobin(ctx context.Context, f func(ctx context.Context, - conn *ClientConn, - copts ...CallOption) (interface{}, error), -) (interface{}, error) { - args := c.Called(ctx, f) - return args.Get(0), args.Error(1) -} - -func (c *ClientInternal) GetDialOption() []DialOption { - args := c.Called() - v, ok := args.Get(0).([]DialOption) - if !ok { - // panic here like testify mock does - panic(fmt.Sprintf("The provided arg(%v) is not type []DialOption", args.Get(0))) - } - return v -} - -func (c *ClientInternal) GetCallOption() []CallOption { - args := c.Called() - v, ok := args.Get(0).([]CallOption) - if !ok { - // panic here like testify mock does - panic(fmt.Sprintf("The provided arg(%v) is not type []CallOption", args.Get(0))) - } - return v -} - -func (c *ClientInternal) GetBackoff() backoff.Backoff { - args := c.Called() - v, ok := args.Get(0).(backoff.Backoff) - if !ok { - // panic here like testify mock does - panic(fmt.Sprintf("The provided arg(%v) is not type backoff.Backoff", args.Get(0))) - } - return v -} - -func (c *ClientInternal) ConnectedAddrs() []string { - args := c.Called() - v, ok := args.Get(0).([]string) - if !ok { - // panic here like testify mock does - panic(fmt.Sprintf("The provided arg(%v) is not type []string", args.Get(0))) - } - return v -} - -func (c *ClientInternal) Close(ctx context.Context) error { - args := c.Called(ctx) - return args.Error(0) -} diff --git a/pkg/index/job/correction/config/config.go b/pkg/index/job/correction/config/config.go deleted file mode 100644 index 70a48b5baa..0000000000 --- a/pkg/index/job/correction/config/config.go +++ /dev/null @@ -1,76 +0,0 @@ -// -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// Package setting stores all server application settings -package config - -import ( - "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errors" -) - -type GlobalConfig = config.GlobalConfig - -// Data represents a application setting data content (config.yaml). -// In K8s environment, this configuration is stored in K8s ConfigMap. -type Data struct { - config.GlobalConfig `json:",inline" yaml:",inline"` - - // Server represent all server configurations - Server *config.Servers `json:"server_config" yaml:"server_config"` - - // Observability represent observability configurations - Observability *config.Observability `json:"observability" yaml:"observability"` - - // Indexer represent agent auto indexing service configuration - Corrector *config.Corrector `json:"corrector" yaml:"corrector"` -} - -func NewConfig(path string) (cfg *Data, err error) { - cfg = new(Data) - - err = config.Read(path, &cfg) - - if err != nil { - return nil, err - } - - if cfg != nil { - cfg.Bind() - } else { - return nil, errors.ErrInvalidConfig - } - - if cfg.Server != nil { - cfg.Server = cfg.Server.Bind() - } else { - return nil, errors.ErrInvalidConfig - } - - if cfg.Observability != nil { - cfg.Observability = cfg.Observability.Bind() - } else { - cfg.Observability = new(config.Observability).Bind() - } - - if cfg.Corrector != nil { - cfg.Corrector = cfg.Corrector.Bind() - } else { - cfg.Corrector = new(config.Corrector).Bind() - } - - return cfg, nil -} diff --git a/pkg/index/job/correction/config/config_test.go b/pkg/index/job/correction/config/config_test.go deleted file mode 100644 index 0cf6858bf7..0000000000 --- a/pkg/index/job/correction/config/config_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package config - -// NOT IMPLEMENTED BELOW -// -// func TestNewConfig(t *testing.T) { -// type args struct { -// path string -// } -// type want struct { -// wantCfg *Data -// err error -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, *Data, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotCfg *Data, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(gotCfg, w.wantCfg) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotCfg, w.wantCfg) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// path:"", -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// path:"", -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// gotCfg, err := NewConfig(test.args.path) -// if err := checkFunc(test.want, gotCfg, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/correction/service/corrector.go b/pkg/index/job/correction/service/corrector.go deleted file mode 100644 index f5553c73ba..0000000000 --- a/pkg/index/job/correction/service/corrector.go +++ /dev/null @@ -1,596 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "cmp" - "context" - "fmt" - "io" - "os" - "slices" - "sync/atomic" - "time" - - agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" - "github.com/vdaas/vald/apis/grpc/v1/payload" - "github.com/vdaas/vald/apis/grpc/v1/vald" - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - "github.com/vdaas/vald/internal/db/kvs/bbolt" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/file" - "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc" - "github.com/vdaas/vald/internal/net/grpc/codes" - "github.com/vdaas/vald/internal/net/grpc/status" - "github.com/vdaas/vald/internal/safety" - "github.com/vdaas/vald/internal/sync" - "github.com/vdaas/vald/internal/sync/errgroup" - "github.com/vdaas/vald/pkg/index/job/correction/config" -) - -type contextTimeKey string - -const ( - insertMethod = "core.v1.Vald/Insert" - updateMethod = "core.v1.Vald/Update" - deleteMethod = "core.v1.Vald/Delete" - correctionStartTimeKey contextTimeKey = "correctionStartTimeKey" -) - -type Corrector interface { - Start(ctx context.Context) (<-chan error, error) - PreStop(ctx context.Context) error -} - -type correct struct { - cfg *config.Data - discoverer discoverer.Client - agentAddrs []string - indexInfos sync.Map[string, *payload.Info_Index_Count] - uuidsCount uint32 - uncommittedUUIDsCount uint32 - checkedID bbolt.Bbolt -} - -const filemode = 0o600 - -func New(cfg *config.Data, discoverer discoverer.Client) (Corrector, error) { - d := file.Join(os.TempDir(), "bbolt") - file.MkdirAll(d, os.ModePerm) - dbfile := file.Join(d, "checkedid.db") - bolt, err := bbolt.New(dbfile, "", os.FileMode(filemode)) - if err != nil { - return nil, err - } - - return &correct{ - cfg: cfg, - discoverer: discoverer, - checkedID: bolt, - }, nil -} - -func (c *correct) Start(ctx context.Context) (<-chan error, error) { - // set current time to context - ctx = embedTime(ctx) - - dech, err := c.discoverer.Start(ctx) - if err != nil { - return nil, err - } - - // addrs is sorted by the memory usage of each agent(descending order) - // this is decending because it's supposed to be used for index manager to decide - // which pod to make a create index rpc(higher memory, first to commit) - c.agentAddrs = c.discoverer.GetAddrs(ctx) - log.Debug("agent addrs found:", c.agentAddrs) - - if l := len(c.agentAddrs); l <= 1 { - log.Warn("only %d agent found, there must be more than two agents for correction to happen", l) - return nil, err - } - - err = c.loadInfos(ctx) - if err != nil { - return nil, err - } - - c.indexInfos.Range(func(addr string, info *payload.Info_Index_Count) bool { - log.Infof("index info: addr(%s), stored(%d), uncommitted(%d)", addr, info.GetStored(), info.GetUncommitted()) - return true - }) - - log.Info("starting correction with bbolt disk cache...") - if err := c.correct(ctx); err != nil { - log.Errorf("there's some errors while correction: %v", err) - return nil, err - } - log.Info("correction finished successfully") - - return dech, nil -} - -func (c *correct) PreStop(_ context.Context) error { - log.Info("removing persistent cache files...") - return c.checkedID.Close(true) -} - -// skipcq: GO-R1005 -func (c *correct) correct(ctx context.Context) (err error) { - // leftAgentAddrs is the agents' addr that hasn't been corrected yet. - // This is used to know which agents possibly have the same index as the target replica. - // We can say this because, thanks to caching, there is no way that the target replica is - // in the agent that has already been corrected. - - // Vector with time after this should not be processed - correctionStartTime, err := correctionStartTime(ctx) - if err != nil { - log.Errorf("cannot determine correction start time: %w", err) - return err - } - - curTargetAgent := 0 - if err := c.discoverer.GetClient().OrderedRange(ctx, c.agentAddrs, - func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { - // current address is the leftAgentAddrs[0] because this is OrderedRange and - // leftAgentAddrs is copied from c.agentAddrs - defer func() { - curTargetAgent++ - }() - - // context and errgroup for stream.Recv and correction - sctx, scancel := context.WithCancel(ctx) - defer scancel() - seg, sctx := errgroup.WithContext(sctx) - sconcurrency := c.cfg.Corrector.GetStreamListConcurrency() - seg.SetLimit(sconcurrency) - - // errgroup for bbolt AsyncSet - bolteg, ctx := errgroup.WithContext(ctx) - bconcurrency := c.cfg.Corrector.GetBboltAsyncWriteConcurrency() - bolteg.SetLimit(bconcurrency) - - var mu sync.Mutex - log.Infof("starting correction for agent %s, stream concurrency: %d, bbolt concurrency: %d", addr, sconcurrency, bconcurrency) - - vc := vald.NewValdClient(conn) - stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{}) - if err != nil { - return err - } - - // The number of items to be received in advance is not known in advance. - // This is because there is a possibility of new items being inserted during processing. - for { - select { - case <-sctx.Done(): - if !errors.Is(sctx.Err(), context.Canceled) { - log.Errorf("context done unexpectedly: %v", sctx.Err()) - } - - // Finalize - err = seg.Wait() - if err != nil { - log.Errorf("err group returned error: %v", err) - } - - berr := bolteg.Wait() - if berr != nil { - log.Errorf("bbolt err group returned error: %v", err) - err = errors.Join(err, berr) - } else { - log.Info("bbolt all batch finished") - } - - log.Infof("correction finished for agent %s", addr) - return err - - default: - seg.Go(safety.RecoverFunc(func() error { - mu.Lock() - // As long as we don't stream.Recv() from the stream, we do not consume the memory of the message. - // So by limiting the number of this errgroup.Go instances, we can limit the memory usage - // https://github.com/grpc/grpc-go/blob/33f9fa2e6e5bcf4cf8fe45133e23779ae6e43f6c/rpc_util.go#L795 - res, err := stream.Recv() - mu.Unlock() - - if errors.Is(err, io.EOF) { - scancel() - return nil - } - if err != nil { - log.Errorf("StreamListObject stream finished unexpectedly: %v", err) - return err - } - - vec := res.GetVector() - if vec == nil { - st := res.GetStatus() - log.Error(st.GetCode(), st.GetMessage(), st.GetDetails()) - // continue - return nil - } - - // skip if the vector is inserted after correction start - if vec.GetTimestamp() > correctionStartTime.UnixNano() { - log.Debugf("timestamp of vector(id: %s, timestamp: %v) is newer than correction start time(%v). skipping...", - vec.GetId(), - vec.GetTimestamp(), - correctionStartTime.UnixNano(), - ) - return nil - } - - // check if the index is already checked - id := vec.GetId() - _, ok, err := c.checkedID.Get([]byte(id)) - if err != nil { - log.Errorf("failed to perform Get from bbolt: %v", err) - } - if ok { - // already checked index - return nil - } - - if err := c.checkConsistency( - ctx, - &vectorReplica{ - addr: addr, - vec: vec, - }, - curTargetAgent, - ); err != nil { - log.Errorf("failed to check consistency: %v", err) - return nil // continue other processes - } - - // now this id is checked so set it to the disk cache - c.checkedID.AsyncSet(bolteg, []byte(id), nil) - - return nil - })) - } - } - }, - ); err != nil { - log.Errorf("failed to range over agents(%v): %v", c.agentAddrs, err) - return err - } - - return nil -} - -type vectorReplica struct { - addr string - vec *payload.Object_Vector -} - -// Validate len(addrs) >= 2 before calling this function -func (c *correct) checkConsistency(ctx context.Context, targetReplica *vectorReplica, targetAgentIdx int) error { - // leftAgentAddrs is the agents' addr that hasn't been corrected yet. - leftAgentAddrs := c.agentAddrs[targetAgentIdx+1:] - - // Vector with time after this should not be processed - correctionStartTime, err := correctionStartTime(ctx) - if err != nil { - log.Errorf("cannot determine correction start time: %w", err) - return err - } - - foundReplicas := make([]*vectorReplica, 0, len(c.agentAddrs)) - var mu sync.Mutex - if err := c.discoverer.GetClient().OrderedRangeConcurrent(ctx, leftAgentAddrs, len(leftAgentAddrs), - func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { - vec, err := vald.NewValdClient(conn).GetObject(ctx, &payload.Object_VectorRequest{ - Id: &payload.Object_ID{ - Id: targetReplica.vec.GetId(), - }, - }) - if err != nil { - if st, ok := status.FromError(err); !ok { - log.Errorf("gRPC call returned not a gRPC status error: %v", err) - return err - } else if st.Code() == codes.NotFound { - // when replica of agent > index replica, this happens - return nil - } else { - log.Errorf("failed to GetObject with unexpected error. code: %v, message: %s", st.Code(), st.Message()) - return err - } - } - - // skip if the vector is inserted after correction start - if vec.GetTimestamp() > correctionStartTime.UnixNano() { - log.Debugf("timestamp of vector(id: %s, timestamp: %v) is newer than correction start time(%v). skipping...", - vec.GetId(), - vec.GetTimestamp(), - correctionStartTime.UnixNano(), - ) - return nil - } - - mu.Lock() - foundReplicas = append(foundReplicas, &vectorReplica{ - addr: addr, - vec: vec, - }) - mu.Unlock() - - return nil - }, - ); err != nil { - return err - } - - // check timestamps - if err := c.correctTimestamp(ctx, targetReplica, foundReplicas); err != nil { - return fmt.Errorf("failed to fix timestamp: %w", err) - } - - // check replica number - if err := c.correctReplica(ctx, targetReplica, foundReplicas); err != nil { - return fmt.Errorf("failed to fix index replica: %w", err) - } - - return nil -} - -func (c *correct) correctTimestamp(ctx context.Context, targetReplica *vectorReplica, foundReplicas []*vectorReplica) error { - if len(foundReplicas) == 0 { - // no replica found. nothing to do about timestamp - return nil - } - - // skipcq: CRT-D0001 - allReplicas := append(foundReplicas, targetReplica) - - // sort by timestamp - slices.SortFunc(allReplicas, func(i, j *vectorReplica) int { - // largest timestamp means the latest - return cmp.Compare(j.vec.GetTimestamp(), i.vec.GetTimestamp()) - }) - - latest := allReplicas[0] - latestTS := latest.vec.GetTimestamp() - for _, replica := range allReplicas { - if replica.vec.GetTimestamp() == latestTS { - // no inconsistency - continue - } - - // udate the vector with the new one - log.Infof("timestamp inconsistency detected with vector(id: %s, timestamp: %v). updating with the latest vector(id: %s, timestamp: %v)", - replica.vec.GetId(), - replica.vec.GetTimestamp(), - latest.vec.GetId(), - latest.vec.GetTimestamp(), - ) - if err := c.updateObject(ctx, replica.addr, latest.vec); err != nil { - return err - } - } - - return nil -} - -// correctReplica corrects the number of replicas of the target vector. -// skipcq: GO-R1005 -func (c *correct) correctReplica( - ctx context.Context, - targetReplica *vectorReplica, - foundReplicas []*vectorReplica, -) error { - // diff < 0 means there is less replica than the correct number - existReplica := len(foundReplicas) + 1 - diff := existReplica - c.cfg.Corrector.IndexReplica - if diff == 0 { - // replica number is correct - return nil - } - - // availableAddrs = c.agentAddrs - foundReplicas - targetReplica.addr - availableAddrs := make([]string, 0, len(c.agentAddrs)) - for _, addr := range c.agentAddrs { - if addr == targetReplica.addr { - continue - } - if slices.ContainsFunc(foundReplicas, func(replica *vectorReplica) bool { - return replica.addr == addr - }) { - continue - } - availableAddrs = append(availableAddrs, addr) - } - - // when there are less replicas than the correct number, add the extra replicas - if diff < 0 { - log.Infof("replica shortage of vector %s. inserting to other agents...", targetReplica.vec.GetId()) - if len(availableAddrs) == 0 { - return errors.ErrNoAvailableAgentToInsert - } - - // inserting with the reverse order of availableAddrs since the last agent has the lowest memory usage - for i := len(availableAddrs) - 1; i >= 0 && diff < 0; i-- { - addr := availableAddrs[i] - log.Infof("inserting replica to %s", addr) - if err := c.insertObject(ctx, addr, targetReplica.vec); err != nil { - log.Errorf("failed to insert object to agent(%s): %v", addr, err) - continue - } - diff++ - } - - if diff < 0 { - return errors.ErrFailedToCorrectReplicaNum - } - - return nil - } - - // when there are more replicas than the correct number, delete the extra replicas - log.Infof("replica oversupply of vector %s. deleting...", - targetReplica.vec.GetId()) - // delete from myself - if err := c.deleteObject(ctx, targetReplica.addr, targetReplica.vec); err != nil { - log.Errorf("failed to delete object from agent(%s): %v", targetReplica.addr, err) - } else { - diff-- - } - - // delte from others if there's more to delete - for _, replica := range foundReplicas { - if diff == 0 { - break - } - if err := c.deleteObject(ctx, replica.addr, replica.vec); err != nil { - log.Errorf("failed to delete object from agent(%s): %v", replica.addr, err) - continue - } - diff-- - } - - if diff > 0 { - return errors.ErrFailedToCorrectReplicaNum - } - - return nil -} - -func (c *correct) updateObject(ctx context.Context, addr string, vector *payload.Object_Vector) error { - res, err := c.discoverer.GetClient(). - Do(grpc.WithGRPCMethod(ctx, updateMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - // TODO: use UpdateTimestamp when it's implemented because here we just want to update only the timestamp but not the vector - return vald.NewUpdateClient(conn).Update(ctx, &payload.Update_Request{ - Vector: vector, - // TODO: this should be deleted after Config.Timestamp deprecation - Config: &payload.Update_Config{ - // TODO: Decrementing because it's gonna be incremented befor being pushed - // to vqueue in the agent. This is a not ideal workaround for the current vqueue implementation - // so we should consider refactoring vqueue. - Timestamp: vector.GetTimestamp() - 1, - }, - }, copts...) - }) - if err != nil { - return err - } - - if v, ok := res.(*payload.Object_Location); ok { - log.Infof("vector successfully updated. address: %s, uuid: %v", addr, v.GetUuid()) - } - - return nil -} - -func (c *correct) insertObject(ctx context.Context, addr string, vector *payload.Object_Vector) error { - res, err := c.discoverer.GetClient(). - Do(grpc.WithGRPCMethod(ctx, insertMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - return vald.NewInsertClient(conn).Insert(ctx, &payload.Insert_Request{ - Vector: vector, - // TODO: this should be deleted after Config.Timestamp deprecation - Config: &payload.Insert_Config{ - Timestamp: vector.GetTimestamp(), - }, - }, copts...) - }) - if err != nil { - return err - } - - if v, ok := res.(*payload.Object_Location); ok { - log.Infof("vector successfully inserted. address: %s, uuid: %v", addr, v.GetUuid()) - } - - return nil -} - -func (c *correct) deleteObject(ctx context.Context, addr string, vector *payload.Object_Vector) error { - res, err := c.discoverer.GetClient(). - Do(grpc.WithGRPCMethod(ctx, deleteMethod), addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - return vald.NewRemoveClient(conn).Remove(ctx, &payload.Remove_Request{ - Id: &payload.Object_ID{ - Id: vector.GetId(), - }, - }, copts...) - }) - if err != nil { - return err - } - - if v, ok := res.(*payload.Object_Location); ok { - log.Infof("vector successfully deleted. address: %s, uuid: %v", addr, v.GetUuid()) - } - - return nil -} - -func (c *correct) loadInfos(ctx context.Context) (err error) { - var u, ucu uint32 - var infoMap sync.Map[string, *payload.Info_Index_Count] - err = c.discoverer.GetClient().RangeConcurrent(ctx, len(c.discoverer.GetAddrs(ctx)), - func(ctx context.Context, - addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, - ) (err error) { - select { - case <-ctx.Done(): - return nil - default: - info, err := agent.NewAgentClient(conn).IndexInfo(ctx, new(payload.Empty), copts...) - if err != nil { - log.Warnf("an error occurred while calling IndexInfo of %s: %s", addr, err) - return nil - } - infoMap.Store(addr, info) - atomic.AddUint32(&u, info.GetStored()) - atomic.AddUint32(&ucu, info.GetUncommitted()) - } - return nil - }) - if err != nil { - return err - } - atomic.StoreUint32(&c.uuidsCount, atomic.LoadUint32(&u)) - atomic.StoreUint32(&c.uncommittedUUIDsCount, atomic.LoadUint32(&ucu)) - c.indexInfos.Range(func(addr string, _ *payload.Info_Index_Count) bool { - info, ok := infoMap.Load(addr) - if !ok { - c.indexInfos.Delete(addr) - } - c.indexInfos.Store(addr, info) - infoMap.Delete(addr) - return true - }) - infoMap.Range(func(addr string, info *payload.Info_Index_Count) bool { - c.indexInfos.Store(addr, info) - return true - }) - return nil -} - -func embedTime(ctx context.Context) context.Context { - v := ctx.Value(correctionStartTimeKey) - if _, ok := v.(time.Time); ok { - return ctx - } - return context.WithValue(ctx, correctionStartTimeKey, time.Now()) -} - -func correctionStartTime(ctx context.Context) (time.Time, error) { - v := ctx.Value(correctionStartTimeKey) - if t, ok := v.(time.Time); ok { - return t, nil - } - return time.Time{}, fmt.Errorf("timeKey is not embedded in context") -} diff --git a/pkg/index/job/correction/service/corrector_test.go b/pkg/index/job/correction/service/corrector_test.go deleted file mode 100644 index 91a6b2fd4c..0000000000 --- a/pkg/index/job/correction/service/corrector_test.go +++ /dev/null @@ -1,814 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package service - -import ( - "context" - "testing" - - tmock "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/vdaas/vald/apis/grpc/v1/payload" - iconfig "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/net/grpc" - "github.com/vdaas/vald/internal/test/mock" - "github.com/vdaas/vald/pkg/index/job/correction/config" -) - -type mockDiscovererClient struct { - client mock.ClientInternal -} - -func (*mockDiscovererClient) Start(context.Context) (<-chan error, error) { - return nil, nil -} - -func (*mockDiscovererClient) GetAddrs(context.Context) []string { - return nil -} - -func (m *mockDiscovererClient) GetClient() grpc.Client { - return &m.client -} - -func Test_correct_correctTimestamp(t *testing.T) { - t.Parallel() - - // This mock just returns nil and record args inside - m := mockDiscovererClient{} - m.client.On("Do", tmock.Anything, tmock.Anything, tmock.Anything).Return(nil, nil) - c := &correct{ - discoverer: &m, - } - - type args struct { - target *vectorReplica - found []*vectorReplica - } - - type want struct { - addrs []string - err error - } - - type test struct { - name string - args args - want want - } - - tests := []test{ - { - name: "nothing happens when no replica is found", - args: args{ - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - Timestamp: 100, - }, - }, - found: []*vectorReplica{}, - }, - want: want{ - addrs: nil, - err: nil, - }, - }, - { - name: "updates one found vec when found vecs are older than target", - args: args{ - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - Timestamp: 100, - }, - }, - found: []*vectorReplica{ - { - addr: "found", - vec: &payload.Object_Vector{ - Id: "found", - Timestamp: 99, - }, - }, - }, - }, - want: want{ - addrs: []string{"found"}, - err: nil, - }, - }, - { - name: "updates multiple found vecs when found vecs are older than target", - args: args{ - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - Timestamp: 100, - }, - }, - found: []*vectorReplica{ - { - addr: "found1", - vec: &payload.Object_Vector{ - Id: "found", - Timestamp: 99, - }, - }, - { - addr: "found2", - vec: &payload.Object_Vector{ - Id: "found", - Timestamp: 98, - }, - }, - }, - }, - want: want{ - addrs: []string{"found1", "found2"}, - err: nil, - }, - }, - { - name: "updates target vec when found vecs are newer than target", - args: args{ - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - Timestamp: 0, - }, - }, - found: []*vectorReplica{ - { - addr: "found1", - vec: &payload.Object_Vector{ - Id: "found", - Timestamp: 99, - }, - }, - }, - }, - want: want{ - addrs: []string{"target"}, - err: nil, - }, - }, - { - name: "updates target vec and one of found vecs with the latest found vec", - args: args{ - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - Timestamp: 0, - }, - }, - found: []*vectorReplica{ - { - addr: "found1", - vec: &payload.Object_Vector{ - Id: "found", - Timestamp: 99, - }, - }, - { - addr: "latest", - vec: &payload.Object_Vector{ - Id: "found", - Timestamp: 100, - }, - }, - }, - }, - want: want{ - addrs: []string{"target", "found1"}, - err: nil, - }, - }, - } - - for _, tc := range tests { - test := tc - t.Run(test.name, func(tt *testing.T) { - tt.Parallel() - err := c.correctTimestamp(context.Background(), test.args.target, test.args.found) - require.Equal(tt, test.want.err, err) - - for _, addr := range test.want.addrs { - // check if the agents which need to be corrected are called - // checking calling parameter, like timestamp, is impossible because its inside of the function arg - m.client.AssertCalled(tt, "Do", tmock.Anything, addr, tmock.Anything) - } - }) - } -} - -func Test_correct_correctReplica(t *testing.T) { - t.Parallel() - - // This mock just returns nil and record args inside - m := mockDiscovererClient{} - m.client.On("Do", tmock.Anything, tmock.Anything, tmock.Anything).Return(nil, nil) - - type args struct { - indexReplica int - target *vectorReplica - found []*vectorReplica - availableAddrs []string - } - - type addrMethod struct { - addr string - method string - } - - type want struct { - addrMethods []addrMethod - err error - } - - type test struct { - name string - args args - want want - } - - tests := []test{ - { - name: "nothing happens when replica number sutisfies", - args: args{ - indexReplica: 2, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{ - { - addr: "found", - vec: &payload.Object_Vector{ - Id: "found", - }, - }, - }, - availableAddrs: []string{}, - }, - want: want{ - addrMethods: nil, - err: nil, - }, - }, - { - name: "insert replica when replica number is not enough", - args: args{ - indexReplica: 2, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{}, - availableAddrs: []string{"available"}, - }, - want: want{ - addrMethods: []addrMethod{ - { - addr: "available", - method: insertMethod, - }, - }, - err: nil, - }, - }, - { - name: "insert replica to the agent with most memory available", - args: args{ - indexReplica: 2, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{}, - // this is supposed to be sorted by memory usage with descending order - availableAddrs: []string{"most memory used", "second memory used"}, - }, - want: want{ - addrMethods: []addrMethod{ - { - addr: "second memory used", - method: insertMethod, - }, - }, - err: nil, - }, - }, - { - name: "delete replica from myself when replica number is too much by one", - args: args{ - indexReplica: 2, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{ - { - addr: "found1", - }, - { - addr: "found2", - }, - }, - availableAddrs: []string{}, - }, - want: want{ - addrMethods: []addrMethod{ - { - addr: "target", - method: deleteMethod, - }, - }, - err: nil, - }, - }, - { - name: "delete replica from myself and most memory used agent when replica number is too much by more than one", - args: args{ - indexReplica: 2, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{ - { - addr: "found1", - }, - { - addr: "found2", - }, - { - addr: "found3", - }, - }, - availableAddrs: []string{}, - }, - want: want{ - addrMethods: []addrMethod{ - { - addr: "target", - method: deleteMethod, - }, - { - addr: "found1", - method: deleteMethod, - }, - }, - err: nil, - }, - }, - { - name: "return ErrNoAvailableAgentToInsert when availableAddrs is empty when insertion required", - args: args{ - indexReplica: 2, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{}, - availableAddrs: []string{}, - }, - want: want{ - addrMethods: nil, - err: errors.ErrNoAvailableAgentToInsert, - }, - }, - { - name: "return ErrFailedToCorrectReplicaNum when there is not enough number of availableAddrs", - args: args{ - indexReplica: 3, - target: &vectorReplica{ - addr: "target", - vec: &payload.Object_Vector{ - Id: "target", - }, - }, - found: []*vectorReplica{}, - availableAddrs: []string{"available"}, - }, - want: want{ - addrMethods: nil, - err: errors.ErrFailedToCorrectReplicaNum, - }, - }, - } - - for _, tc := range tests { - test := tc - c := &correct{ - discoverer: &m, - cfg: &config.Data{ - Corrector: &iconfig.Corrector{ - IndexReplica: test.args.indexReplica, - }, - }, - } - - // agentAddrs = availableAddrs + target.addr + found.addr - // skipcq: CRT-D0001 - c.agentAddrs = append(test.args.availableAddrs, test.args.target.addr) - for _, found := range test.args.found { - c.agentAddrs = append(c.agentAddrs, found.addr) - } - - t.Run(test.name, func(tt *testing.T) { - tt.Parallel() - err := c.correctReplica(context.Background(), test.args.target, test.args.found) - if test.want.err != nil { - require.ErrorIs(t, test.want.err, err) - } - - for _, am := range test.want.addrMethods { - // check if the agents which need to be corrected are called with the required method - // checking calling parameter, like timestamp, is impossible because its inside of the function arg - m.client.AssertCalled(tt, "Do", tmock.MatchedBy(func(ctx context.Context) bool { - method := ctx.Value(grpc.GRPCMethodContextKey) - val, ok := method.(string) - if !ok { - return false - } - return val == am.method - }), am.addr, tmock.Anything) - } - }) - } -} - -// NOT IMPLEMENTED BELOW -// -// func TestNew(t *testing.T) { -// type args struct { -// cfg *config.Data -// discoverer discoverer.Client -// } -// type want struct { -// want Corrector -// err error -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Corrector, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Corrector, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// cfg:nil, -// discoverer:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// cfg:nil, -// discoverer:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got, err := New(test.args.cfg, test.args.discoverer) -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_correct_Start(t *testing.T) { -// type args struct { -// ctx context.Context -// } -// type fields struct { -// cfg *config.Data -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// } -// type want struct { -// want <-chan error -// err error -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, <-chan error, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got <-chan error, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// cfg:nil, -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// cfg:nil, -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &correct{ -// cfg: test.fields.cfg, -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// } -// -// got, err := c.Start(test.args.ctx) -// if err := checkFunc(test.want, got, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// -// func Test_correct_PreStop(t *testing.T) { -// type args struct { -// in0 context.Context -// } -// type fields struct { -// cfg *config.Data -// discoverer discoverer.Client -// agentAddrs []string -// indexInfos sync.Map[string, *payload.Info_Index_Count] -// uuidsCount uint32 -// uncommittedUUIDsCount uint32 -// checkedID bbolt.Bbolt -// } -// type want struct { -// err error -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// in0:nil, -// }, -// fields: fields { -// cfg:nil, -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// in0:nil, -// }, -// fields: fields { -// cfg:nil, -// discoverer:nil, -// agentAddrs:nil, -// indexInfos:nil, -// uuidsCount:0, -// uncommittedUUIDsCount:0, -// checkedID:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// c := &correct{ -// cfg: test.fields.cfg, -// discoverer: test.fields.discoverer, -// agentAddrs: test.fields.agentAddrs, -// indexInfos: test.fields.indexInfos, -// uuidsCount: test.fields.uuidsCount, -// uncommittedUUIDsCount: test.fields.uncommittedUUIDsCount, -// checkedID: test.fields.checkedID, -// } -// -// err := c.PreStop(test.args.in0) -// if err := checkFunc(test.want, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/pkg/index/job/correction/usecase/corrector.go b/pkg/index/job/correction/usecase/corrector.go deleted file mode 100644 index 337b308f3c..0000000000 --- a/pkg/index/job/correction/usecase/corrector.go +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package usecase - -import ( - "context" - "os" - "syscall" - "time" - - "github.com/vdaas/vald/internal/client/v1/client/discoverer" - iconf "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc" - "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" - "github.com/vdaas/vald/internal/observability" - "github.com/vdaas/vald/internal/runner" - "github.com/vdaas/vald/internal/safety" - "github.com/vdaas/vald/internal/servers/server" - "github.com/vdaas/vald/internal/servers/starter" - "github.com/vdaas/vald/internal/sync/errgroup" - "github.com/vdaas/vald/pkg/index/job/correction/config" - "github.com/vdaas/vald/pkg/index/job/correction/service" -) - -type run struct { - eg errgroup.Group - cfg *config.Data - observability observability.Observability - server starter.Server - corrector service.Corrector -} - -func New(cfg *config.Data) (r runner.Runner, err error) { - if cfg.Corrector.IndexReplica == 1 { - return nil, errors.ErrIndexReplicaOne - } - - eg := errgroup.Get() - - cOpts, err := cfg.Corrector.Discoverer.Client.Opts() - if err != nil { - return nil, err - } - // skipcq: CRT-D0001 - dopts := append( - cOpts, - grpc.WithErrGroup(eg)) - - acOpts, err := cfg.Corrector.Discoverer.AgentClientOptions.Opts() - if err != nil { - return nil, err - } - // skipcq: CRT-D0001 - aopts := append( - acOpts, - grpc.WithErrGroup(eg)) - - // Construct discoverer - discoverer, err := discoverer.New( - discoverer.WithAutoConnect(true), - discoverer.WithName(cfg.Corrector.AgentName), - discoverer.WithNamespace(cfg.Corrector.AgentNamespace), - discoverer.WithPort(cfg.Corrector.AgentPort), - discoverer.WithServiceDNSARecord(cfg.Corrector.AgentDNS), - discoverer.WithDiscovererClient(grpc.New(dopts...)), - discoverer.WithDiscoverDuration(cfg.Corrector.Discoverer.Duration), - discoverer.WithOptions(aopts...), - discoverer.WithNodeName(cfg.Corrector.NodeName), - discoverer.WithOnDiscoverFunc(func(ctx context.Context, c discoverer.Client, addrs []string) error { - last := len(addrs) - 1 - for i := 0; i < len(addrs)/2; i++ { - addrs[i], addrs[last-i] = addrs[last-i], addrs[i] - } - return nil - }), - ) - if err != nil { - return nil, err - } - - grpcServerOptions := []server.Option{ - server.WithGRPCOption( - grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), - grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), - ), - } - - // For health check and metrics - srv, err := starter.New(starter.WithConfig(cfg.Server), - starter.WithGRPC(func(sc *iconf.Server) []server.Option { - return grpcServerOptions - }), - ) - if err != nil { - return nil, err - } - - corrector, err := service.New(cfg, discoverer) - if err != nil { - return nil, err - } - - var obs observability.Observability - if cfg.Observability.Enabled { - obs, err = observability.NewWithConfig(cfg.Observability) - if err != nil { - log.Error("failed to initialize observability") - return nil, err - } - } - - return &run{ - eg: eg, - cfg: cfg, - observability: obs, - server: srv, - corrector: corrector, - }, nil -} - -func (r *run) PreStart(ctx context.Context) error { - if r.observability != nil { - return r.observability.PreStart(ctx) - } - return nil -} - -func (r *run) Start(ctx context.Context) (<-chan error, error) { - log.Info("starting servers") - ech := make(chan error, 3) //nolint:gomnd - var oech, nech, sech <-chan error - r.eg.Go(safety.RecoverFunc(func() (err error) { - defer close(ech) - if r.observability != nil { - oech = r.observability.Start(ctx) - } - sech = r.server.ListenAndServe(ctx) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err = <-oech: - case err = <-nech: - case err = <-sech: - } - if err != nil { - select { - case <-ctx.Done(): - return ctx.Err() - case ech <- err: - } - } - } - })) - - // main groutine to run the job - r.eg.Go(safety.RecoverFunc(func() (err error) { - defer func() { - log.Info("fiding my pid to kill myself") - p, err := os.FindProcess(os.Getpid()) - if err != nil { - // using Fatal to avoid this process to be zombie - // skipcq: RVV-A0003 - log.Fatalf("failed to find my pid to kill %v", err) - return - } - - log.Info("sending SIGTERM to myself to stop this job") - if err := p.Signal(syscall.SIGTERM); err != nil { - log.Error(err) - } - }() - - start := time.Now() - _, err = r.corrector.Start(ctx) - if err != nil { - log.Errorf("index correction process failed: %v", err) - return err - } - end := time.Since(start) - log.Infof("correction finished in %v", end) - return nil - })) - - return ech, nil -} - -func (r *run) PreStop(ctx context.Context) error { - r.corrector.PreStop(ctx) - return nil -} - -func (r *run) Stop(ctx context.Context) error { - if r.observability != nil { - r.observability.Stop(ctx) - } - if r.server != nil { - r.server.Shutdown(ctx) - } - return nil -} - -func (*run) PostStop(_ context.Context) error { - return nil -} diff --git a/pkg/index/job/correction/usecase/corrector_test.go b/pkg/index/job/correction/usecase/corrector_test.go deleted file mode 100644 index c8759d29e4..0000000000 --- a/pkg/index/job/correction/usecase/corrector_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright (C) 2019-2023 vdaas.org vald team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// You may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package usecase - -// NOT IMPLEMENTED BELOW -// -// func TestNew(t *testing.T) { -// type args struct { -// cfg *config.Data -// } -// type want struct { -// wantR runner.Runner -// err error -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, runner.Runner, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, gotR runner.Runner, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// if !reflect.DeepEqual(gotR, w.wantR) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotR, w.wantR) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// cfg:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// cfg:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// gotR, err := New(test.args.cfg) -// if err := checkFunc(test.want, gotR, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } diff --git a/tests/e2e/pkg/agent/core/ngt/service/ngt_e2s_test.go b/tests/e2e/pkg/agent/core/ngt/service/ngt_e2s_test.go index 81ee7cf22d..77528a2de0 100644 --- a/tests/e2e/pkg/agent/core/ngt/service/ngt_e2s_test.go +++ b/tests/e2e/pkg/agent/core/ngt/service/ngt_e2s_test.go @@ -1,5 +1,3 @@ -//go:build e2e - // // Copyright (C) 2019-2023 vdaas.org vald team //