diff --git a/pkg/agent/core/ngt/handler/grpc/index.go b/pkg/agent/core/ngt/handler/grpc/index.go index 62df71e1805..8e7f24d9f64 100644 --- a/pkg/agent/core/ngt/handler/grpc/index.go +++ b/pkg/agent/core/ngt/handler/grpc/index.go @@ -64,6 +64,9 @@ func (s *server) CreateIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled @@ -149,6 +152,9 @@ func (s *server) CreateAndSaveIndex( }, }, info.Get())...) code = codes.FailedPrecondition + case errors.Is(err, errors.ErrFlushingIsInProgress): + err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...) + code = codes.Aborted case errors.Is(err, context.Canceled): err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...) code = codes.Canceled diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index b6cc7205ccb..8a16fe978fd 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -264,6 +264,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) { return n, nil } +func (n *ngt) copyNGT(src *ngt) { + // instances + n.core = src.core + n.kvs = src.kvs + n.fmap = src.fmap + n.vq = src.vq + + // counters + n.wfci = src.wfci + n.nobic = src.nobic + n.nopvq = atomic.Uint64{} + + // paths + n.path = src.path + n.tmpPath = src.tmpPath + n.oldPath = src.oldPath + n.basePath = src.basePath + n.brokenPath = src.brokenPath +} + // migrate migrates the index directory from old to new under the input path if necessary. // Migration happens when the path is not empty and there is no `path/origin` directory, // which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet. @@ -912,7 +932,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { } return ctx.Err() case <-tick.C: - if n.vq.IVQLen() >= n.alen { + if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen { err = n.CreateIndex(ctx, n.poolSize) } case <-limit.C: @@ -1258,14 +1278,12 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { if err != nil { log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err) } - n.kvs = nil - n.vq = nil // gc runtime.GC() atomic.AddUint64(&n.nogce, 1) - if n.inMem { + if !n.inMem { // delete file err = file.DeleteDir(ctx, n.path) if err != nil { @@ -1281,30 +1299,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } } - nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) - - nvq, err := vqueue.New() - if err != nil { - log.Errorf("failed to create new vector vector queue. error: %v", err) - } - // renew instance nn, err := newNGT(n.cfg, n.opts...) if err != nil { return err } - nn.kvs = nkvs - nn.vq = nvq - - // Regenerate with flags set - nn.flushing.Store(true) - nn.indexing.Store(true) - defer nn.flushing.Store(false) - defer nn.indexing.Store(false) + n.copyNGT(nn) - n = nn - - return nil + return n.loadStatistics() } func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { @@ -1315,8 +1317,11 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } }() - if n.isReadReplica { + switch { + case n.isReadReplica: return errors.ErrWriteOperationToReadReplica + case n.IsFlushing(): + return errors.ErrFlushingIsInProgress } ic := n.vq.IVQLen() + n.vq.DVQLen() @@ -1445,6 +1450,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { return err } } + return n.loadStatistics() +} + +func (n *ngt) loadStatistics() error { if n.IsStatisticsEnabled() { log.Info("loading index statistics to cache") stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics) @@ -1488,8 +1497,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { IndegreeHistogram: stats.IndegreeHistogram, }) } - - return err + return nil } func (n *ngt) removeInvalidIndex(ctx context.Context) { @@ -1922,15 +1930,24 @@ func (n *ngt) gc() { } func (n *ngt) Len() uint64 { - return n.kvs.Len() + if n.kvs != nil && !n.IsFlushing() { + return n.kvs.Len() + } + return 0 } func (n *ngt) InsertVQueueBufferLen() uint64 { - return uint64(n.vq.IVQLen()) + if n.vq != nil && !n.IsFlushing() { + return uint64(n.vq.IVQLen()) + } + return 0 } func (n *ngt) DeleteVQueueBufferLen() uint64 { - return uint64(n.vq.DVQLen()) + if n.vq != nil && !n.IsFlushing() { + return uint64(n.vq.DVQLen()) + } + return 0 } func (n *ngt) GetDimensionSize() int {