Skip to content

Commit

Permalink
fix flush.go & ngt.go
Browse files Browse the repository at this point in the history
Signed-off-by: aknishid <[email protected]>
  • Loading branch information
aknishid committed Apr 25, 2023
1 parent 9858128 commit a8c8cb6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
24 changes: 22 additions & 2 deletions pkg/agent/core/ngt/handler/grpc/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/observability/trace"
"sync/atomic"
)

func (s *server) Flush(ctx context.Context, req *payload.Flush_Request) (*payload.Info_Index_Count, error) {
Expand All @@ -35,7 +36,7 @@ func (s *server) Flush(ctx context.Context, req *payload.Flush_Request) (*payloa
span.End()
}
}()
err := s.ngt.RegenerateIndex(ctx)
err := s.ngt.RegenerateIndexes(ctx)
if err != nil {
var attrs []attribute.KeyValue
if errors.Is(err, errors.ErrFlushingIsInProgress) {
Expand Down Expand Up @@ -68,5 +69,24 @@ func (s *server) Flush(ctx context.Context, req *payload.Flush_Request) (*payloa
}
return nil, err
}
return nil, err

var (
stored uint32
uncommited uint32
indexing atomic.Value
saving atomic.Value
)
stored = 0
uncommited = 0
indexing.Store(false)
saving.Store(false)

cnts := &payload.Info_Index_Count{
Stored: atomic.LoadUint32(&stored),
Uncommitted: atomic.LoadUint32(&uncommited),
Indexing: indexing.Load().(bool),
Saving: saving.Load().(bool),
}

return cnts, nil
}
13 changes: 4 additions & 9 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type NGT interface {
DeleteWithTime(uuid string, t int64) (err error)
DeleteMultiple(uuids ...string) (err error)
DeleteMultipleWithTime(uuids []string, t int64) (err error)
ReganarateIndecies(ctx context.Context) (err error)
RegenerateIndexes(ctx context.Context) (err error)
GetObject(uuid string) (vec []float32, err error)
CreateIndex(ctx context.Context, poolSize uint32) (err error)
SaveIndex(ctx context.Context) (err error)
Expand Down Expand Up @@ -943,7 +943,7 @@ func (n *ngt) deleteMultiple(uuids []string, now int64, validation bool) (err er
return err
}

func (n *ngt) ReganarateIndecies(ctx context.Context) (err error) {
func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
if n.IsFlushing() {
return errors.ErrFlushingIsInProgress
}
Expand Down Expand Up @@ -976,14 +976,9 @@ func (n *ngt) ReganarateIndecies(ctx context.Context) (err error) {
if err != nil {
log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err)
}
n.kvs = nil
n.kvs = kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))

// delete ngt
n.core.CloseWithoutSaveIndex()
n.core = nil

// delete vq
n.vq = nil
n.vq, err = vqueue.New()

// gc
runtime.GC()
Expand Down

0 comments on commit a8c8cb6

Please sign in to comment.