Skip to content

Commit

Permalink
✨ impl benchmark jobs
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <[email protected]>
  • Loading branch information
vankichi committed Mar 16, 2023
1 parent 1035390 commit 9e6c81e
Show file tree
Hide file tree
Showing 15 changed files with 736 additions and 60 deletions.
10 changes: 6 additions & 4 deletions internal/config/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ type InsertConfig struct {

// UpdateConfig defines the desired state of update config
type UpdateConfig struct {
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"`
}

// UpsertConfig defines the desired state of upsert config
type UpsertConfig struct {
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
SkipStrictExistCheck bool `json:"skip_strict_exist_check,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
DisableBalancedUpdate bool `json:"disable_balanced_update,omitempty"`
}

// SearchConfig defines the desired state of search config
Expand Down
5 changes: 5 additions & 0 deletions internal/errors/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ var (
ErrFailedToCreateJob = func(err error, jn string) error {
return Wrapf(err, "could not create job: %s ", jn)
}

// ErrMismatchAtomics represents a function to generate an error that mismatch each atomic.Pointer stored corresponding to benchmark tasks.
ErrMismatchAtomics = func(job, benchjob, benchscenario interface{}) error {
return Errorf("mismatch atomics: job=%v\tbenchjob=%v\tbenchscenario=%v", job, benchjob, benchscenario)
}
)
4 changes: 4 additions & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
upsert_config:
description: UpsertConfig defines the desired state of upsert config
Expand All @@ -176,6 +178,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
client_config:
description: ClientConfig represents the configurations for gRPC client.
Expand Down
4 changes: 4 additions & 0 deletions internal/k8s/crd/benchmark/valdbenchmarkscenario.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
upsert_config:
description: UpsertConfig defines the desired state of upsert config
Expand All @@ -214,6 +216,8 @@ spec:
type: boolean
timestamp:
type: string
disable_balanced_update:
type: boolean
type: object
client_config:
description: ClientConfig represents the configurations for gRPC client.
Expand Down
22 changes: 22 additions & 0 deletions internal/test/data/hdf5/hdf5.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Data interface {
Read() error
GetName() DatasetName
GetPath() string
GetByGroupName(name string) [][]float32
GetTrain() [][]float32
GetTest() [][]float32
GetNeighbors() [][]int
Expand Down Expand Up @@ -164,6 +165,27 @@ func (d *data) GetPath() string {
return d.path
}

// TODO: Apply generics
func (d *data) GetByGroupName(name string) [][]float32 {
switch name {
case "train":
return d.GetTrain()
case "test":
return d.GetTest()
case "neighbors":
l := d.GetNeighbors()
r := make([][]float32, 0)
for x := range l {
for y, z := range l[x] {
r[x][y] = float32(z)
}
}
return r
default:
return nil
}
}

func (d *data) GetTrain() [][]float32 {
return d.train
}
Expand Down
62 changes: 62 additions & 0 deletions internal/timeutil/rate/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package rate

import (
"context"
"runtime"

"go.uber.org/ratelimit"
"golang.org/x/time/rate"
)

type Limiter interface {
Wait(ctx context.Context) error
}

type limiter struct {
isStd bool
uber ratelimit.Limiter
std *rate.Limiter
}

func NewLimiter(cnt int) Limiter {
if runtime.GOMAXPROCS(0) >= 32 {
return &limiter{
isStd: true,
std: rate.NewLimiter(rate.Limit(cnt), 1),
}
}
return &limiter{
isStd: false,
uber: ratelimit.New(cnt, ratelimit.WithoutSlack),
}
}

func (l *limiter) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
if l.isStd {
return l.std.Wait(ctx)
} else {
l.uber.Take()
}
}
return nil
}
92 changes: 92 additions & 0 deletions pkg/tools/benchmark/job/service/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// Copyright (C) 2019-2022 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package service manages the main logic of benchmark job.
package service

import (
"context"
"strconv"
"testing"
"time"

"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
)

func (j *job) insert(ctx context.Context, ech chan error) error {
log.Info("[benchmark job] Start benchmarking insert")
if j.insertConfig == nil {
err := errors.NewErrInvalidOption("insertConfig", j.insertConfig)
select {
case <-ctx.Done():
if err != context.Canceled {
ech <- errors.Wrap(err, ctx.Err().Error())
} else {
ech <- err
}
case ech <- err:
}
return err
}

// create data
vecs := j.genVec(j.dataset)
timestamp, _ := strconv.Atoi(j.insertConfig.Timestamp)
cfg := &payload.Insert_Config{
SkipStrictExistCheck: j.insertConfig.SkipStrictExistCheck,
Timestamp: int64(timestamp),
}
for i := 0; i < len(vecs); i++ {
log.Infof("[benchmark job] Start insert: iter = %d", i)
bres := testing.Benchmark(func(b *testing.B) {
b.Helper()
b.ResetTimer()
b.StartTimer()
start := time.Now()
id := i + j.dataset.Indexes
res, err := j.client.Insert(ctx, &payload.Insert_Request{
Vector: &payload.Object_Vector{
Id: strconv.Itoa(id),
Vector: vecs[i],
},
Config: cfg,
})
if err != nil {
select {
case <-ctx.Done():
if errors.Is(err, context.Canceled) {
ech <- errors.Wrap(err, ctx.Err().Error())
} else {
ech <- err
}
case ech <- err:
break
}
}
b.StopTimer()
latency := time.Since(start)
b.ReportMetric(float64(latency.Microseconds()), "latency")
log.Infof("[benchmark job] iter=%d, bN=%d, Name=%s, Uuid=%s, Ips=%v, latency=%d", i, b.N, res.Name, res.Uuid, res.Ips, latency.Microseconds())
})
// TODO: send metrics to the Prometeus
log.Infof("[benchmark job] Finish insert: iter= %d \n%v\n", i, bres)
}

log.Info("[benchmark job] Finish benchmarking insert")
return nil
}
33 changes: 32 additions & 1 deletion pkg/tools/benchmark/job/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,33 @@ type jobType int

const (
USERDEFINED jobType = iota
INSERT
SEARCH
UPDATE
UPSERT
REMOVE
GETOBJECT
EXISTS
)

func (jt jobType) String() string {
switch jt {
case USERDEFINED:
return "userdefined"
case INSERT:
return "insert"
case SEARCH:
return "search"
case UPDATE:
return "update"
case UPSERT:
return "upsert"
case REMOVE:
return "remove"
case GETOBJECT:
return "getObject"
case EXISTS:
return "exists"
}
return ""
}
Expand Down Expand Up @@ -91,8 +109,20 @@ func New(opts ...Option) (Job, error) {
opt := WithJobFunc(j.jobFunc)
err := opt(j)
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
case INSERT:
j.jobFunc = j.insert
case SEARCH:
j.jobFunc = j.search
case UPDATE:
j.jobFunc = j.update
case UPSERT:
j.jobFunc = j.upsert
case REMOVE:
j.jobFunc = j.remove
case GETOBJECT:
j.jobFunc = j.getObject
case EXISTS:
j.jobFunc = j.exists
}
} else if j.jobType != USERDEFINED {
log.Warnf("[benchmark job] userdefined jobFunc is set but jobType is set %s", j.jobType.String())
Expand Down Expand Up @@ -208,13 +238,14 @@ func calcRecall(linearRes, searchRes []*payload.Object_Distance) (recall float64
return recall / float64(len(linearRes))
}

func genVec(data [][]float32, cfg *config.BenchmarkDataset) [][]float32 {
func (j *job) genVec(cfg *config.BenchmarkDataset) [][]float32 {
start := cfg.Range.Start
end := cfg.Range.End
if (end - start) < cfg.Indexes {
end = cfg.Indexes
}
num := end - start + 1
data := j.hdf5.GetByGroupName(cfg.Group)
if len(data) < num {
num = len(data)
end = start + num + 1
Expand Down
Loading

0 comments on commit 9e6c81e

Please sign in to comment.