Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List kvs and vqueue data #2188

Merged
merged 4 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1688,10 +1688,29 @@ func (n *ngt) BrokenIndexCount() uint64 {
return atomic.LoadUint64(&n.nobic)
}

// ListObjectFunc applies the input function on each index stored in the kvs.
// ListObjectFunc applies the input function on each index stored in the kvs and vqueue.
// Use this function for performing something on each object with caring about the memory usage.
// If the vector exists in the vqueue, this vector is not indexed so the oid(object ID) is processed as 0.
func (n *ngt) ListObjectFunc(ctx context.Context, f func(uuid string, oid uint32, ts int64) bool) {
n.kvs.Range(ctx, f)
dup := make(map[string]bool)
n.vq.Range(ctx, func(uuid string, vec []float32, ts int64) (ok bool) {
ok = f(uuid, 0, ts)
if !ok {
return false
}
var kts int64
_, kts, ok = n.kvs.Get(uuid)
if ok && ts > kts {
dup[uuid] = true
}
return true
})
n.kvs.Range(ctx, func(uuid string, oid uint32, ts int64) (ok bool) {
if dup[uuid] {
return true
}
return f(uuid, oid, ts)
})
}

func (n *ngt) toSearchResponse(sr []core.SearchResult) (res *payload.Search_Response, err error) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/core/ngt/service/vqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Queue interface {
PushInsert(uuid string, vector []float32, date int64) error
PushDelete(uuid string, date int64) error
GetVector(uuid string) (vec []float32, timestamp int64, exists bool)
Range(ctx context.Context, f func(uuid string, vector []float32, ts int64) bool)
RangePopInsert(ctx context.Context, now int64, f func(uuid string, vector []float32, date int64) bool)
RangePopDelete(ctx context.Context, now int64, f func(uuid string) bool)
IVExists(uuid string) bool
Expand Down Expand Up @@ -255,6 +256,19 @@ func (v *vqueue) RangePopDelete(ctx context.Context, now int64, f func(uuid stri
}
}

func (v *vqueue) Range(ctx context.Context, f func(uuid string, vector []float32, ts int64) bool) {
v.il.Range(func(uuid string, idx *index) bool {
if idx == nil {
return true
}
didx, ok := v.dl.Load(uuid)
if !ok || (didx != nil && idx.date > didx.date) {
return f(uuid, idx.vector, idx.date)
}
return true
})
}

// IVQLen returns the number of uninserted indexes stored in the insert queue.
func (v *vqueue) IVQLen() (l int) {
return int(atomic.LoadUint64(&v.ic))
Expand Down