diff --git a/charts/vald/templates/gateway/lb/configmap.yaml b/charts/vald/templates/gateway/lb/configmap.yaml index 0facd73355..7c840cb0c2 100644 --- a/charts/vald/templates/gateway/lb/configmap.yaml +++ b/charts/vald/templates/gateway/lb/configmap.yaml @@ -16,6 +16,8 @@ {{- $gateway := .Values.gateway.lb -}} {{- $agent := .Values.agent -}} {{- $discoverer := .Values.discoverer -}} +{{- $readreplica := .Values.agent.readreplica -}} +{{- $release := .Release -}} {{- if $gateway.enabled }} apiVersion: v1 kind: ConfigMap @@ -49,6 +51,7 @@ data: agent_namespace: {{ $gateway.gateway_config.agent_namespace | quote }} node_name: {{ $gateway.gateway_config.node_name | quote }} index_replica: {{ $gateway.gateway_config.index_replica }} + read_replica_replicas: {{ $readreplica.replica }} discoverer: duration: {{ $gateway.gateway_config.discoverer.duration }} client: @@ -64,4 +67,20 @@ data: agent_client_options: {{- include "vald.grpc.client.addrs" (dict "Valued" $gateway.gateway_config.discoverer.agent_client_options.addrs) | nindent 10 }} {{- include "vald.grpc.client" (dict "Values" $gateway.gateway_config.discoverer.agent_client_options "default" .Values.defaults.grpc.client) | nindent 10 }} + {{- if $readreplica.enabled }} + read_replica_client: + client: + {{- $discovererClient := $gateway.gateway_config.discoverer.client }} + {{- $readReplicaPort := $agent.server_config.servers.grpc.port }} + {{- $defaultReadReplicaPort := default .Values.defaults.server_config.servers.grpc.port $readReplicaPort }} + {{- $readReplicaAddrs := list }} + {{- range $i := until (int $agent.minReplicas) }} + {{- $addr := printf "%s-%d.%s.svc.cluster.local:%d" $readreplica.name $i $release.Namespace (int64 $defaultReadReplicaPort) }} + {{- $readReplicaAddrs = append $readReplicaAddrs $addr }} + {{- end }} + {{- $readReplicaAddrs := dict "Values" $discovererClient.addrs "default" $readReplicaAddrs }} + {{- include "vald.grpc.client.addrs" $readReplicaAddrs | nindent 10 }} + {{- $readReplicaGRPCclient := dict "Values" $discovererClient "default" .Values.defaults.grpc.client }} + {{- include "vald.grpc.client" $readReplicaGRPCclient | nindent 10 }} + {{- end }} {{- end }} diff --git a/internal/client/v1/client/discoverer/discover.go b/internal/client/v1/client/discoverer/discover.go index 2c2c5c398a..5b928d6a99 100644 --- a/internal/client/v1/client/discoverer/discover.go +++ b/internal/client/v1/client/discoverer/discover.go @@ -38,6 +38,7 @@ type Client interface { Start(ctx context.Context) (<-chan error, error) GetAddrs(ctx context.Context) []string GetClient() grpc.Client + GetReadClient() grpc.Client } type client struct { @@ -56,6 +57,10 @@ type client struct { name string namespace string nodeName string + // read replica related below + readClient grpc.Client + readReplicaReplicas uint64 + roundRobin atomic.Uint64 } func New(opts ...Option) (d Client, err error) { @@ -74,6 +79,14 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { return nil, err } + var rrech <-chan error + if c.readClient != nil { + rrech, err = c.readClient.StartConnectionMonitor(ctx) + if err != nil { + return nil, err + } + } + ech := make(chan error, 100) addrs, err := c.dnsDiscovery(ctx, ech) if err != nil { @@ -134,6 +147,7 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) { return finalize() case err = <-dech: case err = <-aech: + case err = <-rrech: case <-dt.C: err = c.discover(ctx, ech) } @@ -172,6 +186,29 @@ func (c *client) GetClient() grpc.Client { return c.client } +func (c *client) GetReadClient() grpc.Client { + // read replica + // 1. primary + svc cluster IP *n でここで比率を見て呼び分ける + // read replicaのreplica数がHPAなどで高速に変動する場合に、実装がシンプルそうなのでこちらが良さそう + // 2. primary + ipsで単純にラウンドロビン + + // round robin with c.client and c.readClient everytime it's called + // with a ratio of primary + read replica deployment replicas + // TODO: is this atomic operation really worth it? + var new uint64 + for { + cur := c.roundRobin.Load() + new = (cur + 1) % (c.readReplicaReplicas + 1) + if c.roundRobin.CompareAndSwap(cur, new) { + break + } + } + if new == 0 || c.readClient == nil { + return c.client + } + return c.readClient +} + func (c *client) connect(ctx context.Context, addr string) (err error) { if c.autoconn && c.client != nil { _, err = c.client.Connect(ctx, addr) diff --git a/internal/client/v1/client/discoverer/option.go b/internal/client/v1/client/discoverer/option.go index 6159d8b72e..632bfc2d29 100644 --- a/internal/client/v1/client/discoverer/option.go +++ b/internal/client/v1/client/discoverer/option.go @@ -68,6 +68,13 @@ func WithDiscovererClient(gc grpc.Client) Option { } } +func WithReadReplicaClient(gc grpc.Client) Option { + return func(c *client) error { + c.readClient = gc + return nil + } +} + func WithDiscoverDuration(dur string) Option { return func(c *client) error { d, err := timeutil.Parse(dur) @@ -142,3 +149,10 @@ func WithErrGroup(eg errgroup.Group) Option { return nil } } + +func WithReadReplicaReplicas(num uint64) Option { + return func(c *client) error { + c.readReplicaReplicas = num + return nil + } +} diff --git a/internal/config/lb.go b/internal/config/lb.go index b479400b2d..c1e134de9d 100644 --- a/internal/config/lb.go +++ b/internal/config/lb.go @@ -37,6 +37,12 @@ type LB struct { // IndexReplica represents index replication count IndexReplica int `json:"index_replica" yaml:"index_replica"` + // ReadReplicaReplicas represents replica count of read replica Deployment + ReadReplicaReplicas uint64 `json:"read_replica_replicas" yaml:"read_replica_replicas"` + + // ReadReplicaClient represents read replica client configuration + ReadReplicaClient ReadReplicaClient `json:"read_replica_client" yaml:"read_replica_client"` + // Discoverer represent agent discoverer service configuration Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` @@ -56,3 +62,26 @@ func (g *LB) Bind() *LB { } return g } + +// ReadReplicaClient +type ReadReplicaClient struct { + Duration string `json:"duration" yaml:"duration"` + Client *GRPCClient `json:"client" yaml:"client"` + AgentClientOptions *GRPCClient `json:"agent_client_options" yaml:"agent_client_options"` +} + +// Bind binds the actual data from the ReadReplicaClient receiver field. +func (d *ReadReplicaClient) Bind() *ReadReplicaClient { + d.Duration = GetActualValue(d.Duration) + if d.Client != nil { + d.Client.Bind() + } else { + d.Client = newGRPCClientConfig() + } + if d.AgentClientOptions != nil { + d.AgentClientOptions.Bind() + } else { + d.AgentClientOptions = newGRPCClientConfig() + } + return d +} diff --git a/pkg/gateway/lb/handler/grpc/aggregation.go b/pkg/gateway/lb/handler/grpc/aggregation.go index 4c7403767e..99f9d45fcc 100644 --- a/pkg/gateway/lb/handler/grpc/aggregation.go +++ b/pkg/gateway/lb/handler/grpc/aggregation.go @@ -33,6 +33,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/sync" + "github.com/vdaas/vald/pkg/gateway/lb/service" ) type Aggregator interface { @@ -68,7 +69,7 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa ctx, cancel := context.WithTimeout(ctx, timeout) aggr.Start(ctx) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target) defer func() { if sspan != nil { diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index 27b48274f4..7f0678c3ba 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -95,7 +95,7 @@ func (s *server) exists(ctx context.Context, uuid string) (id *payload.Object_ID defer close(ich) defer close(ech) var once sync.Once - ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/exists/BroadCast/"+target) defer func() { if sspan != nil { @@ -1653,7 +1653,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * Ips: make([]string, 0, s.replica), } ) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { + err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target) defer func() { if span != nil { @@ -2562,7 +2562,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs Ips: make([]string, 0, s.replica), } ls := make([]string, 0, s.replica) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { + err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveRPCName+"/"+target) defer func() { if span != nil { @@ -2778,7 +2778,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time visited := make(map[string]int) // map[uuid: position of locs] locs = new(payload.Object_Locations) - err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { + err := s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { sctx, sspan := trace.StartSpan(grpc.WithGRPCMethod(ctx, "BroadCast/"+target), apiName+"/removeByTimestamp/BroadCast/"+target) defer func() { if sspan != nil { @@ -2905,7 +2905,7 @@ func (s *server) getObject(ctx context.Context, uuid string) (vec *payload.Objec defer close(vch) defer close(ech) var once sync.Once - ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/getObject/BroadCast/"+target) defer func() { if sspan != nil { @@ -3143,7 +3143,7 @@ func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald. defer cancel() var rmu, smu sync.Mutex - err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { + err := s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { ctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target) defer func() { if sspan != nil { diff --git a/pkg/gateway/lb/service/gateway.go b/pkg/gateway/lb/service/gateway.go index 64f9f0dc9b..e2787bffaf 100644 --- a/pkg/gateway/lb/service/gateway.go +++ b/pkg/gateway/lb/service/gateway.go @@ -37,10 +37,17 @@ type Gateway interface { Addrs(ctx context.Context) []string DoMulti(ctx context.Context, num int, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error - BroadCast(ctx context.Context, + BroadCast(ctx context.Context, kind BroadCastKind, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error } +type BroadCastKind int + +const ( + READ BroadCastKind = iota + WRITE +) + type gateway struct { client discoverer.Client eg errgroup.Group @@ -60,7 +67,7 @@ func (g *gateway) Start(ctx context.Context) (<-chan error, error) { return g.client.Start(ctx) } -func (g *gateway) BroadCast(ctx context.Context, +func (g *gateway) BroadCast(ctx context.Context, kind BroadCastKind, f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error, ) (err error) { fctx, span := trace.StartSpan(ctx, "vald/gateway-lb/service/Gateway.BroadCast") @@ -69,7 +76,19 @@ func (g *gateway) BroadCast(ctx context.Context, span.End() } }() - return g.client.GetClient().RangeConcurrent(fctx, -1, func(ictx context.Context, + + // select read or write clinet which is connected to primary agent or read replica agent + // GetReadClient includes the client to the primary agent so + // it works even when there is no read replica + var client grpc.Client + switch kind { + case READ: + client = g.client.GetReadClient() + case WRITE: + client = g.client.GetClient() + } + + return client.RangeConcurrent(fctx, -1, func(ictx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, ) (err error) { select { diff --git a/pkg/gateway/lb/usecase/vald.go b/pkg/gateway/lb/usecase/vald.go index b66d9068cf..070d862770 100644 --- a/pkg/gateway/lb/usecase/vald.go +++ b/pkg/gateway/lb/usecase/vald.go @@ -55,6 +55,16 @@ func New(cfg *config.Data) (r runner.Runner, err error) { if err != nil { return nil, err } + + rrOpts, err := cfg.Gateway.ReadReplicaClient.Client.Opts() + if err != nil { + return nil, err + } + rrOpts = append(rrOpts, + grpc.WithErrGroup(eg), + grpc.WithConnectionPoolSize(int(cfg.Gateway.ReadReplicaReplicas)), + ) + // skipcq: CRT-D0001 dopts := append( cOpts, @@ -75,9 +85,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) { discoverer.WithPort(cfg.Gateway.AgentPort), discoverer.WithServiceDNSARecord(cfg.Gateway.AgentDNS), discoverer.WithDiscovererClient(grpc.New(dopts...)), + discoverer.WithReadReplicaClient(grpc.New(rrOpts...)), discoverer.WithDiscoverDuration(cfg.Gateway.Discoverer.Duration), discoverer.WithOptions(aopts...), discoverer.WithNodeName(cfg.Gateway.NodeName), + discoverer.WithReadReplicaReplicas(cfg.Gateway.ReadReplicaReplicas), ) if err != nil { return nil, err diff --git a/pkg/index/job/correction/service/corrector_test.go b/pkg/index/job/correction/service/corrector_test.go index 43ad7517f1..2329cbc2e8 100644 --- a/pkg/index/job/correction/service/corrector_test.go +++ b/pkg/index/job/correction/service/corrector_test.go @@ -41,6 +41,10 @@ func (m *mockDiscovererClient) GetClient() grpc.Client { return &m.client } +func (m *mockDiscovererClient) GetReadClient() grpc.Client { + return &m.client +} + func Test_correct_correctTimestamp(t *testing.T) { t.Parallel() diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index 9a845f4c28..b5bd90b67e 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -241,6 +241,7 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment Kind: cur.Spec.DataSource.Kind, APIGroup: cur.Spec.DataSource.APIGroup, }, + StorageClassName: cur.Spec.StorageClassName, }, }