diff --git a/go.mod b/go.mod index 2c6aefe78c4..22b17160b39 100644 --- a/go.mod +++ b/go.mod @@ -44,13 +44,13 @@ replace ( github.com/akrylysov/pogreb => github.com/akrylysov/pogreb v0.10.2 github.com/antihax/optional => github.com/antihax/optional v1.0.0 github.com/armon/go-socks5 => github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 - github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.20 + github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.21 github.com/aws/aws-sdk-go-v2 => github.com/aws/aws-sdk-go-v2 v1.25.0 github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 github.com/aws/aws-sdk-go-v2/config => github.com/aws/aws-sdk-go-v2/config v1.27.0 github.com/aws/aws-sdk-go-v2/credentials => github.com/aws/aws-sdk-go-v2/credentials v1.17.0 github.com/aws/aws-sdk-go-v2/feature/ec2/imds => github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 - github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 + github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 github.com/aws/aws-sdk-go-v2/internal/configsources => github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 github.com/aws/aws-sdk-go-v2/internal/ini => github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 @@ -59,7 +59,7 @@ replace ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 github.com/aws/aws-sdk-go-v2/service/internal/s3shared => github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 github.com/aws/aws-sdk-go-v2/service/kms => github.com/aws/aws-sdk-go-v2/service/kms v1.28.1 - github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 + github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 github.com/aws/aws-sdk-go-v2/service/secretsmanager => github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.27.1 github.com/aws/aws-sdk-go-v2/service/sns => github.com/aws/aws-sdk-go-v2/service/sns v1.28.0 github.com/aws/aws-sdk-go-v2/service/sqs => github.com/aws/aws-sdk-go-v2/service/sqs v1.30.1 diff --git a/go.sum b/go.sum index cfa743f25ef..859bb1728a4 100644 --- a/go.sum +++ b/go.sum @@ -185,8 +185,8 @@ github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybF github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/aws/aws-sdk-go v1.50.20 h1:xfAnSDVf/azIWTVQXQODp89bubvCS85r70O3nuQ4dnE= -github.com/aws/aws-sdk-go v1.50.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.50.21 h1:W8awpwiInOt4qHQE6JghRYQJhHcf/cDJS3mlZYqioSQ= +github.com/aws/aws-sdk-go v1.50.21/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ= github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE= @@ -197,8 +197,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.0 h1:lMW2x6sKBsiAJrpi1doOXqWFyEPo github.com/aws/aws-sdk-go-v2/credentials v1.17.0/go.mod h1:uT41FIH8cCIxOdUYIL0PYyHlL1NoneDuDSCwg5VE/5o= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh9ZrXfcAXpflhOUUeXg1k= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 h1:VEekE/fJWqAWYozxFQ07B+h8NdvTPAYhV13xIBenuO0= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2/go.mod h1:8vozqAHmDNmoD4YbuDKIfpnLbByzngczL4My1RELLVo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80= @@ -215,8 +215,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 h1:SHN/umDLT github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0/go.mod h1:l8gPU5RYGOFHJqWEpPMoRTP0VoaWQSkJdKo+hwWnnDA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECuPMIuZG7UKOzAnF24v6t4l+Z5Moay4= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= -github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA= -github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 h1:bjpWJEXch7moIt3PX2r5XpGROsletl7enqG1Q3Te1Dc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM= diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index 51454a3c544..0dc22156513 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -710,9 +710,7 @@ func (n *ngt) Remove(id uint) error { return n.newGoError(ne) } n.PutErrorBuffer(ne) - n.cnt.Add(^uint64(0)) - return nil } diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 4429d4ab7bc..ddb05b288b1 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1697,8 +1697,11 @@ func (n *ngt) IsIndexing() bool { func (n *ngt) UUIDs(ctx context.Context) (uuids []string) { uuids = make([]string, 0, n.kvs.Len()) + var mu sync.Mutex n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool { + mu.Lock() uuids = append(uuids, uuid) + mu.Unlock() return true }) return uuids @@ -1741,8 +1744,18 @@ func (n *ngt) GetDimensionSize() int { func (n *ngt) Close(ctx context.Context) (err error) { defer n.core.Close() - - err = n.kvs.Close() + defer func() { + kerr := n.kvs.Close() + if kerr != nil && + !errors.Is(err, context.Canceled) && + !errors.Is(err, context.DeadlineExceeded) { + if err != nil { + err = errors.Join(kerr, err) + } else { + err = kerr + } + } + }() if len(n.path) != 0 { if n.isReadReplica { log.Info("skip create and save index operation on close because this is read replica") diff --git a/pkg/agent/internal/kvs/kvs.go b/pkg/agent/internal/kvs/kvs.go index f5dedb13dc5..c17923d32d4 100644 --- a/pkg/agent/internal/kvs/kvs.go +++ b/pkg/agent/internal/kvs/kvs.go @@ -20,6 +20,7 @@ import ( "context" "sync/atomic" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/sync/errgroup" @@ -53,7 +54,6 @@ type bidi struct { l uint64 ou [slen]*sync.Map[uint32, valueStructOu] uo [slen]*sync.Map[string, ValueStructUo] - eg errgroup.Group } const ( @@ -79,14 +79,6 @@ func New(opts ...Option) BidiMap { b.uo[i] = new(sync.Map[string, ValueStructUo]) } - if b.eg == nil { - b.eg, _ = errgroup.New(context.Background()) - } - - if b.concurrency > 0 { - b.eg.SetLimit(b.concurrency) - } - return b } @@ -151,24 +143,33 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) { // Range retrieves all set keys and values and calls the callback function f. func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) { - var wg sync.WaitGroup + eg, ctx = errgroup.New(ctx) + if b.concurrency > 0 { + eg.SetLimit(b.concurrency) + } for i := range b.uo { idx := i - wg.Add(1) - b.eg.Go(safety.RecoverFunc(func() (err error) { + eg.Go(safety.RecoverFunc(func() (err error) { b.uo[idx].Range(func(uuid string, val ValueStructUo) bool { select { case <-ctx.Done(): + err = ctx.Err() return false default: return f(uuid, val.value, val.timestamp) } }) - wg.Done() + if err != nil && + (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + return err + } return nil })) } - wg.Wait() + err := eg.Wait() + if err != nil { + log.Error(err) + } } // Len returns the length of the cache that is set in the bidi. @@ -180,10 +181,7 @@ func (b *bidi) Len() uint64 { } func (b *bidi) Close() error { - if b == nil { - return nil - } - return b.eg.Wait() + return nil } func getShardID(key string) (id uint64) { diff --git a/pkg/agent/internal/kvs/kvs_test.go b/pkg/agent/internal/kvs/kvs_test.go index 0849e921ea4..0b6be3ebe8e 100644 --- a/pkg/agent/internal/kvs/kvs_test.go +++ b/pkg/agent/internal/kvs/kvs_test.go @@ -66,7 +66,6 @@ func TestNew(t *testing.T) { l: 0, ou: wantOu, uo: wantUo, - eg: errgroup.Get(), }, }, } @@ -1676,12 +1675,10 @@ func Test_bidi_Range(t *testing.T) { defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eg, egctx := errgroup.New(ctx) b := &bidi{ ou: test.fields.ou, uo: test.fields.uo, l: test.fields.l, - eg: eg, } if test.beforeFunc != nil { test.beforeFunc(tt, test.args, b) @@ -1694,7 +1691,7 @@ func Test_bidi_Range(t *testing.T) { checkFunc = defaultCheckFunc } - b.Range(egctx, test.args.f) + b.Range(ctx, test.args.f) if err := checkFunc(test.want, b); err != nil { tt.Errorf("error = %v", err) } @@ -1791,7 +1788,6 @@ func Test_bidi_Len(t *testing.T) { // l uint64 // ou [slen]*sync.Map[uint32, valueStructOu] // uo [slen]*sync.Map[string, ValueStructUo] -// eg errgroup.Group // } // type want struct { // err error @@ -1820,7 +1816,6 @@ func Test_bidi_Len(t *testing.T) { // l:0, // ou:nil, // uo:nil, -// eg:nil, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -1843,7 +1838,6 @@ func Test_bidi_Len(t *testing.T) { // l:0, // ou:nil, // uo:nil, -// eg:nil, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -1878,7 +1872,6 @@ func Test_bidi_Len(t *testing.T) { // l: test.fields.l, // ou: test.fields.ou, // uo: test.fields.uo, -// eg: test.fields.eg, // } // // err := b.Close() diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3a16f74f084..b961e6a9fe6 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -23,9 +23,9 @@ version = "0.1.0" [[package]] name = "anyhow" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" [[package]] name = "async-stream" @@ -577,18 +577,18 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", @@ -616,9 +616,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.49" +version = "2.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915aea9e586f80826ee59f8453c1101f9d1c4b3964cd2460185ee8e299ada496" +checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb" dependencies = [ "proc-macro2", "quote", diff --git a/versions/PROMETHEUS_STACK_VERSION b/versions/PROMETHEUS_STACK_VERSION index 13f01c3f710..ed50120be4f 100644 --- a/versions/PROMETHEUS_STACK_VERSION +++ b/versions/PROMETHEUS_STACK_VERSION @@ -1 +1 @@ -56.7.0 +56.8.0 diff --git a/versions/VALDCLI_VERSION b/versions/VALDCLI_VERSION index e20d08587bd..992322844af 100644 --- a/versions/VALDCLI_VERSION +++ b/versions/VALDCLI_VERSION @@ -1 +1 @@ -v1.7.10.Rev1 +v1.7.11