diff --git a/charts/vald/templates/gateway/mirror/configmap.yaml b/charts/vald/templates/gateway/mirror/configmap.yaml index a8641c4f42..ca7ccedff5 100644 --- a/charts/vald/templates/gateway/mirror/configmap.yaml +++ b/charts/vald/templates/gateway/mirror/configmap.yaml @@ -43,7 +43,7 @@ data: {{- include "vald.observability" $observability | nindent 6 }} gateway: pod_name: {{ $gateway.gateway_config.pod_name }} - advertise_interval: {{ $gateway.gateway_config.advertise_interval }} + register_duration: {{ $gateway.gateway_config.register_duration }} namespace: {{ $gateway.gateway_config.namespace }} discovery_duration: {{ $gateway.gateway_config.discovery_duration }} colocation: {{ $gateway.gateway_config.colocation }} diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 2b27c8e7f4..5149940109 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -1716,9 +1716,9 @@ gateway: # @schema {"name": "gateway.mirror.gateway_config.pod_name", "type": "string"} # gateway.mirror.gateway_config.pod_name -- self mirror gateway pod name pod_name: _MY_POD_NAME_ - # @schema {"name": "gateway.mirror.gateway_config.advertise_interval", "type": "string"} - # gateway.mirror.gateway_config.advertise_interval -- interval to advertise mirror-gateway information to other mirror-gateway. - advertise_interval: "1s" + # @schema {"name": "gateway.mirror.gateway_config.register_duration", "type": "string"} + # gateway.mirror.gateway_config.register_duration -- duration to register mirror-gateway. + register_duration: "1s" # @schema {"name": "gateway.mirror.gateway_config.namespace", "type": "string"} # gateway.mirror.gateway_config.namespace -- namespace to discovery namespace: _MY_POD_NAMESPACE_ diff --git a/internal/config/mirror.go b/internal/config/mirror.go index a9d069f946..3d816480fe 100644 --- a/internal/config/mirror.go +++ b/internal/config/mirror.go @@ -25,8 +25,8 @@ type Mirror struct { GatewayAddr string `json:"gateway_addr" yaml:"gateway_addr"` // PodName represents the mirror gateway pod name. PodName string `json:"pod_name" yaml:"pod_name"` - // AdvertiseInterval represents the interval to advertise addresses of Mirror Gateway to other Mirror Gateway. - AdvertiseInterval string `json:"advertise_interval" yaml:"advertise_interval"` + // RegisterDuration represents the duration to register Mirror Gateway. + RegisterDuration string `json:"register_duration" yaml:"register_duration"` // Namespace represents the target namespace to discover ValdMirrorTarget resource. Namespace string `json:"namespace" yaml:"namespace"` // DiscoveryDuration represents the duration to discover. @@ -43,7 +43,7 @@ func (m *Mirror) Bind() *Mirror { m.SelfMirrorAddr = GetActualValue(m.SelfMirrorAddr) m.GatewayAddr = GetActualValue(m.GatewayAddr) m.PodName = GetActualValue(m.PodName) - m.AdvertiseInterval = GetActualValue(m.AdvertiseInterval) + m.RegisterDuration = GetActualValue(m.RegisterDuration) m.Namespace = GetActualValue(m.Namespace) m.DiscoveryDuration = GetActualValue(m.DiscoveryDuration) m.Colocation = GetActualValue(m.Colocation) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 228b3fc0c0..014b0fa551 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -119,19 +119,11 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa } return nil, err } - return req, nil -} -func (s *server) Advertise(ctx context.Context, req *payload.Mirror_Targets) (res *payload.Mirror_Targets, err error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.AdvertiseRPCName), apiName+"/"+vald.AdvertiseRPCName) - defer func() { - if span != nil { - span.End() - } - }() + // Get own address and the addresses of other mirror gateways to which this gateway is currently connected. tgts, err := s.mirror.MirrorTargets() if err != nil { - err = status.WrapWithInternal(vald.AdvertiseRPCName+" API failed to get connected vald gateway targets", err, + err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err, &errdetails.BadRequest{ FieldViolations: []*errdetails.BadRequestFieldViolation{ { @@ -141,7 +133,7 @@ func (s *server) Advertise(ctx context.Context, req *payload.Mirror_Targets) (re }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.AdvertiseRPCName, + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }, ) diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index 03e9b6fc1f..97d4a81c66 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -35,7 +35,7 @@ import ( // Mirror manages other mirror gateway connection. // If there is a new Mirror Gateway components, registers new connection. type Mirror interface { - Start(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) <-chan error Connect(ctx context.Context, targets ...*payload.Mirror_Target) error Disconnect(ctx context.Context, targets ...*payload.Mirror_Target) error IsConnected(ctx context.Context, addr string) bool @@ -50,7 +50,7 @@ type mirr struct { selfMirrAddrl sync.Map[string, any] // List of self Mirror gateway addresses gwAddrl sync.Map[string, any] // List of Vald Gateway addresses eg errgroup.Group - advertiseDur time.Duration + registerDur time.Duration gateway Gateway } @@ -87,37 +87,8 @@ func NewMirror(opts ...MirrorOption) (_ Mirror, err error) { return m, err } -func (m *mirr) Start(ctx context.Context) (<-chan error, error) { - ech := make(chan error, 100) - - aech, err := m.startAdvertise(ctx) - if err != nil { - close(ech) - return nil, err - } - - m.eg.Go(func() (err error) { - defer close(ech) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case err = <-aech: - } - if err != nil { - select { - case <-ctx.Done(): - case ech <- err: - } - err = nil - } - } - }) - return ech, nil -} - -func (m *mirr) startAdvertise(ctx context.Context) (<-chan error, error) { - ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.startAdvertise") +func (m *mirr) Start(ctx context.Context) <-chan error { + ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.Start") defer func() { if span != nil { span.End() @@ -125,96 +96,55 @@ func (m *mirr) startAdvertise(ctx context.Context) (<-chan error, error) { }() ech := make(chan error, 100) - err := m.registers(ctx, &payload.Mirror_Targets{ - Targets: m.selfMirrTgts, - }) - if err != nil && - !errors.Is(err, errors.ErrTargetNotFound) && - !errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) { - var attrs trace.Attributes - - switch { - case errors.Is(err, context.Canceled): - err = status.WrapWithCanceled( - vald.InsertRPCName+" API canceld", err, - ) - attrs = trace.StatusCodeCancelled(err.Error()) - case errors.Is(err, context.DeadlineExceeded): - err = status.WrapWithDeadlineExceeded( - vald.InsertRPCName+" API deadline exceeded", err, - ) - attrs = trace.StatusCodeDeadlineExceeded(err.Error()) - default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, "failed to parse "+vald.RegisterRPCName+" gRPC error response") - if span != nil { - span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) - span.SetStatus(trace.StatusError, err.Error()) - } - } - log.Warn(err) - if span != nil { - span.RecordError(err) - span.SetAttributes(attrs...) - span.SetStatus(trace.StatusError, err.Error()) - } - close(ech) - return nil, err - } - - m.eg.Go(func() (err error) { - tic := time.NewTicker(m.advertiseDur) + m.eg.Go(func() error { + tic := time.NewTicker(m.registerDur) defer close(ech) defer tic.Stop() for { select { case <-ctx.Done(): - return err case <-tic.C: - resTgts, err := m.advertises(ctx, new(payload.Mirror_Targets)) - if err != nil || len(resTgts) == 0 { - if err == nil { - err = errors.ErrTargetNotFound - } + tgt, err := m.MirrorTargets() + if err != nil { select { case <-ctx.Done(): - return ctx.Err() case ech <- err: break } } - if err = m.Connect(ctx, resTgts...); err != nil { + + resTgts, err := m.registers(ctx, &payload.Mirror_Targets{Targets: tgt}) + if err != nil || len(resTgts) == 0 { + if !errors.Is(err, errors.ErrTargetNotFound) && len(resTgts) == 0 { + err = errors.Join(err, errors.ErrTargetNotFound) + } else if len(resTgts) == 0 { + err = errors.ErrTargetNotFound + } select { case <-ctx.Done(): return ctx.Err() case ech <- err: - break } } - - if err := m.registers(ctx, &payload.Mirror_Targets{ - Targets: append(resTgts, m.selfMirrTgts...), - }); err != nil { - select { - case <-ctx.Done(): - return ctx.Err() - case ech <- err: - break + if len(resTgts) > 0 { + if err := m.Connect(ctx, resTgts...); err != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case ech <- err: + break + } } } log.Debugf("[mirror]: connected mirror gateway targets: %v", m.gateway.GRPCClient().ConnectedAddrs()) } } }) - return ech, nil + return ech } -func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) error { +func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) { ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers") defer func() { if span != nil { @@ -227,8 +157,11 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro resInfo := &errdetails.ResourceInfo{ ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName, } + resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets())) + exists := make(map[string]struct{}) + var mu sync.Mutex - return m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err := m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target) defer func() { if span != nil { @@ -236,7 +169,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro } }() - _, err := vc.Register(ctx, tgts, copts...) + res, err := vc.Register(ctx, tgts, copts...) if err != nil { var attrs trace.Attributes switch { @@ -264,71 +197,6 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) erro "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo, ) attrs = trace.FromGRPCStatus(st.Code(), msg) - } - log.Error("failed to send Register API to %s\t: %v", target, err) - if span != nil { - span.RecordError(err) - span.SetAttributes(attrs...) - span.SetStatus(trace.StatusError, err.Error()) - } - return err - } - return nil - }) -} - -func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.AdvertiseRPCName), "vald/gateway/vald/service/Mirror.advertises") - defer func() { - if span != nil { - span.End() - } - }() - reqInfo := &errdetails.RequestInfo{ - ServingData: errdetails.Serialize(tgts), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.AdvertiseRPCName, - } - resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets())) - exists := make(map[string]struct{}) - var mu sync.Mutex - - err := m.gateway.DoMulti(ctx, m.connectedMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { - ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.advertises/"+target) - defer func() { - if span != nil { - span.End() - } - }() - res, err := vc.Advertise(ctx, tgts) - if err != nil { - var attrs trace.Attributes - switch { - case errors.Is(err, context.Canceled): - err = status.WrapWithCanceled( - vald.AdvertiseRPCName+" API canceld", err, reqInfo, resInfo, - ) - attrs = trace.StatusCodeCancelled(err.Error()) - case errors.Is(err, context.DeadlineExceeded): - err = status.WrapWithCanceled( - vald.AdvertiseRPCName+" API deadline exceeded", err, reqInfo, resInfo, - ) - attrs = trace.StatusCodeDeadlineExceeded(err.Error()) - case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): - err = status.WrapWithInternal( - vald.AdvertiseRPCName+" API connection not found", err, reqInfo, resInfo, - ) - attrs = trace.StatusCodeInternal(err.Error()) - default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.AdvertiseRPCName+" gRPC error response", reqInfo, resInfo, - ) - attrs = trace.FromGRPCStatus(st.Code(), msg) // When ingress is deleted, the controller's default backend results(Unimplemented error) are returned so that the connection should be disconnected. // If it is a different namespace on the same cluster, the connection is automatically disconnected because the net.grpc health check fails. @@ -346,7 +214,7 @@ func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([] } } } - log.Errorf("failed to process advertise requst to %s\terror: %s", target, err.Error()) + log.Error("failed to send Register API to %s\t: %v", target, err) if span != nil { span.RecordError(err) span.SetAttributes(attrs...) @@ -356,8 +224,8 @@ func (m *mirr) advertises(ctx context.Context, tgts *payload.Mirror_Targets) ([] } if res != nil && len(res.GetTargets()) > 0 { for _, tgt := range res.GetTargets() { - mu.Lock() addr := net.JoinHostPort(tgt.Host, uint16(tgt.Port)) + mu.Lock() if _, ok := exists[addr]; !ok { exists[addr] = struct{}{} resTgts = append(resTgts, res.GetTargets()...) @@ -431,6 +299,7 @@ func (m *mirr) Exist(_ context.Context, addr string) bool { return ok } +// MirrorTargets returns own address and the addresses of other mirror gateways to which this gateway is currently connected. func (m *mirr) MirrorTargets() ([]*payload.Mirror_Target, error) { addrs := m.gateway.GRPCClient().ConnectedAddrs() tgts := make([]*payload.Mirror_Target, 0, len(addrs)+1) @@ -460,6 +329,7 @@ func (m *mirr) isGatewayAddr(addr string) bool { return ok } +// connected returns the addresses of other mirror gateways to which this gateway is currently connected. func (m *mirr) connectedMirrorAddrs() []string { connectedAddrs := m.gateway.GRPCClient().ConnectedAddrs() addrs := make([]string, 0, len(connectedAddrs)) diff --git a/pkg/gateway/mirror/service/mirror_option.go b/pkg/gateway/mirror/service/mirror_option.go index fe0b67c83a..3a43016925 100644 --- a/pkg/gateway/mirror/service/mirror_option.go +++ b/pkg/gateway/mirror/service/mirror_option.go @@ -23,7 +23,7 @@ import ( type MirrorOption func(m *mirr) error var defaultMirrOpts = []MirrorOption{ - WithAdvertiseInterval("1s"), + WithRegisterDuration("1s"), } func WithErrorGroup(eg errgroup.Group) MirrorOption { @@ -68,16 +68,16 @@ func WithGateway(g Gateway) MirrorOption { } } -func WithAdvertiseInterval(s string) MirrorOption { +func WithRegisterDuration(s string) MirrorOption { return func(m *mirr) error { if len(s) == 0 { - return errors.NewErrInvalidOption("advertiseInterval", s) + return errors.NewErrInvalidOption("registerDuration", s) } dur, err := time.ParseDuration(s) if err != nil { - return errors.NewErrInvalidOption("advertiseInterval", s, err) + return errors.NewErrInvalidOption("registerDuration", s, err) } - m.advertiseDur = dur + m.registerDur = dur return nil } } diff --git a/pkg/gateway/mirror/usecase/vald.go b/pkg/gateway/mirror/usecase/vald.go index 844accf600..5e4fb5d342 100644 --- a/pkg/gateway/mirror/usecase/vald.go +++ b/pkg/gateway/mirror/usecase/vald.go @@ -83,7 +83,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { } mirr, err := service.NewMirror( service.WithErrorGroup(eg), - service.WithAdvertiseInterval(cfg.Mirror.AdvertiseInterval), + service.WithRegisterDuration(cfg.Mirror.RegisterDuration), service.WithValdAddrs(cfg.Mirror.GatewayAddr), service.WithSelfMirrorAddrs(cfg.Mirror.SelfMirrorAddr), service.WithGateway(gw), @@ -198,11 +198,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { } } if r.mirr != nil { - mech, err = r.mirr.Start(ctx) - if err != nil { - close(ech) - return nil, err - } + mech = r.mirr.Start(ctx) } if r.dsc != nil { dech, err = r.dsc.Start(ctx)