Skip to content

Commit

Permalink
add core ngt benchmark
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Sep 21, 2023
1 parent 594b23b commit a1be637
Show file tree
Hide file tree
Showing 17 changed files with 518 additions and 91 deletions.
3 changes: 3 additions & 0 deletions charts/vald-helm-operator/crds/valdrelease.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ spec:
type: boolean
enable_proactive_gc:
type: boolean
error_buffer_limit:
type: integer
minimum: 1
index_path:
type: string
initial_delay_max_duration:
Expand Down
1 change: 1 addition & 0 deletions charts/vald/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Run the following command to install the chart,
| agent.ngt.enable_copy_on_write | bool | `false` | enable copy on write saving for more stable backup |
| agent.ngt.enable_in_memory_mode | bool | `true` | in-memory mode enabled |
| agent.ngt.enable_proactive_gc | bool | `false` | enable proactive GC call for reducing heap memory allocation |
| agent.ngt.error_buffer_limit | int | `10` | maximum number of core ngt error buffer pool size limit |
| agent.ngt.index_path | string | `""` | path to index data |
| agent.ngt.initial_delay_max_duration | string | `"3m"` | maximum duration for initial delay |
| agent.ngt.kvsdb.concurrency | int | `6` | kvsdb processing concurrency |
Expand Down
5 changes: 5 additions & 0 deletions charts/vald/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@
"type": "boolean",
"description": "enable proactive GC call for reducing heap memory allocation"
},
"error_buffer_limit": {
"type": "integer",
"description": "maximum number of core ngt error buffer pool size limit",
"minimum": 1
},
"index_path": {
"type": "string",
"description": "path to index data"
Expand Down
3 changes: 3 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,9 @@ agent:
# @schema {"name": "agent.ngt.broken_index_history_limit", "type": "integer", "minimum": 0}
# agent.ngt.broken_index_history_limit -- maximum number of broken index generations to backup
broken_index_history_limit: 0
# @schema {"name": "agent.ngt.error_buffer_limit", "type": "integer", "minimum": 1}
# agent.ngt.error_buffer_limit -- maximum number of core ngt error buffer pool size limit
error_buffer_limit: 10
# @schema {"name": "agent.sidecar", "type": "object"}
sidecar:
# @schema {"name": "agent.sidecar.enabled", "type": "boolean"}
Expand Down
217 changes: 217 additions & 0 deletions cmd/tools/cli/benchmark/core/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
//
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package ngt provides implementation of Go API for https://github.com/yahoojapan/NGT
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/vdaas/vald/internal/core/algorithm/ngt"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/http/metrics"
"gonum.org/v1/hdf5"
)

func main() {
vectors, _, _ := load(os.Getenv("DATA_PATH"))
n, _ := ngt.New(
ngt.WithDimension(len(vectors[0])),
ngt.WithDefaultPoolSize(8),
ngt.WithObjectType(ngt.Float),
ngt.WithDistanceType(ngt.L2),
)
pid := os.Getpid()

log.Infof("# of vectors: %v", len(vectors))
output := func(header string) {
status := fmt.Sprintf("/proc/%d/status", pid)
buf, err := os.ReadFile(status)
if err != nil {
log.Fatal(err)
}
var vmpeak, vmrss, vmhwm string
for _, line := range strings.Split(string(buf), "\n") {
switch {
case strings.HasPrefix(line, "VmPeak"):
vmpeak = strings.Fields(line)[1]
case strings.HasPrefix(line, "VmHWM"):
vmhwm = strings.Fields(line)[1]
case strings.HasPrefix(line, "VmRSS"):
vmrss = strings.Fields(line)[1]
}
}

var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Infof("%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v", header, vmpeak, vmhwm, vmrss, m.Alloc/1024, m.TotalAlloc/1024, m.HeapAlloc/1024, m.HeapSys/1024, m.HeapInuse/1024)
}
log.Info(" operation\tVmPeak\tVmHWM\tVmRSS\tAlloc\tTotalAlloc\tHeapAlloc\tHeapSys\tHeapInuse")
output(" start")
defer output(" end")
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
srv := &http.Server{
Addr: "0.0.0.0:6060",
Handler: metrics.NewPProfHandler(),
}
go srv.ListenAndServe()
<-ctx.Done()
srv.Shutdown(context.Background())
}()
wg.Add(1)
go func() {
defer wg.Done()
ids := make([]uint, len(vectors))
for {
select {
case <-ctx.Done():
for i, vector := range vectors {
id, err := n.Insert(vector)
if err != nil {
log.Fatal(err)
}
ids[i] = id
}
output(" insert")

if err := n.CreateIndex(8); err != nil {
log.Fatal(err)
}
output("create index")
return
default:
for i, vector := range vectors {
id, err := n.Insert(vector)
if err != nil {
log.Fatal(err)
}
ids[i] = id
}
output(" insert")

if err := n.CreateIndex(8); err != nil {
log.Fatal(err)
}
output("create index")

for _, id := range ids {
if err := n.Remove(id); err != nil {
log.Fatal(err)
}
}
output(" remove")
}
}
}()

ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

<-ch
cancel()

wg.Wait()
end := time.NewTicker(30 * time.Minute)
defer end.Stop()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for range ticker.C {
select {
case <-end.C:
return
default:
output(" waiting")
}
}
}

// load function loads training and test vector from hdf file. The size of ids is same to the number of training data.
// Each id, which is an element of ids, will be set a random number.
func load(path string) (train, test [][]float32, err error) {
var f *hdf5.File
f, err = hdf5.OpenFile(path, hdf5.F_ACC_RDONLY)
if err != nil {
return nil, nil, err
}
defer f.Close()

// readFn function reads vectors of the hierarchy with the given the name.
readFn := func(name string) ([][]float32, error) {
// Opens and returns a named Dataset.
// The returned dataset must be closed by the user when it is no longer needed.
d, err := f.OpenDataset(name)
if err != nil {
return nil, err
}
defer d.Close()

// Space returns an identifier for a copy of the dataspace for a dataset.
sp := d.Space()
defer sp.Close()

// SimpleExtentDims returns dataspace dimension size and maximum size.
dims, _, _ := sp.SimpleExtentDims()
row, dim := int(dims[0]), int(dims[1])

// Gets the stored vector. All are represented as one-dimensional arrays.
// The type of the slice depends on your dataset.
// For fashion-mnist-784-euclidean.hdf5, the datatype is float32.
vec := make([]float32, sp.SimpleExtentNPoints())
if err := d.Read(&vec); err != nil {
return nil, err
}

// Converts a one-dimensional array to a two-dimensional array.
// Use the `dim` variable as a separator.
vecs := make([][]float32, row)
for i := 0; i < row; i++ {
vecs[i] = make([]float32, dim)
for j := 0; j < dim; j++ {
vecs[i][j] = float32(vec[i*dim+j])
}
}

return vecs, nil
}

// Gets vector of `train` hierarchy.
train, err = readFn("train")
if err != nil {
return nil, nil, err
}

// Gets vector of `test` hierarchy.
test, err = readFn("test")
if err != nil {
return nil, nil, err
}

return
}
Loading

0 comments on commit a1be637

Please sign in to comment.