diff --git a/go.mod b/go.mod index aeccbce84f..bb2e23ef82 100755 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ replace ( github.com/ajstarks/svgo => github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b github.com/antihax/optional => github.com/antihax/optional v1.0.0 github.com/armon/go-socks5 => github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 - github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.44.293 + github.com/aws/aws-sdk-go => github.com/aws/aws-sdk-go v1.44.294 github.com/aws/aws-sdk-go-v2 => github.com/aws/aws-sdk-go-v2 v1.18.1 github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream => github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 github.com/aws/aws-sdk-go-v2/config => github.com/aws/aws-sdk-go-v2/config v1.18.27 @@ -194,7 +194,7 @@ replace ( github.com/klauspost/cpuid/v2 => github.com/klauspost/cpuid/v2 v2.2.5 github.com/kpango/fastime => github.com/kpango/fastime v1.1.9 github.com/kpango/fuid => github.com/kpango/fuid v0.0.0-20221203053508-503b5ad89aa1 - github.com/kpango/gache => github.com/kpango/gache v1.2.8 + github.com/kpango/gache/v2 => github.com/kpango/gache/v2 v2.0.8 github.com/kpango/glg => github.com/kpango/glg v1.6.15 github.com/kr/fs => github.com/kr/fs v0.1.0 github.com/kr/pretty => github.com/kr/pretty v0.3.1 @@ -244,7 +244,7 @@ replace ( github.com/quasilyte/gogrep => github.com/quasilyte/gogrep v0.5.0 github.com/quasilyte/stdinfo => github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 github.com/rogpeppe/fastuuid => github.com/rogpeppe/fastuuid v1.2.0 - github.com/rogpeppe/go-internal => github.com/rogpeppe/go-internal v1.10.0 + github.com/rogpeppe/go-internal => github.com/rogpeppe/go-internal v1.11.0 github.com/rs/xid => github.com/rs/xid v1.5.0 github.com/rs/zerolog => github.com/rs/zerolog v1.29.1 github.com/russross/blackfriday/v2 => github.com/russross/blackfriday/v2 v2.1.0 @@ -362,8 +362,8 @@ require ( github.com/klauspost/compress v1.15.9 github.com/kpango/fastime v1.1.9 github.com/kpango/fuid v0.0.0-00010101000000-000000000000 - github.com/kpango/gache v0.0.0-00010101000000-000000000000 - github.com/kpango/glg v1.6.14 + github.com/kpango/gache/v2 v2.0.0-00010101000000-000000000000 + github.com/kpango/glg v1.6.15 github.com/leanovate/gopter v0.0.0-00010101000000-000000000000 github.com/lucasb-eyer/go-colorful v0.0.0-00010101000000-000000000000 github.com/pierrec/lz4/v3 v3.0.0-00010101000000-000000000000 diff --git a/go.sum b/go.sum index 5c96c6a078..7c50c47120 100644 --- a/go.sum +++ b/go.sum @@ -201,8 +201,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= -github.com/aws/aws-sdk-go v1.44.293 h1:oBPrQqsyMYe61Sl/xKVvQFflXjPwYH11aKi8QR3Nhts= -github.com/aws/aws-sdk-go v1.44.293/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.44.294 h1:3x7GaEth+pDU9HwFcAU0awZlEix5CEdyIZvV08SlHa8= +github.com/aws/aws-sdk-go v1.44.294/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo= github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs= @@ -503,8 +503,8 @@ github.com/kpango/fastime v1.1.9 h1:xVQHcqyPt5M69DyFH7g1EPRns1YQNap9d5eLhl/Jy84= github.com/kpango/fastime v1.1.9/go.mod h1:vyD7FnUn08zxY4b/QFBZVG+9EWMYsNl+QF0uE46urD4= github.com/kpango/fuid v0.0.0-20221203053508-503b5ad89aa1 h1:rxyM+7uaZQ35P9fbixdnld/h4AgEhODoubuy6A4nDdk= github.com/kpango/fuid v0.0.0-20221203053508-503b5ad89aa1/go.mod h1:CAYeq6us9NfnRkSz67/xKVIR6/vaY5ZQZRe6IVcaIKg= -github.com/kpango/gache v1.2.8 h1:+OjREOmuWO4qrJksDhzWJq80o9iwHiezdVmMR1jtCG0= -github.com/kpango/gache v1.2.8/go.mod h1:UyBo0IoPFDSJypK2haDXeV6PwHEmBcXQA0BLuOYEvWg= +github.com/kpango/gache/v2 v2.0.8 h1:lzT3xtkPTvDSPFnF6pC/x4ZDDQXbhkEFlwDx0YavQwM= +github.com/kpango/gache/v2 v2.0.8/go.mod h1:5AWVWlHau1dwI9Hzf+NZc4rPTwxM3SVwJQgob/OyAjQ= github.com/kpango/glg v1.6.15 h1:nw0xSxpSyrDIWHeb3dvnE08PW+SCbK+aYFETT75IeLA= github.com/kpango/glg v1.6.15/go.mod h1:cmsc7Yeu8AS3wHLmN7bhwENXOpxfq+QoqxCIk2FneRk= github.com/kpango/go-hostpool v0.0.0-20210303030322-aab80263dcd0 h1:orIEVdc68woWO1ZyYWEVOl5Kl33eDjP+kbxgbdpMwi4= @@ -604,8 +604,8 @@ github.com/rakyll/embedmd v0.0.0-20171029212350-c8060a0752a2/go.mod h1:7jOTMgqac github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/hack/go.mod.default b/hack/go.mod.default index f91d21d04b..270cfed58e 100755 --- a/hack/go.mod.default +++ b/hack/go.mod.default @@ -194,7 +194,7 @@ replace ( github.com/klauspost/cpuid/v2 => github.com/klauspost/cpuid/v2 latest github.com/kpango/fastime => github.com/kpango/fastime latest github.com/kpango/fuid => github.com/kpango/fuid latest - github.com/kpango/gache => github.com/kpango/gache latest + github.com/kpango/gache/v2 => github.com/kpango/gache/v2 latest github.com/kpango/glg => github.com/kpango/glg latest github.com/kr/fs => github.com/kr/fs latest github.com/kr/pretty => github.com/kr/pretty latest diff --git a/internal/cache/cache.go b/internal/cache/cache.go index ddb86935ca..f14ce31bc6 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -26,7 +26,7 @@ import ( "github.com/vdaas/vald/internal/errors" ) -type cache struct { +type cache[V any] struct { cacher cacher.Type expireDur time.Duration expireCheckDur time.Duration @@ -34,17 +34,17 @@ type cache struct { } // New returns the Cache instance or error. -func New(opts ...Option) (cc cacher.Cache, err error) { - c := new(cache) - for _, opt := range append(defaultOptions, opts...) { +func New[V any](opts ...Option[V]) (cc cacher.Cache[V], err error) { + c := new(cache[V]) + for _, opt := range append(defaultOptions[V](), opts...) { opt(c) } switch c.cacher { case cacher.GACHE: - return gache.New( - gache.WithExpireDuration(c.expireDur), - gache.WithExpireCheckDuration(c.expireCheckDur), - gache.WithExpiredHook(c.expiredHook), + return gache.New[V]( + gache.WithExpireDuration[V](c.expireDur), + gache.WithExpireCheckDuration[V](c.expireCheckDur), + gache.WithExpiredHook[V](c.expiredHook), ), nil default: return nil, errors.ErrInvalidCacherType diff --git a/internal/cache/cache_test.go b/internal/cache/cache_test.go index 7d00d52ca0..4a5186144b 100644 --- a/internal/cache/cache_test.go +++ b/internal/cache/cache_test.go @@ -33,21 +33,21 @@ var goleakIgnoreOptions = []goleak.Option{ func TestNew(t *testing.T) { type args struct { - opts []Option + opts []Option[any] } type want struct { - wantCc cacher.Cache + wantCc cacher.Cache[any] err error } type test struct { name string args args want want - checkFunc func(want, cacher.Cache, error) error + checkFunc func(want, cacher.Cache[any], error) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, gotCc cacher.Cache, err error) error { + defaultCheckFunc := func(w want, gotCc cacher.Cache[any], err error) error { if !errors.Is(err, w.err) { return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) } @@ -60,9 +60,9 @@ func TestNew(t *testing.T) { { name: "return gache cacher", args: args{ - opts: []Option{WithType("gache")}, + opts: []Option[any]{WithType[any]("gache")}, }, - checkFunc: func(w want, got cacher.Cache, err error) error { + checkFunc: func(w want, got cacher.Cache[any], err error) error { if err != nil { return err } @@ -76,7 +76,7 @@ func TestNew(t *testing.T) { { name: "return unknown error when type is unknown", args: args{ - opts: []Option{WithType("unknown")}, + opts: []Option[any]{WithType[any]("unknown")}, }, want: want{ err: errors.ErrInvalidCacherType, @@ -85,9 +85,9 @@ func TestNew(t *testing.T) { { name: "return cache when type is empty", args: args{ - opts: []Option{WithType("")}, + opts: []Option[any]{WithType[any]("")}, }, - checkFunc: func(w want, got cacher.Cache, err error) error { + checkFunc: func(w want, got cacher.Cache[any], err error) error { if err != nil { return err } @@ -101,7 +101,7 @@ func TestNew(t *testing.T) { { name: "return unknown error when type is dummy string", args: args{ - opts: []Option{WithType("dummy")}, + opts: []Option[any]{WithType[any]("dummy")}, }, want: want{ err: errors.ErrInvalidCacherType, @@ -124,7 +124,7 @@ func TestNew(t *testing.T) { checkFunc = defaultCheckFunc } - gotCc, err := New(test.args.opts...) + gotCc, err := New[any](test.args.opts...) if err := checkFunc(test.want, gotCc, err); err != nil { tt.Errorf("error = %v", err) } diff --git a/internal/cache/cacher/cacher.go b/internal/cache/cacher/cacher.go index 910d4a0844..0624115ec3 100644 --- a/internal/cache/cacher/cacher.go +++ b/internal/cache/cacher/cacher.go @@ -24,12 +24,12 @@ import ( ) // Cache represent the cache interface to store cache. -type Cache interface { +type Cache[V any] interface { Start(context.Context) - Get(string) (interface{}, bool) - Set(string, interface{}) + Get(string) (V, bool) + Set(string, V) Delete(string) - GetAndDelete(string) (interface{}, bool) + GetAndDelete(string) (V, bool) } // Type represents the cacher type. Currently it support GACHE only. diff --git a/internal/cache/gache/gache.go b/internal/cache/gache/gache.go index 94c9a84dce..094a1bf2c0 100644 --- a/internal/cache/gache/gache.go +++ b/internal/cache/gache/gache.go @@ -21,21 +21,21 @@ import ( "context" "time" - "github.com/kpango/gache" + gache "github.com/kpango/gache/v2" "github.com/vdaas/vald/internal/cache/cacher" ) -type cache struct { - gache gache.Gache +type cache[V any] struct { + gache gache.Gache[V] expireDur time.Duration expireCheckDur time.Duration expiredHook func(context.Context, string) } // New loads a cache model and returns a new cache struct. -func New(opts ...Option) cacher.Cache { - c := new(cache) - for _, opt := range append(defaultOptions(), opts...) { +func New[V any](opts ...Option[V]) cacher.Cache[V] { + c := new(cache[V]) + for _, opt := range append(defaultOptions[V](), opts...) { opt(c) } c.gache.SetDefaultExpire(c.expireDur) @@ -48,31 +48,31 @@ func New(opts ...Option) cacher.Cache { } // Start calls StartExpired func of c.gache. -func (c *cache) Start(ctx context.Context) { +func (c *cache[V]) Start(ctx context.Context) { c.gache.StartExpired(ctx, c.expireCheckDur) } // Get calls StartExpired func of c.gache and returns (interface{}, bool) according to key. -func (c *cache) Get(key string) (interface{}, bool) { +func (c *cache[V]) Get(key string) (V, bool) { return c.gache.Get(key) } // Set calls Set func of c.gache. -func (c *cache) Set(key string, val interface{}) { +func (c *cache[V]) Set(key string, val V) { c.gache.Set(key, val) } // Delete calls Delete func of c.gache. -func (c *cache) Delete(key string) { +func (c *cache[V]) Delete(key string) { c.gache.Delete(key) } // GetAndDelete returns (interface{}, bool) and delete value according to key when value of key is set. // When value of key is not set, returns (nil, false). -func (c *cache) GetAndDelete(key string) (interface{}, bool) { +func (c *cache[V]) GetAndDelete(key string) (V, bool) { v, ok := c.gache.Get(key) if !ok { - return nil, false + return *new(V), false } c.gache.Delete(key) return v, true diff --git a/internal/cache/gache/gache_test.go b/internal/cache/gache/gache_test.go index f8b3ec6b7b..d5724b13d6 100644 --- a/internal/cache/gache/gache_test.go +++ b/internal/cache/gache/gache_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "github.com/kpango/gache" + gache "github.com/kpango/gache/v2" "github.com/vdaas/vald/internal/cache/cacher" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/test/goleak" @@ -36,22 +36,22 @@ var goleakIgnoreOptions = []goleak.Option{ func TestNew(t *testing.T) { type args struct { - opts []Option + opts []Option[any] } type want struct { - wantC cacher.Cache + wantC cacher.Cache[any] } type test struct { name string args args want want - checkFunc func(want, cacher.Cache) error + checkFunc func(want, cacher.Cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, got cacher.Cache) error { - wc := reflect.ValueOf(w.wantC.(*cache)) - gc := reflect.ValueOf(got.(*cache)) + defaultCheckFunc := func(w want, got cacher.Cache[any]) error { + wc := reflect.ValueOf(w.wantC.(*cache[any])) + gc := reflect.ValueOf(got.(*cache[any])) flag := false for i := 0; i < reflect.Indirect(gc).NumField(); i++ { flag = reflect.DeepEqual(reflect.Indirect(gc).Field(i), reflect.Indirect(wc).Field(i)) @@ -63,8 +63,8 @@ func TestNew(t *testing.T) { } tests := []test{ func() test { - c := new(cache) - for _, opt := range defaultOptions() { + c := new(cache[any]) + for _, opt := range defaultOptions[any]() { opt(c) } c.gache.SetDefaultExpire(c.expireDur) @@ -77,8 +77,8 @@ func TestNew(t *testing.T) { }(), func() test { expiredHook := func(context.Context, string) {} - c := new(cache) - for _, opt := range append(defaultOptions(), WithExpiredHook(expiredHook)) { + c := new(cache[any]) + for _, opt := range append(defaultOptions[any](), WithExpiredHook[any](expiredHook)) { opt(c) } c.gache.SetDefaultExpire(c.expireDur) @@ -88,8 +88,8 @@ func TestNew(t *testing.T) { return test{ name: "set success when opts is not nil", args: args{ - opts: []Option{ - WithExpiredHook(expiredHook), + opts: []Option[any]{ + WithExpiredHook[any](expiredHook), }, }, want: want{ @@ -126,7 +126,7 @@ func Test_cache_Start(t *testing.T) { ctx context.Context } type fields struct { - gache gache.Gache + gache gache.Gache[any] expireDur time.Duration expireCheckDur time.Duration expiredHook func(context.Context, string) @@ -155,7 +155,7 @@ func Test_cache_Start(t *testing.T) { }(), }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -177,7 +177,7 @@ func Test_cache_Start(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - c := &cache{ + c := &cache[any]{ gache: test.fields.gache, expireDur: test.fields.expireDur, expireCheckDur: test.fields.expireCheckDur, @@ -196,7 +196,7 @@ func Test_cache_Get(t *testing.T) { key string } type fields struct { - gache gache.Gache + gache gache.Gache[any] expireDur time.Duration expireCheckDur time.Duration expiredHook func(context.Context, string) @@ -211,7 +211,7 @@ func Test_cache_Get(t *testing.T) { fields fields want want checkFunc func(want, interface{}, bool) error - beforeFunc func(*testing.T, args, *cache) + beforeFunc func(*testing.T, args, *cache[any]) afterFunc func(args) } defaultCheckFunc := func(w want, got interface{}, got1 bool) error { @@ -230,7 +230,7 @@ func Test_cache_Get(t *testing.T) { key: "vdaas", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -246,7 +246,7 @@ func Test_cache_Get(t *testing.T) { key: "vdaas", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -255,7 +255,7 @@ func Test_cache_Get(t *testing.T) { want: "vald", want1: true, }, - beforeFunc: func(t *testing.T, args args, c *cache) { + beforeFunc: func(t *testing.T, args args, c *cache[any]) { t.Helper() c.Set(args.key, "vald") }, @@ -266,7 +266,7 @@ func Test_cache_Get(t *testing.T) { test := tc t.Run(test.name, func(tt *testing.T) { defer goleak.VerifyNone(tt, goleakIgnoreOptions...) - c := &cache{ + c := &cache[any]{ gache: test.fields.gache, expireDur: test.fields.expireDur, expireCheckDur: test.fields.expireCheckDur, @@ -296,7 +296,7 @@ func Test_cache_Set(t *testing.T) { val interface{} } type fields struct { - gache gache.Gache + gache gache.Gache[any] expireDur time.Duration expireCheckDur time.Duration expiredHook func(context.Context, string) @@ -311,11 +311,11 @@ func Test_cache_Set(t *testing.T) { args args fields fields want want - checkFunc func(want, *cache) error + checkFunc func(want, *cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, c *cache) error { + defaultCheckFunc := func(w want, c *cache[any]) error { got, got1 := c.Get(w.key) if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want = %v", got, w.want) @@ -333,7 +333,7 @@ func Test_cache_Set(t *testing.T) { val: "vald", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -360,7 +360,7 @@ func Test_cache_Set(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - c := &cache{ + c := &cache[any]{ gache: test.fields.gache, expireDur: test.fields.expireDur, expireCheckDur: test.fields.expireCheckDur, @@ -380,7 +380,7 @@ func Test_cache_Delete(t *testing.T) { key string } type fields struct { - gache gache.Gache + gache gache.Gache[any] expireDur time.Duration expireCheckDur time.Duration expiredHook func(context.Context, string) @@ -395,11 +395,11 @@ func Test_cache_Delete(t *testing.T) { args args fields fields want want - checkFunc func(want, *cache) error - beforeFunc func(*testing.T, args, *cache) + checkFunc func(want, *cache[any]) error + beforeFunc func(*testing.T, args, *cache[any]) afterFunc func(args) } - defaultCheckFunc := func(w want, c *cache) error { + defaultCheckFunc := func(w want, c *cache[any]) error { got, got1 := c.Get(w.key) if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want = %v", got, w.want) @@ -416,7 +416,7 @@ func Test_cache_Delete(t *testing.T) { key: "vdaas", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -433,7 +433,7 @@ func Test_cache_Delete(t *testing.T) { key: "vdaas", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -443,7 +443,7 @@ func Test_cache_Delete(t *testing.T) { want: nil, want1: false, }, - beforeFunc: func(t *testing.T, args args, c *cache) { + beforeFunc: func(t *testing.T, args args, c *cache[any]) { t.Helper() c.Set(args.key, "vald") }, @@ -454,7 +454,7 @@ func Test_cache_Delete(t *testing.T) { test := tc t.Run(test.name, func(tt *testing.T) { defer goleak.VerifyNone(tt, goleakIgnoreOptions...) - c := &cache{ + c := &cache[any]{ gache: test.fields.gache, expireDur: test.fields.expireDur, expireCheckDur: test.fields.expireCheckDur, @@ -484,7 +484,7 @@ func Test_cache_GetAndDelete(t *testing.T) { key string } type fields struct { - gache gache.Gache + gache gache.Gache[any] expireDur time.Duration expireCheckDur time.Duration expiredHook func(context.Context, string) @@ -499,7 +499,7 @@ func Test_cache_GetAndDelete(t *testing.T) { fields fields want want checkFunc func(want, interface{}, bool) error - beforeFunc func(*testing.T, args, *cache) + beforeFunc func(*testing.T, args, *cache[any]) afterFunc func(args) } defaultCheckFunc := func(w want, got interface{}, got1 bool) error { @@ -518,7 +518,7 @@ func Test_cache_GetAndDelete(t *testing.T) { key: "vdaas", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -534,7 +534,7 @@ func Test_cache_GetAndDelete(t *testing.T) { key: "vdaas", }, fields: fields{ - gache: gache.New(), + gache: gache.New[any](), expireDur: 1 * time.Second, expireCheckDur: 1 * time.Second, expiredHook: nil, @@ -543,7 +543,7 @@ func Test_cache_GetAndDelete(t *testing.T) { want: "vald", want1: true, }, - beforeFunc: func(t *testing.T, args args, c *cache) { + beforeFunc: func(t *testing.T, args args, c *cache[any]) { t.Helper() c.Set(args.key, "vald") }, @@ -554,7 +554,7 @@ func Test_cache_GetAndDelete(t *testing.T) { test := tc t.Run(test.name, func(tt *testing.T) { defer goleak.VerifyNone(tt, goleakIgnoreOptions...) - c := &cache{ + c := &cache[any]{ gache: test.fields.gache, expireDur: test.fields.expireDur, expireCheckDur: test.fields.expireCheckDur, diff --git a/internal/cache/gache/option.go b/internal/cache/gache/option.go index 13116f2ccb..516ea8dfc3 100644 --- a/internal/cache/gache/option.go +++ b/internal/cache/gache/option.go @@ -21,29 +21,29 @@ import ( "context" "time" - "github.com/kpango/gache" + gache "github.com/kpango/gache/v2" ) // Option represents the functional option for cache. -type Option func(*cache) +type Option[V any] func(*cache[V]) // defaultOptions returns []Option with gache.New(). -func defaultOptions() []Option { - return []Option{ - WithGache(gache.New()), +func defaultOptions[V any]() []Option[V] { + return []Option[V]{ + WithGache(gache.New[V]()), } } // WithGache returns Option after set gache to cache. -func WithGache(g gache.Gache) Option { - return func(c *cache) { +func WithGache[V any](g gache.Gache[V]) Option[V] { + return func(c *cache[V]) { c.gache = g } } // WithExpiredHook returns Option after set expiredHook when f is not nil. -func WithExpiredHook(f func(context.Context, string)) Option { - return func(c *cache) { +func WithExpiredHook[V any](f func(context.Context, string)) Option[V] { + return func(c *cache[V]) { if f != nil { c.expiredHook = f } @@ -51,8 +51,8 @@ func WithExpiredHook(f func(context.Context, string)) Option { } // WithExpireDuration returns Option after set expireDur when dur is not 0. -func WithExpireDuration(dur time.Duration) Option { - return func(c *cache) { +func WithExpireDuration[V any](dur time.Duration) Option[V] { + return func(c *cache[V]) { if dur != 0 { c.expireDur = dur } @@ -60,8 +60,8 @@ func WithExpireDuration(dur time.Duration) Option { } // WithExpireCheckDuration returns Option after set expireCheckDur when dur is not 0. -func WithExpireCheckDuration(dur time.Duration) Option { - return func(c *cache) { +func WithExpireCheckDuration[V any](dur time.Duration) Option[V] { + return func(c *cache[V]) { if dur != 0 { c.expireCheckDur = dur } diff --git a/internal/cache/gache/option_test.go b/internal/cache/gache/option_test.go index e38fd8b53e..b3c12ebfb0 100644 --- a/internal/cache/gache/option_test.go +++ b/internal/cache/gache/option_test.go @@ -15,6 +15,7 @@ // // Package gache provides implementation of cache using gache + package gache import ( @@ -24,7 +25,7 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/kpango/gache" + gache "github.com/kpango/gache/v2" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/test/goleak" ) @@ -32,22 +33,22 @@ import ( func TestDefaultOptions(t *testing.T) { type args struct{} type want struct { - want *cache + want *cache[any] } type test struct { name string args args want want - checkFunc func(want, *cache) error + checkFunc func(want, *cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, got *cache) error { + defaultCheckFunc := func(w want, got *cache[any]) error { opts := []cmp.Option{ cmp.AllowUnexported(*w.want), cmp.AllowUnexported(*got), - cmp.Comparer(func(want, got *cache) bool { + cmp.Comparer(func(want, got *cache[any]) bool { return want.gache != nil && got.gache != nil }), } @@ -61,8 +62,8 @@ func TestDefaultOptions(t *testing.T) { { name: "set succuess", want: want{ - want: &cache{ - gache: gache.New(), + want: &cache[any]{ + gache: gache.New[any](), }, }, }, @@ -82,8 +83,8 @@ func TestDefaultOptions(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - g := new(cache) - for _, opt := range defaultOptions() { + g := new(cache[any]) + for _, opt := range defaultOptions[any]() { opt(g) } if err := checkFunc(test.want, g); err != nil { @@ -94,9 +95,9 @@ func TestDefaultOptions(t *testing.T) { } func TestWithGache(t *testing.T) { - type T = cache + type T = cache[any] type args struct { - g gache.Gache + g gache.Gache[any] } type want struct { want *T @@ -119,7 +120,7 @@ func TestWithGache(t *testing.T) { tests := []test{ func() test { - ga := gache.New() + ga := gache.New[any]() return test{ name: "set succuess when g is not nil", args: args{ @@ -167,7 +168,7 @@ func TestWithGache(t *testing.T) { } func TestWithExpiredHook(t *testing.T) { - type T = cache + type T = cache[any] type args struct { f func(context.Context, string) } @@ -235,7 +236,7 @@ func TestWithExpiredHook(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - got := WithExpiredHook(test.args.f) + got := WithExpiredHook[any](test.args.f) want := new(T) got(want) if err := checkFunc(test.want, want); err != nil { @@ -246,7 +247,7 @@ func TestWithExpiredHook(t *testing.T) { } func TestWithExpireDuration(t *testing.T) { - type T = cache + type T = cache[any] type args struct { dur time.Duration } @@ -307,7 +308,7 @@ func TestWithExpireDuration(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - got := WithExpireDuration(test.args.dur) + got := WithExpireDuration[any](test.args.dur) want := new(T) got(want) if err := checkFunc(test.want, want); err != nil { @@ -318,7 +319,7 @@ func TestWithExpireDuration(t *testing.T) { } func TestWithExpireCheckDuration(t *testing.T) { - type T = cache + type T = cache[any] type args struct { dur time.Duration } @@ -378,7 +379,7 @@ func TestWithExpireCheckDuration(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - got := WithExpireCheckDuration(test.args.dur) + got := WithExpireCheckDuration[any](test.args.dur) want := new(T) got(want) if err := checkFunc(test.want, want); err != nil { @@ -392,16 +393,16 @@ func TestWithExpireCheckDuration(t *testing.T) { func Test_defaultOptions(t *testing.T) { type want struct { - want []Option + want []Option[any] } type test struct { name string want want - checkFunc func(want, []Option) error + checkFunc func(want, []Option[any]) error beforeFunc func(*testing.T) afterFunc func(*testing.T) } - defaultCheckFunc := func(w want, got []Option) error { + defaultCheckFunc := func(w want, got []Option[any]) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) } @@ -457,7 +458,7 @@ func Test_defaultOptions(t *testing.T) { checkFunc = defaultCheckFunc } - got := defaultOptions() + got := defaultOptions[any]() if err := checkFunc(test.want, got); err != nil { tt.Errorf("error = %v", err) } diff --git a/internal/cache/option.go b/internal/cache/option.go index 9094ada587..932a7718b2 100644 --- a/internal/cache/option.go +++ b/internal/cache/option.go @@ -25,17 +25,19 @@ import ( ) // Option represents the functional option for cache. -type Option func(*cache) +type Option[V any] func(*cache[V]) -var defaultOptions = []Option{ - WithType(cacher.GACHE.String()), - WithExpireDuration("30m"), - WithExpireCheckDuration("5m"), +func defaultOptions[V any]() []Option[V] { + return []Option[V]{ + WithType[V](cacher.GACHE.String()), + WithExpireDuration[V]("30m"), + WithExpireCheckDuration[V]("5m"), + } } // WithExpiredHook returns Option after set expiredHook when f is not nil. -func WithExpiredHook(f func(context.Context, string)) Option { - return func(c *cache) { +func WithExpiredHook[V any](f func(context.Context, string)) Option[V] { + return func(c *cache[V]) { if f != nil { c.expiredHook = f } @@ -43,8 +45,8 @@ func WithExpiredHook(f func(context.Context, string)) Option { } // WithType returns Option after set cacher when len(mo string) is not nil. -func WithType(mo string) Option { - return func(c *cache) { +func WithType[V any](mo string) Option[V] { + return func(c *cache[V]) { if len(mo) == 0 { return } @@ -54,8 +56,8 @@ func WithType(mo string) Option { } // WithExpireDuration returns Option after set expireDur when dur is cprrect param. -func WithExpireDuration(dur string) Option { - return func(c *cache) { +func WithExpireDuration[V any](dur string) Option[V] { + return func(c *cache[V]) { if len(dur) == 0 { return } @@ -68,8 +70,8 @@ func WithExpireDuration(dur string) Option { } // WithExpireCheckDuration returns Option after set expireCheckDur when dur is cprrect param. -func WithExpireCheckDuration(dur string) Option { - return func(c *cache) { +func WithExpireCheckDuration[V any](dur string) Option[V] { + return func(c *cache[V]) { if len(dur) == 0 { return } diff --git a/internal/cache/option_test.go b/internal/cache/option_test.go index cc14060907..f41f6ccd15 100644 --- a/internal/cache/option_test.go +++ b/internal/cache/option_test.go @@ -33,17 +33,17 @@ func TestWithExpiredHook(t *testing.T) { f func(context.Context, string) } type want struct { - want *cache + want *cache[any] } type test struct { name string args args want want - checkFunc func(want, *cache) error + checkFunc func(want, *cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, got *cache) error { + defaultCheckFunc := func(w want, got *cache[any]) error { if reflect.ValueOf(w.want.expiredHook).Pointer() != reflect.ValueOf(got.expiredHook).Pointer() { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) } @@ -58,7 +58,7 @@ func TestWithExpiredHook(t *testing.T) { f: fn, }, want: want{ - want: &cache{ + want: &cache[any]{ expiredHook: fn, }, }, @@ -71,7 +71,7 @@ func TestWithExpiredHook(t *testing.T) { f: nil, }, want: want{ - want: &cache{}, + want: &cache[any]{}, }, } }(), @@ -92,8 +92,8 @@ func TestWithExpiredHook(t *testing.T) { checkFunc = defaultCheckFunc } - got := new(cache) - opts := WithExpiredHook(test.args.f) + got := new(cache[any]) + opts := WithExpiredHook[any](test.args.f) opts(got) if err := checkFunc(test.want, got); err != nil { tt.Errorf("error = %v", err) @@ -107,17 +107,17 @@ func TestWithType(t *testing.T) { mo string } type want struct { - want *cache + want *cache[any] } type test struct { name string args args want want - checkFunc func(want, *cache) error + checkFunc func(want, *cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, got *cache) error { + defaultCheckFunc := func(w want, got *cache[any]) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) } @@ -132,7 +132,7 @@ func TestWithType(t *testing.T) { mo: val, }, want: want{ - want: &cache{ + want: &cache[any]{ cacher: cacher.ToType(val), }, }, @@ -142,7 +142,7 @@ func TestWithType(t *testing.T) { return test{ name: "set success when len(mo) is 0", want: want{ - want: &cache{}, + want: &cache[any]{}, }, } }(), @@ -163,8 +163,8 @@ func TestWithType(t *testing.T) { checkFunc = defaultCheckFunc } - got := new(cache) - opts := WithType(test.args.mo) + got := new(cache[any]) + opts := WithType[any](test.args.mo) opts(got) if err := checkFunc(test.want, got); err != nil { tt.Errorf("error = %v", err) @@ -178,17 +178,17 @@ func TestWithExpireDuration(t *testing.T) { dur string } type want struct { - want *cache + want *cache[any] } type test struct { name string args args want want - checkFunc func(want, *cache) error + checkFunc func(want, *cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, got *cache) error { + defaultCheckFunc := func(w want, got *cache[any]) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) } @@ -204,7 +204,7 @@ func TestWithExpireDuration(t *testing.T) { dur: val, }, want: want{ - want: &cache{ + want: &cache[any]{ expireDur: dur, }, }, @@ -214,7 +214,7 @@ func TestWithExpireDuration(t *testing.T) { return test{ name: "set success when dur is empty", want: want{ - want: &cache{}, + want: &cache[any]{}, }, } }(), @@ -226,7 +226,7 @@ func TestWithExpireDuration(t *testing.T) { dur: val, }, want: want{ - want: &cache{}, + want: &cache[any]{}, }, } }(), @@ -247,8 +247,8 @@ func TestWithExpireDuration(t *testing.T) { checkFunc = defaultCheckFunc } - got := new(cache) - opts := WithExpireDuration(test.args.dur) + got := new(cache[any]) + opts := WithExpireDuration[any](test.args.dur) opts(got) if err := checkFunc(test.want, got); err != nil { tt.Errorf("error = %v", err) @@ -262,17 +262,17 @@ func TestWithExpireCheckDuration(t *testing.T) { dur string } type want struct { - want *cache + want *cache[any] } type test struct { name string args args want want - checkFunc func(want, *cache) error + checkFunc func(want, *cache[any]) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, got *cache) error { + defaultCheckFunc := func(w want, got *cache[any]) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) } @@ -288,7 +288,7 @@ func TestWithExpireCheckDuration(t *testing.T) { dur: val, }, want: want{ - want: &cache{ + want: &cache[any]{ expireCheckDur: dur, }, }, @@ -298,7 +298,7 @@ func TestWithExpireCheckDuration(t *testing.T) { return test{ name: "set success when dur is empty", want: want{ - want: &cache{}, + want: &cache[any]{}, }, } }(), @@ -310,7 +310,7 @@ func TestWithExpireCheckDuration(t *testing.T) { dur: val, }, want: want{ - want: &cache{}, + want: &cache[any]{}, }, } }(), @@ -331,8 +331,8 @@ func TestWithExpireCheckDuration(t *testing.T) { checkFunc = defaultCheckFunc } - got := new(cache) - opts := WithExpireCheckDuration(test.args.dur) + got := new(cache[any]) + opts := WithExpireCheckDuration[any](test.args.dur) opts(got) if err := checkFunc(test.want, got); err != nil { tt.Errorf("error = %v", err) diff --git a/internal/circuitbreaker/manager.go b/internal/circuitbreaker/manager.go index a7846b9aaa..66d259c7eb 100644 --- a/internal/circuitbreaker/manager.go +++ b/internal/circuitbreaker/manager.go @@ -20,6 +20,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" + valdsync "github.com/vdaas/vald/internal/sync" ) // NOTE: This variable is for observability package. @@ -36,7 +37,7 @@ type CircuitBreaker interface { } type breakerManager struct { - m sync.Map // breaker group. key: string, value: *breaker. + m valdsync.Map[string, *breaker] opts []BreakerOption } @@ -74,22 +75,15 @@ func (bm *breakerManager) Do(ctx context.Context, key string, fn func(ctx contex var br *breaker // Pre-loading to prevent a lot of object generation. - obj, ok := bm.m.Load(key) + br, ok := bm.m.Load(key) if !ok { br, err = newBreaker(key, bm.opts...) if err != nil { return nil, err } - obj, _ = bm.m.LoadOrStore(key, br) - } - br, ok = obj.(*breaker) - if !ok { - br, err = newBreaker(key, bm.opts...) - if err != nil { - return nil, err - } - bm.m.Store(key, br) + br, _ = bm.m.LoadOrStore(key, br) } + val, st, err = br.do(ctx, fn) if err != nil { switch st { diff --git a/internal/client/v1/client/discoverer/discover.go b/internal/client/v1/client/discoverer/discover.go index c553219c93..f4f2e6e8e1 100644 --- a/internal/client/v1/client/discoverer/discover.go +++ b/internal/client/v1/client/discoverer/discover.go @@ -20,7 +20,6 @@ package discoverer import ( "context" "reflect" - "sync" "sync/atomic" "time" @@ -32,6 +31,7 @@ import ( "github.com/vdaas/vald/internal/net" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/safety" + valdsync "github.com/vdaas/vald/internal/sync" ) type Client interface { @@ -344,7 +344,7 @@ func (c *client) disconnectOldAddrs(ctx context.Context, oldAddrs, connectedAddr if !c.autoconn { return nil } - var cur sync.Map + var cur valdsync.Map[string, any] // TODO: Does this have to be a sync.Map not a map? for _, addr := range connectedAddrs { cur.Store(addr, struct{}{}) } diff --git a/internal/client/v1/client/filter/egress/client.go b/internal/client/v1/client/filter/egress/client.go index 7c1f943102..77ea9793cf 100644 --- a/internal/client/v1/client/filter/egress/client.go +++ b/internal/client/v1/client/filter/egress/client.go @@ -20,7 +20,6 @@ package egress import ( "context" "reflect" - "sync" "github.com/vdaas/vald/apis/grpc/v1/filter/egress" "github.com/vdaas/vald/apis/grpc/v1/payload" @@ -28,6 +27,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/strings" + valdsync "github.com/vdaas/vald/internal/sync" ) type Client interface { @@ -40,7 +40,7 @@ type Client interface { type client struct { addrs []string - cl sync.Map + cl valdsync.Map[string, any] c grpc.Client } diff --git a/internal/client/v1/client/filter/ingress/client.go b/internal/client/v1/client/filter/ingress/client.go index 26e2f8120e..7222e0763d 100644 --- a/internal/client/v1/client/filter/ingress/client.go +++ b/internal/client/v1/client/filter/ingress/client.go @@ -20,7 +20,6 @@ package ingress import ( "context" "reflect" - "sync" "github.com/vdaas/vald/apis/grpc/v1/filter/ingress" "github.com/vdaas/vald/apis/grpc/v1/payload" @@ -28,6 +27,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/strings" + valdsync "github.com/vdaas/vald/internal/sync" ) type Client interface { @@ -40,7 +40,7 @@ type Client interface { type client struct { addrs []string - cl sync.Map + cl valdsync.Map[string, any] c grpc.Client } diff --git a/internal/net/dialer.go b/internal/net/dialer.go index 39251a9c7b..692cc1e058 100644 --- a/internal/net/dialer.go +++ b/internal/net/dialer.go @@ -33,6 +33,7 @@ import ( "github.com/vdaas/vald/internal/net/control" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" + valdsync "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/tls" ) @@ -44,7 +45,7 @@ type Dialer interface { } type dialer struct { - dnsCache cacher.Cache + dnsCache cacher.Cache[*dialerCache] enableDNSCache bool dnsCachedOnce sync.Once tlsConfig *tls.Config @@ -59,7 +60,7 @@ type dialer struct { ctrl control.SocketController sockFlg control.SocketFlag dialerDualStack bool - addrs sync.Map + addrs valdsync.Map[string, *addrInfo] der *net.Dialer dialer func(ctx context.Context, network, addr string) (Conn, error) } @@ -121,9 +122,9 @@ func NewDialer(opts ...DialerOption) (der Dialer, err error) { } if d.dnsCache == nil { if d.dnsCache, err = cache.New( - cache.WithExpireDuration(d.dnsCacheExpirationStr), - cache.WithExpireCheckDuration(d.dnsRefreshDurationStr), - cache.WithExpiredHook(d.cacheExpireHook), + cache.WithExpireDuration[*dialerCache](d.dnsCacheExpirationStr), + cache.WithExpireCheckDuration[*dialerCache](d.dnsRefreshDurationStr), + cache.WithExpiredHook[*dialerCache](d.cacheExpireHook), ); err != nil { return nil, err } @@ -146,10 +147,8 @@ func (d *dialer) GetDialer() func(ctx context.Context, network, addr string) (Co func (d *dialer) lookup(ctx context.Context, host string) (dc *dialerCache, err error) { if d.enableDNSCache { - dnsCache, ok := d.dnsCache.Get(host) - if ok && dnsCache != nil { - dc, ok = dnsCache.(*dialerCache) - if ok && dc != nil && len(dc.ips) > 0 { + if dc, ok := d.dnsCache.Get(host); ok { + if dc != nil && len(dc.ips) > 0 { return dc, nil } } @@ -256,12 +255,9 @@ func (d *dialer) cachedDialer(ctx context.Context, network, addr string) (conn C isIP: isV4 || isV6, }) } else { - info, ok := ai.(*addrInfo) - if ok { - host = info.host - port = info.port - isIP = info.isIP - } + host = ai.host + port = ai.port + isIP = ai.isIP } if d.enableDNSCache && !isIP { diff --git a/internal/net/dialer_test.go b/internal/net/dialer_test.go index a228c56643..8ee8625c5a 100644 --- a/internal/net/dialer_test.go +++ b/internal/net/dialer_test.go @@ -273,7 +273,7 @@ func TestNewDialer(t *testing.T) { cmpopts.IgnoreFields(*want, "dialer", "der", "addrs", "dnsCachedOnce", "dnsCache", "ctrl", "tmu"), // skipcq: VET-V0008 cmp.AllowUnexported(*want), - cmp.Comparer(func(x, y cacher.Cache) bool { + cmp.Comparer(func(x, y cacher.Cache[*dialerCache]) bool { if x == nil && y == nil { return true } @@ -493,7 +493,7 @@ func Test_dialer_lookup(t *testing.T) { addr: "google.com", }, opts: []DialerOption{ - WithDNSCache(gache.New()), + WithDNSCache(gache.New[*dialerCache]()), WithDisableDialerDualStack(), }, checkFunc: func(ctx context.Context, w want, got *dialerCache, err error, d *dialer) error { @@ -533,8 +533,8 @@ func Test_dialer_lookup(t *testing.T) { addr: "addr", }, opts: []DialerOption{ - WithDNSCache(func() cacher.Cache { - g := gache.New() + WithDNSCache(func() cacher.Cache[*dialerCache] { + g := gache.New[*dialerCache]() g.Set("addr", &dialerCache{ ips: []string{"999.999.999.999"}, }) @@ -604,7 +604,7 @@ func Test_dialer_StartDialerCache(t *testing.T) { tests := []test{ func() test { addr := "localhost" - ips := []string{} + var ips []string return test{ name: "cache refresh when it is expired", @@ -629,7 +629,7 @@ func Test_dialer_StartDialerCache(t *testing.T) { if !ok { return errors.New("cache not found") } - if val == nil || len(val.(*dialerCache).ips) != ipLen { + if val == nil || len(val.ips) != ipLen { return errors.Errorf("cache is not correct, gotLen: %v, want: %v", val, ipLen) } return nil @@ -923,7 +923,7 @@ func Test_dialer_cachedDialer(t *testing.T) { w.WriteHeader(200) })) - c, err := cache.New() + c, err := cache.New[*dialerCache]() if err != nil { t.Error(err) } @@ -977,7 +977,7 @@ func Test_dialer_cachedDialer(t *testing.T) { })) srv.TLS.InsecureSkipVerify = true - c, err := cache.New() + c, err := cache.New[*dialerCache]() if err != nil { t.Error(err) } @@ -1033,7 +1033,7 @@ func Test_dialer_cachedDialer(t *testing.T) { func() test { addr := "invalid_ip" - c, err := cache.New() + c, err := cache.New[*dialerCache]() if err != nil { t.Error(err) } @@ -1071,7 +1071,7 @@ func Test_dialer_cachedDialer(t *testing.T) { }(), func() test { addr := "google.com" - c, err := cache.New() + c, err := cache.New[*dialerCache]() if err != nil { t.Error(err) } @@ -1124,7 +1124,7 @@ func Test_dialer_cachedDialer(t *testing.T) { addrs[i] = JoinHostPort(hosts[i], ports[i]) } - c, err := cache.New() + c, err := cache.New[*dialerCache]() if err != nil { t.Error(err) } @@ -1225,7 +1225,7 @@ func Test_dialer_cachedDialer(t *testing.T) { t.Error(err) } - c, err := cache.New() + c, err := cache.New[*dialerCache]() if err != nil { t.Error(err) } @@ -1255,8 +1255,8 @@ func Test_dialer_cachedDialer(t *testing.T) { return errors.New("conn is nil") } - c, _ := d.dnsCache.Get(host) - if dc := c.(*dialerCache); dc.cnt != 0 { + dc, _ := d.dnsCache.Get(host) + if dc.cnt != 0 { return errors.Errorf("count do not reset, cnt: %v", dc.cnt) } @@ -1830,7 +1830,7 @@ func Test_dialer_lookupIPAddrs(t *testing.T) { host string } type fields struct { - dnsCache cacher.Cache + dnsCache cacher.Cache[*dialerCache] enableDNSCache bool tlsConfig *tls.Config dnsRefreshDurationStr string diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index c4fe2ef3a1..95dfe07e85 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -20,7 +20,6 @@ package grpc import ( "context" "math" - "sync" "sync/atomic" "time" @@ -38,6 +37,7 @@ import ( "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/singleflight" "github.com/vdaas/vald/internal/strings" + valdsync "github.com/vdaas/vald/internal/sync" "google.golang.org/grpc" gbackoff "google.golang.org/grpc/backoff" ) @@ -110,7 +110,7 @@ type gRPCClient struct { gbo gbackoff.Config // grpc's original backoff configuration mcd time.Duration // minimum connection timeout duration group singleflight.Group[pool.Conn] - crl sync.Map // connection request list + crl valdsync.Map[string, bool] // connection request list ech <-chan error monitorRunning atomic.Bool @@ -302,18 +302,15 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error, } } clctx, cancel := context.WithTimeout(ctx, reconnLimitDuration) - g.crl.Range(func(a, bo interface{}) bool { + g.crl.Range(func(addr string, enabled bool) bool { select { case <-clctx.Done(): return false default: - defer g.crl.Delete(a) - addr, ok := a.(string) - if !ok { - return true - } + defer g.crl.Delete(addr) + var p pool.Conn - if enabled, ok := bo.(bool); ok && enabled && g.bo != nil { + if enabled && g.bo != nil { _, err = g.bo.Do(clctx, func(ictx context.Context) (r interface{}, ret bool, err error) { p, err = g.Connect(ictx, addr) return nil, err != nil, err diff --git a/internal/net/grpc/pool/pool_bench_test.go b/internal/net/grpc/pool/pool_bench_test.go index 04efc2cc54..a8d7515c1f 100644 --- a/internal/net/grpc/pool/pool_bench_test.go +++ b/internal/net/grpc/pool/pool_bench_test.go @@ -27,6 +27,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/log/level" "github.com/vdaas/vald/internal/net" + valdsync "github.com/vdaas/vald/internal/sync" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -133,7 +134,7 @@ func Benchmark_StaticDial(b *testing.B) { b.Error(err) } - conns := new(sync.Map) + conns := new(valdsync.Map[string, *grpc.ClientConn]) conns.Store(DefaultServerAddr, conn) b.StopTimer() @@ -143,7 +144,7 @@ func Benchmark_StaticDial(b *testing.B) { for i := 0; i < b.N; i++ { val, ok := conns.Load(DefaultServerAddr) if ok { - do(b, val.(*ClientConn)) + do(b, val) } } b.StopTimer() @@ -190,7 +191,7 @@ func BenchmarkParallel_StaticDial(b *testing.B) { b.Error(err) } - conns := new(sync.Map) + conns := new(valdsync.Map[string, *grpc.ClientConn]) conns.Store(DefaultServerAddr, conn) b.StopTimer() @@ -201,7 +202,7 @@ func BenchmarkParallel_StaticDial(b *testing.B) { for pb.Next() { val, ok := conns.Load(DefaultServerAddr) if ok { - do(b, val.(*ClientConn)) + do(b, val) } } }) diff --git a/internal/net/option.go b/internal/net/option.go index 13d22e8c69..a9b31b173d 100644 --- a/internal/net/option.go +++ b/internal/net/option.go @@ -38,7 +38,7 @@ var defaultDialerOptions = []DialerOption{ } // WithDNSCache returns the functional option to set the cache. -func WithDNSCache(c cacher.Cache) DialerOption { +func WithDNSCache(c cacher.Cache[*dialerCache]) DialerOption { return func(d *dialer) { d.dnsCache = c if d.dnsCache != nil { diff --git a/internal/net/option_test.go b/internal/net/option_test.go index 79cda42e7a..6e9f14758d 100644 --- a/internal/net/option_test.go +++ b/internal/net/option_test.go @@ -34,7 +34,7 @@ func TestWithDNSCache(t *testing.T) { t.Parallel() type T = dialer type args struct { - c cacher.Cache + c cacher.Cache[*dialerCache] } type want struct { obj *T @@ -57,7 +57,7 @@ func TestWithDNSCache(t *testing.T) { tests := []test{ func() test { - c := gache.New() + c := gache.New[*dialerCache]() return test{ name: "set cache success", args: args{ diff --git a/internal/singleflight/singleflight.go b/internal/singleflight/singleflight.go index ac669c089d..94e72b149c 100644 --- a/internal/singleflight/singleflight.go +++ b/internal/singleflight/singleflight.go @@ -21,6 +21,8 @@ import ( "context" "sync" "sync/atomic" + + valdsync "github.com/vdaas/vald/internal/sync" ) type call[V any] struct { @@ -36,7 +38,7 @@ type Group[V any] interface { } type group[V any] struct { - m sync.Map + m valdsync.Map[string, *call[V]] } // New returns Group implementation. @@ -49,8 +51,7 @@ func New[V any]() Group[V] { // If duplicate comes, the duplicated call with the same key will wait for the first caller return. // It returns the result and the error of the given function, and whether the result is shared from the first caller. func (g *group[V]) Do(_ context.Context, key string, fn func() (V, error)) (v V, shared bool, err error) { - actual, loaded := g.m.LoadOrStore(key, new(call[V])) - c := actual.(*call[V]) + c, loaded := g.m.LoadOrStore(key, new(call[V])) if loaded { atomic.AddUint64(&c.dups, 1) c.wg.Wait() diff --git a/internal/sync/map.go b/internal/sync/map.go new file mode 100644 index 0000000000..25bb07f071 --- /dev/null +++ b/internal/sync/map.go @@ -0,0 +1,7 @@ +package sync + +import gache "github.com/kpango/gache/v2" + +type Map[K comparable, V any] struct { + gache.Map[K, V] +} diff --git a/pkg/gateway/lb/handler/grpc/aggregation.go b/pkg/gateway/lb/handler/grpc/aggregation.go index a3abe71bf5..3779201327 100644 --- a/pkg/gateway/lb/handler/grpc/aggregation.go +++ b/pkg/gateway/lb/handler/grpc/aggregation.go @@ -35,6 +35,7 @@ import ( "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/slices" + valdsync "github.com/vdaas/vald/internal/sync" ) type Aggregator interface { @@ -365,7 +366,7 @@ type valdStdAggr struct { dch chan DistPayload closed atomic.Bool maxDist atomic.Value - visited sync.Map + visited valdsync.Map[string, any] result []*payload.Object_Distance cancel context.CancelFunc } @@ -495,7 +496,7 @@ type valdPairingHeapAggr struct { num int ph *PairingHeap mu sync.Mutex - visited sync.Map + visited valdsync.Map[string, any] result []*payload.Object_Distance } diff --git a/pkg/gateway/lb/service/gateway.go b/pkg/gateway/lb/service/gateway.go index 72446dd708..1b046637e6 100644 --- a/pkg/gateway/lb/service/gateway.go +++ b/pkg/gateway/lb/service/gateway.go @@ -20,7 +20,6 @@ package service import ( "context" "reflect" - "sync" "sync/atomic" "github.com/vdaas/vald/apis/grpc/v1/vald" @@ -29,6 +28,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/observability/trace" + valdsync "github.com/vdaas/vald/internal/sync" ) type Gateway interface { @@ -102,7 +102,7 @@ func (g *gateway) DoMulti(ctx context.Context, num int, } else { limit = uint32(num) } - var visited sync.Map + var visited valdsync.Map[string, any] err = g.client.GetClient().OrderedRange(sctx, addrs, func(ictx context.Context, addr string, conn *grpc.ClientConn, diff --git a/pkg/manager/index/service/indexer.go b/pkg/manager/index/service/indexer.go index 2251c85cfe..e57bd5ae80 100644 --- a/pkg/manager/index/service/indexer.go +++ b/pkg/manager/index/service/indexer.go @@ -21,7 +21,6 @@ import ( "context" "math" "reflect" - "sync" "sync/atomic" "time" @@ -36,6 +35,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" + valdsync "github.com/vdaas/vald/internal/sync" ) type Indexer interface { @@ -54,7 +54,7 @@ type index struct { saveIndexDurationLimit time.Duration saveIndexWaitDuration time.Duration saveIndexTargetAddrCh chan string - schMap sync.Map + schMap valdsync.Map[string, any] concurrency int indexInfos indexInfos indexing atomic.Value // bool