diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 3d815db0bd..1e1a75d291 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1688,10 +1688,29 @@ func (n *ngt) BrokenIndexCount() uint64 { return atomic.LoadUint64(&n.nobic) } -// ListObjectFunc applies the input function on each index stored in the kvs. +// ListObjectFunc applies the input function on each index stored in the kvs and vqueue. // Use this function for performing something on each object with caring about the memory usage. +// If the vector exists in the vqueue, this vector is not indexed so the oid(object ID) is processed as 0. func (n *ngt) ListObjectFunc(ctx context.Context, f func(uuid string, oid uint32, ts int64) bool) { - n.kvs.Range(ctx, f) + dup := make(map[string]bool) + n.vq.Range(ctx, func(uuid string, vec []float32, ts int64) (ok bool) { + ok = f(uuid, 0, ts) + if !ok { + return false + } + var kts int64 + _, kts, ok = n.kvs.Get(uuid) + if ok && ts > kts { + dup[uuid] = true + } + return true + }) + n.kvs.Range(ctx, func(uuid string, oid uint32, ts int64) (ok bool) { + if dup[uuid] { + return true + } + return f(uuid, oid, ts) + }) } func (n *ngt) toSearchResponse(sr []core.SearchResult) (res *payload.Search_Response, err error) { diff --git a/pkg/agent/core/ngt/service/vqueue/queue.go b/pkg/agent/core/ngt/service/vqueue/queue.go index ee99f3758c..5d97511f6c 100644 --- a/pkg/agent/core/ngt/service/vqueue/queue.go +++ b/pkg/agent/core/ngt/service/vqueue/queue.go @@ -34,6 +34,7 @@ type Queue interface { PushInsert(uuid string, vector []float32, date int64) error PushDelete(uuid string, date int64) error GetVector(uuid string) (vec []float32, timestamp int64, exists bool) + Range(ctx context.Context, f func(uuid string, vector []float32, ts int64) bool) RangePopInsert(ctx context.Context, now int64, f func(uuid string, vector []float32, date int64) bool) RangePopDelete(ctx context.Context, now int64, f func(uuid string) bool) IVExists(uuid string) bool @@ -255,6 +256,20 @@ func (v *vqueue) RangePopDelete(ctx context.Context, now int64, f func(uuid stri } } +// Range calls f sequentially for each key and value present in the vqueue. +func (v *vqueue) Range(ctx context.Context, f func(uuid string, vector []float32, ts int64) bool) { + v.il.Range(func(uuid string, idx *index) bool { + if idx == nil { + return true + } + didx, ok := v.dl.Load(uuid) + if !ok || (didx != nil && idx.date > didx.date) { + return f(uuid, idx.vector, idx.date) + } + return true + }) +} + // IVQLen returns the number of uninserted indexes stored in the insert queue. func (v *vqueue) IVQLen() (l int) { return int(atomic.LoadUint64(&v.ic))