diff --git a/internal/observability/metrics/gateway/mirror/mirror.go b/internal/observability/metrics/gateway/mirror/mirror.go index 1635e928b5..ae4b5d8d24 100644 --- a/internal/observability/metrics/gateway/mirror/mirror.go +++ b/internal/observability/metrics/gateway/mirror/mirror.go @@ -68,7 +68,7 @@ func (mm *mirrorMetrics) Register(m metrics.Meter) error { targetGauge, }, func(ctx context.Context) { - mm.mirr.RangeAllMirrorAddr(func(addr string, _ any) bool { + mm.mirr.RangeMirrorAddr(func(addr string, _ any) bool { targetGauge.Observe(ctx, 1, attribute.String(targetAddrKey, addr)) return true }) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index f84b19e2a0..7f1bd598f1 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -123,7 +123,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa } // Get own address and the addresses of other mirror gateways to which this gateway is currently connected. - tgts, err := s.mirror.MirrorTargets() + tgts, err := s.mirror.MirrorTargets(ctx) if err != nil { err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err, &errdetails.BadRequest{ diff --git a/pkg/gateway/mirror/service/discovery.go b/pkg/gateway/mirror/service/discovery.go index 57eed86a5b..3a6bffa754 100644 --- a/pkg/gateway/mirror/service/discovery.go +++ b/pkg/gateway/mirror/service/discovery.go @@ -221,7 +221,7 @@ func (d *discovery) startSync(ctx context.Context, prev map[string]target.Target } } - d.mirr.RangeAllMirrorAddr(func(addr string, _ any) bool { + d.mirr.RangeMirrorAddr(func(addr string, _ any) bool { connected := d.mirr.IsConnected(ctx, addr) if name, ok := curAddrs[addr]; ok { if st := target.MirrorTargetPhaseConnected; connected && current[name].Phase != st { diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index 786648f94a..3185ee6efe 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -41,8 +41,8 @@ type Mirror interface { Connect(ctx context.Context, targets ...*payload.Mirror_Target) error Disconnect(ctx context.Context, targets ...*payload.Mirror_Target) error IsConnected(ctx context.Context, addr string) bool - MirrorTargets() ([]*payload.Mirror_Target, error) - RangeAllMirrorAddr(f func(addr string, _ any) bool) + MirrorTargets(ctx context.Context) ([]*payload.Mirror_Target, error) + RangeMirrorAddr(f func(addr string, _ any) bool) } type mirr struct { @@ -110,7 +110,7 @@ func (m *mirr) Start(ctx context.Context) <-chan error { select { case <-ctx.Done(): case <-tic.C: - tgt, err := m.MirrorTargets() + tgt, err := m.MirrorTargets(ctx) if err != nil { select { case <-ctx.Done(): @@ -139,7 +139,6 @@ func (m *mirr) Start(ctx context.Context) <-chan error { case <-ctx.Done(): return ctx.Err() case ech <- err: - break } } } @@ -168,7 +167,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]* exists := make(map[string]bool) var mu sync.Mutex - err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(ctx), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target) defer func() { if span != nil { @@ -307,23 +306,29 @@ func (m *mirr) IsConnected(ctx context.Context, addr string) bool { // MirrorTargets returns the Mirror targets, including the address of this gateway and the addresses of other Mirror Gateways // to which this gateway is currently connected. -func (m *mirr) MirrorTargets() ([]*payload.Mirror_Target, error) { - addrs := m.gateway.GRPCClient().ConnectedAddrs() - tgts := make([]*payload.Mirror_Target, 0, len(addrs)+1) - tgts = append(tgts, m.selfMirrTgts...) - for _, addr := range addrs { - if !m.isGatewayAddr(addr) { - host, port, err := net.SplitHostPort(addr) +func (m *mirr) MirrorTargets(ctx context.Context) (tgts []*payload.Mirror_Target, err error) { + tgts = make([]*payload.Mirror_Target, 0, m.addrl.Len()) + m.RangeMirrorAddr(func(addr string, _ any) bool { + if m.IsConnected(ctx, addr) { + var ( + host string + port uint16 + ) + host, port, err = net.SplitHostPort(addr) if err != nil { - return nil, err + return false } tgts = append(tgts, &payload.Mirror_Target{ Host: host, Port: uint32(port), }) } + return true + }) + if err != nil { + return nil, err } - return tgts, nil + return append(tgts, m.selfMirrTgts...), nil } func (m *mirr) isSelfMirrorAddr(addr string) bool { @@ -337,21 +342,19 @@ func (m *mirr) isGatewayAddr(addr string) bool { } // connectedOtherMirrorAddrs returns the addresses of other Mirror Gateways to which this gateway is currently connected. -func (m *mirr) connectedOtherMirrorAddrs() []string { - connectedAddrs := m.gateway.GRPCClient().ConnectedAddrs() - addrs := make([]string, 0, len(connectedAddrs)) - for _, addr := range connectedAddrs { - if !m.isSelfMirrorAddr(addr) && - !m.isGatewayAddr(addr) { +func (m *mirr) connectedOtherMirrorAddrs(ctx context.Context) (addrs []string) { + m.RangeMirrorAddr(func(addr string, _ any) bool { + if m.IsConnected(ctx, addr) { addrs = append(addrs, addr) } - } + return true + }) return addrs } -func (m *mirr) RangeAllMirrorAddr(f func(addr string, _ any) bool) { +func (m *mirr) RangeMirrorAddr(f func(addr string, _ any) bool) { m.addrl.Range(func(addr string, value any) bool { - if !m.isGatewayAddr(addr) { + if !m.isGatewayAddr(addr) && !m.isSelfMirrorAddr(addr) { if !f(addr, value) { return false } diff --git a/pkg/gateway/mirror/service/mirror_test.go b/pkg/gateway/mirror/service/mirror_test.go index a88f9c3794..284a22a286 100644 --- a/pkg/gateway/mirror/service/mirror_test.go +++ b/pkg/gateway/mirror/service/mirror_test.go @@ -2,7 +2,6 @@ package service import ( "context" - "net" "reflect" "testing" @@ -281,9 +280,13 @@ func Test_mirr_Disconnect(t *testing.T) { } func Test_mirr_MirrorTargets(t *testing.T) { + type args struct { + ctx context.Context + } type fields struct { - gatewayAddr string - gateway Gateway + gatewayAddr string + selfMirrAddr string + gateway Gateway } type want struct { want []*payload.Mirror_Target @@ -291,10 +294,11 @@ func Test_mirr_MirrorTargets(t *testing.T) { } type test struct { name string + args args fields fields want want checkFunc func(want, []*payload.Mirror_Target, error) error - beforeFunc func(*testing.T) + beforeFunc func(*testing.T, Mirror) afterFunc func(*testing.T) } defaultCheckFunc := func(w want, got []*payload.Mirror_Target, err error) error { @@ -308,67 +312,67 @@ func Test_mirr_MirrorTargets(t *testing.T) { } tests := []test{ func() test { - gatewayAddr := "192.168.1.3:8081" - connectedAddrs := []string{ - "192.168.1.2:8081", // mirror gateway address. - "192.168.2.2:8081", // mirror gateway address. - gatewayAddr, + gatewayAddr := "192.168.1.2:8081" + selfMirrorAddr := "192.168.1.3:8081" + connectTargets := []*payload.Mirror_Target{ + { + Host: "192.168.1.2", // gateway addresses + Port: 8081, + }, + { + Host: "192.168.2.2", // other mirror address + Port: 8081, + }, + { + Host: "192.168.3.2", // other mirror address + Port: 8081, + }, } + connected := make(map[string]bool) return test{ name: "returns only the addresses of the mirror gateways", + args: args{ + ctx: context.Background(), + }, fields: fields{ - gatewayAddr: gatewayAddr, + gatewayAddr: gatewayAddr, + selfMirrAddr: selfMirrorAddr, gateway: &GatewayMock{ GRPCClientFunc: func() grpc.Client { return &grpcmock.GRPCClientMock{ - ConnectedAddrsFunc: func() []string { - return connectedAddrs + ConnectFunc: func(_ context.Context, addr string, _ ...grpc.DialOption) (conn pool.Conn, err error) { + connected[addr] = true + return conn, err + }, + IsConnectedFunc: func(_ context.Context, addr string) bool { + return connected[addr] }, } }, }, }, + beforeFunc: func(t *testing.T, m Mirror) { + t.Helper() + if err := m.Connect(context.Background(), connectTargets...); err != nil { + t.Fatal(err) + } + }, want: want{ want: []*payload.Mirror_Target{ { - Host: "192.168.1.2", + Host: "192.168.2.2", // other mirror address Port: 8081, }, { - Host: "192.168.2.2", + Host: "192.168.3.2", // other mirror address Port: 8081, }, - }, - }, - } - }(), - func() test { - gatewayAddr := "192.168.1.3:8081" - connectedAddrs := []string{ - "192.168.1.2:8081", // mirror gateway address. - "192.168.2.2", // mirror gateway address. - gatewayAddr, - } - return test{ - name: "returns an error when there is invalid address", - fields: fields{ - gatewayAddr: gatewayAddr, - gateway: &GatewayMock{ - GRPCClientFunc: func() grpc.Client { - return &grpcmock.GRPCClientMock{ - ConnectedAddrsFunc: func() []string { - return connectedAddrs - }, - } + { + Host: "192.168.1.3", // self mirror address + Port: 8081, }, }, }, - want: want{ - err: &net.AddrError{ - Err: "missing port in address", - Addr: "192.168.2.2", - }, - }, } }(), } @@ -378,8 +382,18 @@ func Test_mirr_MirrorTargets(t *testing.T) { t.Run(test.name, func(tt *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + + m, err := NewMirror( + WithSelfMirrorAddrs(test.fields.selfMirrAddr), + WithGatewayAddrs(test.fields.gatewayAddr), + WithGateway(test.fields.gateway), + ) + if err != nil { + t.Fatal(err) + } + if test.beforeFunc != nil { - test.beforeFunc(tt) + test.beforeFunc(tt, m) } if test.afterFunc != nil { defer test.afterFunc(tt) @@ -389,15 +403,7 @@ func Test_mirr_MirrorTargets(t *testing.T) { checkFunc = defaultCheckFunc } - m, err := NewMirror( - WithGatewayAddrs(test.fields.gatewayAddr), - WithGateway(test.fields.gateway), - ) - if err != nil { - t.Fatal(err) - } - - got, err := m.MirrorTargets() + got, err := m.MirrorTargets(test.args.ctx) if err := checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) } @@ -406,6 +412,9 @@ func Test_mirr_MirrorTargets(t *testing.T) { } func Test_mirr_connectedOtherMirrorAddrs(t *testing.T) { + type args struct { + ctx context.Context + } type fields struct { gatewayAddr string selfMirrAddr string @@ -416,10 +425,11 @@ func Test_mirr_connectedOtherMirrorAddrs(t *testing.T) { } type test struct { name string + args args fields fields want want checkFunc func(want, []string) error - beforeFunc func(*testing.T) + beforeFunc func(*testing.T, Mirror) afterFunc func(*testing.T) } defaultCheckFunc := func(w want, got []string) error { @@ -430,28 +440,47 @@ func Test_mirr_connectedOtherMirrorAddrs(t *testing.T) { } tests := []test{ func() test { - gatewayAddr := "192.168.1.3:8081" - selfMirrorAddr := "192.168.1.2:8081" - connectedAddrs := []string{ - selfMirrorAddr, - "192.168.2.2:8081", // othre mirror gateway address. - gatewayAddr, + gatewayAddr := "192.168.1.2:8081" + selfMirrorAddr := "192.168.1.3:8081" + connectTargets := []*payload.Mirror_Target{ + { + Host: "192.168.1.2", // gateway addresses + Port: 8081, + }, + { + Host: "192.168.2.2", // other mirror address + Port: 8081, + }, } + connected := make(map[string]bool) return test{ name: "returns only the address of the other mirror gateway", + args: args{ + ctx: context.Background(), + }, fields: fields{ selfMirrAddr: selfMirrorAddr, gatewayAddr: gatewayAddr, gateway: &GatewayMock{ GRPCClientFunc: func() grpc.Client { return &grpcmock.GRPCClientMock{ - ConnectedAddrsFunc: func() []string { - return connectedAddrs + ConnectFunc: func(_ context.Context, addr string, _ ...grpc.DialOption) (conn pool.Conn, err error) { + connected[addr] = true + return conn, err + }, + IsConnectedFunc: func(_ context.Context, addr string) bool { + return connected[addr] }, } }, }, }, + beforeFunc: func(t *testing.T, m Mirror) { + t.Helper() + if err := m.Connect(context.Background(), connectTargets...); err != nil { + t.Fatal(err) + } + }, want: want{ want: []string{ "192.168.2.2:8081", @@ -466,8 +495,18 @@ func Test_mirr_connectedOtherMirrorAddrs(t *testing.T) { t.Run(test.name, func(tt *testing.T) { tt.Parallel() defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + + m, err := NewMirror( + WithSelfMirrorAddrs(test.fields.selfMirrAddr), + WithGatewayAddrs(test.fields.gatewayAddr), + WithGateway(test.fields.gateway), + ) + if err != nil { + t.Fatal(err) + } + if test.beforeFunc != nil { - test.beforeFunc(tt) + test.beforeFunc(tt, m) } if test.afterFunc != nil { defer test.afterFunc(tt) @@ -477,16 +516,8 @@ func Test_mirr_connectedOtherMirrorAddrs(t *testing.T) { checkFunc = defaultCheckFunc } - m, err := NewMirror( - WithSelfMirrorAddrs(test.fields.selfMirrAddr), - WithGatewayAddrs(test.fields.gatewayAddr), - WithGateway(test.fields.gateway), - ) - if err != nil { - t.Fatal(err) - } if mirr, ok := m.(*mirr); ok { - got := mirr.connectedOtherMirrorAddrs() + got := mirr.connectedOtherMirrorAddrs(test.args.ctx) if err := checkFunc(test.want, got); err != nil { tt.Errorf("error = %v", err) }