Skip to content

Commit

Permalink
fix: use address cache insted of connectedAddr method
Browse files Browse the repository at this point in the history
Signed-off-by: hlts2 <[email protected]>
  • Loading branch information
hlts2 committed Dec 5, 2023
1 parent f5afe4c commit 6c2789c
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 97 deletions.
2 changes: 1 addition & 1 deletion internal/observability/metrics/gateway/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (mm *mirrorMetrics) Register(m metrics.Meter) error {
targetGauge,
},
func(ctx context.Context) {
mm.mirr.RangeAllMirrorAddr(func(addr string, _ any) bool {
mm.mirr.RangeMirrorAddr(func(addr string, _ any) bool {
targetGauge.Observe(ctx, 1, attribute.String(targetAddrKey, addr))
return true
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/mirror/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
}

// Get own address and the addresses of other mirror gateways to which this gateway is currently connected.
tgts, err := s.mirror.MirrorTargets()
tgts, err := s.mirror.MirrorTargets(ctx)
if err != nil {
err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err,
&errdetails.BadRequest{
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/mirror/service/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (d *discovery) startSync(ctx context.Context, prev map[string]target.Target
}

Check warning on line 221 in pkg/gateway/mirror/service/discovery.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/discovery.go#L216-L221

Added lines #L216 - L221 were not covered by tests
}

d.mirr.RangeAllMirrorAddr(func(addr string, _ any) bool {
d.mirr.RangeMirrorAddr(func(addr string, _ any) bool {
connected := d.mirr.IsConnected(ctx, addr)
if name, ok := curAddrs[addr]; ok {
if st := target.MirrorTargetPhaseConnected; connected && current[name].Phase != st {
Expand Down
49 changes: 26 additions & 23 deletions pkg/gateway/mirror/service/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type Mirror interface {
Connect(ctx context.Context, targets ...*payload.Mirror_Target) error
Disconnect(ctx context.Context, targets ...*payload.Mirror_Target) error
IsConnected(ctx context.Context, addr string) bool
MirrorTargets() ([]*payload.Mirror_Target, error)
RangeAllMirrorAddr(f func(addr string, _ any) bool)
MirrorTargets(ctx context.Context) ([]*payload.Mirror_Target, error)
RangeMirrorAddr(f func(addr string, _ any) bool)
}

type mirr struct {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (m *mirr) Start(ctx context.Context) <-chan error {
select {
case <-ctx.Done():
case <-tic.C:
tgt, err := m.MirrorTargets()
tgt, err := m.MirrorTargets(ctx)
if err != nil {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -139,7 +139,6 @@ func (m *mirr) Start(ctx context.Context) <-chan error {
case <-ctx.Done():
return ctx.Err()
case ech <- err:

Check warning on line 141 in pkg/gateway/mirror/service/mirror.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/mirror.go#L136-L141

Added lines #L136 - L141 were not covered by tests
break
}
}
}
Expand Down Expand Up @@ -168,7 +167,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*
exists := make(map[string]bool)
var mu sync.Mutex

err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(ctx), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target)
defer func() {
if span != nil {
Expand Down Expand Up @@ -307,23 +306,29 @@ func (m *mirr) IsConnected(ctx context.Context, addr string) bool {

// MirrorTargets returns the Mirror targets, including the address of this gateway and the addresses of other Mirror Gateways
// to which this gateway is currently connected.
func (m *mirr) MirrorTargets() ([]*payload.Mirror_Target, error) {
addrs := m.gateway.GRPCClient().ConnectedAddrs()
tgts := make([]*payload.Mirror_Target, 0, len(addrs)+1)
tgts = append(tgts, m.selfMirrTgts...)
for _, addr := range addrs {
if !m.isGatewayAddr(addr) {
host, port, err := net.SplitHostPort(addr)
func (m *mirr) MirrorTargets(ctx context.Context) (tgts []*payload.Mirror_Target, err error) {
tgts = make([]*payload.Mirror_Target, 0, m.addrl.Len())
m.RangeMirrorAddr(func(addr string, _ any) bool {
if m.IsConnected(ctx, addr) {
var (
host string
port uint16
)
host, port, err = net.SplitHostPort(addr)
if err != nil {
return nil, err
return false
}

Check warning on line 320 in pkg/gateway/mirror/service/mirror.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/mirror.go#L319-L320

Added lines #L319 - L320 were not covered by tests
tgts = append(tgts, &payload.Mirror_Target{
Host: host,
Port: uint32(port),
})
}
return true
})
if err != nil {
return nil, err
}

Check warning on line 330 in pkg/gateway/mirror/service/mirror.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/mirror.go#L329-L330

Added lines #L329 - L330 were not covered by tests
return tgts, nil
return append(tgts, m.selfMirrTgts...), nil
}

func (m *mirr) isSelfMirrorAddr(addr string) bool {
Expand All @@ -337,21 +342,19 @@ func (m *mirr) isGatewayAddr(addr string) bool {
}

// connectedOtherMirrorAddrs returns the addresses of other Mirror Gateways to which this gateway is currently connected.
func (m *mirr) connectedOtherMirrorAddrs() []string {
connectedAddrs := m.gateway.GRPCClient().ConnectedAddrs()
addrs := make([]string, 0, len(connectedAddrs))
for _, addr := range connectedAddrs {
if !m.isSelfMirrorAddr(addr) &&
!m.isGatewayAddr(addr) {
func (m *mirr) connectedOtherMirrorAddrs(ctx context.Context) (addrs []string) {
m.RangeMirrorAddr(func(addr string, _ any) bool {
if m.IsConnected(ctx, addr) {
addrs = append(addrs, addr)
}
}
return true
})
return addrs
}

func (m *mirr) RangeAllMirrorAddr(f func(addr string, _ any) bool) {
func (m *mirr) RangeMirrorAddr(f func(addr string, _ any) bool) {
m.addrl.Range(func(addr string, value any) bool {
if !m.isGatewayAddr(addr) {
if !m.isGatewayAddr(addr) && !m.isSelfMirrorAddr(addr) {
if !f(addr, value) {
return false
}

Check warning on line 360 in pkg/gateway/mirror/service/mirror.go

View check run for this annotation

Codecov / codecov/patch

pkg/gateway/mirror/service/mirror.go#L359-L360

Added lines #L359 - L360 were not covered by tests
Expand Down
Loading

0 comments on commit 6c2789c

Please sign in to comment.