From d4c79e259a184217eead1fd3faabeb3dd3b486b6 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 10 Sep 2024 15:23:30 +0900 Subject: [PATCH] fix: copy ngt service object for flushing Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 49 +++++++++++++++++-------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index b3d585261c..4803f0db26 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -260,6 +260,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) { return n, nil } +func (n *ngt) copyNGT(obj *ngt) { + // instances + n.core = obj.core + n.kvs = obj.kvs + n.fmap = obj.fmap + n.vq = obj.vq + + // counters + n.wfci = obj.wfci + n.nobic = obj.nobic + n.nopvq = atomic.Uint64{} + + // paths + n.path = obj.path + n.tmpPath = obj.tmpPath + n.oldPath = obj.oldPath + n.basePath = obj.basePath + n.brokenPath = obj.brokenPath +} + // migrate migrates the index directory from old to new under the input path if necessary. // Migration happens when the path is not empty and there is no `path/origin` directory, // which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet. @@ -1242,8 +1262,6 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil // gc runtime.GC() @@ -1265,30 +1283,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } } - // nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) - // - // nvq, err := vqueue.New() - // if err != nil { - // log.Errorf("failed to create new vector vector queue. error: %v", err) - // } - // renew instance nn, err := newNGT(n.cfg, n.opts...) if err != nil { return err } - // nn.kvs = nkvs - // nn.vq = nvq - - // Regenerate with flags set - nn.flushing.Store(true) - nn.indexing.Store(true) - defer nn.flushing.Store(false) - defer nn.indexing.Store(false) + n.copyNGT(nn) - n = nn - - return nil + return n.loadStatistics() } func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { @@ -1431,6 +1433,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { return err } } + return n.loadStatistics() +} + +func (n *ngt) loadStatistics() error { if n.IsStatisticsEnabled() { log.Info("loading index statistics to cache") stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics) @@ -1474,8 +1480,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { IndegreeHistogram: stats.IndegreeHistogram, }) } - - return err + return nil } func (n *ngt) removeInvalidIndex(ctx context.Context) {