From ba1719270b2e183eca78679015a3ace75cb4601e Mon Sep 17 00:00:00 2001 From: Kiichiro YUKAWA Date: Mon, 11 Sep 2023 19:49:18 +0900 Subject: [PATCH] :recycle: Apply new internal/sync for continous benchmark (#2175) Signed-off-by: vankichi --- .../workflows/dockers-benchmark-job-image.yml | 111 +++++------------- .../dockers-benchmark-operator-image.yaml | 110 +++++------------ .../k8s/vald/benchmark/api/v1/job_types.go | 2 +- pkg/tools/benchmark/job/router/option.go | 2 +- pkg/tools/benchmark/job/router/router.go | 2 +- pkg/tools/benchmark/job/service/insert.go | 10 +- pkg/tools/benchmark/job/service/job.go | 2 +- pkg/tools/benchmark/job/service/object.go | 20 +--- pkg/tools/benchmark/job/service/option.go | 2 +- pkg/tools/benchmark/job/service/remove.go | 10 +- pkg/tools/benchmark/job/service/search.go | 19 +-- pkg/tools/benchmark/job/service/update.go | 10 +- pkg/tools/benchmark/job/service/upsert.go | 9 +- pkg/tools/benchmark/job/usecase/benchmarkd.go | 2 +- pkg/tools/benchmark/operator/router/option.go | 2 +- pkg/tools/benchmark/operator/router/router.go | 2 +- .../benchmark/operator/service/operator.go | 2 +- .../benchmark/operator/service/option.go | 2 +- .../benchmark/operator/usecase/benchmarkd.go | 2 +- 19 files changed, 84 insertions(+), 237 deletions(-) diff --git a/.github/workflows/dockers-benchmark-job-image.yml b/.github/workflows/dockers-benchmark-job-image.yml index f38149a7c38..1b2142e6a6f 100644 --- a/.github/workflows/dockers-benchmark-job-image.yml +++ b/.github/workflows/dockers-benchmark-job-image.yml @@ -17,7 +17,7 @@ name: "Build docker image: benchmark-job" on: push: branches: - - master + - main tags: - "*.*.*" - "v*.*.*" @@ -25,109 +25,58 @@ on: - "v*.*.*-*" paths: - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" - ".github/workflows/dockers-benchmak-job-image.yml" - "go.mod" - "go.sum" - "internal/**" - "!internal/**/*_test.go" - "!internal/db/**" - - "!internal/k8s/**" - "apis/grpc/**" + - "pkg/benchmark/operator/**" + - "cmd/benchmark/operator/**" - "pkg/benchmark/job/**" - "cmd/benchmark/job/**" - - "dockers/benchmark/job/Dockerfile" + - "dockers/tools/benchmark/job/Dockerfile" - "versions/GO_VERSION" - - "versions/NGT_VERSION" pull_request: paths: - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" - ".github/workflows/dockers-benchmak-job-image.yml" - "go.mod" - "go.sum" - "internal/**" - "!internal/**/*_test.go" - "!internal/db/**" - - "!internal/k8s/**" - "apis/grpc/**" + - "pkg/benchmark/operator/**" + - "cmd/benchmark/operator/**" - "pkg/benchmark/job/**" - "cmd/benchmark/job/**" - - "dockers/benchmark/job/Dockerfile" + - "dockers/tools/benchmark/job/Dockerfile" + - "versions/GO_VERSION" + pull_request_target: + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" + - ".github/workflows/dockers-benchmak-job-image.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "apis/grpc/**" + - "pkg/benchmark/operator/**" + - "cmd/benchmark/operator/**" + - "pkg/benchmark/job/**" + - "cmd/benchmark/job/**" + - "dockers/tools/benchmark/job/Dockerfile" - "versions/GO_VERSION" - - "versions/NGT_VERSION" - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref != 'refs/heads/main' && github.ref || github.sha }}-${{ github.event_name }} - cancel-in-progress: true jobs: build: - strategy: - max-parallel: 4 - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Setup QEMU - uses: docker/setup-qemu-action@v1 - with: - platforms: all - - name: Setup Docker Buildx - id: buildx - uses: docker/setup-buildx-action@v1 - with: - buildkitd-flags: "--debug" - - name: Login to DockerHub - uses: docker/login-action@v1 - with: - username: ${{ secrets.DOCKERHUB_USER }} - password: ${{ secrets.DOCKERHUB_PASS }} - - name: Login to GitHub Container Registry - uses: docker/login-action@v1 - with: - registry: ghcr.io - username: ${{ secrets.PACKAGE_USER }} - password: ${{ secrets.PACKAGE_TOKEN }} - - name: Build and Publish - id: build_and_publish - uses: ./.github/actions/docker-build - with: - target: benchmark-job - builder: ${{ steps.buildx.outputs.name }} - - name: Initialize CodeQL - if: startsWith( github.ref, 'refs/tags/') - uses: github/codeql-action/init@v2 - - name: Run vulnerability scanner (table) - if: startsWith( github.ref, 'refs/tags/') - uses: aquasecurity/trivy-action@master - with: - image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}" - format: "table" - - name: Run vulnerability scanner (sarif) - if: startsWith( github.ref, 'refs/tags/') - uses: aquasecurity/trivy-action@master - with: - image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}" - format: "template" - template: "@/contrib/sarif.tpl" - output: "trivy-results.sarif" - - name: Upload Trivy scan results to Security tab - if: startsWith( github.ref, 'refs/tags/') - uses: github/codeql-action/upload-sarif@v2 - with: - sarif_file: "trivy-results.sarif" - slack: - name: Slack notification - needs: build - runs-on: ubuntu-latest - if: github.ref == 'refs/heads/main' || startsWith( github.ref, 'refs/tags/') - steps: - - uses: technote-space/workflow-conclusion-action@v2 - with: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - uses: 8398a7/action-slack@v3 - with: - author_name: benchmark-job image build - status: ${{ env.WORKFLOW_CONCLUSION }} - only_mention_fail: channel - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_NOTIFY_WEBHOOK_URL }} + uses: ./.github/workflows/_docker-image.yaml + with: + target: benchmark-job + secrets: inherit diff --git a/.github/workflows/dockers-benchmark-operator-image.yaml b/.github/workflows/dockers-benchmark-operator-image.yaml index ada420e0be7..279d559cf3f 100644 --- a/.github/workflows/dockers-benchmark-operator-image.yaml +++ b/.github/workflows/dockers-benchmark-operator-image.yaml @@ -17,7 +17,7 @@ name: "Build docker image: benchmark-operator" on: push: branches: - - master + - main tags: - "*.*.*" - "v*.*.*" @@ -25,22 +25,24 @@ on: - "v*.*.*-*" paths: - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" - ".github/workflows/dockers-benchmak-operator-image.yml" - "go.mod" - "go.sum" - "internal/**" - "!internal/**/*_test.go" - "!internal/db/**" - - "!internal/k8s/**" - "apis/grpc/**" - "pkg/benchmark/operator/**" - "cmd/benchmark/operator/**" - - "dockers/benchmark/operator/Dockerfile" + - "pkg/benchmark/job/**" + - "cmd/benchmark/job/**" + - "dockers/tools/benchmark/operator/Dockerfile" - "versions/GO_VERSION" - - "versions/NGT_VERSION" pull_request: paths: - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" - ".github/workflows/dockers-benchmak-operator-image.yml" - "go.mod" - "go.sum" @@ -50,83 +52,31 @@ on: - "apis/grpc/**" - "pkg/benchmark/operator/**" - "cmd/benchmark/operator/**" - - "dockers/benchmark/operator/Dockerfile" + - "pkg/benchmark/job/**" + - "cmd/benchmark/job/**" + - "dockers/tools/benchmark/operator/Dockerfile" + - "versions/GO_VERSION" + pull_request_target: + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" + - ".github/workflows/dockers-benchmak-operator-image.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "apis/grpc/**" + - "pkg/benchmark/operator/**" + - "cmd/benchmark/operator/**" + - "pkg/benchmark/job/**" + - "cmd/benchmark/job/**" + - "dockers/tools/benchmark/operator/Dockerfile" - "versions/GO_VERSION" - - "versions/NGT_VERSION" - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref != 'refs/heads/main' && github.ref || github.sha }}-${{ github.event_name }} - cancel-in-progress: true jobs: build: - strategy: - max-parallel: 4 - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Setup QEMU - uses: docker/setup-qemu-action@v1 - with: - platforms: all - - name: Setup Docker Buildx - id: buildx - uses: docker/setup-buildx-action@v1 - with: - buildkitd-flags: "--debug" - - name: Login to DockerHub - uses: docker/login-action@v1 - with: - username: ${{ secrets.DOCKERHUB_USER }} - password: ${{ secrets.DOCKERHUB_PASS }} - - name: Login to GitHub Container Registry - uses: docker/login-action@v1 - with: - registry: ghcr.io - username: ${{ secrets.PACKAGE_USER }} - password: ${{ secrets.PACKAGE_TOKEN }} - - name: Build and Publish - id: build_and_publish - uses: ./.github/actions/docker-build - with: - target: benchmark-operator - builder: ${{ steps.buildx.outputs.name }} - - name: Initialize CodeQL - if: startsWith( github.ref, 'refs/tags/') - uses: github/codeql-action/init@v2 - - name: Run vulnerability scanner (table) - if: startsWith( github.ref, 'refs/tags/') - uses: aquasecurity/trivy-action@master - with: - image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}" - format: "table" - - name: Run vulnerability scanner (sarif) - if: startsWith( github.ref, 'refs/tags/') - uses: aquasecurity/trivy-action@master - with: - image-ref: "${{ steps.build_and_publish.outputs.IMAGE_NAME }}:${{ steps.build_and_publish.outputs.PRIMARY_TAG }}" - format: "template" - template: "@/contrib/sarif.tpl" - output: "trivy-results.sarif" - - name: Upload Trivy scan results to Security tab - if: startsWith( github.ref, 'refs/tags/') - uses: github/codeql-action/upload-sarif@v2 - with: - sarif_file: "trivy-results.sarif" - slack: - name: Slack notification - needs: build - runs-on: ubuntu-latest - if: github.ref == 'refs/heads/main' || startsWith( github.ref, 'refs/tags/') - steps: - - uses: technote-space/workflow-conclusion-action@v2 - with: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - uses: 8398a7/action-slack@v3 - with: - author_name: benchmark-operator image build - status: ${{ env.WORKFLOW_CONCLUSION }} - only_mention_fail: channel - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - SLACK_WEBHOOK_URL: ${{ secrets.SLACK_NOTIFY_WEBHOOK_URL }} + uses: ./.github/workflows/_docker-image.yaml + with: + target: benchmark-operator + secrets: inherit diff --git a/internal/k8s/vald/benchmark/api/v1/job_types.go b/internal/k8s/vald/benchmark/api/v1/job_types.go index 9b8bbf1f0b9..7d401093809 100644 --- a/internal/k8s/vald/benchmark/api/v1/job_types.go +++ b/internal/k8s/vald/benchmark/api/v1/job_types.go @@ -23,7 +23,7 @@ import ( ) type BenchmarkJobSpec struct { - *config.GlobalConfig `json:",omitempty" yaml:""` + *config.GlobalConfig `json:",omitempty" yaml:""` ServerConfig *config.Servers `json:"server_config,omitempty" yaml:"server_config"` Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"` Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"` diff --git a/pkg/tools/benchmark/job/router/option.go b/pkg/tools/benchmark/job/router/option.go index 7053ee57ea2..e3cc0633f7f 100644 --- a/pkg/tools/benchmark/job/router/option.go +++ b/pkg/tools/benchmark/job/router/option.go @@ -18,7 +18,7 @@ package router import ( - "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/pkg/tools/benchmark/job/handler/rest" ) diff --git a/pkg/tools/benchmark/job/router/router.go b/pkg/tools/benchmark/job/router/router.go index 6b45c721843..3020c9f5066 100644 --- a/pkg/tools/benchmark/job/router/router.go +++ b/pkg/tools/benchmark/job/router/router.go @@ -20,9 +20,9 @@ package router import ( "net/http" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/net/http/middleware" "github.com/vdaas/vald/internal/net/http/routing" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/pkg/tools/benchmark/job/handler/rest" ) diff --git a/pkg/tools/benchmark/job/service/insert.go b/pkg/tools/benchmark/job/service/insert.go index 00ba632e801..d4074f292ab 100644 --- a/pkg/tools/benchmark/job/service/insert.go +++ b/pkg/tools/benchmark/job/service/insert.go @@ -25,7 +25,7 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "golang.org/x/sync/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" ) func (j *job) insert(ctx context.Context, ech chan error) error { @@ -38,10 +38,8 @@ func (j *job) insert(ctx context.Context, ech chan error) error { if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { iter := i eg.Go(func() error { @@ -50,7 +48,6 @@ func (j *job) insert(ctx context.Context, ech chan error) error { if err != nil { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - // return errors.Join(err, context.Canceled) return nil } select { @@ -74,9 +71,6 @@ func (j *job) insert(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // log.Warnf("[benchmark job] insert error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } } } // TODO: send metrics to the Prometeus diff --git a/pkg/tools/benchmark/job/service/job.go b/pkg/tools/benchmark/job/service/job.go index 168046e2b7a..3a399777373 100644 --- a/pkg/tools/benchmark/job/service/job.go +++ b/pkg/tools/benchmark/job/service/job.go @@ -28,13 +28,13 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/client/v1/client/vald" "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s/client" v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/rand" "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/internal/test/data/hdf5" "github.com/vdaas/vald/internal/timeutil/rate" ) diff --git a/pkg/tools/benchmark/job/service/object.go b/pkg/tools/benchmark/job/service/object.go index af3971798fe..542213de0ed 100644 --- a/pkg/tools/benchmark/job/service/object.go +++ b/pkg/tools/benchmark/job/service/object.go @@ -24,15 +24,13 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "golang.org/x/sync/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" ) func (j *job) exists(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking exists") - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { idx := i eg.Go(func() error { @@ -42,7 +40,6 @@ func (j *job) exists(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { return nil - // return errors.Join(err, context.Canceled) } select { case <-egctx.Done(): @@ -58,11 +55,7 @@ func (j *job) exists(ctx context.Context, ech chan error) error { case <-egctx.Done(): log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return nil - // return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // log.Warnf("[benchmark job] exists error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } } } log.Debugf("[benchmark job] Finish exists: iter= %d \n%v\n", idx, res) @@ -80,10 +73,8 @@ func (j *job) exists(ctx context.Context, ech chan error) error { func (j *job) getObject(ctx context.Context, ech chan error) error { log.Info("[benchmark job] Start benchmarking getObject") - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { log.Infof("[benchmark job] Start get object: iter = %d", i) ft := []*payload.Filter_Target{} @@ -102,7 +93,6 @@ func (j *job) getObject(ctx context.Context, ech chan error) error { if err != nil { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - // return errors.Join(err, context.Canceled) return nil } select { @@ -123,12 +113,8 @@ func (j *job) getObject(ctx context.Context, ech chan error) error { select { case <-egctx.Done(): log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) - // return errors.Join(err, egctx.Err()) return nil default: - // if st, ok := status.FromError(err); ok { - // log.Warnf("[benchmark job] object error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } } } if res != nil { diff --git a/pkg/tools/benchmark/job/service/option.go b/pkg/tools/benchmark/job/service/option.go index 5b2d470fae0..e1d443d4163 100644 --- a/pkg/tools/benchmark/job/service/option.go +++ b/pkg/tools/benchmark/job/service/option.go @@ -22,9 +22,9 @@ import ( "github.com/vdaas/vald/internal/client/v1/client/vald" "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/internal/test/data/hdf5" "github.com/vdaas/vald/internal/timeutil" ) diff --git a/pkg/tools/benchmark/job/service/remove.go b/pkg/tools/benchmark/job/service/remove.go index be3c91c3fa7..79f465e4111 100644 --- a/pkg/tools/benchmark/job/service/remove.go +++ b/pkg/tools/benchmark/job/service/remove.go @@ -24,7 +24,7 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "golang.org/x/sync/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" ) func (j *job) remove(ctx context.Context, ech chan error) error { @@ -35,10 +35,8 @@ func (j *job) remove(ctx context.Context, ech chan error) error { if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { idx := i eg.Go(func() error { @@ -48,7 +46,6 @@ func (j *job) remove(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { return nil - // return errors.Join(err, context.Canceled) } select { case <-egctx.Done(): @@ -68,9 +65,6 @@ func (j *job) remove(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // log.Warnf("[benchmark job] remove error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } } } log.Debugf("[benchmark job] Finish remove: iter= %d \n%v", idx, res) diff --git a/pkg/tools/benchmark/job/service/search.go b/pkg/tools/benchmark/job/service/search.go index c7d54937052..c59356ae866 100644 --- a/pkg/tools/benchmark/job/service/search.go +++ b/pkg/tools/benchmark/job/service/search.go @@ -24,7 +24,7 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "golang.org/x/sync/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" ) func (j *job) search(ctx context.Context, ech chan error) error { @@ -47,10 +47,8 @@ func (j *job) search(ctx context.Context, ech chan error) error { }(), } sres := make([]*payload.Search_Response, j.dataset.Indexes) - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { iter := i eg.Go(func() error { @@ -60,7 +58,6 @@ func (j *job) search(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { return nil - // return errors.Join(err, context.Canceled) } select { case <-egctx.Done(): @@ -83,13 +80,7 @@ func (j *job) search(ctx context.Context, ech chan error) error { case <-egctx.Done(): log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return nil - // return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // if st.Code() != codes.NotFound { - // log.Warnf("[benchmark job] search error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } - // } } } if res != nil && j.searchConfig.EnableLinearSearch { @@ -116,7 +107,6 @@ func (j *job) search(ctx context.Context, ech chan error) error { if err != nil { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - // return errors.Join(err, context.Canceled) return nil } select { @@ -142,11 +132,6 @@ func (j *job) search(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // if st.Code() != codes.NotFound { - // log.Warnf("[benchmark job] linear search error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } - // } } } if res != nil { diff --git a/pkg/tools/benchmark/job/service/update.go b/pkg/tools/benchmark/job/service/update.go index 51a10a9ba2e..14848fcd656 100644 --- a/pkg/tools/benchmark/job/service/update.go +++ b/pkg/tools/benchmark/job/service/update.go @@ -25,7 +25,7 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "golang.org/x/sync/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" ) func (j *job) update(ctx context.Context, ech chan error) error { @@ -39,10 +39,8 @@ func (j *job) update(ctx context.Context, ech chan error) error { if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { iter := i eg.Go(func() error { @@ -51,7 +49,6 @@ func (j *job) update(ctx context.Context, ech chan error) error { if err != nil { log.Errorf("[benchmark job] limiter error is detected: %s", err.Error()) if errors.Is(err, context.Canceled) { - // return errors.Join(err, context.Canceled) return nil } select { @@ -75,9 +72,6 @@ func (j *job) update(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // log.Warnf("[benchmark job] update error is detected: code = %d, msg = %s\n", st.Code(), err.Error()) - // } } } if res != nil { diff --git a/pkg/tools/benchmark/job/service/upsert.go b/pkg/tools/benchmark/job/service/upsert.go index 016c2963820..1aaf4c3d44a 100644 --- a/pkg/tools/benchmark/job/service/upsert.go +++ b/pkg/tools/benchmark/job/service/upsert.go @@ -25,7 +25,7 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "golang.org/x/sync/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" ) func (j *job) upsert(ctx context.Context, ech chan error) error { @@ -39,10 +39,8 @@ func (j *job) upsert(ctx context.Context, ech chan error) error { if j.timestamp > int64(0) { cfg.Timestamp = j.timestamp } - eg, egctx := errgroup.WithContext(ctx) + eg, egctx := errgroup.New(ctx) eg.SetLimit(j.concurrencyLimit) - // eg, egctx := errgroup.New(ctx) - // eg.Limitation(j.concurrencyLimit) for i := j.dataset.Range.Start; i <= j.dataset.Range.End; i++ { iter := i eg.Go(func() error { @@ -74,9 +72,6 @@ func (j *job) upsert(ctx context.Context, ech chan error) error { log.Errorf("[benchmark job] context error is detected: %s\t%s", err.Error(), egctx.Err()) return errors.Join(err, egctx.Err()) default: - // if st, ok := status.FromError(err); ok { - // log.Warnf("[benchmark job] upsert error is detected: code = %d, msg = %s", st.Code(), err.Error()) - // } } } if res != nil { diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index 038f048c38d..c7e23e277ef 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -23,7 +23,6 @@ import ( "github.com/vdaas/vald/internal/client/v1/client/vald" iconf "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc" @@ -34,6 +33,7 @@ import ( "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/servers/server" "github.com/vdaas/vald/internal/servers/starter" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/internal/test/data/hdf5" "github.com/vdaas/vald/pkg/tools/benchmark/job/config" handler "github.com/vdaas/vald/pkg/tools/benchmark/job/handler/grpc" diff --git a/pkg/tools/benchmark/operator/router/option.go b/pkg/tools/benchmark/operator/router/option.go index 681ce4bef7a..875d6f8241f 100644 --- a/pkg/tools/benchmark/operator/router/option.go +++ b/pkg/tools/benchmark/operator/router/option.go @@ -18,7 +18,7 @@ package router import ( - "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/pkg/tools/benchmark/operator/handler/rest" ) diff --git a/pkg/tools/benchmark/operator/router/router.go b/pkg/tools/benchmark/operator/router/router.go index 1303eeec8d6..505df5dce35 100644 --- a/pkg/tools/benchmark/operator/router/router.go +++ b/pkg/tools/benchmark/operator/router/router.go @@ -20,9 +20,9 @@ package router import ( "net/http" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/net/http/middleware" "github.com/vdaas/vald/internal/net/http/routing" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/pkg/tools/benchmark/operator/handler/rest" ) diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index c3c68728598..4c41078537e 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -24,7 +24,6 @@ import ( "sync/atomic" "time" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s" "github.com/vdaas/vald/internal/k8s/client" @@ -33,6 +32,7 @@ import ( benchjob "github.com/vdaas/vald/internal/k8s/vald/benchmark/job" benchscenario "github.com/vdaas/vald/internal/k8s/vald/benchmark/scenario" "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/sync/errgroup" ) type Operator interface { diff --git a/pkg/tools/benchmark/operator/service/option.go b/pkg/tools/benchmark/operator/service/option.go index f38c10f38ef..e1a426e4808 100644 --- a/pkg/tools/benchmark/operator/service/option.go +++ b/pkg/tools/benchmark/operator/service/option.go @@ -20,8 +20,8 @@ package service import ( "time" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/sync/errgroup" ) // Option represents the functional option for scenario struct. diff --git a/pkg/tools/benchmark/operator/usecase/benchmarkd.go b/pkg/tools/benchmark/operator/usecase/benchmarkd.go index e867473bc51..6a8f2148de3 100644 --- a/pkg/tools/benchmark/operator/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/operator/usecase/benchmarkd.go @@ -22,7 +22,6 @@ import ( "os" iconf "github.com/vdaas/vald/internal/config" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc" @@ -33,6 +32,7 @@ import ( "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/servers/server" "github.com/vdaas/vald/internal/servers/starter" + "github.com/vdaas/vald/internal/sync/errgroup" "github.com/vdaas/vald/pkg/tools/benchmark/operator/config" handler "github.com/vdaas/vald/pkg/tools/benchmark/operator/handler/grpc" "github.com/vdaas/vald/pkg/tools/benchmark/operator/handler/rest"