Skip to content

Commit

Permalink
os.free nil pointer failure in ngt cgo due to create index hang up (#930
Browse files Browse the repository at this point in the history
)

* os.free nil pointer failure in ngt cgo due to create index hang up

Signed-off-by: kpango <[email protected]>

* bugfix error on stream close

Signed-off-by: kpango <[email protected]>

* Apply suggestions from code review

Co-authored-by: Rintaro Okamura <[email protected]>

* downgrade NGT version to 1.12.1 from 1.12.2

Signed-off-by: kpango <[email protected]>

* remove optimizer from cgo build & small refactor for backoff

Signed-off-by: kpango <[email protected]>

* add gnu option for build flag

Signed-off-by: kpango <[email protected]>

Co-authored-by: Rintaro Okamura <[email protected]>
  • Loading branch information
kpango and rinx authored Jan 18, 2021
1 parent 2990012 commit c34930e
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 52 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ PORT ?= 80
NUMBER ?= 10
DIMENSION ?= 6
NUMPANES ?= 4
MEAN ?= 0.0
STDDEV ?= 1.0

BODY = ""

Expand Down
2 changes: 1 addition & 1 deletion Makefile.d/build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ cmd/agent/core/ngt/ngt: \
GOPRIVATE=$(GOPRIVATE) \
go build \
--ldflags "-s -w -linkmode 'external' \
-extldflags '-static -fPIC -pthread -fopenmp -std=gnu++20 -lstdc++ -O3 -lm $(EXTLDFLAGS)' \
-extldflags '-static -fPIC -pthread -fopenmp -std=gnu++20 -lstdc++ -lm $(EXTLDFLAGS)' \
-X '$(GOPKG)/internal/info.Version=$(VERSION)' \
-X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \
-X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \
Expand Down
4 changes: 2 additions & 2 deletions Makefile.d/client.mk
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ endif
.PHONY: valdcli/xpanes/insert
## insert randomized vectors using valdcli and xpanes
valdcli/xpanes/insert:
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-insert --elapsed-time" $$(seq 1 $(NUMPANES))
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-insert --elapsed-time" $$(seq 1 $(NUMPANES))

.PHONY: valdcli/xpanes/search
## search randomized vectors using valdcli and xpanes
valdcli/xpanes/search:
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES))
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES))
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

[![License: Apache 2.0](https://img.shields.io/github/license/vdaas/vald.svg?style=flat-square)](https://opensource.org/licenses/Apache-2.0)
[![release](https://img.shields.io/github/release/vdaas/vald.svg?style=flat-square)](https://github.com/vdaas/vald/releases/latest)
[![GoDoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat-square)](https://pkg.go.dev/github.com/vdaas/vald)
[![Go Reference](https://pkg.go.dev/badge/github.com/vdaas/vald.svg)](https://pkg.go.dev/github.com/vdaas/vald)
[![Codacy Badge](https://img.shields.io/codacy/grade/a6e544eee7bc49e08a000bb10ba3deed?style=flat-square)](https://www.codacy.com/app/i.can.feel.gravity/vald?utm_source=github.com&utm_medium=referral&utm_content=vdaas/vald&utm_campaign=Badge_Grade)
[![Go Report Card](https://goreportcard.com/badge/github.com/vdaas/vald?style=flat-square)](https://goreportcard.com/report/github.com/vdaas/vald)
[![DepShield Badge](https://depshield.sonatype.org/badges/vdaas/vald/depshield.svg?style=flat-square)](https://depshield.github.io)
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/core/ngt/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ ngt:
default_epsilon: 0.01
default_pool_size: 10000
default_radius: -1
dimension: 4096
dimension: 6
distance_type: l2
enable_in_memory_mode: true
enable_proactive_gc: true
Expand Down
59 changes: 34 additions & 25 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter
for cnt := 0; cnt < b.maxRetryCount; cnt++ {
select {
case <-dctx.Done():
return nil, errors.Wrap(err, dctx.Err().Error())
switch dctx.Err() {
case context.DeadlineExceeded:
return nil, errors.ErrBackoffTimeout(err)
case context.Canceled:
return nil, err
default:
return nil, errors.Wrap(err, dctx.Err().Error())
}
default:
res, ret, err = func() (val interface{}, retryable bool, err error) {
ssctx, span := trace.StartSpan(dctx, traceTag+"/"+strconv.Itoa(cnt+1))
Expand All @@ -100,34 +107,36 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter
}()
return f(ssctx)
}()
if ret && err != nil {
if b.errLog {
log.Error(err)
if !ret {
return res, err
}
if err == nil {
return res, nil
}
if b.errLog {
log.Error(err)
}
timer.Reset(time.Duration(jdur))
select {
case <-dctx.Done():
switch dctx.Err() {
case context.DeadlineExceeded:
return nil, errors.ErrBackoffTimeout(err)
case context.Canceled:
return nil, err
default:
return nil, errors.Wrap(dctx.Err(), err.Error())
}
timer.Reset(time.Duration(jdur))
select {
case <-dctx.Done():
switch dctx.Err() {
case context.DeadlineExceeded:
return nil, errors.ErrBackoffTimeout(err)
case context.Canceled:
return nil, err
default:
return nil, errors.Wrap(dctx.Err(), err.Error())
}
case <-timer.C:
if dur >= b.durationLimit {
dur = b.maxDuration
jdur = b.maxDuration
} else {
dur *= b.backoffFactor
jdur = b.addJitter(dur)
}
continue
case <-timer.C:
if dur >= b.durationLimit {
dur = b.maxDuration
jdur = b.maxDuration
} else {
dur *= b.backoffFactor
jdur = b.addJitter(dur)
}
}
}
return res, err
}
return res, err
}
Expand Down
20 changes: 5 additions & 15 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,24 +393,14 @@ func (n *ngt) BulkInsert(vecs [][]float32) ([]uint, []error) {
ids := make([]uint, 0, len(vecs))
errs := make([]error, 0, len(vecs))

dim := int(n.dimension)
var id uint
n.mu.Lock()
for _, vec := range vecs {
id = 0
if len(vec) != dim {
errs = append(errs, errors.ErrIncompatibleDimensionSize(len(vec), dim))
} else {
// n.mu.Lock()
id = uint(C.ngt_insert_index_as_float(n.index, (*C.float)(&vec[0]), C.uint32_t(n.dimension), n.ebuf))
// n.mu.Unlock()
if id == 0 {
errs = append(errs, n.newGoError(n.ebuf))
}
log.Infof("started to bulk insert %d of vectors", len(vecs))
for i, vec := range vecs {
id, err := n.Insert(vec)
if err != nil {
errs = append(errs, errors.Wrapf(err, "bulkinsert error detected index number: %d,\tid: %d", i, id))
}
ids = append(ids, id)
}
n.mu.Unlock()

return ids, errs
}
Expand Down
9 changes: 6 additions & 3 deletions internal/net/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream,
gerrs.Roots = append(gerrs.Roots, gerr)
return true
})
if errs == nil {
return nil
}
st, err := status.New(status.Unknown, errs.Error()).WithDetails(gerrs)
if err != nil {
log.Warn(err)
Expand All @@ -102,11 +105,11 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream,
data := newData()
err = stream.RecvMsg(data)
if err != nil {
if err == io.EOF {
if err == io.EOF || errors.Is(err, io.EOF) {
return finalize()
}
log.Error(err)
continue
log.Errorf("failed to receive stream message %v", err)
return errors.Wrap(finalize(), err.Error())
}
if data != nil {
eg.Go(safety.RecoverWithoutPanicFunc(func() (err error) {
Expand Down
18 changes: 17 additions & 1 deletion internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"syscall"

"github.com/vdaas/vald/internal/config"
"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/info"
Expand Down Expand Up @@ -95,6 +96,21 @@ func Do(ctx context.Context, opts ...Option) error {
log.Init()
}

log.Debugf("version info:\t\t%s\n\nconfiguration:\t\t%s\n\n",
func() string {
b, err := json.Marshal(info.Get())
if err != nil {
return "failed to serialize build information"
}
return string(b)
}(), func() string {
b, err := json.Marshal(cfg)
if err != nil {
return "failed to serialize configuration"
}
return string(b)
}())

// set location temporary for initialization logging
location.Set(ccfg.TZ)

Expand Down Expand Up @@ -197,7 +213,7 @@ func Run(ctx context.Context, run Runner, name string) (err error) {
}

err = errgroup.Wait()
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(errors.ErrRunnerWait(name, err))
if _, ok := emap[err.Error()]; !ok {
errs = append(errs, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
select {
case <-ctx.Done():
err = n.CreateIndex(ctx, n.poolSize)
if err != nil {
if err != nil && !errors.Is(err, errors.ErrUncommittedIndexNotFound){
ech <- err
return errors.Wrap(ctx.Err(), err.Error())
}
Expand Down Expand Up @@ -757,7 +757,7 @@ func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err erro
}()

err = n.CreateIndex(ctx, poolSize)
if err != nil && err != errors.ErrUncommittedIndexNotFound {
if err != nil {
return err
}
return n.SaveIndex(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/gateway/lb/usecase/vald.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.
//

// Package usecase represents gateways usecase layer
package usecase

import (
Expand Down
2 changes: 1 addition & 1 deletion versions/NGT_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.12.2
1.12.3

0 comments on commit c34930e

Please sign in to comment.