Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature/index/correction job #2152

Closed
wants to merge 66 commits into from
Closed
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
a1ee9d8
implement the initail framework
ykadowak Aug 4, 2023
8a1221d
add corrector configuration
ykadowak Aug 7, 2023
fa1eb75
add corrector logic
ykadowak Aug 14, 2023
39e7d71
add build make command for index correction binary
ykadowak Aug 14, 2023
2bd41e9
add Dockerfile for index correction
ykadowak Aug 14, 2023
6ef29e3
add Docker image for index job correction
ykadowak Aug 14, 2023
c7e1ff2
add timer
ykadowak Aug 15, 2023
22dcaaa
fix tag align
ykadowak Aug 15, 2023
d51e26b
tmp
ykadowak Aug 16, 2023
6aa8fde
fix log
ykadowak Aug 16, 2023
41faffd
temporally implement two versions of correct function
ykadowak Aug 17, 2023
8f5bcf0
set eg limit from config
ykadowak Aug 18, 2023
065e9e2
add stream list concurrency config
ykadowak Aug 18, 2023
8562a99
implement index id caching
ykadowak Aug 21, 2023
81345f3
add config to use cache or not
ykadowak Aug 22, 2023
77c33f0
style: Format code with prettier and gofumpt
deepsource-autofix[bot] Aug 22, 2023
72c3956
refactor availableAddrs
ykadowak Aug 23, 2023
5ffd9b3
add kvs range duration
ykadowak Aug 24, 2023
5d3c6c7
add leftAgentAddrs for performance
ykadowak Aug 25, 2023
3b3710b
Revert "add kvs range duration"
ykadowak Aug 25, 2023
6b3934e
refactor
ykadowak Aug 28, 2023
bc591ae
fix without cache bug
ykadowak Aug 28, 2023
5209a72
enable observability
ykadowak Aug 28, 2023
a0cb7aa
refactor
ykadowak Aug 29, 2023
b453e61
SIGTERM after complete
ykadowak Aug 29, 2023
db6868d
add metrics server
ykadowak Sep 4, 2023
7a630b3
add pcache
ykadowak Sep 5, 2023
ca84f56
remove comment
ykadowak Sep 5, 2023
feda428
[TEMP] use pcache
ykadowak Sep 5, 2023
c229a87
[TMP] use pcache
ykadowak Sep 6, 2023
fa0b68b
fix empty shard returns error
ykadowak Sep 6, 2023
06b312b
fix to use local map
ykadowak Sep 6, 2023
490345f
[TMP] add prestop for pcache
ykadowak Sep 6, 2023
05838c3
[TEMP] add pcache config
ykadowak Sep 6, 2023
9805e7e
style: Format code with prettier and gofumpt
deepsource-autofix[bot] Sep 6, 2023
a0d698e
[TEMP] add pcache log
ykadowak Sep 6, 2023
9e5f2ae
fix map alloc size
ykadowak Sep 6, 2023
3728276
[TMP] Add bbolt cache
ykadowak Sep 8, 2023
1678ff7
update bbolt
ykadowak Sep 8, 2023
4f87601
fix bbolt bug
ykadowak Sep 8, 2023
01225aa
add bbolt test
ykadowak Sep 8, 2023
428ee35
[TEMP] use bbolt as persistent cache
ykadowak Sep 8, 2023
81b800e
style: Format code with prettier and gofumpt
deepsource-autofix[bot] Sep 8, 2023
bfc73c4
add SetBatch to bbolt
ykadowak Sep 8, 2023
d4f3695
use batch to write map to disk
ykadowak Sep 8, 2023
9975237
style: Format code with prettier and gofumpt
deepsource-autofix[bot] Sep 8, 2023
2b00e05
delete the map elements on finalize
ykadowak Sep 8, 2023
a611e88
manually call GC after the map shrink
ykadowak Sep 8, 2023
e21441c
add limit to SetBatch goroutine number
ykadowak Sep 8, 2023
66bc79c
stop unnecesarry GC
ykadowak Sep 8, 2023
3a85405
increase eg limit to the MaxBatchSize
ykadowak Sep 8, 2023
16d2986
use ch to set batch bbolt
ykadowak Sep 11, 2023
919c9f9
fix servers shutdown properly
ykadowak Sep 15, 2023
9bd57c0
use internal/kvs/bbolt
ykadowak Sep 15, 2023
55c6586
refactor
ykadowak Sep 15, 2023
bc92686
always use bbolt cache for correction
ykadowak Sep 19, 2023
319ec8b
update sample.yaml for correction
ykadowak Sep 19, 2023
eb5cdb0
style: format code with Prettier and Gofumpt
deepsource-autofix[bot] Sep 19, 2023
d3616e5
use go std slices pkg
ykadowak Sep 19, 2023
89e2339
refactor
ykadowak Sep 19, 2023
e2eeabc
add comment
ykadowak Sep 19, 2023
9a9e622
remove valdsync
ykadowak Sep 19, 2023
4c22bcc
use vald errgroup
ykadowak Sep 19, 2023
e0ef7a8
Merge branch 'main' into feature/index/correction-job
ykadowak Sep 19, 2023
4dc4801
refactor
ykadowak Sep 19, 2023
9cbf664
style: format code with Prettier and Gofumpt
deepsource-autofix[bot] Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions .github/workflows/dockers-index-job-correction.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#
# Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
name: "Build docker image: index-job-correction"
on:
push:
branches:
- main
tags:
- "*.*.*"
- "v*.*.*"
- "*.*.*-*"
- "v*.*.*-*"
paths:
- ".github/actions/docker-build/actions.yaml"
- ".github/workflows/dockers-index-job-correction.yml"
- "go.mod"
- "go.sum"
- "internal/**"
- "!internal/**/*_test.go"
- "!internal/db/**"
- "!internal/k8s/**"
- "apis/grpc/**"
- "pkg/index/job/correction/**"
- "cmd/index/job/correction/**"
- "dockers/index/job/correction/Dockerfile"
- "versions/GO_VERSION"
pull_request:
paths:
- ".github/actions/docker-build/actions.yaml"
- ".github/workflows/dockers-index-job-correction.yml"
- "go.mod"
- "go.sum"
- "internal/**"
- "!internal/**/*_test.go"
- "!internal/db/**"
- "!internal/k8s/**"
- "apis/grpc/**"
- "pkg/index/job/correction/**"
- "cmd/index/job/correction/**"
- "dockers/index/job/correction/Dockerfile"
- "versions/GO_VERSION"
pull_request_target:
paths:
- ".github/actions/docker-build/actions.yaml"
- ".github/workflows/dockers-index-job-correction.yml"
- "go.mod"
- "go.sum"
- "internal/**"
- "!internal/**/*_test.go"
- "!internal/db/**"
- "!internal/k8s/**"
- "apis/grpc/**"
- "pkg/index/job/correction/**"
- "cmd/index/job/correction/**"
- "dockers/index/job/correction/Dockerfile"
- "versions/GO_VERSION"

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref != 'refs/heads/main' && github.ref || github.sha }}-${{ github.event_name }}
cancel-in-progress: true

jobs:
dump_contexts_to_log:
runs-on: ubuntu-latest
steps:
- name: Dump GitHub context
id: github_context_step
run: echo $JSON
env:
JSON: ${{ toJSON(github) }}
- name: Dump job context
run: echo $JSON
env:
JSON: ${{ toJSON(job) }}
- name: Dump steps context
run: echo $JSON
env:
JSON: ${{ toJSON(steps) }}
- name: Dump runner context
run: echo $JSON
env:
JSON: ${{ toJSON(runner) }}
- name: Dump strategy context
run: echo $JSON
env:
JSON: ${{ toJSON(strategy) }}
- name: Dump matrix context
run: echo $JSON
env:
JSON: ${{ toJSON(matrix) }}
build:
strategy:
max-parallel: 4
runs-on: ubuntu-latest
if: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.fork == false) || (github.event.pull_request.head.repo.fork == true && github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'ci/approved')) || (github.event_name == 'push' && github.ref == 'refs/heads/main') || startsWith( github.ref, 'refs/tags/') }}
steps:
- name: Get ref
id: ref
run: |
if [ ${{ github.event.pull_request.head.sha }} != "" ]; then
echo ref=${{ github.event.pull_request.head.sha }} >> $GITHUB_OUTPUT
else
echo ref=${{ github.sha }} >> $GITHUB_OUTPUT
fi
- uses: actions/checkout@v3
with:
ref: ${{ steps.ref.outputs.ref }}
- name: set git config
run: |
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Setup QEMU
uses: docker/setup-qemu-action@v2
with:
platforms: all
- name: Setup Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v2
with:
buildkitd-flags: "--debug"
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USER }}
password: ${{ secrets.DOCKERHUB_PASS }}
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ secrets.PACKAGE_USER }}
password: ${{ secrets.PACKAGE_TOKEN }}
- name: Build and Publish
id: build_and_publish
uses: ./.github/actions/docker-build
with:
target: index-job-correction
builder: ${{ steps.buildx.outputs.name }}
- name: Initialize CodeQL
if: startsWith( github.ref, 'refs/tags/')
uses: github/codeql-action/init@v2
- name: Run vulnerability scanner (table)
if: startsWith( github.ref, 'refs/tags/')
uses: aquasecurity/trivy-action@master
with:
image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}"
format: "table"
- name: Run vulnerability scanner (sarif)
if: startsWith( github.ref, 'refs/tags/')
uses: aquasecurity/trivy-action@master
with:
image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}"
format: "template"
template: "@/contrib/sarif.tpl"
output: "trivy-results.sarif"
- name: Upload Trivy scan results to Security tab
if: startsWith( github.ref, 'refs/tags/')
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: "trivy-results.sarif"
slack:
name: Slack notification
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main' || startsWith( github.ref, 'refs/tags/')
steps:
- uses: technote-space/workflow-conclusion-action@v2
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: 8398a7/action-slack@v3
with:
author_name: index-job-correction image build
status: ${{ env.WORKFLOW_CONCLUSION }}
only_mention_fail: channel
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_NOTIFY_WEBHOOK_URL }}
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ FILTER_GATEWAY_IMAGE = $(NAME)-filter-gateway
HELM_OPERATOR_IMAGE = $(NAME)-helm-operator
LB_GATEWAY_IMAGE = $(NAME)-lb-gateway
LOADTEST_IMAGE = $(NAME)-loadtest
INDEX_JOB_CORRECTION_IMAGE = $(NAME)-index-job-correction
MANAGER_INDEX_IMAGE = $(NAME)-manager-index
MAINTAINER = "$(ORG).org $(NAME) team <$(NAME)@$(ORG).org>"

Expand Down
29 changes: 29 additions & 0 deletions Makefile.d/build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,35 @@ cmd/manager/index/index: \
$(dir $@)main.go
$@ -version

cmd/index/job/correction/correction: \
$(GO_SOURCES_INTERNAL) \
$(PBGOS) \
$(shell find $(ROOTDIR)/cmd/index/job/correction/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \
$(shell find $(ROOTDIR)/pkg/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go')
$(eval CGO_ENABLED = 0)
CGO_ENABLED=$(CGO_ENABLED) \
GO111MODULE=on \
GOPRIVATE=$(GOPRIVATE) \
go build \
--ldflags "-w -extldflags=-static \
-X '$(GOPKG)/internal/info.Version=$(VERSION)' \
-X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \
-X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \
-X '$(GOPKG)/internal/info.GoVersion=$(GO_VERSION)' \
-X '$(GOPKG)/internal/info.GoOS=$(GOOS)' \
-X '$(GOPKG)/internal/info.GoArch=$(GOARCH)' \
-X '$(GOPKG)/internal/info.CGOEnabled=$(CGO_ENABLED)' \
-X '$(GOPKG)/internal/info.BuildCPUInfoFlags=$(CPU_INFO_FLAGS)' \
-buildid=" \
-mod=readonly \
-modcacherw \
-a \
-tags "osusergo netgo static_build" \
-trimpath \
-o $@ \
$(dir $@)main.go
$@ -version

.PHONY: binary/build/zip
## build all binaries and zip them
binary/build/zip: \
Expand Down
14 changes: 14 additions & 0 deletions Makefile.d/docker.mk
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,17 @@ docker/build/loadtest:
-t $(ORG)/$(LOADTEST_IMAGE):$(TAG) . \
--build-arg MAINTAINER=$(MAINTAINER) \
--build-arg GO_VERSION=$(GO_VERSION)

.PHONY: docker/name/index-job-correction
docker/name/index-job-correction:
@echo "$(ORG)/$(INDEX_JOB_CORRECTION_IMAGE)"

.PHONY: docker/build/index-job-correction
## build index-job-correction image
docker/build/index-job-correction:
$(DOCKER) build \
$(DOCKER_OPTS) \
-f dockers/index/job/correction/Dockerfile \
-t $(ORG)/$(INDEX_JOB_CORRECTION_IMAGE):$(TAG) . \
--build-arg MAINTAINER=$(MAINTAINER) \
--build-arg GO_VERSION=$(GO_VERSION)
26 changes: 26 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2631,3 +2631,29 @@ manager:
net:
dialer:
keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual
# @schema {"name": "manager.index.corrector", "type": "object"}
corrector:
# @schema {"name": "manager.index.corrector.agent_namespace", "type": "string"}
# manager.index.corrector.agent_namespace -- namespace of agent pods to manage
agent_namespace: _MY_POD_NAMESPACE_
# @schema {"name": "manager.index.corrector.node_name", "type": "string"}
# manager.index.corrector.node_name -- node name
node_name: "" # _MY_NODE_NAME_
# @schema {"name": "manager.index.corrector.concurrency", "type": "integer", "minimum": 1}
# manager.index.corrector.concurrency -- concurrency
concurrency: 1
# @schema {"name": "manager.index.corrector.discoverer", "type": "object"}
discoverer:
# @schema {"name": "manager.index.corrector.discoverer.duration", "type": "string"}
# manager.index.corrector.discoverer.duration -- refresh duration to discover
duration: 500ms
# @schema {"name": "manager.index.corrector.discoverer.client", "alias": "grpc.client"}
# manager.index.corrector.discoverer.client -- gRPC client for discoverer (overrides defaults.grpc.client)
client: {}
# @schema {"name": "manager.index.corrector.discoverer.agent_client_options", "alias": "grpc.client"}
# manager.index.corrector.discoverer.agent_client_options -- gRPC client options for agents (overrides defaults.grpc.client)
agent_client_options:
dial_option:
net:
dialer:
keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual
55 changes: 55 additions & 0 deletions cmd/index/job/correction/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main

import (
"context"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/info"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/index/job/correction/config"
"github.com/vdaas/vald/pkg/index/job/correction/usecase"
)

const (
maxVersion = "v0.0.10"
minVersion = "v0.0.0"
name = "index correction job"
)

func main() {
if err := safety.RecoverFunc(func() error {
return runner.Do(
context.Background(),
runner.WithName(name),
runner.WithVersion(info.Version, maxVersion, minVersion),
runner.WithConfigLoader(func(path string) (interface{}, *config.GlobalConfig, error) {
cfg, err := config.NewConfig(path)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration")
}
return cfg, &cfg.GlobalConfig, nil
}),
runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) {
return usecase.New(cfg.(*config.Data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci] reported by reviewdog 🐶
type assertion must be checked (forcetypeassert)

}),
)
})(); err != nil {
log.Fatal(err, info.Get())
return
}
}
Loading