Skip to content

Commit

Permalink
Make vald-readreplica values.yaml to symbolic link (#2286)
Browse files Browse the repository at this point in the history
* remove values.yaml

* Add symlink to values.yaml
  • Loading branch information
ykadowak committed Jan 16, 2024
1 parent 958f7f4 commit 7d6fce3
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 10 deletions.
19 changes: 19 additions & 0 deletions charts/vald/templates/gateway/lb/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
{{- $gateway := .Values.gateway.lb -}}
{{- $agent := .Values.agent -}}
{{- $discoverer := .Values.discoverer -}}
{{- $readreplica := .Values.agent.readreplica -}}
{{- $release := .Release -}}
{{- if $gateway.enabled }}
apiVersion: v1
kind: ConfigMap
Expand Down Expand Up @@ -49,6 +51,7 @@ data:
agent_namespace: {{ $gateway.gateway_config.agent_namespace | quote }}
node_name: {{ $gateway.gateway_config.node_name | quote }}
index_replica: {{ $gateway.gateway_config.index_replica }}
read_replica_replicas: {{ $readreplica.replica }}
discoverer:
duration: {{ $gateway.gateway_config.discoverer.duration }}
client:
Expand All @@ -64,4 +67,20 @@ data:
agent_client_options:
{{- include "vald.grpc.client.addrs" (dict "Valued" $gateway.gateway_config.discoverer.agent_client_options.addrs) | nindent 10 }}
{{- include "vald.grpc.client" (dict "Values" $gateway.gateway_config.discoverer.agent_client_options "default" .Values.defaults.grpc.client) | nindent 10 }}
{{- if $readreplica.enabled }}
read_replica_client:
client:
{{- $discovererClient := $gateway.gateway_config.discoverer.client }}
{{- $readReplicaPort := $agent.server_config.servers.grpc.port }}
{{- $defaultReadReplicaPort := default .Values.defaults.server_config.servers.grpc.port $readReplicaPort }}
{{- $readReplicaAddrs := list }}
{{- range $i := until (int $agent.minReplicas) }}
{{- $addr := printf "%s-%d.%s.svc.cluster.local:%d" $readreplica.name $i $release.Namespace (int64 $defaultReadReplicaPort) }}
{{- $readReplicaAddrs = append $readReplicaAddrs $addr }}
{{- end }}
{{- $readReplicaAddrs := dict "Values" $discovererClient.addrs "default" $readReplicaAddrs }}
{{- include "vald.grpc.client.addrs" $readReplicaAddrs | nindent 10 }}
{{- $readReplicaGRPCclient := dict "Values" $discovererClient "default" .Values.defaults.grpc.client }}
{{- include "vald.grpc.client" $readReplicaGRPCclient | nindent 10 }}
{{- end }}
{{- end }}
37 changes: 37 additions & 0 deletions internal/client/v1/client/discoverer/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Client interface {
Start(ctx context.Context) (<-chan error, error)
GetAddrs(ctx context.Context) []string
GetClient() grpc.Client
GetReadClient() grpc.Client
}

type client struct {
Expand All @@ -56,6 +57,10 @@ type client struct {
name string
namespace string
nodeName string
// read replica related below
readClient grpc.Client
readReplicaReplicas uint64
roundRobin atomic.Uint64
}

func New(opts ...Option) (d Client, err error) {
Expand All @@ -74,6 +79,14 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
return nil, err
}

var rrech <-chan error
if c.readClient != nil {
rrech, err = c.readClient.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}
}

ech := make(chan error, 100)
addrs, err := c.dnsDiscovery(ctx, ech)
if err != nil {
Expand Down Expand Up @@ -134,6 +147,7 @@ func (c *client) Start(ctx context.Context) (<-chan error, error) {
return finalize()
case err = <-dech:
case err = <-aech:
case err = <-rrech:
case <-dt.C:
err = c.discover(ctx, ech)
}
Expand Down Expand Up @@ -172,6 +186,29 @@ func (c *client) GetClient() grpc.Client {
return c.client
}

func (c *client) GetReadClient() grpc.Client {
// read replica
// 1. primary + svc cluster IP *n でここで比率を見て呼び分ける
// read replicaのreplica数がHPAなどで高速に変動する場合に、実装がシンプルそうなのでこちらが良さそう
// 2. primary + ipsで単純にラウンドロビン

// round robin with c.client and c.readClient everytime it's called
// with a ratio of primary + read replica deployment replicas
// TODO: is this atomic operation really worth it?
var new uint64
for {
cur := c.roundRobin.Load()
new = (cur + 1) % (c.readReplicaReplicas + 1)
if c.roundRobin.CompareAndSwap(cur, new) {
break
}
}
if new == 0 || c.readClient == nil {
return c.client
}
return c.readClient
}

func (c *client) connect(ctx context.Context, addr string) (err error) {
if c.autoconn && c.client != nil {
_, err = c.client.Connect(ctx, addr)
Expand Down
14 changes: 14 additions & 0 deletions internal/client/v1/client/discoverer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ func WithDiscovererClient(gc grpc.Client) Option {
}
}

func WithReadReplicaClient(gc grpc.Client) Option {
return func(c *client) error {
c.readClient = gc
return nil
}
}

func WithDiscoverDuration(dur string) Option {
return func(c *client) error {
d, err := timeutil.Parse(dur)
Expand Down Expand Up @@ -142,3 +149,10 @@ func WithErrGroup(eg errgroup.Group) Option {
return nil
}
}

func WithReadReplicaReplicas(num uint64) Option {
return func(c *client) error {
c.readReplicaReplicas = num
return nil
}
}
29 changes: 29 additions & 0 deletions internal/config/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type LB struct {
// IndexReplica represents index replication count
IndexReplica int `json:"index_replica" yaml:"index_replica"`

// ReadReplicaReplicas represents replica count of read replica Deployment
ReadReplicaReplicas uint64 `json:"read_replica_replicas" yaml:"read_replica_replicas"`

// ReadReplicaClient represents read replica client configuration
ReadReplicaClient ReadReplicaClient `json:"read_replica_client" yaml:"read_replica_client"`

// Discoverer represent agent discoverer service configuration
Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"`

Expand All @@ -56,3 +62,26 @@ func (g *LB) Bind() *LB {
}
return g
}

// ReadReplicaClient
type ReadReplicaClient struct {
Duration string `json:"duration" yaml:"duration"`
Client *GRPCClient `json:"client" yaml:"client"`
AgentClientOptions *GRPCClient `json:"agent_client_options" yaml:"agent_client_options"`
}

// Bind binds the actual data from the ReadReplicaClient receiver field.
func (d *ReadReplicaClient) Bind() *ReadReplicaClient {
d.Duration = GetActualValue(d.Duration)
if d.Client != nil {
d.Client.Bind()
} else {
d.Client = newGRPCClientConfig()
}
if d.AgentClientOptions != nil {
d.AgentClientOptions.Bind()
} else {
d.AgentClientOptions = newGRPCClientConfig()
}
return d
}
3 changes: 2 additions & 1 deletion pkg/gateway/lb/handler/grpc/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/pkg/gateway/lb/service"
)

type Aggregator interface {
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *server) aggregationSearch(ctx context.Context, aggr Aggregator, cfg *pa

ctx, cancel := context.WithTimeout(ctx, timeout)
aggr.Start(ctx)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
err = s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/aggregationSearch/"+target)
defer func() {
if sspan != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/gateway/lb/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *server) exists(ctx context.Context, uuid string) (id *payload.Object_ID
defer close(ich)
defer close(ech)
var once sync.Once
ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/exists/BroadCast/"+target)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -1653,7 +1653,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res *
Ips: make([]string, 0, s.replica),
}
)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target)
defer func() {
if span != nil {
Expand Down Expand Up @@ -2562,7 +2562,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs
Ips: make([]string, 0, s.replica),
}
ls := make([]string, 0, s.replica)
err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
err = s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveRPCName+"/"+target)
defer func() {
if span != nil {
Expand Down Expand Up @@ -2778,7 +2778,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time
visited := make(map[string]int) // map[uuid: position of locs]
locs = new(payload.Object_Locations)

err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
err := s.gateway.BroadCast(ctx, service.WRITE, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) {
sctx, sspan := trace.StartSpan(grpc.WithGRPCMethod(ctx, "BroadCast/"+target), apiName+"/removeByTimestamp/BroadCast/"+target)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -2905,7 +2905,7 @@ func (s *server) getObject(ctx context.Context, uuid string) (vec *payload.Objec
defer close(vch)
defer close(ech)
var once sync.Once
ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
ech <- s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
sctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/getObject/BroadCast/"+target)
defer func() {
if sspan != nil {
Expand Down Expand Up @@ -3143,7 +3143,7 @@ func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald.
defer cancel()

var rmu, smu sync.Mutex
err := s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
err := s.gateway.BroadCast(ctx, service.READ, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error {
ctx, sspan := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target)
defer func() {
if sspan != nil {
Expand Down
25 changes: 22 additions & 3 deletions pkg/gateway/lb/service/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ type Gateway interface {
Addrs(ctx context.Context) []string
DoMulti(ctx context.Context, num int,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error
BroadCast(ctx context.Context,
BroadCast(ctx context.Context, kind BroadCastKind,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error) error
}

type BroadCastKind int

const (
READ BroadCastKind = iota
WRITE
)

type gateway struct {
client discoverer.Client
eg errgroup.Group
Expand All @@ -60,7 +67,7 @@ func (g *gateway) Start(ctx context.Context) (<-chan error, error) {
return g.client.Start(ctx)
}

func (g *gateway) BroadCast(ctx context.Context,
func (g *gateway) BroadCast(ctx context.Context, kind BroadCastKind,
f func(ctx context.Context, target string, ac vald.Client, copts ...grpc.CallOption) error,
) (err error) {
fctx, span := trace.StartSpan(ctx, "vald/gateway-lb/service/Gateway.BroadCast")
Expand All @@ -69,7 +76,19 @@ func (g *gateway) BroadCast(ctx context.Context,
span.End()
}
}()
return g.client.GetClient().RangeConcurrent(fctx, -1, func(ictx context.Context,

// select read or write clinet which is connected to primary agent or read replica agent
// GetReadClient includes the client to the primary agent so
// it works even when there is no read replica
var client grpc.Client
switch kind {
case READ:
client = g.client.GetReadClient()
case WRITE:
client = g.client.GetClient()
}

return client.RangeConcurrent(fctx, -1, func(ictx context.Context,
addr string, conn *grpc.ClientConn, copts ...grpc.CallOption,
) (err error) {
select {
Expand Down
12 changes: 12 additions & 0 deletions pkg/gateway/lb/usecase/vald.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
if err != nil {
return nil, err
}

rrOpts, err := cfg.Gateway.ReadReplicaClient.Client.Opts()
if err != nil {
return nil, err
}
rrOpts = append(rrOpts,
grpc.WithErrGroup(eg),
grpc.WithConnectionPoolSize(int(cfg.Gateway.ReadReplicaReplicas)),
)

// skipcq: CRT-D0001
dopts := append(
cOpts,
Expand All @@ -75,9 +85,11 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
discoverer.WithPort(cfg.Gateway.AgentPort),
discoverer.WithServiceDNSARecord(cfg.Gateway.AgentDNS),
discoverer.WithDiscovererClient(grpc.New(dopts...)),
discoverer.WithReadReplicaClient(grpc.New(rrOpts...)),
discoverer.WithDiscoverDuration(cfg.Gateway.Discoverer.Duration),
discoverer.WithOptions(aopts...),
discoverer.WithNodeName(cfg.Gateway.NodeName),
discoverer.WithReadReplicaReplicas(cfg.Gateway.ReadReplicaReplicas),
)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/index/job/correction/service/corrector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (m *mockDiscovererClient) GetClient() grpc.Client {
return &m.client
}

func (m *mockDiscovererClient) GetReadClient() grpc.Client {
return &m.client
}

func Test_correct_correctTimestamp(t *testing.T) {
t.Parallel()

Expand Down
1 change: 1 addition & 0 deletions pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment
Kind: cur.Spec.DataSource.Kind,
APIGroup: cur.Spec.DataSource.APIGroup,
},
StorageClassName: cur.Spec.StorageClassName,
},
}

Expand Down

0 comments on commit 7d6fce3

Please sign in to comment.