From 4a903566b3db5a006df778faa79cff0ffdf31a89 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 12:09:27 +0900 Subject: [PATCH 01/15] fix: bugfix flush logic Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 095f0b6f0a..9809e9d6b7 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1249,7 +1249,7 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { runtime.GC() atomic.AddUint64(&n.nogce, 1) - if n.inMem { + if !n.inMem { // delete file err = file.DeleteDir(ctx, n.path) if err != nil { From 833ccf76180585610cf79c721dc0d0ca1608a9bb Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 14:04:30 +0900 Subject: [PATCH 02/15] fix: nil check for flushing Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 9809e9d6b7..e96f4f5fd8 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1941,15 +1941,24 @@ func (n *ngt) gc() { } func (n *ngt) Len() uint64 { - return n.kvs.Len() + if n != nil && n.kvs != nil { + return n.kvs.Len() + } + return 0 } func (n *ngt) InsertVQueueBufferLen() uint64 { - return uint64(n.vq.IVQLen()) + if n != nil && n.vq != nil { + return uint64(n.vq.IVQLen()) + } + return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - return uint64(n.vq.DVQLen()) + if n != nil && n.vq != nil { + return uint64(n.vq.DVQLen()) + } + return 0 } func (n *ngt) GetDimensionSize() int { From 50861c7550ea808c5c32945664232ed2420bbe56 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 14:07:39 +0900 Subject: [PATCH 03/15] fix: add flush check logic Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index e96f4f5fd8..bd932472ef 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1941,21 +1941,21 @@ func (n *ngt) gc() { } func (n *ngt) Len() uint64 { - if n != nil && n.kvs != nil { + if n != nil && n.kvs != nil && !n.IsFlushing() { return n.kvs.Len() } return 0 } func (n *ngt) InsertVQueueBufferLen() uint64 { - if n != nil && n.vq != nil { + if n != nil && n.kvs != nil && !n.IsFlushing() { return uint64(n.vq.IVQLen()) } return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - if n != nil && n.vq != nil { + if n != nil && n.kvs != nil && !n.IsFlushing() { return uint64(n.vq.DVQLen()) } return 0 From f8b76c6232288983e369c022f6bf7a7cc8c9d1fc Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 14:26:10 +0900 Subject: [PATCH 04/15] fix: nil check bug Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index bd932472ef..ee53a85bb7 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1948,14 +1948,14 @@ func (n *ngt) Len() uint64 { } func (n *ngt) InsertVQueueBufferLen() uint64 { - if n != nil && n.kvs != nil && !n.IsFlushing() { + if n != nil && n.vq != nil && !n.IsFlushing() { return uint64(n.vq.IVQLen()) } return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - if n != nil && n.kvs != nil && !n.IsFlushing() { + if n != nil && n.vq != nil && !n.IsFlushing() { return uint64(n.vq.DVQLen()) } return 0 From d27ba65cf720930dc6e25d7a19126ecdd22388bd Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 15:23:15 +0900 Subject: [PATCH 05/15] fix: add nil check Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index ee53a85bb7..b65f180b3f 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -908,7 +908,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { } return ctx.Err() case <-tick.C: - if n.vq.IVQLen() >= n.alen { + if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { err = n.CreateIndex(ctx, n.poolSize) } case <-limit.C: From 3433f2858c60af0075eb04f42714360839809ac1 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 15:40:50 +0900 Subject: [PATCH 06/15] fix: return err when the flush process is executing Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index b65f180b3f..4c1ce19650 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -908,7 +908,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { } return ctx.Err() case <-tick.C: - if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { + if n != nil && n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { err = n.CreateIndex(ctx, n.poolSize) } case <-limit.C: @@ -1299,8 +1299,11 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } }() - if n.isReadReplica { + switch { + case n.isReadReplica: return errors.ErrWriteOperationToReadReplica + case n.IsFlushing(): + return errors.ErrFlushingIsInProgress } ic := n.vq.IVQLen() + n.vq.DVQLen() From 42d055395ebbca6cebc1872afab485b1283de4ae Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 16:03:39 +0900 Subject: [PATCH 07/15] fix: add error check for flushing Signed-off-by: hlts2 --- pkg/agent/core/ngt/handler/grpc/index.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/agent/core/ngt/handler/grpc/index.go b/pkg/agent/core/ngt/handler/grpc/index.go index 62df71e180..f23b8b69b6 100644 --- a/pkg/agent/core/ngt/handler/grpc/index.go +++ b/pkg/agent/core/ngt/handler/grpc/index.go @@ -64,6 +64,9 @@ func (s *server) CreateIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled @@ -149,6 +152,9 @@ func (s *server) CreateAndSaveIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled From 050039335032b7fe7de1a1c399606ad3c72cfb1b Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 16:05:50 +0900 Subject: [PATCH 08/15] fix: error message Signed-off-by: hlts2 --- pkg/agent/core/ngt/handler/grpc/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/core/ngt/handler/grpc/index.go b/pkg/agent/core/ngt/handler/grpc/index.go index f23b8b69b6..8e7f24d9f6 100644 --- a/pkg/agent/core/ngt/handler/grpc/index.go +++ b/pkg/agent/core/ngt/handler/grpc/index.go @@ -153,7 +153,7 @@ func (s *server) CreateAndSaveIndex( }, info.Get())...) code = codes.FailedPrecondition case errors.Is(err, errors.ErrFlushingIsInProgress): - err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) From 9d1f55020474fa4d54a891ff2406c08ea8da8b67 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 16:44:05 +0900 Subject: [PATCH 09/15] fix: disable kvs and vqueue initialization Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 4c1ce19650..a0e5e93dc3 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1242,8 +1242,8 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil + // n.kvs = nil + // n.vq = nil // gc runtime.GC() @@ -1265,20 +1265,20 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } } - nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) - - nvq, err := vqueue.New() - if err != nil { - log.Errorf("failed to create new vector vector queue. error: %v", err) - } + // nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) + // + // nvq, err := vqueue.New() + // if err != nil { + // log.Errorf("failed to create new vector vector queue. error: %v", err) + // } // renew instance nn, err := newNGT(n.cfg, n.opts...) if err != nil { return err } - nn.kvs = nkvs - nn.vq = nvq + // nn.kvs = nkvs + // nn.vq = nvq // Regenerate with flags set nn.flushing.Store(true) From 9588ef336379dad69ad2dbc97866ef8af4a8c898 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 17:02:23 +0900 Subject: [PATCH 10/15] fix: disable commentout Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index a0e5e93dc3..b3d585261c 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1242,8 +1242,8 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - // n.kvs = nil - // n.vq = nil + n.kvs = nil + n.vq = nil // gc runtime.GC() From c561c89cc5a31451f6d7a425c7ca6fbc10ad0ca3 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 17:15:57 +0900 Subject: [PATCH 11/15] fix: disable kvs and vq Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index b3d585261c..a0e5e93dc3 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1242,8 +1242,8 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil + // n.kvs = nil + // n.vq = nil // gc runtime.GC() From b3151dc58e6c762b115bf72b03066f8c2905abb8 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 9 Sep 2024 17:26:28 +0900 Subject: [PATCH 12/15] fix: nil set to kvs and vq Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index a0e5e93dc3..b3d585261c 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1242,8 +1242,8 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - // n.kvs = nil - // n.vq = nil + n.kvs = nil + n.vq = nil // gc runtime.GC() From d4c79e259a184217eead1fd3faabeb3dd3b486b6 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 10 Sep 2024 15:23:30 +0900 Subject: [PATCH 13/15] fix: copy ngt service object for flushing Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 49 +++++++++++++++++-------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index b3d585261c..4803f0db26 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -260,6 +260,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) { return n, nil } +func (n *ngt) copyNGT(obj *ngt) { + // instances + n.core = obj.core + n.kvs = obj.kvs + n.fmap = obj.fmap + n.vq = obj.vq + + // counters + n.wfci = obj.wfci + n.nobic = obj.nobic + n.nopvq = atomic.Uint64{} + + // paths + n.path = obj.path + n.tmpPath = obj.tmpPath + n.oldPath = obj.oldPath + n.basePath = obj.basePath + n.brokenPath = obj.brokenPath +} + // migrate migrates the index directory from old to new under the input path if necessary. // Migration happens when the path is not empty and there is no `path/origin` directory, // which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet. @@ -1242,8 +1262,6 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil // gc runtime.GC() @@ -1265,30 +1283,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } } - // nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) - // - // nvq, err := vqueue.New() - // if err != nil { - // log.Errorf("failed to create new vector vector queue. error: %v", err) - // } - // renew instance nn, err := newNGT(n.cfg, n.opts...) if err != nil { return err } - // nn.kvs = nkvs - // nn.vq = nvq - - // Regenerate with flags set - nn.flushing.Store(true) - nn.indexing.Store(true) - defer nn.flushing.Store(false) - defer nn.indexing.Store(false) + n.copyNGT(nn) - n = nn - - return nil + return n.loadStatistics() } func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { @@ -1431,6 +1433,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { return err } } + return n.loadStatistics() +} + +func (n *ngt) loadStatistics() error { if n.IsStatisticsEnabled() { log.Info("loading index statistics to cache") stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics) @@ -1474,8 +1480,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { IndegreeHistogram: stats.IndegreeHistogram, }) } - - return err + return nil } func (n *ngt) removeInvalidIndex(ctx context.Context) { From 35060540205166ea2466144c01e79abfc3f36806 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 10 Sep 2024 15:25:44 +0900 Subject: [PATCH 14/15] fix: deleted unnecessary nil check Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 4803f0db26..33c0bfa46e 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -928,7 +928,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { } return ctx.Err() case <-tick.C: - if n != nil && n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { + if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { err = n.CreateIndex(ctx, n.poolSize) } case <-limit.C: @@ -1949,21 +1949,21 @@ func (n *ngt) gc() { } func (n *ngt) Len() uint64 { - if n != nil && n.kvs != nil && !n.IsFlushing() { + if n.kvs != nil && !n.IsFlushing() { return n.kvs.Len() } return 0 } func (n *ngt) InsertVQueueBufferLen() uint64 { - if n != nil && n.vq != nil && !n.IsFlushing() { + if n.vq != nil && !n.IsFlushing() { return uint64(n.vq.IVQLen()) } return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - if n != nil && n.vq != nil && !n.IsFlushing() { + if n.vq != nil && !n.IsFlushing() { return uint64(n.vq.DVQLen()) } return 0 From b768b5d5a343d5be4bc1e9c1cefc9b9bbcca8bae Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 10 Sep 2024 16:40:55 +0900 Subject: [PATCH 15/15] fix: variable name Signed-off-by: hlts2 --- pkg/agent/core/ngt/service/ngt.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 33c0bfa46e..09bf8577f5 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -260,24 +260,24 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) { return n, nil } -func (n *ngt) copyNGT(obj *ngt) { +func (n *ngt) copyNGT(src *ngt) { // instances - n.core = obj.core - n.kvs = obj.kvs - n.fmap = obj.fmap - n.vq = obj.vq + n.core = src.core + n.kvs = src.kvs + n.fmap = src.fmap + n.vq = src.vq // counters - n.wfci = obj.wfci - n.nobic = obj.nobic + n.wfci = src.wfci + n.nobic = src.nobic n.nopvq = atomic.Uint64{} // paths - n.path = obj.path - n.tmpPath = obj.tmpPath - n.oldPath = obj.oldPath - n.basePath = obj.basePath - n.brokenPath = obj.brokenPath + n.path = src.path + n.tmpPath = src.tmpPath + n.oldPath = src.oldPath + n.basePath = src.basePath + n.brokenPath = src.brokenPath } // migrate migrates the index directory from old to new under the input path if necessary.