From c3c5765c8e77c0da38fa538f05feea133b25f5f7 Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 11 Sep 2024 11:02:56 +0900 Subject: [PATCH] Bugfix NGT flush logic (#2598) * fix: bugfix flush logic Signed-off-by: hlts2 * fix: nil check for flushing Signed-off-by: hlts2 * fix: add flush check logic Signed-off-by: hlts2 * fix: nil check bug Signed-off-by: hlts2 * fix: add nil check Signed-off-by: hlts2 * fix: return err when the flush process is executing Signed-off-by: hlts2 * fix: add error check for flushing Signed-off-by: hlts2 * fix: error message Signed-off-by: hlts2 * fix: disable kvs and vqueue initialization Signed-off-by: hlts2 * fix: disable commentout Signed-off-by: hlts2 * fix: disable kvs and vq Signed-off-by: hlts2 * fix: nil set to kvs and vq Signed-off-by: hlts2 * fix: copy ngt service object for flushing Signed-off-by: hlts2 * fix: deleted unnecessary nil check Signed-off-by: hlts2 * fix: variable name Signed-off-by: hlts2 --------- Signed-off-by: hlts2 Co-authored-by: Yusuke Kato --- pkg/agent/core/ngt/handler/grpc/index.go | 6 ++ pkg/agent/core/ngt/service/ngt.go | 73 +++++++++++++++--------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/agent/core/ngt/handler/grpc/index.go b/pkg/agent/core/ngt/handler/grpc/index.go index 62df71e1805..8e7f24d9f64 100644 --- a/pkg/agent/core/ngt/handler/grpc/index.go +++ b/pkg/agent/core/ngt/handler/grpc/index.go @@ -64,6 +64,9 @@ func (s *server) CreateIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled @@ -149,6 +152,9 @@ func (s *server) CreateAndSaveIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 095f0b6f0a8..09bf8577f50 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -260,6 +260,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) { return n, nil } +func (n *ngt) copyNGT(src *ngt) { + // instances + n.core = src.core + n.kvs = src.kvs + n.fmap = src.fmap + n.vq = src.vq + + // counters + n.wfci = src.wfci + n.nobic = src.nobic + n.nopvq = atomic.Uint64{} + + // paths + n.path = src.path + n.tmpPath = src.tmpPath + n.oldPath = src.oldPath + n.basePath = src.basePath + n.brokenPath = src.brokenPath +} + // migrate migrates the index directory from old to new under the input path if necessary. // Migration happens when the path is not empty and there is no `path/origin` directory, // which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet. @@ -908,7 +928,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { } return ctx.Err() case <-tick.C: - if n.vq.IVQLen() >= n.alen { + if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { err = n.CreateIndex(ctx, n.poolSize) } case <-limit.C: @@ -1242,14 +1262,12 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil // gc runtime.GC() atomic.AddUint64(&n.nogce, 1) - if n.inMem { + if !n.inMem { // delete file err = file.DeleteDir(ctx, n.path) if err != nil { @@ -1265,30 +1283,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } } - nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) - - nvq, err := vqueue.New() - if err != nil { - log.Errorf("failed to create new vector vector queue. error: %v", err) - } - // renew instance nn, err := newNGT(n.cfg, n.opts...) if err != nil { return err } - nn.kvs = nkvs - nn.vq = nvq - - // Regenerate with flags set - nn.flushing.Store(true) - nn.indexing.Store(true) - defer nn.flushing.Store(false) - defer nn.indexing.Store(false) + n.copyNGT(nn) - n = nn - - return nil + return n.loadStatistics() } func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { @@ -1299,8 +1301,11 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } }() - if n.isReadReplica { + switch { + case n.isReadReplica: return errors.ErrWriteOperationToReadReplica + case n.IsFlushing(): + return errors.ErrFlushingIsInProgress } ic := n.vq.IVQLen() + n.vq.DVQLen() @@ -1428,6 +1433,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { return err } } + return n.loadStatistics() +} + +func (n *ngt) loadStatistics() error { if n.IsStatisticsEnabled() { log.Info("loading index statistics to cache") stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics) @@ -1471,8 +1480,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { IndegreeHistogram: stats.IndegreeHistogram, }) } - - return err + return nil } func (n *ngt) removeInvalidIndex(ctx context.Context) { @@ -1941,15 +1949,24 @@ func (n *ngt) gc() { } func (n *ngt) Len() uint64 { - return n.kvs.Len() + if n.kvs != nil && !n.IsFlushing() { + return n.kvs.Len() + } + return 0 } func (n *ngt) InsertVQueueBufferLen() uint64 { - return uint64(n.vq.IVQLen()) + if n.vq != nil && !n.IsFlushing() { + return uint64(n.vq.IVQLen()) + } + return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - return uint64(n.vq.DVQLen()) + if n.vq != nil && !n.IsFlushing() { + return uint64(n.vq.DVQLen()) + } + return 0 } func (n *ngt) GetDimensionSize() int {