Skip to content

Commit

Permalink
✨ impl benchmark jobs
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Mar 29, 2023
1 parent 2a65ee1 commit fa03594
Show file tree
Hide file tree
Showing 19 changed files with 781 additions and 78 deletions.
28 changes: 24 additions & 4 deletions internal/config/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ type BenchmarkJob struct {
UpsertConfig *UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"`
SearchConfig *SearchConfig `json:"search_config,omitempty" yaml:"search_config"`
RemoveConfig *RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"`
ObjectConfig *ObjectConfig `json:"object_config,omitempty" yaml:"object_config"`
ClientConfig *GRPCClient `json:"client_config,omitempty" yaml:"client_config"`
Rules []*BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"`
BeforeJobName string `json:"before_job_name,omitempty" yaml:"before_job_name"`
BeforeJobNamespace string `json:"before_job_namespace,omitempty" yaml:"before_job_namespace"`
Client client.Client `json:"client,omitempty" yaml:"client"`
RPC int `json:"rpc,omitempty" yaml:"rpc"`
}

// BenchmarkScenario represents the configuration for the internal benchmark scenario.
Expand Down Expand Up @@ -80,14 +82,16 @@ type InsertConfig struct {

// UpdateConfig defines the desired state of update config
type UpdateConfig struct {
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"`
}

// UpsertConfig defines the desired state of upsert config
type UpsertConfig struct {
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"`
}

// SearchConfig defines the desired state of search config
Expand All @@ -105,6 +109,22 @@ type RemoveConfig struct {
Timestamp string `json:"timestamp,omitempty"`
}

// ObjectConfig defines the desired state of object config
type ObjectConfig struct {
FilterConfig FilterConfig `json:"filter_config,omitempty" yaml:"filter_config"`
}

// FilterTarget defines the desired state of filter target
type FilterTarget struct {
Host string `json:"host,omitempty" yaml:"host"`
Port int32 `json:"port,omitempty" yaml:"port"`
}

// FilterConfig defines the desired state of filter config
type FilterConfig struct {
Targets []*FilterTarget `json:"target,omitempty" yaml:"target"`
}

// Bind binds the actual data from the Job receiver fields.
func (b *BenchmarkJob) Bind() *BenchmarkJob {
b.JobType = GetActualValue(b.JobType)
Expand Down
5 changes: 5 additions & 0 deletions internal/errors/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ var (
ErrFailedToCreateJob = func(err error, jn string) error {
return Wrapf(err, "could not create job: %s ", jn)
}

// ErrMismatchAtomics represents a function to generate an error that mismatch each atomic.Pointer stored corresponding to benchmark tasks.
ErrMismatchAtomics = func(job, benchjob, benchscenario interface{}) error {
return Errorf("mismatch atomics: job=%v\tbenchjob=%v\tbenchscenario=%v", job, benchjob, benchscenario)
}
)
6 changes: 6 additions & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ spec:
- type
type: object
type: array
rpc:
type: integer
insert_config:
description: InsertConfig defines the desired state of insert config
properties:
Expand Down Expand Up @@ -168,6 +170,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
upsert_config:
description: UpsertConfig defines the desired state of upsert config
Expand All @@ -176,6 +180,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
client_config:
description: ClientConfig represents the configurations for gRPC client.
Expand Down
6 changes: 6 additions & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ spec:
- type
type: object
type: array
rpc:
type: integer
insert_config:
description: InsertConfig defines the desired state of insert config
properties:
Expand Down Expand Up @@ -206,6 +208,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
upsert_config:
description: UpsertConfig defines the desired state of upsert config
Expand All @@ -214,6 +218,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
client_config:
description: ClientConfig represents the configurations for gRPC client.
Expand Down
12 changes: 12 additions & 0 deletions internal/k8s/rbac/benchmark/operator/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ rules:
- patch
- update
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- vald.vdaas.org
resources:
Expand Down
28 changes: 15 additions & 13 deletions internal/k8s/vald/benchmark/api/v1/job_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ import (
)

type BenchmarkJobSpec struct {
Target *BenchmarkTarget `json:"target,omitempty"`
Dataset *BenchmarkDataset `json:"dataset,omitempty"`
Dimension int `json:"dimension,omitempty"`
Replica int `json:"replica,omitempty"`
Repetition int `json:"repetition,omitempty"`
JobType string `json:"job_type,omitempty"`
InsertConfig *config.InsertConfig `json:"insert_config,omitempty"`
UpdateConfig *config.UpdateConfig `json:"update_config,omitempty"`
UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty"`
SearchConfig *config.SearchConfig `json:"search_config,omitempty"`
RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty"`
ClientConfig *config.GRPCClient `json:"client_config,omitempty"`
Rules []*config.BenchmarkJobRule `json:"rules,omitempty"`
Target *BenchmarkTarget `json:"target,omitempty" yaml:"target"`
Dataset *BenchmarkDataset `json:"dataset,omitempty" yaml:"dataset"`
Dimension int `json:"dimension,omitempty" yaml:"dimension"`
Replica int `json:"replica,omitempty" yaml:"replica"`
Repetition int `json:"repetition,omitempty" yaml:"repetition"`
JobType string `json:"job_type,omitempty" yaml:"job_type"`
InsertConfig *config.InsertConfig `json:"insert_config,omitempty" yaml:"insert_config"`
UpdateConfig *config.UpdateConfig `json:"update_config,omitempty" yaml:"update_config"`
UpsertConfig *config.UpsertConfig `json:"upsert_config,omitempty" yaml:"upsert_config"`
SearchConfig *config.SearchConfig `json:"search_config,omitempty" yaml:"search_config"`
RemoveConfig *config.RemoveConfig `json:"remove_config,omitempty" yaml:"remove_config"`
ObjectConfig *config.ObjectConfig `json:"object_config,omitempty" yaml:"object_config"`
ClientConfig *config.GRPCClient `json:"client_config,omitempty" yaml:"client_config"`
Rules []*config.BenchmarkJobRule `json:"rules,omitempty" yaml:"rules"`
RPC int `json:"rpc,omitempty" yaml:"rpc"`
}

type BenchmarkJobStatus string
Expand Down
22 changes: 22 additions & 0 deletions internal/test/data/hdf5/hdf5.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Data interface {
Read() error
GetName() DatasetName
GetPath() string
GetByGroupName(name string) [][]float32
GetTrain() [][]float32
GetTest() [][]float32
GetNeighbors() [][]int
Expand Down Expand Up @@ -164,6 +165,27 @@ func (d *data) GetPath() string {
return d.path
}

// TODO: Apply generics
func (d *data) GetByGroupName(name string) [][]float32 {
switch name {
case "train":
return d.GetTrain()
case "test":
return d.GetTest()
case "neighbors":
l := d.GetNeighbors()
r := make([][]float32, 0)
for x := range l {
for y, z := range l[x] {
r[x][y] = float32(z)
}
}
return r
default:
return nil
}
}

func (d *data) GetTrain() [][]float32 {
return d.train
}
Expand Down
62 changes: 62 additions & 0 deletions internal/timeutil/rate/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package rate

import (
"context"
"runtime"

"go.uber.org/ratelimit"
"golang.org/x/time/rate"
)

type Limiter interface {
Wait(ctx context.Context) error
}

type limiter struct {
isStd bool
uber ratelimit.Limiter
std *rate.Limiter
}

func NewLimiter(cnt int) Limiter {
if runtime.GOMAXPROCS(0) >= 32 {
return &limiter{
isStd: true,
std: rate.NewLimiter(rate.Limit(cnt), 1),
}
}
return &limiter{
isStd: false,
uber: ratelimit.New(cnt, ratelimit.WithoutSlack),
}
}

func (l *limiter) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
if l.isStd {
return l.std.Wait(ctx)
} else {
l.uber.Take()
}
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/tools/benchmark/job/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func NewConfig(ctx context.Context, path string) (cfg *Config, err error) {
cfg.Job.UpsertConfig = jobResource.Spec.UpsertConfig
cfg.Job.SearchConfig = jobResource.Spec.SearchConfig
cfg.Job.RemoveConfig = jobResource.Spec.RemoveConfig
cfg.Job.ObjectConfig = jobResource.Spec.ObjectConfig
cfg.Job.ClientConfig = jobResource.Spec.ClientConfig
cfg.Job.RPC = jobResource.Spec.RPC
if annotations := jobResource.GetAnnotations(); annotations != nil {
cfg.Job.BeforeJobName = annotations["before-job-name"]
cfg.Job.BeforeJobNamespace = annotations["before-job-namespace"]
Expand Down
86 changes: 86 additions & 0 deletions pkg/tools/benchmark/job/service/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// Copyright (C) 2019-2022 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package service manages the main logic of benchmark job.
package service

import (
"context"
"strconv"

"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
)

func (j *job) insert(ctx context.Context, ech chan error) error {
log.Info("[benchmark job] Start benchmarking insert")
if j.insertConfig == nil {
err := errors.NewErrInvalidOption("insertConfig", j.insertConfig)
select {
case <-ctx.Done():
if err != context.Canceled {
ech <- errors.Wrap(err, ctx.Err().Error())
} else {
ech <- err
}
case ech <- err:
}
return err
}

// create data
vecs := j.genVec(j.dataset)
timestamp, _ := strconv.Atoi(j.insertConfig.Timestamp)
cfg := &payload.Insert_Config{
SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck,
Timestamp: int64(timestamp),
}
for i := 0; i < len(vecs); i++ {
log.Infof("[benchmark job] Start insert: iter = %d", i)
err := j.limiter.Wait(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
ech <- err
}
}
id := i + j.dataset.Indexes
res, err := j.client.Insert(ctx, &payload.Insert_Request{
Vector: &payload.Object_Vector{
Id: strconv.Itoa(id),
Vector: vecs[i],
},
Config: cfg,
})
if err != nil {
select {
case <-ctx.Done():
if errors.Is(err, context.Canceled) {
ech <- errors.Wrap(err, ctx.Err().Error())
} else {
ech <- err
}
case ech <- err:
break
}
}
// TODO: send metrics to the Prometeus
log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, res)
}

log.Info("[benchmark job] Finish benchmarking insert")
return nil
}
Loading

0 comments on commit fa03594

Please sign in to comment.