Skip to content

Commit

Permalink
Add gache's generic Map as internal/sync.Map and replace standard syn…
Browse files Browse the repository at this point in the history
…c.Map with it (#2115)

* add internal/sync map.go

* replace sync.Map with vald generic sync map

* replace indexmap with valdsync.Map

* replace nodemap with valdsync.Map

* replace podmetricsmap with valdsync.Map

* replace indexInfos with valdsync.Map

* replace ou

* replace uo

* replace other expunges

* remove redundant type

* style: Format code with prettier and gofumpt

* remove unused definition

---------

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
  • Loading branch information
ykadowak and deepsource-autofix[bot] authored Jul 11, 2023
1 parent 9774624 commit e52eb4a
Show file tree
Hide file tree
Showing 33 changed files with 174 additions and 7,538 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ require (
github.com/klauspost/compress v1.15.9
github.com/kpango/fastime v1.1.9
github.com/kpango/fuid v0.0.0-00010101000000-000000000000
github.com/kpango/gache/v2 v2.0.0-00010101000000-000000000000
github.com/kpango/gache/v2 v2.0.9
github.com/kpango/glg v1.6.15
github.com/leanovate/gopter v0.0.0-00010101000000-000000000000
github.com/lucasb-eyer/go-colorful v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -457,7 +457,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand Down
17 changes: 5 additions & 12 deletions internal/circuitbreaker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
valdsync "github.com/vdaas/vald/internal/sync"
)

// NOTE: This variable is for observability package.
Expand All @@ -36,7 +37,7 @@ type CircuitBreaker interface {
}

type breakerManager struct {
m sync.Map // breaker group. key: string, value: *breaker.
m valdsync.Map[string, *breaker]
opts []BreakerOption
}

Expand Down Expand Up @@ -72,24 +73,16 @@ func (bm *breakerManager) Do(ctx context.Context, key string, fn func(ctx contex
mu.Unlock()
}()

var br *breaker
// Pre-loading to prevent a lot of object generation.
obj, ok := bm.m.Load(key)
br, ok := bm.m.Load(key)
if !ok {
br, err = newBreaker(key, bm.opts...)
if err != nil {
return nil, err
}
obj, _ = bm.m.LoadOrStore(key, br)
}
br, ok = obj.(*breaker)
if !ok {
br, err = newBreaker(key, bm.opts...)
if err != nil {
return nil, err
}
bm.m.Store(key, br)
br, _ = bm.m.LoadOrStore(key, br)
}

val, st, err = br.do(ctx, fn)
if err != nil {
switch st {
Expand Down
4 changes: 2 additions & 2 deletions internal/client/v1/client/discoverer/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package discoverer
import (
"context"
"reflect"
"sync"
"sync/atomic"
"time"

Expand All @@ -32,6 +31,7 @@ import (
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
valdsync "github.com/vdaas/vald/internal/sync"
)

type Client interface {
Expand Down Expand Up @@ -344,7 +344,7 @@ func (c *client) disconnectOldAddrs(ctx context.Context, oldAddrs, connectedAddr
if !c.autoconn {
return nil
}
var cur sync.Map
var cur valdsync.Map[string, any] // TODO: Does this have to be a sync.Map not a map?
for _, addr := range connectedAddrs {
cur.Store(addr, struct{}{})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client/v1/client/filter/egress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package egress
import (
"context"
"reflect"
"sync"

"github.com/vdaas/vald/apis/grpc/v1/filter/egress"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/strings"
valdsync "github.com/vdaas/vald/internal/sync"
)

type Client interface {
Expand All @@ -40,7 +40,7 @@ type Client interface {

type client struct {
addrs []string
cl sync.Map
cl valdsync.Map[string, any]
c grpc.Client
}

Expand Down
4 changes: 2 additions & 2 deletions internal/client/v1/client/filter/ingress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package ingress
import (
"context"
"reflect"
"sync"

"github.com/vdaas/vald/apis/grpc/v1/filter/ingress"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/strings"
valdsync "github.com/vdaas/vald/internal/sync"
)

type Client interface {
Expand All @@ -40,7 +40,7 @@ type Client interface {

type client struct {
addrs []string
cl sync.Map
cl valdsync.Map[string, any]
c grpc.Client
}

Expand Down
12 changes: 5 additions & 7 deletions internal/net/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/vdaas/vald/internal/net/control"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
valdsync "github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/tls"
)

Expand All @@ -59,7 +60,7 @@ type dialer struct {
ctrl control.SocketController
sockFlg control.SocketFlag
dialerDualStack bool
addrs sync.Map
addrs valdsync.Map[string, *addrInfo]
der *net.Dialer
dialer func(ctx context.Context, network, addr string) (Conn, error)
}
Expand Down Expand Up @@ -254,12 +255,9 @@ func (d *dialer) cachedDialer(ctx context.Context, network, addr string) (conn C
isIP: isV4 || isV6,
})
} else {
info, ok := ai.(*addrInfo)
if ok {
host = info.host
port = info.port
isIP = info.isIP
}
host = ai.host
port = ai.port
isIP = ai.isIP
}

if d.enableDNSCache && !isIP {
Expand Down
15 changes: 6 additions & 9 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package grpc
import (
"context"
"math"
"sync"
"sync/atomic"
"time"

Expand All @@ -38,6 +37,7 @@ import (
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/singleflight"
"github.com/vdaas/vald/internal/strings"
valdsync "github.com/vdaas/vald/internal/sync"
"google.golang.org/grpc"
gbackoff "google.golang.org/grpc/backoff"
)
Expand Down Expand Up @@ -110,7 +110,7 @@ type gRPCClient struct {
gbo gbackoff.Config // grpc's original backoff configuration
mcd time.Duration // minimum connection timeout duration
group singleflight.Group[pool.Conn]
crl sync.Map // connection request list
crl valdsync.Map[string, bool] // connection request list

ech <-chan error
monitorRunning atomic.Bool
Expand Down Expand Up @@ -302,18 +302,15 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
}
}
clctx, cancel := context.WithTimeout(ctx, reconnLimitDuration)
g.crl.Range(func(a, bo interface{}) bool {
g.crl.Range(func(addr string, enabled bool) bool {
select {
case <-clctx.Done():
return false
default:
defer g.crl.Delete(a)
addr, ok := a.(string)
if !ok {
return true
}
defer g.crl.Delete(addr)

var p pool.Conn
if enabled, ok := bo.(bool); ok && enabled && g.bo != nil {
if enabled && g.bo != nil {
_, err = g.bo.Do(clctx, func(ictx context.Context) (r interface{}, ret bool, err error) {
p, err = g.Connect(ictx, addr)
return nil, err != nil, err
Expand Down
9 changes: 5 additions & 4 deletions internal/net/grpc/pool/pool_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/log/level"
"github.com/vdaas/vald/internal/net"
valdsync "github.com/vdaas/vald/internal/sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand Down Expand Up @@ -133,7 +134,7 @@ func Benchmark_StaticDial(b *testing.B) {
b.Error(err)
}

conns := new(sync.Map)
conns := new(valdsync.Map[string, *grpc.ClientConn])
conns.Store(DefaultServerAddr, conn)

b.StopTimer()
Expand All @@ -143,7 +144,7 @@ func Benchmark_StaticDial(b *testing.B) {
for i := 0; i < b.N; i++ {
val, ok := conns.Load(DefaultServerAddr)
if ok {
do(b, val.(*ClientConn))
do(b, val)
}
}
b.StopTimer()
Expand Down Expand Up @@ -190,7 +191,7 @@ func BenchmarkParallel_StaticDial(b *testing.B) {
b.Error(err)
}

conns := new(sync.Map)
conns := new(valdsync.Map[string, *grpc.ClientConn])
conns.Store(DefaultServerAddr, conn)

b.StopTimer()
Expand All @@ -201,7 +202,7 @@ func BenchmarkParallel_StaticDial(b *testing.B) {
for pb.Next() {
val, ok := conns.Load(DefaultServerAddr)
if ok {
do(b, val.(*ClientConn))
do(b, val)
}
}
})
Expand Down
7 changes: 4 additions & 3 deletions internal/singleflight/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"sync"
"sync/atomic"

valdsync "github.com/vdaas/vald/internal/sync"
)

type call[V any] struct {
Expand All @@ -36,7 +38,7 @@ type Group[V any] interface {
}

type group[V any] struct {
m sync.Map
m valdsync.Map[string, *call[V]]
}

// New returns Group implementation.
Expand All @@ -49,8 +51,7 @@ func New[V any]() Group[V] {
// If duplicate comes, the duplicated call with the same key will wait for the first caller return.
// It returns the result and the error of the given function, and whether the result is shared from the first caller.
func (g *group[V]) Do(_ context.Context, key string, fn func() (V, error)) (v V, shared bool, err error) {
actual, loaded := g.m.LoadOrStore(key, new(call[V]))
c := actual.(*call[V])
c, loaded := g.m.LoadOrStore(key, new(call[V]))
if loaded {
atomic.AddUint64(&c.dups, 1)
c.wg.Wait()
Expand Down
7 changes: 7 additions & 0 deletions internal/sync/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sync

import gache "github.com/kpango/gache/v2"

type Map[K comparable, V any] struct {
gache.Map[K, V]
}
Loading

0 comments on commit e52eb4a

Please sign in to comment.