Skip to content

Commit

Permalink
Add index correction internal logic (#2194)
Browse files Browse the repository at this point in the history
* implement the initail framework

* add corrector configuration

* add corrector logic

* add build make command for index correction binary

* add Dockerfile for index correction

* add Docker image for index job correction

* add timer

* fix tag align

* tmp

* fix log

* temporally implement two versions of correct function

* set eg limit from config

* add stream list concurrency config

* implement index id caching

* add config to use cache or not

* style: Format code with prettier and gofumpt

* refactor availableAddrs

* add kvs range duration

* add leftAgentAddrs for performance

* Revert "add kvs range duration"

This reverts commit 5b647be.

* refactor

* fix without cache bug

* enable observability

* refactor

* SIGTERM after complete

* add metrics server

* add pcache

* remove comment

* [TEMP] use pcache

* [TMP] use pcache

* fix empty shard returns error

* fix to use local map

* [TMP] add prestop for pcache

* [TEMP] add pcache config

* style: Format code with prettier and gofumpt

* [TEMP] add pcache log

* fix map alloc size

* [TMP] Add bbolt cache

* update bbolt

* fix bbolt bug

* add bbolt test

* [TEMP] use bbolt as persistent cache

* style: Format code with prettier and gofumpt

* add SetBatch to bbolt

* use batch to write map to disk

* style: Format code with prettier and gofumpt

* delete the map elements on finalize

* manually call GC after the map shrink

* add limit to SetBatch goroutine number

* stop unnecesarry GC

* increase eg limit to the MaxBatchSize

* use ch to set batch bbolt

* fix servers shutdown properly

* use internal/kvs/bbolt

* refactor

* always use bbolt cache for correction

* update sample.yaml for correction

* style: format code with Prettier and Gofumpt

This commit fixes the style issues introduced in 319ec8b according to the output
from Prettier and Gofumpt.

Details: #2152

* use go std slices pkg

* refactor

* add comment

* remove valdsync

* use vald errgroup

* refactor

* Define ErrNoAvailableAgentToInsert

* update comment in English

* Apply new actions yaml format

* Disable godox

* style: format code with Prettier and Gofumpt

This commit fixes the style issues introduced in c860ddc according to the output
from Prettier and Gofumpt.

Details: #2194

* remove comment

* Apply format

* Add type check for type assertion

* use const to specify filemode

* Add bbolt concurrency as config

* fix var style

* Suppress linter

* fix comment

* add test template

* Refactor parameters for index correction

* Refactor config

* Add corrector test

* style: format code with Prettier and Gofumpt

This commit fixes the style issues introduced in 004bf81 according to the output
from Prettier and Gofumpt.

Details: #2194

* Add timestamp check

* Apply format

* fix schema type

* Fix DeepSource errors

* Fix misspell

* Add type check

* Remove unused config

* Fix DeepSource error

* Add required go:build e2e tag

* Remove memo

* Refactor comment

* Remove TODO comment that is already done

* Remove unused config

* Add comment to errors

* change app name

* replace filepath pkg with internal file

replace filepath pkg with internal file

refactor

* Refactor

refactor

* Fix gRPC spelling

* Remove memo

---------

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and kmrmt committed Dec 12, 2023
1 parent 37f9f2b commit 514958c
Show file tree
Hide file tree
Showing 22 changed files with 3,103 additions and 10 deletions.
78 changes: 78 additions & 0 deletions .github/workflows/dockers-index-correction.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
name: "Build docker image: index-correction"
on:
push:
branches:
- main
tags:
- "*.*.*"
- "v*.*.*"
- "*.*.*-*"
- "v*.*.*-*"
paths:
- ".github/actions/docker-build/actions.yaml"
- ".github/workflows/dockers-index-correction.yml"
- "go.mod"
- "go.sum"
- "internal/**"
- "!internal/**/*_test.go"
- "!internal/db/**"
- "!internal/k8s/**"
- "apis/grpc/**"
- "pkg/index/job/correction/**"
- "cmd/index/job/correction/**"
- "dockers/index/job/correction/Dockerfile"
- "versions/GO_VERSION"
pull_request:
paths:
- ".github/actions/docker-build/actions.yaml"
- ".github/workflows/_docker-image.yaml"
- ".github/workflows/dockers-index-correction.yml"
- "go.mod"
- "go.sum"
- "internal/**"
- "!internal/**/*_test.go"
- "!internal/db/**"
- "!internal/k8s/**"
- "apis/grpc/**"
- "pkg/index/job/correction/**"
- "cmd/index/job/correction/**"
- "dockers/index/job/correction/Dockerfile"
- "versions/GO_VERSION"
pull_request_target:
paths:
- ".github/actions/docker-build/actions.yaml"
- ".github/workflows/_docker-image.yaml"
- ".github/workflows/dockers-index-correction.yml"
- "go.mod"
- "go.sum"
- "internal/**"
- "!internal/**/*_test.go"
- "!internal/db/**"
- "!internal/k8s/**"
- "apis/grpc/**"
- "pkg/index/job/correction/**"
- "cmd/index/job/correction/**"
- "dockers/index/job/correction/Dockerfile"
- "versions/GO_VERSION"

jobs:
build:
uses: ./.github/workflows/_docker-image.yaml
with:
target: index-correction
secrets: inherit
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ linters:
- gochecknoinits
- goconst
- godot
- godox
- gofumpt
- goimports
- gomnd
Expand Down Expand Up @@ -99,6 +98,7 @@ linters:
# - gocognit
# - gocritic
# - gocyclo
# - godox
# - goerr113
# - gofmt
# - goheader
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FILTER_GATEWAY_IMAGE = $(NAME)-filter-gateway
HELM_OPERATOR_IMAGE = $(NAME)-helm-operator
LB_GATEWAY_IMAGE = $(NAME)-lb-gateway
LOADTEST_IMAGE = $(NAME)-loadtest
INDEX_CORRECTION_IMAGE = $(NAME)-index-correction
MANAGER_INDEX_IMAGE = $(NAME)-manager-index
MAINTAINER = "$(ORG).org $(NAME) team <$(NAME)@$(ORG).org>"

Expand Down
29 changes: 29 additions & 0 deletions Makefile.d/build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,35 @@ cmd/manager/index/index: \
$(dir $@)main.go
$@ -version

cmd/index/job/correction/index-correction: \
$(GO_SOURCES_INTERNAL) \
$(PBGOS) \
$(shell find $(ROOTDIR)/cmd/index/job/correction/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \
$(shell find $(ROOTDIR)/pkg/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go')
$(eval CGO_ENABLED = 0)
CGO_ENABLED=$(CGO_ENABLED) \
GO111MODULE=on \
GOPRIVATE=$(GOPRIVATE) \
go build \
--ldflags "-w -extldflags=-static \
-X '$(GOPKG)/internal/info.Version=$(VERSION)' \
-X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \
-X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \
-X '$(GOPKG)/internal/info.GoVersion=$(GO_VERSION)' \
-X '$(GOPKG)/internal/info.GoOS=$(GOOS)' \
-X '$(GOPKG)/internal/info.GoArch=$(GOARCH)' \
-X '$(GOPKG)/internal/info.CGOEnabled=$(CGO_ENABLED)' \
-X '$(GOPKG)/internal/info.BuildCPUInfoFlags=$(CPU_INFO_FLAGS)' \
-buildid=" \
-mod=readonly \
-modcacherw \
-a \
-tags "osusergo netgo static_build" \
-trimpath \
-o $@ \
$(dir $@)main.go
$@ -version

.PHONY: binary/build/zip
## build all binaries and zip them
binary/build/zip: \
Expand Down
14 changes: 14 additions & 0 deletions Makefile.d/docker.mk
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,17 @@ docker/build/loadtest:
-t $(ORG)/$(LOADTEST_IMAGE):$(TAG) . \
--build-arg MAINTAINER=$(MAINTAINER) \
--build-arg GO_VERSION=$(GO_VERSION)

.PHONY: docker/name/index-correction
docker/name/index-correction:
@echo "$(ORG)/$(INDEX_CORRECTION_IMAGE)"

.PHONY: docker/build/index-correction
## build index-correction image
docker/build/index-correction:
$(DOCKER) build \
$(DOCKER_OPTS) \
-f dockers/index/job/correction/Dockerfile \
-t $(ORG)/$(INDEX_CORRECTION_IMAGE):$(TAG) . \
--build-arg MAINTAINER=$(MAINTAINER) \
--build-arg GO_VERSION=$(GO_VERSION)
35 changes: 35 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2631,3 +2631,38 @@ manager:
net:
dialer:
keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual
# @schema {"name": "manager.index.corrector", "type": "object"}
corrector:
# @schema {"name": "manager.index.corrector.enabled", "type": "boolean"}
# manager.index.corrector.enabled -- enable index correction CronJob
enabled: false
# @schema {"name": "manager.index.corrector.check_duration", "type": "string"}
# manager.index.corrector.enabled -- check duration of index correction CronJob
check_duration: 24h
# @schema {"name": "manager.index.corrector.stream_list_concurrency", "type": "integer", "minimum": 1}
# manager.index.corrector.stream_list_concurrency -- concurrency for stream list object rpc
stream_list_concurrency: 200
# @schema {"name": "manager.index.corrector.bbolt_async_write_concurrency", "type": "integer", "minimum": 1}
# manager.index.corrector.bbolt_async_write_concurrency -- concurrency for bbolt async write
bbolt_async_write_concurrency: 2048
# @schema {"name": "manager.index.corrector.agent_namespace", "type": "string"}
# manager.index.corrector.agent_namespace -- namespace of agent pods to manage
agent_namespace: _MY_POD_NAMESPACE_
# @schema {"name": "manager.index.corrector.node_name", "type": "string"}
# manager.index.corrector.node_name -- node name
node_name: "" # _MY_NODE_NAME_
# @schema {"name": "manager.index.corrector.discoverer", "type": "object"}
discoverer:
# @schema {"name": "manager.index.corrector.discoverer.duration", "type": "string"}
# manager.index.corrector.discoverer.duration -- refresh duration to discover
duration: 500ms
# @schema {"name": "manager.index.corrector.discoverer.client", "alias": "grpc.client"}
# manager.index.corrector.discoverer.client -- gRPC client for discoverer (overrides defaults.grpc.client)
client: {}
# @schema {"name": "manager.index.corrector.discoverer.agent_client_options", "alias": "grpc.client"}
# manager.index.corrector.discoverer.agent_client_options -- gRPC client options for agents (overrides defaults.grpc.client)
agent_client_options:
dial_option:
net:
dialer:
keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual
59 changes: 59 additions & 0 deletions cmd/index/job/correction/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main

import (
"context"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/info"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/index/job/correction/config"
"github.com/vdaas/vald/pkg/index/job/correction/usecase"
)

const (
maxVersion = "v0.0.10"
minVersion = "v0.0.0"
name = "index correction job"
)

func main() {
if err := safety.RecoverFunc(func() error {
return runner.Do(
context.Background(),
runner.WithName(name),
runner.WithVersion(info.Version, maxVersion, minVersion),
runner.WithConfigLoader(func(path string) (interface{}, *config.GlobalConfig, error) {
cfg, err := config.NewConfig(path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration")
}
return cfg, &cfg.GlobalConfig, nil
}),
runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) {
c, ok := cfg.(*config.Data)
if !ok {
return nil, errors.ErrInvalidConfig
}
return usecase.New(c)
}),
)
})(); err != nil {
log.Fatal(err, info.Get())
return
}
}
Loading

0 comments on commit 514958c

Please sign in to comment.