Skip to content

Commit

Permalink
resolve kvs already closed before last saving
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango committed Feb 20, 2024
1 parent f080436 commit eb0bd29
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 49 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ replace (
github.com/akrylysov/pogreb => github.com/akrylysov/pogreb v0.10.2
github.com/antihax/optional => github.com/antihax/optional v1.0.0
github.com/armon/go-socks5 => github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.20
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.21
github.com/aws/aws-sdk-go-v2 => github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0
github.com/aws/aws-sdk-go-v2/config => github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials => github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds => github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2
github.com/aws/aws-sdk-go-v2/internal/configsources => github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0
github.com/aws/aws-sdk-go-v2/internal/ini => github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0
Expand All @@ -59,7 +59,7 @@ replace (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0
github.com/aws/aws-sdk-go-v2/service/internal/s3shared => github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0
github.com/aws/aws-sdk-go-v2/service/kms => github.com/aws/aws-sdk-go-v2/service/kms v1.28.1
github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager => github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.27.1
github.com/aws/aws-sdk-go-v2/service/sns => github.com/aws/aws-sdk-go-v2/service/sns v1.28.0
github.com/aws/aws-sdk-go-v2/service/sqs => github.com/aws/aws-sdk-go-v2/service/sqs v1.30.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybF
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go v1.50.20 h1:xfAnSDVf/azIWTVQXQODp89bubvCS85r70O3nuQ4dnE=
github.com/aws/aws-sdk-go v1.50.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.50.21 h1:W8awpwiInOt4qHQE6JghRYQJhHcf/cDJS3mlZYqioSQ=
github.com/aws/aws-sdk-go v1.50.21/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
Expand All @@ -197,8 +197,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.0 h1:lMW2x6sKBsiAJrpi1doOXqWFyEPo
github.com/aws/aws-sdk-go-v2/credentials v1.17.0/go.mod h1:uT41FIH8cCIxOdUYIL0PYyHlL1NoneDuDSCwg5VE/5o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh9ZrXfcAXpflhOUUeXg1k=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 h1:VEekE/fJWqAWYozxFQ07B+h8NdvTPAYhV13xIBenuO0=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2/go.mod h1:8vozqAHmDNmoD4YbuDKIfpnLbByzngczL4My1RELLVo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
Expand All @@ -215,8 +215,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 h1:SHN/umDLT
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0/go.mod h1:l8gPU5RYGOFHJqWEpPMoRTP0VoaWQSkJdKo+hwWnnDA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECuPMIuZG7UKOzAnF24v6t4l+Z5Moay4=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 h1:bjpWJEXch7moIt3PX2r5XpGROsletl7enqG1Q3Te1Dc=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM=
Expand Down
2 changes: 0 additions & 2 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,7 @@ func (n *ngt) Remove(id uint) error {
return n.newGoError(ne)
}
n.PutErrorBuffer(ne)

n.cnt.Add(^uint64(0))

return nil
}

Expand Down
17 changes: 15 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,11 @@ func (n *ngt) IsIndexing() bool {

func (n *ngt) UUIDs(ctx context.Context) (uuids []string) {
uuids = make([]string, 0, n.kvs.Len())
var mu sync.Mutex
n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool {
mu.Lock()
uuids = append(uuids, uuid)
mu.Unlock()
return true
})
return uuids
Expand Down Expand Up @@ -1741,8 +1744,18 @@ func (n *ngt) GetDimensionSize() int {

func (n *ngt) Close(ctx context.Context) (err error) {
defer n.core.Close()

err = n.kvs.Close()
defer func() {
kerr := n.kvs.Close()
if kerr != nil &&
!errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
if err != nil {
err = errors.Join(kerr, err)
} else {
err = kerr
}
}
}()
if len(n.path) != 0 {
if n.isReadReplica {
log.Info("skip create and save index operation on close because this is read replica")
Expand Down
34 changes: 16 additions & 18 deletions pkg/agent/internal/kvs/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -53,7 +54,6 @@ type bidi struct {
l uint64
ou [slen]*sync.Map[uint32, valueStructOu]
uo [slen]*sync.Map[string, ValueStructUo]
eg errgroup.Group
}

const (
Expand All @@ -79,14 +79,6 @@ func New(opts ...Option) BidiMap {
b.uo[i] = new(sync.Map[string, ValueStructUo])
}

if b.eg == nil {
b.eg, _ = errgroup.New(context.Background())
}

if b.concurrency > 0 {
b.eg.SetLimit(b.concurrency)
}

return b
}

Expand Down Expand Up @@ -151,24 +143,33 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) {

// Range retrieves all set keys and values and calls the callback function f.
func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
var wg sync.WaitGroup
eg, ctx = errgroup.New(ctx)

Check failure on line 146 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 146 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
if b.concurrency > 0 {
eg.SetLimit(b.concurrency)

Check failure on line 148 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 148 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
}
for i := range b.uo {
idx := i
wg.Add(1)
b.eg.Go(safety.RecoverFunc(func() (err error) {
eg.Go(safety.RecoverFunc(func() (err error) {

Check failure on line 152 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 152 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
b.uo[idx].Range(func(uuid string, val ValueStructUo) bool {
select {
case <-ctx.Done():
err = ctx.Err()
return false
default:
return f(uuid, val.value, val.timestamp)
}
})
wg.Done()
if err != nil &&
(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
return err
}
return nil
}))
}
wg.Wait()
err := eg.Wait()

Check failure on line 169 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 169 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
if err != nil {
log.Error(err)

Check failure on line 171 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: log

Check failure on line 171 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: log
}
}

// Len returns the length of the cache that is set in the bidi.
Expand All @@ -180,10 +181,7 @@ func (b *bidi) Len() uint64 {
}

func (b *bidi) Close() error {
if b == nil {
return nil
}
return b.eg.Wait()
return nil
}

func getShardID(key string) (id uint64) {
Expand Down
9 changes: 1 addition & 8 deletions pkg/agent/internal/kvs/kvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func TestNew(t *testing.T) {
l: 0,
ou: wantOu,
uo: wantUo,
eg: errgroup.Get(),
},
},
}
Expand Down Expand Up @@ -1676,12 +1675,10 @@ func Test_bidi_Range(t *testing.T) {
defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eg, egctx := errgroup.New(ctx)
b := &bidi{
ou: test.fields.ou,
uo: test.fields.uo,
l: test.fields.l,
eg: eg,
}
if test.beforeFunc != nil {
test.beforeFunc(tt, test.args, b)
Expand All @@ -1694,7 +1691,7 @@ func Test_bidi_Range(t *testing.T) {
checkFunc = defaultCheckFunc
}

b.Range(egctx, test.args.f)
b.Range(ctx, test.args.f)
if err := checkFunc(test.want, b); err != nil {
tt.Errorf("error = %v", err)
}
Expand Down Expand Up @@ -1791,7 +1788,6 @@ func Test_bidi_Len(t *testing.T) {
// l uint64
// ou [slen]*sync.Map[uint32, valueStructOu]
// uo [slen]*sync.Map[string, ValueStructUo]
// eg errgroup.Group
// }
// type want struct {
// err error
Expand Down Expand Up @@ -1820,7 +1816,6 @@ func Test_bidi_Len(t *testing.T) {
// l:0,
// ou:nil,
// uo:nil,
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
Expand All @@ -1843,7 +1838,6 @@ func Test_bidi_Len(t *testing.T) {
// l:0,
// ou:nil,
// uo:nil,
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
Expand Down Expand Up @@ -1878,7 +1872,6 @@ func Test_bidi_Len(t *testing.T) {
// l: test.fields.l,
// ou: test.fields.ou,
// uo: test.fields.uo,
// eg: test.fields.eg,
// }
//
// err := b.Close()
Expand Down
16 changes: 8 additions & 8 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion versions/PROMETHEUS_STACK_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
56.7.0
56.8.0
2 changes: 1 addition & 1 deletion versions/VALDCLI_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.7.10.Rev1
v1.7.11

0 comments on commit eb0bd29

Please sign in to comment.