From c049ea2355d63844792e60f4f37ecc035b191fdd Mon Sep 17 00:00:00 2001 From: kpango Date: Tue, 20 Feb 2024 18:29:55 +0900 Subject: [PATCH] resolve kvs already closed before last saving Signed-off-by: kpango --- internal/core/algorithm/ngt/ngt.go | 2 -- pkg/agent/core/ngt/service/ngt.go | 14 +++++++++++-- pkg/agent/internal/kvs/kvs.go | 32 +++++++++++++----------------- 3 files changed, 26 insertions(+), 22 deletions(-) diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index 51454a3c544..0dc22156513 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -710,9 +710,7 @@ func (n *ngt) Remove(id uint) error { return n.newGoError(ne) } n.PutErrorBuffer(ne) - n.cnt.Add(^uint64(0)) - return nil } diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 4429d4ab7bc..a4e30bf30e9 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1741,8 +1741,18 @@ func (n *ngt) GetDimensionSize() int { func (n *ngt) Close(ctx context.Context) (err error) { defer n.core.Close() - - err = n.kvs.Close() + defer func() { + kerr := n.kvs.Close() + if kerr != nil && + !errors.Is(err, context.Canceled) && + !errors.Is(err, context.DeadlineExceeded) { + if err != nil { + err = errors.Join(kerr, err) + } else { + err = kerr + } + } + }() if len(n.path) != 0 { if n.isReadReplica { log.Info("skip create and save index operation on close because this is read replica") diff --git a/pkg/agent/internal/kvs/kvs.go b/pkg/agent/internal/kvs/kvs.go index f5dedb13dc5..8c886a2c425 100644 --- a/pkg/agent/internal/kvs/kvs.go +++ b/pkg/agent/internal/kvs/kvs.go @@ -20,6 +20,8 @@ import ( "context" "sync/atomic" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/sync/errgroup" @@ -53,7 +55,6 @@ type bidi struct { l uint64 ou [slen]*sync.Map[uint32, valueStructOu] uo [slen]*sync.Map[string, ValueStructUo] - eg errgroup.Group } const ( @@ -79,14 +80,6 @@ func New(opts ...Option) BidiMap { b.uo[i] = new(sync.Map[string, ValueStructUo]) } - if b.eg == nil { - b.eg, _ = errgroup.New(context.Background()) - } - - if b.concurrency > 0 { - b.eg.SetLimit(b.concurrency) - } - return b } @@ -151,24 +144,30 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) { // Range retrieves all set keys and values and calls the callback function f. func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) { - var wg sync.WaitGroup + eg, ctx = errgroup.New(ctx) + if b.concurrency > 0 { + eg.SetLimit(b.concurrency) + } for i := range b.uo { idx := i - wg.Add(1) - b.eg.Go(safety.RecoverFunc(func() (err error) { + eg.Go(safety.RecoverFunc(func() (err error) { b.uo[idx].Range(func(uuid string, val ValueStructUo) bool { select { case <-ctx.Done(): + err = ctx.Err() return false default: return f(uuid, val.value, val.timestamp) } }) - wg.Done() + if err != nil && + (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + return err + } return nil })) } - wg.Wait() + eg.Wait() } // Len returns the length of the cache that is set in the bidi. @@ -180,10 +179,7 @@ func (b *bidi) Len() uint64 { } func (b *bidi) Close() error { - if b == nil { - return nil - } - return b.eg.Wait() + return nil } func getShardID(key string) (id uint64) {