Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] resolve agent kvsdb already closed before last saving #2390

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ clean-generated:
mv $(ROOTDIR)/apis/grpc/v1/vald/vald.go $(TEMP_DIR)/vald.go
mv $(ROOTDIR)/apis/grpc/v1/agent/core/agent.go $(TEMP_DIR)/agent.go
mv $(ROOTDIR)/apis/grpc/v1/payload/interface.go $(TEMP_DIR)/interface.go
mv $(ROOTDIR)/apis/grpc/v1/mirror/mirror.go $(TEMP_DIR)/mirror.go
rm -rf \
$(ROOTDIR)/*.log \
$(ROOTDIR)/*.svg \
Expand All @@ -384,6 +385,8 @@ clean-generated:
mv $(TEMP_DIR)/agent.go $(ROOTDIR)/apis/grpc/v1/agent/core/agent.go
mkdir -p $(ROOTDIR)/apis/grpc/v1/payload
mv $(TEMP_DIR)/interface.go $(ROOTDIR)/apis/grpc/v1/payload/interface.go
mkdir -p $(ROOTDIR)/apis/grpc/v1/mirror
mv $(TEMP_DIR)/mirror.go $(ROOTDIR)/apis/grpc/v1/mirror/mirror.go

.PHONY: license
## add license to files
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ replace (
github.com/akrylysov/pogreb => github.com/akrylysov/pogreb v0.10.2
github.com/antihax/optional => github.com/antihax/optional v1.0.0
github.com/armon/go-socks5 => github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.20
github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.50.21
github.com/aws/aws-sdk-go-v2 => github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0
github.com/aws/aws-sdk-go-v2/config => github.com/aws/aws-sdk-go-v2/config v1.27.0
github.com/aws/aws-sdk-go-v2/credentials => github.com/aws/aws-sdk-go-v2/credentials v1.17.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds => github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
github.com/aws/aws-sdk-go-v2/feature/s3/manager => github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2
github.com/aws/aws-sdk-go-v2/internal/configsources => github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 => github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0
github.com/aws/aws-sdk-go-v2/internal/ini => github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0
Expand All @@ -59,7 +59,7 @@ replace (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url => github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0
github.com/aws/aws-sdk-go-v2/service/internal/s3shared => github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0
github.com/aws/aws-sdk-go-v2/service/kms => github.com/aws/aws-sdk-go-v2/service/kms v1.28.1
github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
github.com/aws/aws-sdk-go-v2/service/s3 => github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager => github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.27.1
github.com/aws/aws-sdk-go-v2/service/sns => github.com/aws/aws-sdk-go-v2/service/sns v1.28.0
github.com/aws/aws-sdk-go-v2/service/sqs => github.com/aws/aws-sdk-go-v2/service/sqs v1.30.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybF
github.com/apache/thrift v0.17.0/go.mod h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go v1.50.20 h1:xfAnSDVf/azIWTVQXQODp89bubvCS85r70O3nuQ4dnE=
github.com/aws/aws-sdk-go v1.50.20/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.50.21 h1:W8awpwiInOt4qHQE6JghRYQJhHcf/cDJS3mlZYqioSQ=
github.com/aws/aws-sdk-go v1.50.21/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
Expand All @@ -197,8 +197,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.0 h1:lMW2x6sKBsiAJrpi1doOXqWFyEPo
github.com/aws/aws-sdk-go-v2/credentials v1.17.0/go.mod h1:uT41FIH8cCIxOdUYIL0PYyHlL1NoneDuDSCwg5VE/5o=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh9ZrXfcAXpflhOUUeXg1k=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2 h1:VEekE/fJWqAWYozxFQ07B+h8NdvTPAYhV13xIBenuO0=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.2/go.mod h1:8vozqAHmDNmoD4YbuDKIfpnLbByzngczL4My1RELLVo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
Expand All @@ -215,8 +215,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 h1:SHN/umDLT
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0/go.mod h1:l8gPU5RYGOFHJqWEpPMoRTP0VoaWQSkJdKo+hwWnnDA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECuPMIuZG7UKOzAnF24v6t4l+Z5Moay4=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1 h1:bjpWJEXch7moIt3PX2r5XpGROsletl7enqG1Q3Te1Dc=
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.1/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.5 h1:2k9KmFawS63euAkY4/ixVNsYYwrwnd5fIvgEKkfZFNM=
Expand Down
2 changes: 0 additions & 2 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/*
#cgo LDFLAGS: -lngt
#include <NGT/Capi.h>

Check failure on line 22 in internal/core/algorithm/ngt/ngt.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

fatal error: NGT/Capi.h: No such file or directory
#include <stdlib.h>
*/
import "C"
Expand Down Expand Up @@ -710,9 +710,7 @@
return n.newGoError(ne)
}
n.PutErrorBuffer(ne)

n.cnt.Add(^uint64(0))

return nil
}

Expand Down
17 changes: 15 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,11 @@

func (n *ngt) UUIDs(ctx context.Context) (uuids []string) {
uuids = make([]string, 0, n.kvs.Len())
var mu sync.Mutex

Check warning on line 1700 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1700

Added line #L1700 was not covered by tests
n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool {
mu.Lock()

Check warning on line 1702 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1702

Added line #L1702 was not covered by tests
uuids = append(uuids, uuid)
mu.Unlock()

Check warning on line 1704 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1704

Added line #L1704 was not covered by tests
return true
})
return uuids
Expand Down Expand Up @@ -1741,8 +1744,18 @@

func (n *ngt) Close(ctx context.Context) (err error) {
defer n.core.Close()

err = n.kvs.Close()
defer func() {
kerr := n.kvs.Close()
if kerr != nil &&
!errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
if err != nil {
err = errors.Join(kerr, err)
} else {
err = kerr
}

Check warning on line 1756 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1752-L1756

Added lines #L1752 - L1756 were not covered by tests
}
}()
if len(n.path) != 0 {
if n.isReadReplica {
log.Info("skip create and save index operation on close because this is read replica")
Expand Down
35 changes: 17 additions & 18 deletions pkg/agent/internal/kvs/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -53,7 +55,6 @@ type bidi struct {
l uint64
ou [slen]*sync.Map[uint32, valueStructOu]
uo [slen]*sync.Map[string, ValueStructUo]
eg errgroup.Group
}

const (
Expand All @@ -79,14 +80,6 @@ func New(opts ...Option) BidiMap {
b.uo[i] = new(sync.Map[string, ValueStructUo])
}

if b.eg == nil {
b.eg, _ = errgroup.New(context.Background())
}

if b.concurrency > 0 {
b.eg.SetLimit(b.concurrency)
}

return b
}

Expand Down Expand Up @@ -151,24 +144,33 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) {

// Range retrieves all set keys and values and calls the callback function f.
func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
var wg sync.WaitGroup
eg, ctx := errgroup.New(ctx)
if b.concurrency > 0 {
eg.SetLimit(b.concurrency)
}
for i := range b.uo {
idx := i
wg.Add(1)
b.eg.Go(safety.RecoverFunc(func() (err error) {
eg.Go(safety.RecoverFunc(func() (err error) {
b.uo[idx].Range(func(uuid string, val ValueStructUo) bool {
select {
case <-ctx.Done():
err = ctx.Err()
return false
default:
return f(uuid, val.value, val.timestamp)
}
})
wg.Done()
if err != nil &&
(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
return err
}
return nil
}))
}
wg.Wait()
err := eg.Wait()
if err != nil {
log.Error(err)
}
}

// Len returns the length of the cache that is set in the bidi.
Expand All @@ -180,10 +182,7 @@ func (b *bidi) Len() uint64 {
}

func (b *bidi) Close() error {
if b == nil {
return nil
}
return b.eg.Wait()
return nil
}

func getShardID(key string) (id uint64) {
Expand Down
9 changes: 1 addition & 8 deletions pkg/agent/internal/kvs/kvs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func TestNew(t *testing.T) {
l: 0,
ou: wantOu,
uo: wantUo,
eg: errgroup.Get(),
},
},
}
Expand Down Expand Up @@ -1676,12 +1675,10 @@ func Test_bidi_Range(t *testing.T) {
defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eg, egctx := errgroup.New(ctx)
b := &bidi{
ou: test.fields.ou,
uo: test.fields.uo,
l: test.fields.l,
eg: eg,
}
if test.beforeFunc != nil {
test.beforeFunc(tt, test.args, b)
Expand All @@ -1694,7 +1691,7 @@ func Test_bidi_Range(t *testing.T) {
checkFunc = defaultCheckFunc
}

b.Range(egctx, test.args.f)
b.Range(ctx, test.args.f)
if err := checkFunc(test.want, b); err != nil {
tt.Errorf("error = %v", err)
}
Expand Down Expand Up @@ -1791,7 +1788,6 @@ func Test_bidi_Len(t *testing.T) {
// l uint64
// ou [slen]*sync.Map[uint32, valueStructOu]
// uo [slen]*sync.Map[string, ValueStructUo]
// eg errgroup.Group
// }
// type want struct {
// err error
Expand Down Expand Up @@ -1820,7 +1816,6 @@ func Test_bidi_Len(t *testing.T) {
// l:0,
// ou:nil,
// uo:nil,
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
Expand All @@ -1843,7 +1838,6 @@ func Test_bidi_Len(t *testing.T) {
// l:0,
// ou:nil,
// uo:nil,
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
Expand Down Expand Up @@ -1878,7 +1872,6 @@ func Test_bidi_Len(t *testing.T) {
// l: test.fields.l,
// ou: test.fields.ou,
// uo: test.fields.uo,
// eg: test.fields.eg,
// }
//
// err := b.Close()
Expand Down
11 changes: 0 additions & 11 deletions pkg/agent/internal/kvs/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package kvs

import (
"runtime"

"github.com/vdaas/vald/internal/sync/errgroup"
)

// Option represents the functional option for bidi.
Expand All @@ -29,15 +27,6 @@ var defaultOptions = []Option{
WithConcurrency(runtime.GOMAXPROCS(-1) * 10),
}

// WithErrGroup returns the option to set the errgroup.
func WithErrGroup(eg errgroup.Group) Option {
return func(b *bidi) {
if eg != nil {
b.eg = eg
}
}
}

// WithConcurrency returns the option to set the concurrency.
func WithConcurrency(c int) Option {
return func(b *bidi) {
Expand Down
86 changes: 0 additions & 86 deletions pkg/agent/internal/kvs/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,6 @@ package kvs

// NOT IMPLEMENTED BELOW
//
// func TestWithErrGroup(t *testing.T) {
// type args struct {
// eg errgroup.Group
// }
// type want struct {
// want Option
// }
// type test struct {
// name string
// args args
// want want
// checkFunc func(want, Option) error
// beforeFunc func(*testing.T, args)
// afterFunc func(*testing.T, args)
// }
// defaultCheckFunc := func(w want, got Option) error {
// if !reflect.DeepEqual(got, w.want) {
// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want)
// }
// return nil
// }
// tests := []test{
// // TODO test cases
// /*
// {
// name: "test_case_1",
// args: args {
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// afterFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// },
// */
//
// // TODO test cases
// /*
// func() test {
// return test {
// name: "test_case_2",
// args: args {
// eg:nil,
// },
// want: want{},
// checkFunc: defaultCheckFunc,
// beforeFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// afterFunc: func(t *testing.T, args args) {
// t.Helper()
// },
// }
// }(),
// */
// }
//
// for _, tc := range tests {
// test := tc
// t.Run(test.name, func(tt *testing.T) {
// tt.Parallel()
// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
// if test.beforeFunc != nil {
// test.beforeFunc(tt, test.args)
// }
// if test.afterFunc != nil {
// defer test.afterFunc(tt, test.args)
// }
// checkFunc := test.checkFunc
// if test.checkFunc == nil {
// checkFunc = defaultCheckFunc
// }
//
// got := WithErrGroup(test.args.eg)
// if err := checkFunc(test.want, got); err != nil {
// tt.Errorf("error = %v", err)
// }
//
// })
// }
// }
//
// func TestWithConcurrency(t *testing.T) {
// type args struct {
// c int
Expand Down
Loading
Loading