Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
invest removeInvalidIndex bug
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
kpango committed Feb 20, 2024

Verified

This commit was signed with the committer’s verified signature.
kpango Yusuke Kato
1 parent f080436 commit 0d6fbec
Showing 3 changed files with 37 additions and 1 deletion.
14 changes: 14 additions & 0 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
@@ -702,14 +702,21 @@ func (n *ngt) SaveIndexWithPath(idxPath string) error {

// Remove removes from NGT index.
func (n *ngt) Remove(id uint) error {
log.Debugf("[CHECK20240220]\tcore.Remove started\t oid: %d,\t", id)
ne := n.GetErrorBuffer()
log.Debugf("[CHECK20240220]\tcore.Remove get ebuf\toid: %d,\t", id)
log.Debugf("[CHECK20240220]\tcore.Remove pre-lock\toid: %d,\t", id)
n.lock(true)
log.Debugf("[CHECK20240220]\tcore.Remove locked\toid: %d,\t", id)
ret := C.ngt_remove_index(n.index, C.ObjectID(id), ne.err)
log.Debugf("[CHECK20240220]\tcore.Remove pre-unlock\toid: %d,\t", id)
n.unlock(true)
log.Debugf("[CHECK20240220]\tcore.Remove unlocked\toid: %d,\t", id)
if ret == ErrorCode {
return n.newGoError(ne)
}
n.PutErrorBuffer(ne)
log.Debugf("[CHECK20240220]\tcore.Remove return ebuf\toid: %d,\t", id)

n.cnt.Add(^uint64(0))

@@ -730,12 +737,18 @@ func (n *ngt) BulkRemove(ids ...uint) (errs error) {
// GetVector returns vector stored in NGT index.
func (n *ngt) GetVector(id uint) (ret []float32, err error) {
dimension := int(n.dimension)
log.Debugf("[CHECK20240220]\tcore.GetVector started\t oid: %d,\t", id)
ne := n.GetErrorBuffer()
log.Debugf("[CHECK20240220]\tcore.GetVector get ebuf\toid: %d,\t", id)
switch n.objectType {
case Float:
log.Debugf("[CHECK20240220]\tcore.GetVector pre-lock\toid: %d,\t", id)
n.rLock(false)
log.Debugf("[CHECK20240220]\tcore.GetVector locked\toid: %d,\t", id)
results := C.ngt_get_object_as_float(n.ospace, C.ObjectID(id), ne.err)
log.Debugf("[CHECK20240220]\tcore.GetVector pre-unlock\toid: %d,\t", id)
n.rUnlock(false)
log.Debugf("[CHECK20240220]\tcore.GetVector unlocked\toid: %d,\t", id)
if results == nil {
return nil, n.newGoError(ne)
}
@@ -768,6 +781,7 @@ func (n *ngt) GetVector(id uint) (ret []float32, err error) {
return nil, errors.ErrUnsupportedObjectType
}
n.PutErrorBuffer(ne)
log.Debugf("[CHECK20240220]\tcore.GetVector return ebuf\toid: %d,\t", id)
return ret, nil
}

6 changes: 6 additions & 0 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
@@ -1274,12 +1274,15 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
}

func (n *ngt) removeInvalidIndex(ctx context.Context) {
log.Debug("[CHECK20240220]\tremoveInvalidIndex started")
if n.kvs.Len() == 0 {
log.Debug("[CHECK20240220]\tremoveInvalidIndex kvs Len is empty")
return
}
var dcnt uint32
n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool {
vec, err := n.core.GetVector(uint(oid))
log.Debugf("[CHECK20240220]\tuuid: %s,\toid: %d,\terr: %v", uuid, oid, err)
if err != nil || vec == nil || len(vec) != n.dim {
log.Debugf("invalid index detected err: %v\tuuid: %s\toid: %d will remove", err, uuid, oid)
n.kvs.Delete(uuid)
@@ -1295,6 +1298,7 @@ func (n *ngt) removeInvalidIndex(ctx context.Context) {
return true
})
if atomic.LoadUint32(&dcnt) <= 0 {
log.Debug("[CHECK20240220]\tremoveInvalidIndex deleted count is empty")
return
}
var poolSize uint32
@@ -1303,6 +1307,8 @@ func (n *ngt) removeInvalidIndex(ctx context.Context) {
} else {
poolSize = atomic.LoadUint32(&dcnt)
}
log.Debug("[CHECK20240220]\tremoveInvalidIndex before cimu.Lock")
defer log.Debug("[CHECK20240220]\tremoveInvalidIndex after cimu.Lock")
n.cimu.Lock()
defer n.cimu.Unlock()
n.indexing.Store(true)
18 changes: 17 additions & 1 deletion pkg/agent/internal/kvs/kvs.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"context"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
@@ -154,21 +155,36 @@ func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
var wg sync.WaitGroup
for i := range b.uo {
idx := i
log.Debugf("[CHECK20240220]\tkvs.Range In-Loop idx: %d", idx)

Check failure on line 158 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 158 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
wg.Add(1)
log.Debugf("[CHECK20240220]\tkvs.Range wg.Add idx: %d", idx)

Check failure on line 160 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 160 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
b.eg.Go(safety.RecoverFunc(func() (err error) {
log.Debugf("[CHECK20240220]\tkvs.Range inner-function idx: %d", idx)

Check failure on line 162 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 162 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
defer log.Debugf("[CHECK20240220]\tkvs.Range inner-function defer wg.Done idx: %d", idx)

Check failure on line 163 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 163 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
defer wg.Done()
b.uo[idx].Range(func(uuid string, val ValueStructUo) bool {
log.Debugf("[CHECK20240220]\tkvs.Range.uo[%d].Range", idx)

Check failure on line 166 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 166 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
select {
case <-ctx.Done():
log.Debugf("[CHECK20240220]\tkvs.Range.uo[%d].Range context Done", idx)

Check failure on line 169 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 169 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
err = ctx.Err()
return false
default:
log.Debugf("[CHECK20240220]\tkvs.Range.uo[%d].Range exec Function", idx)

Check failure on line 173 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 173 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
return f(uuid, val.value, val.timestamp)
}
})
wg.Done()
if err != nil &&
(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
log.Debugf("[CHECK20240220]\tkvs.Range return context error idx: %d", idx)

Check failure on line 179 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 179 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
return err
}
log.Debugf("[CHECK20240220]\tkvs.Range return no error idx: %d", idx)

Check failure on line 182 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 182 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
return nil
}))
}
wg.Wait()
log.Debug("[CHECK20240220]\tkvs.Range finished")

Check failure on line 187 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 187 in pkg/agent/internal/kvs/kvs.go

GitHub Actions / grpc-stream

undefined: log
}

// Len returns the length of the cache that is set in the bidi.

0 comments on commit 0d6fbec

Please sign in to comment.