Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gache's generic Map as internal/sync.Map and replace standard sync.Map with it #2115

Merged
merged 13 commits into from
Jul 11, 2023
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ require (
github.com/klauspost/compress v1.15.9
github.com/kpango/fastime v1.1.9
github.com/kpango/fuid v0.0.0-00010101000000-000000000000
github.com/kpango/gache/v2 v2.0.0-00010101000000-000000000000
github.com/kpango/gache/v2 v2.0.9
github.com/kpango/glg v1.6.15
github.com/leanovate/gopter v0.0.0-00010101000000-000000000000
github.com/lucasb-eyer/go-colorful v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -457,7 +457,7 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand Down
17 changes: 5 additions & 12 deletions internal/circuitbreaker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
valdsync "github.com/vdaas/vald/internal/sync"
)

// NOTE: This variable is for observability package.
Expand All @@ -36,7 +37,7 @@ type CircuitBreaker interface {
}

type breakerManager struct {
m sync.Map // breaker group. key: string, value: *breaker.
m valdsync.Map[string, *breaker]
opts []BreakerOption
}

Expand Down Expand Up @@ -72,24 +73,16 @@ func (bm *breakerManager) Do(ctx context.Context, key string, fn func(ctx contex
mu.Unlock()
}()

var br *breaker
// Pre-loading to prevent a lot of object generation.
obj, ok := bm.m.Load(key)
br, ok := bm.m.Load(key)
if !ok {
br, err = newBreaker(key, bm.opts...)
if err != nil {
return nil, err
}
obj, _ = bm.m.LoadOrStore(key, br)
}
br, ok = obj.(*breaker)
if !ok {
br, err = newBreaker(key, bm.opts...)
if err != nil {
return nil, err
}
bm.m.Store(key, br)
br, _ = bm.m.LoadOrStore(key, br)
}

val, st, err = br.do(ctx, fn)
if err != nil {
switch st {
Expand Down
4 changes: 2 additions & 2 deletions internal/client/v1/client/discoverer/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package discoverer
import (
"context"
"reflect"
"sync"
"sync/atomic"
"time"

Expand All @@ -32,6 +31,7 @@ import (
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
valdsync "github.com/vdaas/vald/internal/sync"
)

type Client interface {
Expand Down Expand Up @@ -344,7 +344,7 @@ func (c *client) disconnectOldAddrs(ctx context.Context, oldAddrs, connectedAddr
if !c.autoconn {
return nil
}
var cur sync.Map
var cur valdsync.Map[string, any] // TODO: Does this have to be a sync.Map not a map?
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
for _, addr := range connectedAddrs {
cur.Store(addr, struct{}{})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/client/v1/client/filter/egress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package egress
import (
"context"
"reflect"
"sync"

"github.com/vdaas/vald/apis/grpc/v1/filter/egress"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/strings"
valdsync "github.com/vdaas/vald/internal/sync"
)

type Client interface {
Expand All @@ -40,7 +40,7 @@ type Client interface {

type client struct {
addrs []string
cl sync.Map
cl valdsync.Map[string, any]
c grpc.Client
}

Expand Down
4 changes: 2 additions & 2 deletions internal/client/v1/client/filter/ingress/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package ingress
import (
"context"
"reflect"
"sync"

"github.com/vdaas/vald/apis/grpc/v1/filter/ingress"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/strings"
valdsync "github.com/vdaas/vald/internal/sync"
)

type Client interface {
Expand All @@ -40,7 +40,7 @@ type Client interface {

type client struct {
addrs []string
cl sync.Map
cl valdsync.Map[string, any]
c grpc.Client
}

Expand Down
12 changes: 5 additions & 7 deletions internal/net/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/vdaas/vald/internal/net/control"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
valdsync "github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/tls"
)

Expand All @@ -59,7 +60,7 @@ type dialer struct {
ctrl control.SocketController
sockFlg control.SocketFlag
dialerDualStack bool
addrs sync.Map
addrs valdsync.Map[string, *addrInfo]
der *net.Dialer
dialer func(ctx context.Context, network, addr string) (Conn, error)
}
Expand Down Expand Up @@ -254,12 +255,9 @@ func (d *dialer) cachedDialer(ctx context.Context, network, addr string) (conn C
isIP: isV4 || isV6,
})
} else {
info, ok := ai.(*addrInfo)
if ok {
host = info.host
port = info.port
isIP = info.isIP
}
host = ai.host
port = ai.port
isIP = ai.isIP
}

if d.enableDNSCache && !isIP {
Expand Down
15 changes: 6 additions & 9 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package grpc
import (
"context"
"math"
"sync"
"sync/atomic"
"time"

Expand All @@ -38,6 +37,7 @@ import (
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/singleflight"
"github.com/vdaas/vald/internal/strings"
valdsync "github.com/vdaas/vald/internal/sync"
"google.golang.org/grpc"
gbackoff "google.golang.org/grpc/backoff"
)
Expand Down Expand Up @@ -110,7 +110,7 @@ type gRPCClient struct {
gbo gbackoff.Config // grpc's original backoff configuration
mcd time.Duration // minimum connection timeout duration
group singleflight.Group[pool.Conn]
crl sync.Map // connection request list
crl valdsync.Map[string, bool] // connection request list

ech <-chan error
monitorRunning atomic.Bool
Expand Down Expand Up @@ -302,18 +302,15 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
}
}
clctx, cancel := context.WithTimeout(ctx, reconnLimitDuration)
g.crl.Range(func(a, bo interface{}) bool {
g.crl.Range(func(addr string, enabled bool) bool {
select {
case <-clctx.Done():
return false
default:
defer g.crl.Delete(a)
addr, ok := a.(string)
if !ok {
return true
}
defer g.crl.Delete(addr)

var p pool.Conn
if enabled, ok := bo.(bool); ok && enabled && g.bo != nil {
if enabled && g.bo != nil {
_, err = g.bo.Do(clctx, func(ictx context.Context) (r interface{}, ret bool, err error) {
p, err = g.Connect(ictx, addr)
return nil, err != nil, err
Expand Down
9 changes: 5 additions & 4 deletions internal/net/grpc/pool/pool_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/log/level"
"github.com/vdaas/vald/internal/net"
valdsync "github.com/vdaas/vald/internal/sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand Down Expand Up @@ -133,7 +134,7 @@ func Benchmark_StaticDial(b *testing.B) {
b.Error(err)
}

conns := new(sync.Map)
conns := new(valdsync.Map[string, *grpc.ClientConn])
conns.Store(DefaultServerAddr, conn)

b.StopTimer()
Expand All @@ -143,7 +144,7 @@ func Benchmark_StaticDial(b *testing.B) {
for i := 0; i < b.N; i++ {
val, ok := conns.Load(DefaultServerAddr)
if ok {
do(b, val.(*ClientConn))
do(b, val)
}
}
b.StopTimer()
Expand Down Expand Up @@ -190,7 +191,7 @@ func BenchmarkParallel_StaticDial(b *testing.B) {
b.Error(err)
}

conns := new(sync.Map)
conns := new(valdsync.Map[string, *grpc.ClientConn])
conns.Store(DefaultServerAddr, conn)

b.StopTimer()
Expand All @@ -201,7 +202,7 @@ func BenchmarkParallel_StaticDial(b *testing.B) {
for pb.Next() {
val, ok := conns.Load(DefaultServerAddr)
if ok {
do(b, val.(*ClientConn))
do(b, val)
}
}
})
Expand Down
7 changes: 4 additions & 3 deletions internal/singleflight/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"sync"
"sync/atomic"

valdsync "github.com/vdaas/vald/internal/sync"
)

type call[V any] struct {
Expand All @@ -36,7 +38,7 @@ type Group[V any] interface {
}

type group[V any] struct {
m sync.Map
m valdsync.Map[string, *call[V]]
}

// New returns Group implementation.
Expand All @@ -49,8 +51,7 @@ func New[V any]() Group[V] {
// If duplicate comes, the duplicated call with the same key will wait for the first caller return.
// It returns the result and the error of the given function, and whether the result is shared from the first caller.
func (g *group[V]) Do(_ context.Context, key string, fn func() (V, error)) (v V, shared bool, err error) {
actual, loaded := g.m.LoadOrStore(key, new(call[V]))
c := actual.(*call[V])
c, loaded := g.m.LoadOrStore(key, new(call[V]))
if loaded {
atomic.AddUint64(&c.dups, 1)
c.wg.Wait()
Expand Down
7 changes: 7 additions & 0 deletions internal/sync/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package sync

import gache "github.com/kpango/gache/v2"
ykadowak marked this conversation as resolved.
Show resolved Hide resolved

type Map[K comparable, V any] struct {
gache.Map[K, V]
}
Loading