From e734ac3c4fc0681c3117386d3b4955ef667bb615 Mon Sep 17 00:00:00 2001 From: kpango Date: Tue, 20 Feb 2024 19:35:14 +0900 Subject: [PATCH] resolve kvs already closed before last saving Signed-off-by: kpango --- Makefile | 3 + go.mod | 6 +- go.sum | 12 ++-- internal/core/algorithm/ngt/ngt.go | 2 - pkg/agent/core/ngt/service/ngt.go | 17 +++++- pkg/agent/internal/kvs/kvs.go | 35 ++++++----- pkg/agent/internal/kvs/kvs_test.go | 9 +-- pkg/agent/internal/kvs/option.go | 11 ---- pkg/agent/internal/kvs/option_test.go | 86 --------------------------- rust/Cargo.lock | 16 ++--- versions/PROMETHEUS_STACK_VERSION | 2 +- versions/VALDCLI_VERSION | 2 +- 12 files changed, 55 insertions(+), 146 deletions(-) diff --git a/Makefile b/Makefile index fd999e9086..1b6821d242 100644 --- a/Makefile +++ b/Makefile @@ -369,6 +369,7 @@ clean-generated: mv $(ROOTDIR)/apis/grpc/v1/vald/vald.go $(TEMP_DIR)/vald.go mv $(ROOTDIR)/apis/grpc/v1/agent/core/agent.go $(TEMP_DIR)/agent.go mv $(ROOTDIR)/apis/grpc/v1/payload/interface.go $(TEMP_DIR)/interface.go + mv $(ROOTDIR)/apis/grpc/v1/mirror/mirror.go $(TEMP_DIR)/mirror.go rm -rf \ $(ROOTDIR)/*.log \ $(ROOTDIR)/*.svg \ @@ -384,6 +385,8 @@ clean-generated: mv $(TEMP_DIR)/agent.go $(ROOTDIR)/apis/grpc/v1/agent/core/agent.go mkdir -p $(ROOTDIR)/apis/grpc/v1/payload mv $(TEMP_DIR)/interface.go $(ROOTDIR)/apis/grpc/v1/payload/interface.go + mkdir -p $(ROOTDIR)/apis/grpc/v1/mirror + mv $(TEMP_DIR)/mirror.go $(ROOTDIR)/apis/grpc/v1/mirror/mirror.go .PHONY: license ## add license to files diff --git a/go.mod b/go.mod index 2c6aefe78c..22b17160b3 100644 --- a/go.mod +++ b/go.mod @@ -44,13 +44,13 @@ replace ( github.com/akrylysov/pogreb => github.com/akrylysov/pogreb v0.10.2 github.com/antihax/optional => github.com/antihax/optional v1.0.0 github.com/armon/go-socks5 => github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 - github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.20 + github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.21 github.com/aws/aws-sdk-go-v2 => github.com/aws/aws-sdk-go-v2 v1.25.0 github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 github.com/aws/aws-sdk-go-v2/config => github.com/aws/aws-sdk-go-v2/config v1.27.0 github.com/aws/aws-sdk-go-v2/credentials => github.com/aws/aws-sdk-go-v2/credentials v1.17.0 github.com/aws/aws-sdk-go-v2/feature/ec2/imds => github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 - github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 + github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 github.com/aws/aws-sdk-go-v2/internal/configsources => github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 github.com/aws/aws-sdk-go-v2/internal/ini => github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 @@ -59,7 +59,7 @@ replace ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 github.com/aws/aws-sdk-go-v2/service/internal/s3shared => github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 github.com/aws/aws-sdk-go-v2/service/kms => github.com/aws/aws-sdk-go-v2/service/kms v1.28.1 - github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 + github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 github.com/aws/aws-sdk-go-v2/service/secretsmanager => github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.27.1 github.com/aws/aws-sdk-go-v2/service/sns => github.com/aws/aws-sdk-go-v2/service/sns v1.28.0 github.com/aws/aws-sdk-go-v2/service/sqs => github.com/aws/aws-sdk-go-v2/service/sqs v1.30.1 diff --git a/go.sum b/go.sum index cfa743f25e..859bb1728a 100644 --- a/go.sum +++ b/go.sum @@ -185,8 +185,8 @@ github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybF github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= -github.com/aws/aws-sdk-go v1.50.20 h1:xfAnSDVf/azIWTVQXQODp89bubvCS85r70O3nuQ4dnE= -github.com/aws/aws-sdk-go v1.50.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.50.21 h1:W8awpwiInOt4qHQE6JghRYQJhHcf/cDJS3mlZYqioSQ= +github.com/aws/aws-sdk-go v1.50.21/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ= github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE= @@ -197,8 +197,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.0 h1:lMW2x6sKBsiAJrpi1doOXqWFyEPo github.com/aws/aws-sdk-go-v2/credentials v1.17.0/go.mod h1:uT41FIH8cCIxOdUYIL0PYyHlL1NoneDuDSCwg5VE/5o= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh9ZrXfcAXpflhOUUeXg1k= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM= -github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 h1:VEekE/fJWqAWYozxFQ07B+h8NdvTPAYhV13xIBenuO0= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2/go.mod h1:8vozqAHmDNmoD4YbuDKIfpnLbByzngczL4My1RELLVo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80= @@ -215,8 +215,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 h1:SHN/umDLT github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0/go.mod h1:l8gPU5RYGOFHJqWEpPMoRTP0VoaWQSkJdKo+hwWnnDA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECuPMIuZG7UKOzAnF24v6t4l+Z5Moay4= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E= -github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA= -github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= +github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 h1:bjpWJEXch7moIt3PX2r5XpGROsletl7enqG1Q3Te1Dc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM= github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM= diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index 51454a3c54..0dc2215651 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -710,9 +710,7 @@ func (n *ngt) Remove(id uint) error { return n.newGoError(ne) } n.PutErrorBuffer(ne) - n.cnt.Add(^uint64(0)) - return nil } diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 4429d4ab7b..ddb05b288b 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1697,8 +1697,11 @@ func (n *ngt) IsIndexing() bool { func (n *ngt) UUIDs(ctx context.Context) (uuids []string) { uuids = make([]string, 0, n.kvs.Len()) + var mu sync.Mutex n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool { + mu.Lock() uuids = append(uuids, uuid) + mu.Unlock() return true }) return uuids @@ -1741,8 +1744,18 @@ func (n *ngt) GetDimensionSize() int { func (n *ngt) Close(ctx context.Context) (err error) { defer n.core.Close() - - err = n.kvs.Close() + defer func() { + kerr := n.kvs.Close() + if kerr != nil && + !errors.Is(err, context.Canceled) && + !errors.Is(err, context.DeadlineExceeded) { + if err != nil { + err = errors.Join(kerr, err) + } else { + err = kerr + } + } + }() if len(n.path) != 0 { if n.isReadReplica { log.Info("skip create and save index operation on close because this is read replica") diff --git a/pkg/agent/internal/kvs/kvs.go b/pkg/agent/internal/kvs/kvs.go index f5dedb13dc..f0c58612b9 100644 --- a/pkg/agent/internal/kvs/kvs.go +++ b/pkg/agent/internal/kvs/kvs.go @@ -20,6 +20,8 @@ import ( "context" "sync/atomic" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/sync/errgroup" @@ -53,7 +55,6 @@ type bidi struct { l uint64 ou [slen]*sync.Map[uint32, valueStructOu] uo [slen]*sync.Map[string, ValueStructUo] - eg errgroup.Group } const ( @@ -79,14 +80,6 @@ func New(opts ...Option) BidiMap { b.uo[i] = new(sync.Map[string, ValueStructUo]) } - if b.eg == nil { - b.eg, _ = errgroup.New(context.Background()) - } - - if b.concurrency > 0 { - b.eg.SetLimit(b.concurrency) - } - return b } @@ -151,24 +144,33 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) { // Range retrieves all set keys and values and calls the callback function f. func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) { - var wg sync.WaitGroup + eg, ctx := errgroup.New(ctx) + if b.concurrency > 0 { + eg.SetLimit(b.concurrency) + } for i := range b.uo { idx := i - wg.Add(1) - b.eg.Go(safety.RecoverFunc(func() (err error) { + eg.Go(safety.RecoverFunc(func() (err error) { b.uo[idx].Range(func(uuid string, val ValueStructUo) bool { select { case <-ctx.Done(): + err = ctx.Err() return false default: return f(uuid, val.value, val.timestamp) } }) - wg.Done() + if err != nil && + (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + return err + } return nil })) } - wg.Wait() + err := eg.Wait() + if err != nil { + log.Error(err) + } } // Len returns the length of the cache that is set in the bidi. @@ -180,10 +182,7 @@ func (b *bidi) Len() uint64 { } func (b *bidi) Close() error { - if b == nil { - return nil - } - return b.eg.Wait() + return nil } func getShardID(key string) (id uint64) { diff --git a/pkg/agent/internal/kvs/kvs_test.go b/pkg/agent/internal/kvs/kvs_test.go index 0849e921ea..0b6be3ebe8 100644 --- a/pkg/agent/internal/kvs/kvs_test.go +++ b/pkg/agent/internal/kvs/kvs_test.go @@ -66,7 +66,6 @@ func TestNew(t *testing.T) { l: 0, ou: wantOu, uo: wantUo, - eg: errgroup.Get(), }, }, } @@ -1676,12 +1675,10 @@ func Test_bidi_Range(t *testing.T) { defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eg, egctx := errgroup.New(ctx) b := &bidi{ ou: test.fields.ou, uo: test.fields.uo, l: test.fields.l, - eg: eg, } if test.beforeFunc != nil { test.beforeFunc(tt, test.args, b) @@ -1694,7 +1691,7 @@ func Test_bidi_Range(t *testing.T) { checkFunc = defaultCheckFunc } - b.Range(egctx, test.args.f) + b.Range(ctx, test.args.f) if err := checkFunc(test.want, b); err != nil { tt.Errorf("error = %v", err) } @@ -1791,7 +1788,6 @@ func Test_bidi_Len(t *testing.T) { // l uint64 // ou [slen]*sync.Map[uint32, valueStructOu] // uo [slen]*sync.Map[string, ValueStructUo] -// eg errgroup.Group // } // type want struct { // err error @@ -1820,7 +1816,6 @@ func Test_bidi_Len(t *testing.T) { // l:0, // ou:nil, // uo:nil, -// eg:nil, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -1843,7 +1838,6 @@ func Test_bidi_Len(t *testing.T) { // l:0, // ou:nil, // uo:nil, -// eg:nil, // }, // want: want{}, // checkFunc: defaultCheckFunc, @@ -1878,7 +1872,6 @@ func Test_bidi_Len(t *testing.T) { // l: test.fields.l, // ou: test.fields.ou, // uo: test.fields.uo, -// eg: test.fields.eg, // } // // err := b.Close() diff --git a/pkg/agent/internal/kvs/option.go b/pkg/agent/internal/kvs/option.go index 0f00ae1786..06198c69dd 100644 --- a/pkg/agent/internal/kvs/option.go +++ b/pkg/agent/internal/kvs/option.go @@ -18,8 +18,6 @@ package kvs import ( "runtime" - - "github.com/vdaas/vald/internal/sync/errgroup" ) // Option represents the functional option for bidi. @@ -29,15 +27,6 @@ var defaultOptions = []Option{ WithConcurrency(runtime.GOMAXPROCS(-1) * 10), } -// WithErrGroup returns the option to set the errgroup. -func WithErrGroup(eg errgroup.Group) Option { - return func(b *bidi) { - if eg != nil { - b.eg = eg - } - } -} - // WithConcurrency returns the option to set the concurrency. func WithConcurrency(c int) Option { return func(b *bidi) { diff --git a/pkg/agent/internal/kvs/option_test.go b/pkg/agent/internal/kvs/option_test.go index e57f3508ee..864e36a907 100644 --- a/pkg/agent/internal/kvs/option_test.go +++ b/pkg/agent/internal/kvs/option_test.go @@ -18,92 +18,6 @@ package kvs // NOT IMPLEMENTED BELOW // -// func TestWithErrGroup(t *testing.T) { -// type args struct { -// eg errgroup.Group -// } -// type want struct { -// want Option -// } -// type test struct { -// name string -// args args -// want want -// checkFunc func(want, Option) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, got Option) error { -// if !reflect.DeepEqual(got, w.want) { -// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// eg:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// eg:nil, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// -// got := WithErrGroup(test.args.eg) -// if err := checkFunc(test.want, got); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } -// // func TestWithConcurrency(t *testing.T) { // type args struct { // c int diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3a16f74f08..b961e6a9fe 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -23,9 +23,9 @@ version = "0.1.0" [[package]] name = "anyhow" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" +checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" [[package]] name = "async-stream" @@ -577,18 +577,18 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.197" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", @@ -616,9 +616,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.49" +version = "2.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "915aea9e586f80826ee59f8453c1101f9d1c4b3964cd2460185ee8e299ada496" +checksum = "74f1bdc9872430ce9b75da68329d1c1746faf50ffac5f19e02b71e37ff881ffb" dependencies = [ "proc-macro2", "quote", diff --git a/versions/PROMETHEUS_STACK_VERSION b/versions/PROMETHEUS_STACK_VERSION index 13f01c3f71..ed50120be4 100644 --- a/versions/PROMETHEUS_STACK_VERSION +++ b/versions/PROMETHEUS_STACK_VERSION @@ -1 +1 @@ -56.7.0 +56.8.0 diff --git a/versions/VALDCLI_VERSION b/versions/VALDCLI_VERSION index e20d08587b..992322844a 100644 --- a/versions/VALDCLI_VERSION +++ b/versions/VALDCLI_VERSION @@ -1 +1 @@ -v1.7.10.Rev1 +v1.7.11