Skip to content

Commit

Permalink
add core ngt benchmark
Browse files Browse the repository at this point in the history
add check program

Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Sep 20, 2023
1 parent 594b23b commit b2e7fb6
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 19 deletions.
3 changes: 3 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,9 @@ agent:
# @schema {"name": "agent.ngt.broken_index_history_limit", "type": "integer", "minimum": 0}
# agent.ngt.broken_index_history_limit -- maximum number of broken index generations to backup
broken_index_history_limit: 0
# @schema {"name": "agent.ngt.error_buffer_limit", "type": "integer", "minimum": 1}
# agent.ngt.error_buffer_limit -- maximum number of core ngt error buffer pool size limit
error_buffer_limit: 10
# @schema {"name": "agent.sidecar", "type": "object"}
sidecar:
# @schema {"name": "agent.sidecar.enabled", "type": "boolean"}
Expand Down
190 changes: 190 additions & 0 deletions cmd/tools/cli/benchmark/core/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
//
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package ngt provides implementation of Go API for https://github.com/yahoojapan/NGT
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"sync"
"syscall"

"github.com/vdaas/vald/internal/core/algorithm/ngt"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/http/metrics"
"gonum.org/v1/hdf5"
)

func main() {
vectors, _, _ := load(os.Getenv("DATA_PATH"))
n, _ := ngt.New(
ngt.WithDimension(len(vectors[0])),
ngt.WithDefaultPoolSize(8),
ngt.WithObjectType(ngt.Float),
ngt.WithDistanceType(ngt.L2),
)
pid := os.Getpid()

log.Infof("# of vectors: %v", len(vectors))
output := func(header string) {
status := fmt.Sprintf("/proc/%d/status", pid)
buf, err := os.ReadFile(status)
if err != nil {
log.Fatal(err)
}
var vmpeak, vmrss, vmhwm string
for _, line := range strings.Split(string(buf), "\n") {
switch {
case strings.HasPrefix(line, "VmPeak"):
vmpeak = strings.Fields(line)[1]
case strings.HasPrefix(line, "VmHWM"):
vmhwm = strings.Fields(line)[1]
case strings.HasPrefix(line, "VmRSS"):
vmrss = strings.Fields(line)[1]
}
}

var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Infof("%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v", header, vmpeak, vmhwm, vmrss, m.Alloc/1024, m.TotalAlloc/1024, m.HeapAlloc/1024, m.HeapSys/1024, m.HeapInuse/1024)
}
log.Info(" operation\tVmPeak\tVmHWM\tVmRSS\tAlloc\tTotalAlloc\tHeapAlloc\tHeapSys\tHeapInuse")
output(" start")
defer output(" end")
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
srv := &http.Server{
Addr: "0.0.0.0:6060",
Handler: metrics.NewPProfHandler(),
}
go srv.ListenAndServe()
<-ctx.Done()
srv.Shutdown(context.Background())
}()
wg.Add(1)
go func() {
for {
select {
case <-ctx.Done():
wg.Done()
return
default:
ids := make([]uint, len(vectors))
for i, vector := range vectors {
id, err := n.Insert(vector)
if err != nil {
log.Fatal(err)
}
ids[i] = id
}
output(" insert")

if err := n.CreateIndex(8); err != nil {
log.Fatal(err)
}
output("create index")

for _, id := range ids {
if err := n.Remove(id); err != nil {
log.Fatal(err)
}
}
output(" remove")
}
}
}()

ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

<-ch
cancel()

wg.Wait()
}

// load function loads training and test vector from hdf file. The size of ids is same to the number of training data.
// Each id, which is an element of ids, will be set a random number.
func load(path string) (train, test [][]float32, err error) {
var f *hdf5.File
f, err = hdf5.OpenFile(path, hdf5.F_ACC_RDONLY)
if err != nil {
return nil, nil, err
}
defer f.Close()

// readFn function reads vectors of the hierarchy with the given the name.
readFn := func(name string) ([][]float32, error) {
// Opens and returns a named Dataset.
// The returned dataset must be closed by the user when it is no longer needed.
d, err := f.OpenDataset(name)
if err != nil {
return nil, err
}
defer d.Close()

// Space returns an identifier for a copy of the dataspace for a dataset.
sp := d.Space()
defer sp.Close()

// SimpleExtentDims returns dataspace dimension size and maximum size.
dims, _, _ := sp.SimpleExtentDims()
row, dim := int(dims[0]), int(dims[1])

// Gets the stored vector. All are represented as one-dimensional arrays.
// The type of the slice depends on your dataset.
// For fashion-mnist-784-euclidean.hdf5, the datatype is float32.
vec := make([]float32, sp.SimpleExtentNPoints())
if err := d.Read(&vec); err != nil {
return nil, err
}

// Converts a one-dimensional array to a two-dimensional array.
// Use the `dim` variable as a separator.
vecs := make([][]float32, row)
for i := 0; i < row; i++ {
vecs[i] = make([]float32, dim)
for j := 0; j < dim; j++ {
vecs[i][j] = float32(vec[i*dim+j])
}
}

return vecs, nil
}

// Gets vector of `train` hierarchy.
train, err = readFn("train")
if err != nil {
return nil, nil, err
}

// Gets vector of `test` hierarchy.
test, err = readFn("test")
if err != nil {
return nil, nil, err
}

return
}
3 changes: 3 additions & 0 deletions internal/config/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type NGT struct {

// BrokenIndexHistoryLimit represents the maximum number of broken index generations that will be backed up
BrokenIndexHistoryLimit int `yaml:"broken_index_history_limit" json:"broken_index_history_limit,omitempty"`

// ErrorBufferLimit represents the maximum number of core ngt error buffer pool size limit
ErrorBufferLimit uint64 `yaml:"error_buffer_limit" json:"error_buffer_limit,omitempty"`
}

// KVSDB represent the ngt vector bidirectional kv store configuration
Expand Down
52 changes: 33 additions & 19 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ type (
radius float32
epsilon float32
poolSize uint32
cnt uint64
cnt atomic.Uint64
prop C.NGTProperty
epool sync.Pool
epool sync.Pool // NGT error buffer pool
eps atomic.Uint64 // NGT error buffer pool size
epl uint64 // NGT error buffer pool size limit
index C.NGTIndex
ospace C.NGTObjectSpace
mu *sync.RWMutex
Expand Down Expand Up @@ -276,7 +278,7 @@ func (n *ngt) setup() error {
},
}

for i := 0; i < 20; i++ {
for i := uint64(0); i < n.epl; i++ {
n.PutErrorBuffer(C.ngt_create_error_object())
}

Expand Down Expand Up @@ -411,7 +413,7 @@ func (n *ngt) Search(ctx context.Context, vec []float32, size int, epsilon, radi

rsize := int(C.ngt_get_result_size(results, ebuf))
if rsize <= 0 {
if atomic.LoadUint64(&n.cnt) == 0 {
if n.cnt.Load() == 0 {
n.PutErrorBuffer(ebuf)
return nil, errors.ErrSearchResultEmptyButNoDataStored
}
Expand Down Expand Up @@ -476,7 +478,7 @@ func (n *ngt) LinearSearch(ctx context.Context, vec []float32, size int) (result

rsize := int(C.ngt_get_result_size(results, ebuf))
if rsize <= 0 {
if atomic.LoadUint64(&n.cnt) == 0 {
if n.cnt.Load() == 0 {
n.PutErrorBuffer(ebuf)
return nil, errors.ErrSearchResultEmptyButNoDataStored
}
Expand Down Expand Up @@ -515,16 +517,20 @@ func (n *ngt) Insert(vec []float32) (id uint, err error) {
return 0, errors.ErrIncompatibleDimensionSize(len(vec), dim)
}

cvec := (*C.float)(unsafe.Pointer(&vec[0]))
ebuf := n.GetErrorBuffer()
n.lock(true)
id = uint(C.ngt_insert_index_as_float(n.index, (*C.float)(&vec[0]), C.uint32_t(n.dimension), ebuf))
id = uint(C.ngt_insert_index_as_float(n.index, cvec, C.uint32_t(dim), ebuf))
n.unlock(true)
// cvec = nil
// clear(vec)
vec = vec[:0:0]
vec = nil
if id == 0 {
return 0, n.newGoError(ebuf)
}
n.PutErrorBuffer(ebuf)
atomic.AddUint64(&n.cnt, 1)
n.cnt.Add(1)

return id, nil
}
Expand Down Expand Up @@ -676,7 +682,7 @@ func (n *ngt) Remove(id uint) error {
}
n.PutErrorBuffer(ebuf)

atomic.AddUint64(&n.cnt, ^uint64(0))
n.cnt.Add(^uint64(0))

return nil
}
Expand Down Expand Up @@ -742,32 +748,30 @@ func (n *ngt) newGoError(ebuf C.NGTError) (err error) {
n.PutErrorBuffer(ebuf)
return nil
}
n.PutErrorBuffer(C.ngt_create_error_object())
if n.epl == 0 || n.eps.Load() < n.epl {
n.PutErrorBuffer(C.ngt_create_error_object())
}
C.ngt_destroy_error_object(ebuf)
return errors.NewNGTError(msg)
}

// Close NGT index.
func (n *ngt) Close() {
if n.index != nil {
C.ngt_close_index(n.index)
n.index = nil
n.prop = nil
n.ospace = nil
}
}

func (n *ngt) GetErrorBuffer() (ebuf C.NGTError) {
var ok bool
ebuf, ok = n.epool.Get().(C.NGTError)
if !ok {
ebuf = C.ngt_create_error_object()
}
n.eps.Add(^uint64(0))
return ebuf
}

func (n *ngt) PutErrorBuffer(ebuf C.NGTError) {
if n.epl != 0 && n.eps.Load() > n.epl {
C.ngt_destroy_error_object(ebuf)
return
}
n.epool.Put(ebuf)
n.eps.Add(1)
}

func (n *ngt) lock(cLock bool) {
Expand Down Expand Up @@ -797,3 +801,13 @@ func (n *ngt) rUnlock(cLock bool) {
n.cmu.RUnlock()
}
}

// Close NGT index.
func (n *ngt) Close() {
if n.index != nil {
C.ngt_close_index(n.index)
n.index = nil
n.prop = nil
n.ospace = nil
}
}
Loading

0 comments on commit b2e7fb6

Please sign in to comment.