Skip to content

Commit

Permalink
[RESET THIS]
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Dec 20, 2023
1 parent 6f10e65 commit 86e8467
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 10 deletions.
1 change: 1 addition & 0 deletions charts/vald/templates/agent/readreplica/svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spec:
app: {{ $readreplica.name }}-{{ $id }}
app.kubernetes.io/name: {{ $valdname }}
type: ClusterIP
clusterIP: None
{{- if $agent.externalTrafficPolicy }}
externalTrafficPolicy: {{ $agent.externalTrafficPolicy }}
{{- end }}
Expand Down
19 changes: 19 additions & 0 deletions charts/vald/templates/gateway/lb/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
{{- $gateway := .Values.gateway.lb -}}
{{- $agent := .Values.agent -}}
{{- $discoverer := .Values.discoverer -}}
{{- $readreplica := .Values.agent.readreplica -}}
{{- $release := .Release -}}
{{- if $gateway.enabled }}
apiVersion: v1
kind: ConfigMap
Expand Down Expand Up @@ -49,6 +51,7 @@ data:
agent_namespace: {{ $gateway.gateway_config.agent_namespace | quote }}
node_name: {{ $gateway.gateway_config.node_name | quote }}
index_replica: {{ $gateway.gateway_config.index_replica }}
readreplica_replicas: {{ $readreplica.replica }}
discoverer:
duration: {{ $gateway.gateway_config.discoverer.duration }}
client:
Expand All @@ -64,4 +67,20 @@ data:
agent_client_options:
{{- include "vald.grpc.client.addrs" (dict "Valued" $gateway.gateway_config.discoverer.agent_client_options.addrs) | nindent 10 }}
{{- include "vald.grpc.client" (dict "Values" $gateway.gateway_config.discoverer.agent_client_options "default" .Values.defaults.grpc.client) | nindent 10 }}
{{- if $readreplica.enabled }}
read_replica_client:
client:
{{- $discovererClient := $gateway.gateway_config.discoverer.client }}
{{- $readReplicaPort := $agent.server_config.servers.grpc.port }}
{{- $defaultReadReplicaPort := default .Values.defaults.server_config.servers.grpc.port $readReplicaPort }}
{{- $readReplicaAddrs := list }}
{{- range $i := until (int $agent.minReplicas) }}
{{- $addr := printf "%s-%d.%s.svc.cluster.local:%d" $readreplica.name $i $release.Namespace (int64 $defaultReadReplicaPort) }}
{{- $readReplicaAddrs = append $readReplicaAddrs $addr }}
{{- end }}
{{- $readReplicaAddrs := dict "Values" $discovererClient.addrs "default" $readReplicaAddrs }}
{{- include "vald.grpc.client.addrs" $readReplicaAddrs | nindent 10 }}
{{- $readReplicaGRPCclient := dict "Values" $discovererClient "default" .Values.defaults.grpc.client }}
{{- include "vald.grpc.client" $readReplicaGRPCclient | nindent 10 }}
{{- end }}
{{- end }}
50 changes: 50 additions & 0 deletions internal/client/v1/client/discoverer/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
type Client interface {
Start(ctx context.Context) (<-chan error, error)
GetAddrs(ctx context.Context) []string
GetReadAddrs(ctx context.Context) []string
GetClient() grpc.Client
GetReadClient() grpc.Client
}

type client struct {
Expand All @@ -56,6 +58,11 @@ type client struct {
name string
namespace string
nodeName string
// read replica related below
readClient grpc.Client
readAddrs atomic.Pointer[[]string]
readReplicaReplicas uint64
roundRobin atomic.Uint64
}

func New(opts ...Option) (d Client, err error) {
Expand All @@ -74,6 +81,14 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
return nil, err
}

var rrech <-chan error
if c.readClient != nil {
rrech, err = c.readClient.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}
}

ech := make(chan error, 100)
addrs, err := c.dnsDiscovery(ctx, ech)
if err != nil {
Expand Down Expand Up @@ -134,6 +149,7 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
return finalize()
case err = <-dech:
case err = <-aech:
case err = <-rrech:
case <-dt.C:
err = c.discover(ctx, ech)
}
Expand Down Expand Up @@ -168,12 +184,45 @@ func (c *client) GetAddrs(ctx context.Context) (addrs []string) {
return addrs
}

func (c *client) GetReadAddrs(ctx context.Context) []string {
a := c.readAddrs.Load()
if a == nil {
return nil
}
return *a
}

func (c *client) GetClient() grpc.Client {
return c.client
}

func (c *client) GetReadClient() grpc.Client {
// read replica
// 1. primary + svc cluster IP *n でここで比率を見て呼び分ける
// read replicaのreplica数がHPAなどで高速に変動する場合に、実装がシンプルそうなのでこちらが良さそう
// 2. primary + ipsで単純にラウンドロビン

// round robin with c.client and c.readClient everytime it's called
// with a ratio of primary + read replica deployment replicas
// TODO: is this atomic operation really worth it?
var new uint64
for {
cur := c.roundRobin.Load()
new = (cur + 1) % (c.readReplicaReplicas + 1)
if c.roundRobin.CompareAndSwap(cur, new) {
break
}
}
if new == 0 || c.readClient == nil {
return c.client
// return c.readClient // DEBUG:
}
return c.readClient
}

func (c *client) connect(ctx context.Context, addr string) (err error) {
if c.autoconn && c.client != nil {
// FIXME: この中で重複管理されている
_, err = c.client.Connect(ctx, addr)
if err != nil {
return err
Expand Down Expand Up @@ -323,6 +372,7 @@ func (c *client) discoverAddrs(ctx context.Context, nodes *payload.Info_Nodes, e
len(node.GetPods().GetPods()) > i &&
len(node.GetPods().GetPods()[i].GetIp()) != 0 {
addr := net.JoinHostPort(node.GetPods().GetPods()[i].GetIp(), uint16(c.port))
// FIXME: ここで実際にgrpcサーバーへ接続する
if err = c.connect(ctx, addr); err != nil {
select {
case <-ctx.Done():
Expand Down
14 changes: 14 additions & 0 deletions internal/client/v1/client/discoverer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func WithDiscovererClient(gc grpc.Client) Option {
}
}

func WithReadReplicaClient(gc grpc.Client) Option {
return func(c *client) error {
c.readClient = gc
return nil
}
}

func WithDiscoverDuration(dur string) Option {
return func(c *client) error {
d, err := timeutil.Parse(dur)
Expand Down Expand Up @@ -142,3 +149,10 @@ func WithErrGroup(eg errgroup.Group) Option {
return nil
}
}

func WithReadReplicaReplicas(num uint64) Option {
return func(c *client) error {
c.readReplicaReplicas = num
return nil
}
}
29 changes: 29 additions & 0 deletions internal/config/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type LB struct {
// IndexReplica represents index replication count
IndexReplica int `json:"index_replica" yaml:"index_replica"`

// ReadReplicaReplicas represents replica count of read replica Deployment
ReadReplicaReplicas uint64 `json:"read_replica_replicas" yaml:"read_replica_replicas"`

// ReadReplicaClient represents read replica client configuration
ReadReplicaClient ReadReplicaClient `json:"read_replica_client" yaml:"read_replica_client"`

// Discoverer represent agent discoverer service configuration
Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"`

Expand All @@ -56,3 +62,26 @@ func (g *LB) Bind() *LB {
}
return g
}

// ReadReplicaClient
type ReadReplicaClient struct {
Duration string `json:"duration" yaml:"duration"`
Client *GRPCClient `json:"client" yaml:"client"`
AgentClientOptions *GRPCClient `json:"agent_client_options" yaml:"agent_client_options"`
}

// Bind binds the actual data from the ReadReplicaClient receiver field.
func (d *ReadReplicaClient) Bind() *ReadReplicaClient {
d.Duration = GetActualValue(d.Duration)
if d.Client != nil {
d.Client.Bind()
} else {
d.Client = newGRPCClientConfig()
}
if d.AgentClientOptions != nil {
d.AgentClientOptions.Bind()
} else {
d.AgentClientOptions = newGRPCClientConfig()
}
return d
}
3 changes: 2 additions & 1 deletion pkg/gateway/lb/handler/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/pkg/gateway/lb/service"
)

type Aggregator interface {
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa

ctx, cancel := context.WithTimeout(ctx, timeout)
aggr.Start(ctx)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
err = s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target)
defer func() {
if sspan != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/gateway/lb/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *server) exists(ctx context.Context, uuid string) (id *payload.Object_ID
defer close(ich)
defer close(ech)
var once sync.Once
ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/exists/BroadCast/"+target)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -1653,7 +1653,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res *
Ips: make([]string, 0, s.replica),
}
)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target)
defer func() {
if span != nil {
Expand Down Expand Up @@ -2562,7 +2562,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs
Ips: make([]string, 0, s.replica),
}
ls := make([]string, 0, s.replica)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveRPCName+"/"+target)
defer func() {
if span != nil {
Expand Down Expand Up @@ -2778,7 +2778,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time
visited := make(map[string]int) // map[uuid: position of locs]
locs = new(payload.Object_Locations)

err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
err := s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
sctx, sspan := trace.StartSpan(grpc.WithGRPCMethod(ctx, "BroadCast/"+target), apiName+"/removeByTimestamp/BroadCast/"+target)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -2905,7 +2905,7 @@ func (s *server) getObject(ctx context.Context, uuid string) (vec *payload.Objec
defer close(vch)
defer close(ech)
var once sync.Once
ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/getObject/BroadCast/"+target)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -3143,7 +3143,7 @@ func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald.
defer cancel()

var rmu, smu sync.Mutex
err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
err := s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
ctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target)
defer func() {
if sspan != nil {
Expand Down
28 changes: 25 additions & 3 deletions pkg/gateway/lb/service/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ type Gateway interface {
Addrs(ctx context.Context) []string
DoMulti(ctx context.Context, num int,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error
BroadCast(ctx context.Context,
BroadCast(ctx context.Context, kind BroadCastKind,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error
}

type BroadCastKind int
const (
READ BroadCastKind = iota
WRITE
)

type gateway struct {
client discoverer.Client
eg errgroup.Group
Expand All @@ -60,7 +66,9 @@ func (g *gateway) Start(ctx context.Context) (<-chan error, error) {
return g.client.Start(ctx)
}

func (g *gateway) BroadCast(ctx context.Context,
// FIXME: そもそも呼ばれているメソッドがread系かwrite系かをどこかで判断する必要があるが、
// 今後もメソッドは増えていくのだからBroadCastとBroadCastReadみたいに分けて、呼び出し側で判断するべき
func (g *gateway) BroadCast(ctx context.Context, kind BroadCastKind,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error,
) (err error) {
fctx, span := trace.StartSpan(ctx, "vald/gateway-lb/service/Gateway.BroadCast")
Expand All @@ -69,7 +77,20 @@ func (g *gateway) BroadCast(ctx context.Context,
span.End()
}
}()
return g.client.GetClient().RangeConcurrent(fctx, -1, func(ictx context.Context,

// select read or write clinet which is connected to primary agent or read replica agent
// GetReadClient includes the client to the primary agent so
// it works even when there is no read replica
var client grpc.Client
switch kind {
case READ:
client = g.client.GetReadClient()
case WRITE:
client = g.client.GetClient()
}

// FIXME: broadcastはsvc経由でロードバランスされてるっぽい
return client.RangeConcurrent(fctx, -1, func(ictx context.Context,
addr string, conn *grpc.ClientConn, copts ...grpc.CallOption,
) (err error) {
select {
Expand All @@ -85,6 +106,7 @@ func (g *gateway) BroadCast(ctx context.Context,
})
}

// DoMultiはWrite系しか使用しないので、普通にGetClientしていれば良い。現状Update or Insertのみ
func (g *gateway) DoMulti(ctx context.Context, num int,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error,
) (err error) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/gateway/lb/usecase/vald.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
if err != nil {
return nil, err
}

rrOpts, err := cfg.Gateway.ReadReplicaClient.Client.Opts()
if err != nil {
return nil, err
}
rrOpts = append(rrOpts,
grpc.WithErrGroup(eg),
grpc.WithConnectionPoolSize(int(cfg.Gateway.ReadReplicaReplicas)),
)

// skipcq: CRT-D0001
dopts := append(
cOpts,
Expand All @@ -75,9 +85,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
discoverer.WithPort(cfg.Gateway.AgentPort),
discoverer.WithServiceDNSARecord(cfg.Gateway.AgentDNS),
discoverer.WithDiscovererClient(grpc.New(dopts...)),
discoverer.WithReadReplicaClient(grpc.New(rrOpts...)),
discoverer.WithDiscoverDuration(cfg.Gateway.Discoverer.Duration),
discoverer.WithOptions(aopts...),
discoverer.WithNodeName(cfg.Gateway.NodeName),
discoverer.WithReadReplicaReplicas(cfg.Gateway.ReadReplicaReplicas),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 86e8467

Please sign in to comment.