diff --git a/charts/vald/templates/agent/readreplica/svc.yaml b/charts/vald/templates/agent/readreplica/svc.yaml index 441c00c5cb2..e4d94b78d49 100644 --- a/charts/vald/templates/agent/readreplica/svc.yaml +++ b/charts/vald/templates/agent/readreplica/svc.yaml @@ -50,6 +50,7 @@ spec: app: {{ $readreplica.name }}-{{ $id }} app.kubernetes.io/name: {{ $valdname }} type: ClusterIP + clusterIP: None {{- if $agent.externalTrafficPolicy }} externalTrafficPolicy: {{ $agent.externalTrafficPolicy }} {{- end }} diff --git a/charts/vald/templates/gateway/lb/configmap.yaml b/charts/vald/templates/gateway/lb/configmap.yaml index 430040b9296..6be0a7a5060 100644 --- a/charts/vald/templates/gateway/lb/configmap.yaml +++ b/charts/vald/templates/gateway/lb/configmap.yaml @@ -16,6 +16,8 @@ {{- $gateway := .Values.gateway.lb -}} {{- $agent := .Values.agent -}} {{- $discoverer := .Values.discoverer -}} +{{- $readreplica := .Values.agent.readreplica -}} +{{- $release := .Release -}} {{- if $gateway.enabled }} apiVersion: v1 kind: ConfigMap @@ -49,6 +51,7 @@ data: agent_namespace: {{ $gateway.gateway_config.agent_namespace | quote }} node_name: {{ $gateway.gateway_config.node_name | quote }} index_replica: {{ $gateway.gateway_config.index_replica }} + readreplica_replicas: {{ $readreplica.replica }} discoverer: duration: {{ $gateway.gateway_config.discoverer.duration }} client: @@ -64,4 +67,20 @@ data: agent_client_options: {{- include "vald.grpc.client.addrs" (dict "Valued" $gateway.gateway_config.discoverer.agent_client_options.addrs) | nindent 10 }} {{- include "vald.grpc.client" (dict "Values" $gateway.gateway_config.discoverer.agent_client_options "default" .Values.defaults.grpc.client) | nindent 10 }} + {{- if $readreplica.enabled }} + read_replica_client: + client: + {{- $discovererClient := $gateway.gateway_config.discoverer.client }} + {{- $readReplicaPort := $agent.server_config.servers.grpc.port }} + {{- $defaultReadReplicaPort := default .Values.defaults.server_config.servers.grpc.port $readReplicaPort }} + {{- $readReplicaAddrs := list }} + {{- range $i := until (int $agent.minReplicas) }} + {{- $addr := printf "%s-%d.%s.svc.cluster.local:%d" $readreplica.name $i $release.Namespace (int64 $defaultReadReplicaPort) }} + {{- $readReplicaAddrs = append $readReplicaAddrs $addr }} + {{- end }} + {{- $readReplicaAddrs := dict "Values" $discovererClient.addrs "default" $readReplicaAddrs }} + {{- include "vald.grpc.client.addrs" $readReplicaAddrs | nindent 10 }} + {{- $readReplicaGRPCclient := dict "Values" $discovererClient "default" .Values.defaults.grpc.client }} + {{- include "vald.grpc.client" $readReplicaGRPCclient | nindent 10 }} + {{- end }} {{- end }} diff --git a/internal/client/v1/client/discoverer/discover.go b/internal/client/v1/client/discoverer/discover.go index 89a43f2a8cb..43fd6f526ba 100644 --- a/internal/client/v1/client/discoverer/discover.go +++ b/internal/client/v1/client/discoverer/discover.go @@ -37,7 +37,9 @@ import ( type Client interface { Start(ctx context.Context) (<-chan error, error) GetAddrs(ctx context.Context) []string + GetReadAddrs(ctx context.Context) []string GetClient() grpc.Client + GetReadClient() grpc.Client } type client struct { @@ -56,6 +58,11 @@ type client struct { name string namespace string nodeName string + // read replica related below + readClient grpc.Client + readAddrs atomic.Pointer[[]string] + readReplicaReplicas uint64 + roundRobin atomic.Uint64 } func New(opts ...Option) (d Client, err error) { @@ -74,6 +81,14 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { return nil, err } + var rrech <-chan error + if c.readClient != nil { + rrech, err = c.readClient.StartConnectionMonitor(ctx) + if err != nil { + return nil, err + } + } + ech := make(chan error, 100) addrs, err := c.dnsDiscovery(ctx, ech) if err != nil { @@ -134,6 +149,7 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { return finalize() case err = <-dech: case err = <-aech: + case err = <-rrech: case <-dt.C: err = c.discover(ctx, ech) } @@ -168,12 +184,45 @@ func (c *client) GetAddrs(ctx context.Context) (addrs []string) { return addrs } +func (c *client) GetReadAddrs(ctx context.Context) []string { + a := c.readAddrs.Load() + if a == nil { + return nil + } + return *a +} + func (c *client) GetClient() grpc.Client { return c.client } +func (c *client) GetReadClient() grpc.Client { + // read replica + // 1. primary + svc cluster IP *n でここで比率を見て呼び分ける + // read replicaのreplica数がHPAなどで高速に変動する場合に、実装がシンプルそうなのでこちらが良さそう + // 2. primary + ipsで単純にラウンドロビン + + // round robin with c.client and c.readClient everytime it's called + // with a ratio of primary + read replica deployment replicas + // TODO: is this atomic operation really worth it? + var new uint64 + for { + cur := c.roundRobin.Load() + new = (cur + 1) % (c.readReplicaReplicas + 1) + if c.roundRobin.CompareAndSwap(cur, new) { + break + } + } + if new == 0 || c.readClient == nil { + return c.client + // return c.readClient // DEBUG: + } + return c.readClient +} + func (c *client) connect(ctx context.Context, addr string) (err error) { if c.autoconn && c.client != nil { + // FIXME: この中で重複管理されている _, err = c.client.Connect(ctx, addr) if err != nil { return err @@ -323,6 +372,7 @@ func (c *client) discoverAddrs(ctx context.Context, nodes *payload.Info_Nodes, e len(node.GetPods().GetPods()) > i && len(node.GetPods().GetPods()[i].GetIp()) != 0 { addr := net.JoinHostPort(node.GetPods().GetPods()[i].GetIp(), uint16(c.port)) + // FIXME: ここで実際にgrpcサーバーへ接続する if err = c.connect(ctx, addr); err != nil { select { case <-ctx.Done(): diff --git a/internal/client/v1/client/discoverer/option.go b/internal/client/v1/client/discoverer/option.go index aca62c61f2d..23bdff62296 100644 --- a/internal/client/v1/client/discoverer/option.go +++ b/internal/client/v1/client/discoverer/option.go @@ -68,6 +68,13 @@ func WithDiscovererClient(gc grpc.Client) Option { } } +func WithReadReplicaClient(gc grpc.Client) Option { + return func(c *client) error { + c.readClient = gc + return nil + } +} + func WithDiscoverDuration(dur string) Option { return func(c *client) error { d, err := timeutil.Parse(dur) @@ -142,3 +149,10 @@ func WithErrGroup(eg errgroup.Group) Option { return nil } } + +func WithReadReplicaReplicas(num uint64) Option { + return func(c *client) error { + c.readReplicaReplicas = num + return nil + } +} diff --git a/internal/config/lb.go b/internal/config/lb.go index 0e6a429f2f4..467098e43a3 100644 --- a/internal/config/lb.go +++ b/internal/config/lb.go @@ -37,6 +37,12 @@ type LB struct { // IndexReplica represents index replication count IndexReplica int `json:"index_replica" yaml:"index_replica"` + // ReadReplicaReplicas represents replica count of read replica Deployment + ReadReplicaReplicas uint64 `json:"read_replica_replicas" yaml:"read_replica_replicas"` + + // ReadReplicaClient represents read replica client configuration + ReadReplicaClient ReadReplicaClient `json:"read_replica_client" yaml:"read_replica_client"` + // Discoverer represent agent discoverer service configuration Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` @@ -56,3 +62,26 @@ func (g *LB) Bind() *LB { } return g } + +// ReadReplicaClient +type ReadReplicaClient struct { + Duration string `json:"duration" yaml:"duration"` + Client *GRPCClient `json:"client" yaml:"client"` + AgentClientOptions *GRPCClient `json:"agent_client_options" yaml:"agent_client_options"` +} + +// Bind binds the actual data from the ReadReplicaClient receiver field. +func (d *ReadReplicaClient) Bind() *ReadReplicaClient { + d.Duration = GetActualValue(d.Duration) + if d.Client != nil { + d.Client.Bind() + } else { + d.Client = newGRPCClientConfig() + } + if d.AgentClientOptions != nil { + d.AgentClientOptions.Bind() + } else { + d.AgentClientOptions = newGRPCClientConfig() + } + return d +} diff --git a/pkg/gateway/lb/handler/grpc/aggregation.go b/pkg/gateway/lb/handler/grpc/aggregation.go index 58f749eee04..26b51bf5472 100644 --- a/pkg/gateway/lb/handler/grpc/aggregation.go +++ b/pkg/gateway/lb/handler/grpc/aggregation.go @@ -33,6 +33,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/sync" + "github.com/vdaas/vald/pkg/gateway/lb/service" ) type Aggregator interface { @@ -68,7 +69,7 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa ctx, cancel := context.WithTimeout(ctx, timeout) aggr.Start(ctx) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target) defer func() { if sspan != nil { diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index e8fde265a6a..95fa86b1dc0 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -95,7 +95,7 @@ func (s *server) exists(ctx context.Context, uuid string) (id *payload.Object_ID defer close(ich) defer close(ech) var once sync.Once - ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/exists/BroadCast/"+target) defer func() { if sspan != nil { @@ -1653,7 +1653,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * Ips: make([]string, 0, s.replica), } ) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { + err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target) defer func() { if span != nil { @@ -2562,7 +2562,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs Ips: make([]string, 0, s.replica), } ls := make([]string, 0, s.replica) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { + err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveRPCName+"/"+target) defer func() { if span != nil { @@ -2778,7 +2778,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time visited := make(map[string]int) // map[uuid: position of locs] locs = new(payload.Object_Locations) - err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { + err := s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { sctx, sspan := trace.StartSpan(grpc.WithGRPCMethod(ctx, "BroadCast/"+target), apiName+"/removeByTimestamp/BroadCast/"+target) defer func() { if sspan != nil { @@ -2905,7 +2905,7 @@ func (s *server) getObject(ctx context.Context, uuid string) (vec *payload.Objec defer close(vch) defer close(ech) var once sync.Once - ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/getObject/BroadCast/"+target) defer func() { if sspan != nil { @@ -3143,7 +3143,7 @@ func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald. defer cancel() var rmu, smu sync.Mutex - err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + err := s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { ctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target) defer func() { if sspan != nil { diff --git a/pkg/gateway/lb/service/gateway.go b/pkg/gateway/lb/service/gateway.go index 7d32364df0c..b3ffdf5a9ab 100644 --- a/pkg/gateway/lb/service/gateway.go +++ b/pkg/gateway/lb/service/gateway.go @@ -37,10 +37,16 @@ type Gateway interface { Addrs(ctx context.Context) []string DoMulti(ctx context.Context, num int, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error - BroadCast(ctx context.Context, + BroadCast(ctx context.Context, kind BroadCastKind, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error } +type BroadCastKind int +const ( + READ BroadCastKind = iota + WRITE +) + type gateway struct { client discoverer.Client eg errgroup.Group @@ -60,7 +66,9 @@ func (g *gateway) Start(ctx context.Context) (<-chan error, error) { return g.client.Start(ctx) } -func (g *gateway) BroadCast(ctx context.Context, +// FIXME: そもそも呼ばれているメソッドがread系かwrite系かをどこかで判断する必要があるが、 +// 今後もメソッドは増えていくのだからBroadCastとBroadCastReadみたいに分けて、呼び出し側で判断するべき +func (g *gateway) BroadCast(ctx context.Context, kind BroadCastKind, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error, ) (err error) { fctx, span := trace.StartSpan(ctx, "vald/gateway-lb/service/Gateway.BroadCast") @@ -69,7 +77,20 @@ func (g *gateway) BroadCast(ctx context.Context, span.End() } }() - return g.client.GetClient().RangeConcurrent(fctx, -1, func(ictx context.Context, + + // select read or write clinet which is connected to primary agent or read replica agent + // GetReadClient includes the client to the primary agent so + // it works even when there is no read replica + var client grpc.Client + switch kind { + case READ: + client = g.client.GetReadClient() + case WRITE: + client = g.client.GetClient() + } + + // FIXME: broadcastはsvc経由でロードバランスされてるっぽい + return client.RangeConcurrent(fctx, -1, func(ictx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, ) (err error) { select { @@ -85,6 +106,7 @@ func (g *gateway) BroadCast(ctx context.Context, }) } +// DoMultiはWrite系しか使用しないので、普通にGetClientしていれば良い。現状Update or Insertのみ func (g *gateway) DoMulti(ctx context.Context, num int, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error, ) (err error) { diff --git a/pkg/gateway/lb/usecase/vald.go b/pkg/gateway/lb/usecase/vald.go index 4b2eadef88c..27bf60f39bc 100644 --- a/pkg/gateway/lb/usecase/vald.go +++ b/pkg/gateway/lb/usecase/vald.go @@ -55,6 +55,16 @@ func New(cfg *config.Data) (r runner.Runner, err error) { if err != nil { return nil, err } + + rrOpts, err := cfg.Gateway.ReadReplicaClient.Client.Opts() + if err != nil { + return nil, err + } + rrOpts = append(rrOpts, + grpc.WithErrGroup(eg), + grpc.WithConnectionPoolSize(int(cfg.Gateway.ReadReplicaReplicas)), + ) + // skipcq: CRT-D0001 dopts := append( cOpts, @@ -75,9 +85,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) { discoverer.WithPort(cfg.Gateway.AgentPort), discoverer.WithServiceDNSARecord(cfg.Gateway.AgentDNS), discoverer.WithDiscovererClient(grpc.New(dopts...)), + discoverer.WithReadReplicaClient(grpc.New(rrOpts...)), discoverer.WithDiscoverDuration(cfg.Gateway.Discoverer.Duration), discoverer.WithOptions(aopts...), discoverer.WithNodeName(cfg.Gateway.NodeName), + discoverer.WithReadReplicaReplicas(cfg.Gateway.ReadReplicaReplicas), ) if err != nil { return nil, err