diff --git a/apis/grpc/v1/mirror/mirror.go b/apis/grpc/v1/mirror/mirror.go new file mode 100644 index 0000000000..70614d73ff --- /dev/null +++ b/apis/grpc/v1/mirror/mirror.go @@ -0,0 +1,23 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package mirror provides vald server interface +package mirror + +const ( + RPCServiceName = "Mirror" + RegisterRPCName = "Register" +) diff --git a/apis/grpc/v1/vald/vald.go b/apis/grpc/v1/vald/vald.go index 14477c37b6..e3d2a8c46d 100644 --- a/apis/grpc/v1/vald/vald.go +++ b/apis/grpc/v1/vald/vald.go @@ -18,7 +18,6 @@ package vald import ( - "github.com/vdaas/vald/apis/grpc/v1/mirror" grpc "google.golang.org/grpc" ) @@ -36,11 +35,6 @@ type ServerWithFilter interface { FilterServer } -type ServerWithMirror interface { - Server - mirror.MirrorServer -} - type UnimplementedValdServer struct { UnimplementedInsertServer UnimplementedUpdateServer @@ -55,11 +49,6 @@ type UnimplementedValdServerWithFilter struct { UnimplementedFilterServer } -type UnimplementedValdServerWithMirror struct { - UnimplementedValdServer - mirror.UnimplementedMirrorServer -} - type Client interface { InsertClient UpdateClient @@ -74,11 +63,6 @@ type ClientWithFilter interface { FilterClient } -type ClientWithMirror interface { - Client - mirror.MirrorClient -} - const PackageName = "vald.v1" const ( @@ -89,7 +73,6 @@ const ( RemoveRPCServiceName = "Remove" ObjectRPCServiceName = "Object" FilterRPCServiceName = "Filter" - MirrorRPCServiceName = "Mirror" ) const ( @@ -143,8 +126,6 @@ const ( GetTimestampRPCName = "GetTimestamp" StreamGetObjectRPCName = "StreamGetObject" StreamListObjectRPCName = "StreamListObject" - - RegisterRPCName = "Register" ) type client struct { @@ -156,11 +137,6 @@ type client struct { ObjectClient } -type clientWithMirror struct { - Client - mirror.MirrorClient -} - func RegisterValdServer(s *grpc.Server, srv Server) { RegisterInsertServer(s, srv) RegisterUpdateServer(s, srv) @@ -175,11 +151,6 @@ func RegisterValdServerWithFilter(s *grpc.Server, srv ServerWithFilter) { RegisterFilterServer(s, srv) } -func RegisterValdServerWithMirror(s *grpc.Server, srv ServerWithMirror) { - RegisterValdServer(s, srv) - mirror.RegisterMirrorServer(s, srv) -} - func NewValdClient(conn *grpc.ClientConn) Client { return &client{ NewInsertClient(conn), @@ -190,10 +161,3 @@ func NewValdClient(conn *grpc.ClientConn) Client { NewObjectClient(conn), } } - -func NewValdClientWithMirror(conn *grpc.ClientConn) ClientWithMirror { - return &clientWithMirror{ - Client: NewValdClient(conn), - MirrorClient: mirror.NewMirrorClient(conn), - } -} diff --git a/internal/client/v1/client/mirror/mirror.go b/internal/client/v1/client/mirror/mirror.go index c90b3b9c2b..616de1fabc 100644 --- a/internal/client/v1/client/mirror/mirror.go +++ b/internal/client/v1/client/mirror/mirror.go @@ -18,7 +18,6 @@ import ( "github.com/vdaas/vald/apis/grpc/v1/mirror" "github.com/vdaas/vald/apis/grpc/v1/payload" - "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/observability/trace" @@ -69,7 +68,7 @@ func (c *client) GRPCClient() grpc.Client { } func (c *client) Register(ctx context.Context, in *payload.Mirror_Targets, opts ...grpc.CallOption) (res *payload.Mirror_Targets, err error) { - ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/client/"+vald.RegisterRPCName), apiName+"/"+vald.RegisterRPCName) + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/client/"+mirror.RegisterRPCName), apiName+"/"+mirror.RegisterRPCName) defer func() { if span != nil { span.End() diff --git a/internal/test/mock/client/mirror_client_mock.go b/internal/test/mock/client/mirror_client_mock.go index 10cd4563ac..96baff34be 100644 --- a/internal/test/mock/client/mirror_client_mock.go +++ b/internal/test/mock/client/mirror_client_mock.go @@ -16,13 +16,15 @@ package client import ( "context" + "github.com/vdaas/vald/apis/grpc/v1/mirror" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/net/grpc" ) type MirrorClientMock struct { - vald.ClientWithMirror + vald.Client + mirror.MirrorClient InsertFunc func(ctx context.Context, in *payload.Insert_Request, opts ...grpc.CallOption) (*payload.Object_Location, error) UpdateFunc func(ctx context.Context, in *payload.Update_Request, opts ...grpc.CallOption) (*payload.Object_Location, error) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 9a04ccc00e..c94d631dc9 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -20,6 +20,7 @@ import ( "reflect" "sync/atomic" + "github.com/vdaas/vald/apis/grpc/v1/mirror" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/errors" @@ -35,6 +36,11 @@ import ( "github.com/vdaas/vald/pkg/gateway/mirror/service" ) +type Server interface { + vald.Server + mirror.MirrorServer +} + type server struct { eg errgroup.Group gateway service.Gateway // Mirror gateway client service. @@ -43,13 +49,14 @@ type server struct { streamConcurrency int name string ip string - vald.UnimplementedValdServerWithMirror + vald.UnimplementedValdServer + mirror.UnimplementedMirrorServer } const apiName = "vald/gateway/mirror" // New returns a Vald server as gRPC handler with mirror using the provided options. -func New(opts ...Option) (vald.ServerWithMirror, error) { +func New(opts ...Option) (Server, error) { s := new(server) for _, opt := range append(defaultOptions, opts...) { if err := opt(s); err != nil { @@ -69,7 +76,7 @@ func New(opts ...Option) (vald.ServerWithMirror, error) { // The function connects to the mirror using the provided targets, and if successful, // returns the addresses of connected Mirror gateways. func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*payload.Mirror_Targets, error) { - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), apiName+"/"+vald.RegisterRPCName) + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+mirror.RPCServiceName+"/"+mirror.RegisterRPCName), apiName+"/"+mirror.RegisterRPCName) defer func() { if span != nil { span.End() @@ -81,7 +88,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa ServingData: errdetails.Serialize(req), } resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName, + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), } var attrs trace.Attributes @@ -89,22 +96,22 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa switch { case errors.Is(err, context.Canceled): err = status.WrapWithCanceled( - vald.RegisterRPCName+" API canceld", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API canceld", err, reqInfo, resInfo, ) attrs = trace.StatusCodeCancelled(err.Error()) case errors.Is(err, context.DeadlineExceeded): err = status.WrapWithCanceled( - vald.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo, ) attrs = trace.StatusCodeDeadlineExceeded(err.Error()) case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): err = status.WrapWithInternal( - vald.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, ) attrs = trace.StatusCodeInternal(err.Error()) case errors.Is(err, errors.ErrTargetNotFound): err = status.WrapWithInvalidArgument( - vald.RegisterRPCName+" API target not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API target not found", err, reqInfo, resInfo, ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: @@ -113,7 +120,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa msg string ) st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, ) attrs = trace.FromGRPCStatus(st.Code(), msg) } @@ -129,7 +136,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa // Get own address and the addresses of other mirror gateways to which this gateway is currently connected. tgts, err := s.mirror.MirrorTargets(ctx) if err != nil { - err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err, + err = status.WrapWithInternal(mirror.RegisterRPCName+" API failed to get connected vald gateway targets", err, &errdetails.BadRequest{ FieldViolations: []*errdetails.BadRequestFieldViolation{ { @@ -139,7 +146,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName, + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }, ) @@ -165,7 +172,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { id, err = vc.Exists(ctx, meta, copts...) return id, err }) @@ -230,7 +237,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.Search(ctx, req, copts...) return res, err }) @@ -298,7 +305,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.SearchByID(ctx, req, copts...) return res, err }) @@ -464,7 +471,7 @@ func (s *server) MultiSearch(ctx context.Context, req *payload.Search_MultiReque } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiSearch(ctx, req, copts...) return res, err }) @@ -529,7 +536,7 @@ func (s *server) MultiSearchByID(ctx context.Context, req *payload.Search_MultiI } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiSearchByID(ctx, req, copts...) return res, err }) @@ -594,7 +601,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.LinearSearch(ctx, req, copts...) return res, err }) @@ -662,7 +669,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.LinearSearchByID(ctx, req, copts...) return res, err }) @@ -831,7 +838,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, req *payload.Search_Mult } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiLinearSearch(ctx, req, copts...) return res, err }) @@ -896,7 +903,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, req *payload.Search_ } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiLinearSearchByID(ctx, req, copts...) return res, err }) @@ -968,7 +975,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if s.isProxied(ctx) { loc, err = s.doInsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Insert(ctx, req, copts...) return loc, err }) @@ -1016,7 +1023,7 @@ func (s *server) handleInsert(ctx context.Context, req *payload.Insert_Request) Uuid: req.GetVector().GetId(), Ips: make([]string, 0), } - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.InsertRPCName+"/"+target) defer func() { if span != nil { @@ -1180,7 +1187,7 @@ func (s *server) handleInsertResult( // skipcq: GO-R1005 Ips: make([]string, 0), } - err = s.gateway.DoMulti(ctx, alreadyExistsTgts, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.DoMulti(ctx, alreadyExistsTgts, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "DoMulti/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target) defer func() { if span != nil { @@ -1512,7 +1519,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if s.isProxied(ctx) { loc, err = s.doUpdate(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Update(ctx, req, copts...) return loc, err }) @@ -1560,7 +1567,7 @@ func (s *server) handleUpdate(ctx context.Context, req *payload.Update_Request) Uuid: req.GetVector().GetId(), Ips: make([]string, 0), } - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target) defer func() { if span != nil { @@ -1739,7 +1746,7 @@ func (s *server) handleUpdateResult( // skipcq: GO-R1005 Ips: make([]string, 0), } - err = s.gateway.DoMulti(ctx, notFoundTgts, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.DoMulti(ctx, notFoundTgts, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.InsertRPCName+"/"+target) defer func() { if span != nil { @@ -2086,7 +2093,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if s.isProxied(ctx) { loc, err = s.doUpsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Upsert(ctx, req, copts...) return loc, err }) @@ -2135,7 +2142,7 @@ func (s *server) handleUpsert(ctx context.Context, req *payload.Upsert_Request) Uuid: req.GetVector().GetId(), Ips: make([]string, 0), } - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpsertRPCName+"/"+target) defer func() { if span != nil { @@ -2466,7 +2473,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if s.isProxied(ctx) { loc, err = s.doRemove(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Remove(ctx, req, copts...) return loc, err }) @@ -2514,7 +2521,7 @@ func (s *server) handleRemove(ctx context.Context, req *payload.Remove_Request) Uuid: req.GetId().GetId(), Ips: make([]string, 0), } - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveRPCName+"/"+target) defer func() { if span != nil { @@ -2841,7 +2848,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if s.isProxied(ctx) { locs, err = s.doRemoveByTimestamp(ctx, req, func(ctx context.Context) (*payload.Object_Locations, error) { - _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { locs, err = vc.RemoveByTimestamp(ctx, req, copts...) return locs, err }) @@ -2887,7 +2894,7 @@ func (s *server) handleRemoveByTimestamp(ctx context.Context, req *payload.Remov var result sync.Map[string, *errorState] // map[target host: error state] locs = new(payload.Object_Locations) - err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveByTimestampRPCName+"/"+target) defer func() { if span != nil { @@ -3085,7 +3092,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) { vec, err = vc.GetObject(ctx, req, copts...) return vec, err }) @@ -3200,7 +3207,7 @@ func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald. } }() - _, err := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (obj interface{}, err error) { + _, err := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) (obj interface{}, err error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "Do/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target) defer func() { if span != nil { diff --git a/pkg/gateway/mirror/handler/grpc/handler_test.go b/pkg/gateway/mirror/handler/grpc/handler_test.go index fb55eb1b53..84a24a1dd3 100644 --- a/pkg/gateway/mirror/handler/grpc/handler_test.go +++ b/pkg/gateway/mirror/handler/grpc/handler_test.go @@ -18,6 +18,7 @@ import ( "reflect" "testing" + "github.com/vdaas/vald/apis/grpc/v1/mirror" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/errors" @@ -42,14 +43,15 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 req *payload.Insert_Request } type fields struct { - eg errgroup.Group - gateway service.Gateway - mirror service.Mirror - vAddr string - streamConcurrency int - name string - ip string - UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror + eg errgroup.Group + gateway service.Gateway + mirror service.Mirror + vAddr string + streamConcurrency int + name string + ip string + UnimplementedValdServer vald.UnimplementedValdServer + UnimplementedMirrorServer mirror.UnimplementedMirrorServer } type want struct { wantCe *payload.Object_Location @@ -90,7 +92,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -120,7 +122,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -152,7 +154,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return &payload.Object_Location{ @@ -188,13 +190,13 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.ErrTargetNotFound @@ -226,7 +228,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return &payload.Object_Location{ @@ -262,13 +264,13 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.New("target not found") @@ -300,7 +302,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.AlreadyExists, errors.ErrMetaDataAlreadyExists(uuid).Error()) @@ -330,7 +332,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -359,7 +361,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -389,7 +391,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -414,7 +416,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error()) @@ -444,7 +446,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -472,7 +474,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return &payload.Object_Location{ @@ -508,13 +510,13 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.New("target not found") @@ -553,14 +555,15 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005 checkFunc = defaultCheckFunc } s := &server{ - eg: test.fields.eg, - gateway: test.fields.gateway, - mirror: test.fields.mirror, - vAddr: test.fields.vAddr, - streamConcurrency: test.fields.streamConcurrency, - name: test.fields.name, - ip: test.fields.ip, - UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror, + eg: test.fields.eg, + gateway: test.fields.gateway, + mirror: test.fields.mirror, + vAddr: test.fields.vAddr, + streamConcurrency: test.fields.streamConcurrency, + name: test.fields.name, + ip: test.fields.ip, + UnimplementedValdServer: test.fields.UnimplementedValdServer, + UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer, } gotCe, err := s.Insert(test.args.ctx, test.args.req) @@ -582,14 +585,15 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 req *payload.Update_Request } type fields struct { - eg errgroup.Group - gateway service.Gateway - mirror service.Mirror - vAddr string - streamConcurrency int - name string - ip string - UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror + eg errgroup.Group + gateway service.Gateway + mirror service.Mirror + vAddr string + streamConcurrency int + name string + ip string + UnimplementedValdServer vald.UnimplementedValdServer + UnimplementedMirrorServer mirror.UnimplementedMirrorServer } type want struct { wantLoc *payload.Object_Location @@ -630,7 +634,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -660,7 +664,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -692,7 +696,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -722,7 +726,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -754,7 +758,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", "vald-03", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -792,13 +796,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.ErrTargetNotFound @@ -836,7 +840,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", "vald-03", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -874,13 +878,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.ErrTargetNotFound @@ -914,7 +918,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.NotFound, errors.ErrObjectIDNotFound(uuid).Error()) @@ -944,7 +948,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -973,7 +977,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1003,7 +1007,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1028,7 +1032,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error()) @@ -1058,7 +1062,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1086,7 +1090,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", "vald-03", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.AlreadyExists, errors.ErrMetaDataAlreadyExists(uuid).Error()) @@ -1124,13 +1128,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.ErrTargetNotFound @@ -1163,7 +1167,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 targets := []string{ "vald-01", "vald-02", "vald-03", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1201,13 +1205,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } return nil }, - DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, target := range targets { if c, ok := cmap[target]; !ok { return errors.New("target not found") @@ -1246,14 +1250,15 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005 checkFunc = defaultCheckFunc } s := &server{ - eg: test.fields.eg, - gateway: test.fields.gateway, - mirror: test.fields.mirror, - vAddr: test.fields.vAddr, - streamConcurrency: test.fields.streamConcurrency, - name: test.fields.name, - ip: test.fields.ip, - UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror, + eg: test.fields.eg, + gateway: test.fields.gateway, + mirror: test.fields.mirror, + vAddr: test.fields.vAddr, + streamConcurrency: test.fields.streamConcurrency, + name: test.fields.name, + ip: test.fields.ip, + UnimplementedValdServer: test.fields.UnimplementedValdServer, + UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer, } gotLoc, err := s.Update(test.args.ctx, test.args.req) @@ -1275,14 +1280,15 @@ func Test_server_Upsert(t *testing.T) { req *payload.Upsert_Request } type fields struct { - eg errgroup.Group - gateway service.Gateway - mirror service.Mirror - vAddr string - streamConcurrency int - name string - ip string - UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror + eg errgroup.Group + gateway service.Gateway + mirror service.Mirror + vAddr string + streamConcurrency int + name string + ip string + UnimplementedValdServer vald.UnimplementedValdServer + UnimplementedMirrorServer mirror.UnimplementedMirrorServer } type want struct { wantLoc *payload.Object_Location @@ -1323,7 +1329,7 @@ func Test_server_Upsert(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1353,7 +1359,7 @@ func Test_server_Upsert(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1385,7 +1391,7 @@ func Test_server_Upsert(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1415,7 +1421,7 @@ func Test_server_Upsert(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ service.MirrorClient, _ ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1440,7 +1446,7 @@ func Test_server_Upsert(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.AlreadyExists, errors.ErrMetaDataAlreadyExists(uuid).Error()) @@ -1470,7 +1476,7 @@ func Test_server_Upsert(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ service.MirrorClient, _ ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1499,7 +1505,7 @@ func Test_server_Upsert(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1529,7 +1535,7 @@ func Test_server_Upsert(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1554,7 +1560,7 @@ func Test_server_Upsert(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error()) @@ -1584,7 +1590,7 @@ func Test_server_Upsert(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1622,14 +1628,15 @@ func Test_server_Upsert(t *testing.T) { checkFunc = defaultCheckFunc } s := &server{ - eg: test.fields.eg, - gateway: test.fields.gateway, - mirror: test.fields.mirror, - vAddr: test.fields.vAddr, - streamConcurrency: test.fields.streamConcurrency, - name: test.fields.name, - ip: test.fields.ip, - UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror, + eg: test.fields.eg, + gateway: test.fields.gateway, + mirror: test.fields.mirror, + vAddr: test.fields.vAddr, + streamConcurrency: test.fields.streamConcurrency, + name: test.fields.name, + ip: test.fields.ip, + UnimplementedValdServer: test.fields.UnimplementedValdServer, + UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer, } gotLoc, err := s.Upsert(test.args.ctx, test.args.req) @@ -1651,14 +1658,15 @@ func Test_server_Remove(t *testing.T) { req *payload.Remove_Request } type fields struct { - eg errgroup.Group - gateway service.Gateway - mirror service.Mirror - vAddr string - streamConcurrency int - name string - ip string - UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror + eg errgroup.Group + gateway service.Gateway + mirror service.Mirror + vAddr string + streamConcurrency int + name string + ip string + UnimplementedValdServer vald.UnimplementedValdServer + UnimplementedMirrorServer mirror.UnimplementedMirrorServer } type want struct { wantLoc *payload.Object_Location @@ -1699,7 +1707,7 @@ func Test_server_Remove(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1728,7 +1736,7 @@ func Test_server_Remove(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1760,7 +1768,7 @@ func Test_server_Remove(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1789,7 +1797,7 @@ func Test_server_Remove(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1818,7 +1826,7 @@ func Test_server_Remove(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return loc, nil @@ -1847,7 +1855,7 @@ func Test_server_Remove(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1872,7 +1880,7 @@ func Test_server_Remove(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error()) @@ -1901,7 +1909,7 @@ func Test_server_Remove(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1929,7 +1937,7 @@ func Test_server_Remove(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) { return nil, status.Error(codes.NotFound, errors.ErrIndexNotFound.Error()) @@ -1958,7 +1966,7 @@ func Test_server_Remove(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -1993,14 +2001,15 @@ func Test_server_Remove(t *testing.T) { checkFunc = defaultCheckFunc } s := &server{ - eg: test.fields.eg, - gateway: test.fields.gateway, - mirror: test.fields.mirror, - vAddr: test.fields.vAddr, - streamConcurrency: test.fields.streamConcurrency, - name: test.fields.name, - ip: test.fields.ip, - UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror, + eg: test.fields.eg, + gateway: test.fields.gateway, + mirror: test.fields.mirror, + vAddr: test.fields.vAddr, + streamConcurrency: test.fields.streamConcurrency, + name: test.fields.name, + ip: test.fields.ip, + UnimplementedValdServer: test.fields.UnimplementedValdServer, + UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer, } gotLoc, err := s.Remove(test.args.ctx, test.args.req) @@ -2021,14 +2030,15 @@ func Test_server_RemoveByTimestamp(t *testing.T) { req *payload.Remove_TimestampRequest } type fields struct { - eg errgroup.Group - gateway service.Gateway - mirror service.Mirror - vAddr string - streamConcurrency int - name string - ip string - UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror + eg errgroup.Group + gateway service.Gateway + mirror service.Mirror + vAddr string + streamConcurrency int + name string + ip string + UnimplementedValdServer vald.UnimplementedValdServer + UnimplementedMirrorServer mirror.UnimplementedMirrorServer } type want struct { wantLocs *payload.Object_Locations @@ -2076,7 +2086,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) { return &payload.Object_Locations{ @@ -2108,7 +2118,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -2142,7 +2152,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) { return &payload.Object_Locations{ @@ -2170,7 +2180,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -2198,7 +2208,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) { return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error()) @@ -2222,7 +2232,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -2251,7 +2261,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { targets := []string{ "vald-01", "vald-02", } - cmap := map[string]vald.ClientWithMirror{ + cmap := map[string]service.MirrorClient{ targets[0]: &clientmock.MirrorClientMock{ RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) { return nil, status.Error(codes.NotFound, errors.ErrObjectIDNotFound(uuid1).Error()) @@ -2275,7 +2285,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) { FromForwardedContextFunc: func(_ context.Context) string { return "" }, - BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error { + BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error { for _, tgt := range targets { f(ctx, tgt, cmap[tgt]) } @@ -2310,14 +2320,15 @@ func Test_server_RemoveByTimestamp(t *testing.T) { checkFunc = defaultCheckFunc } s := &server{ - eg: test.fields.eg, - gateway: test.fields.gateway, - mirror: test.fields.mirror, - vAddr: test.fields.vAddr, - streamConcurrency: test.fields.streamConcurrency, - name: test.fields.name, - ip: test.fields.ip, - UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror, + eg: test.fields.eg, + gateway: test.fields.gateway, + mirror: test.fields.mirror, + vAddr: test.fields.vAddr, + streamConcurrency: test.fields.streamConcurrency, + name: test.fields.name, + ip: test.fields.ip, + UnimplementedValdServer: test.fields.UnimplementedValdServer, + UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer, } gotLocs, err := s.RemoveByTimestamp(test.args.ctx, test.args.req) diff --git a/pkg/gateway/mirror/handler/grpc/mock_test.go b/pkg/gateway/mirror/handler/grpc/mock_test.go index a09cbb567a..59ab5564a4 100644 --- a/pkg/gateway/mirror/handler/grpc/mock_test.go +++ b/pkg/gateway/mirror/handler/grpc/mock_test.go @@ -16,7 +16,6 @@ package grpc import ( "context" - "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/gateway/mirror/service" ) @@ -28,9 +27,9 @@ type gatewayMock struct { ForwardedContextFunc func(ctx context.Context, podName string) context.Context FromForwardedContextFunc func(ctx context.Context) string BroadCastFunc func(ctx context.Context, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error DoMultiFunc func(ctx context.Context, targets []string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error } func (gm *gatewayMock) ForwardedContext(ctx context.Context, podName string) context.Context { @@ -42,13 +41,13 @@ func (gm *gatewayMock) FromForwardedContext(ctx context.Context) string { } func (gm *gatewayMock) BroadCast(ctx context.Context, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error, + f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error, ) error { return gm.BroadCastFunc(ctx, f) } func (gm *gatewayMock) DoMulti(ctx context.Context, targets []string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error, + f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error, ) error { return gm.DoMultiFunc(ctx, targets, f) } diff --git a/pkg/gateway/mirror/handler/rest/handler.go b/pkg/gateway/mirror/handler/rest/handler.go index 70ed6dbff7..105cd3ec6f 100644 --- a/pkg/gateway/mirror/handler/rest/handler.go +++ b/pkg/gateway/mirror/handler/rest/handler.go @@ -17,9 +17,9 @@ import ( "net/http" "github.com/vdaas/vald/apis/grpc/v1/payload" - "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/net/http/dump" "github.com/vdaas/vald/internal/net/http/json" + "github.com/vdaas/vald/pkg/gateway/mirror/handler/grpc" ) // Handler represents an interface for rest handler. @@ -48,7 +48,7 @@ type Handler interface { } type handler struct { - vald vald.ServerWithMirror + vald grpc.Server } // New returns a Vald server as rest handler with mirror using the provided options. diff --git a/pkg/gateway/mirror/handler/rest/option.go b/pkg/gateway/mirror/handler/rest/option.go index 56272ff02e..f2287b16eb 100644 --- a/pkg/gateway/mirror/handler/rest/option.go +++ b/pkg/gateway/mirror/handler/rest/option.go @@ -13,15 +13,13 @@ // limitations under the License. package rest -import ( - "github.com/vdaas/vald/apis/grpc/v1/vald" -) +import "github.com/vdaas/vald/pkg/gateway/mirror/handler/grpc" type Option func(*handler) var defaultOptions = []Option{} -func WithVald(v vald.ServerWithMirror) Option { +func WithVald(v grpc.Server) Option { return func(h *handler) { h.vald = v } diff --git a/pkg/gateway/mirror/service/gateway.go b/pkg/gateway/mirror/service/gateway.go index 632f9db8a6..d2a8b9dbd8 100644 --- a/pkg/gateway/mirror/service/gateway.go +++ b/pkg/gateway/mirror/service/gateway.go @@ -17,7 +17,6 @@ import ( "context" "reflect" - "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/client/v1/client/mirror" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" @@ -36,11 +35,11 @@ type Gateway interface { ForwardedContext(ctx context.Context, podName string) context.Context FromForwardedContext(ctx context.Context) string BroadCast(ctx context.Context, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error Do(ctx context.Context, target string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error) + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error) DoMulti(ctx context.Context, targets []string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error GRPCClient() grpc.Client } @@ -105,7 +104,7 @@ func (*gateway) FromForwardedContext(ctx context.Context) string { // to interact with gRPC clients for multiple targets. // The provided function should handle the communication logic for a target. func (g *gateway) BroadCast(ctx context.Context, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error, + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error, ) (err error) { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.BroadCast") defer func() { @@ -120,7 +119,7 @@ func (g *gateway) BroadCast(ctx context.Context, case <-ictx.Done(): return nil default: - return f(ictx, addr, vald.NewValdClientWithMirror(conn), copts...) + return f(ictx, addr, NewMirrorClient(conn), copts...) } }) } @@ -129,7 +128,7 @@ func (g *gateway) BroadCast(ctx context.Context, // It returns the result of the operation and any associated error. // The provided function should handle the communication logic for a target. func (g *gateway) Do(ctx context.Context, target string, - f func(ctx context.Context, addr string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error), + f func(ctx context.Context, addr string, vc MirrorClient, copts ...grpc.CallOption) (interface{}, error), ) (res interface{}, err error) { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.Do") defer func() { @@ -143,7 +142,7 @@ func (g *gateway) Do(ctx context.Context, target string, } return g.client.GRPCClient().Do(g.ForwardedContext(ctx, g.podName), target, func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - return f(ictx, target, vald.NewValdClientWithMirror(conn), copts...) + return f(ictx, target, NewMirrorClient(conn), copts...) }, ) } @@ -152,7 +151,7 @@ func (g *gateway) Do(ctx context.Context, target string, // It returns an error if any of the operations fails. // The provided function should handle the communication logic for a target. func (g *gateway) DoMulti(ctx context.Context, targets []string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error, + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error, ) error { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.DoMulti") defer func() { @@ -170,7 +169,7 @@ func (g *gateway) DoMulti(ctx context.Context, targets []string, case <-ictx.Done(): return nil default: - return f(ictx, addr, vald.NewValdClientWithMirror(conn), copts...) + return f(ictx, addr, NewMirrorClient(conn), copts...) } }, ) diff --git a/pkg/gateway/mirror/service/gateway_mock_test.go b/pkg/gateway/mirror/service/gateway_mock_test.go index 9bd0641033..baaed0ae4c 100644 --- a/pkg/gateway/mirror/service/gateway_mock_test.go +++ b/pkg/gateway/mirror/service/gateway_mock_test.go @@ -16,7 +16,6 @@ package service import ( "context" - "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/net/grpc" ) @@ -26,11 +25,11 @@ type GatewayMock struct { ForwardedContextFunc func(ctx context.Context, podName string) context.Context FromForwardedContextFunc func(ctx context.Context) string BroadCastFunc func(ctx context.Context, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error DoFunc func(ctx context.Context, target string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error) + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error) DoMultiFunc func(ctx context.Context, targets []string, - f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error + f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error GRPCClientFunc func() grpc.Client } @@ -46,21 +45,21 @@ func (gm *GatewayMock) FromForwardedContext(ctx context.Context) string { // BroadCast calls BroadCastFunc object. func (gm *GatewayMock) BroadCast(ctx context.Context, - f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error, + f func(_ context.Context, _ string, _ MirrorClient, _ ...grpc.CallOption) error, ) error { return gm.BroadCastFunc(ctx, f) } // Do calls DoFunc object. func (gm *GatewayMock) Do(ctx context.Context, target string, - f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) (interface{}, error), + f func(_ context.Context, _ string, _ MirrorClient, _ ...grpc.CallOption) (interface{}, error), ) (interface{}, error) { return gm.DoFunc(ctx, target, f) } // DoMulti calls DoMultiFunc object. func (gm *GatewayMock) DoMulti(ctx context.Context, targets []string, - f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error, + f func(_ context.Context, _ string, _ MirrorClient, _ ...grpc.CallOption) error, ) error { return gm.DoMultiFunc(ctx, targets, f) } diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index 892ef39f1a..c23eb8d510 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -18,6 +18,7 @@ import ( "reflect" "time" + "github.com/vdaas/vald/apis/grpc/v1/mirror" "github.com/vdaas/vald/apis/grpc/v1/payload" "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/errors" @@ -45,6 +46,23 @@ type Mirror interface { RangeMirrorAddr(f func(addr string, _ any) bool) } +type MirrorClient interface { + vald.Client + mirror.MirrorClient +} + +type client struct { + vald.Client + mirror.MirrorClient +} + +func NewMirrorClient(conn *grpc.ClientConn) MirrorClient { + return &client{ + Client: vald.NewValdClient(conn), + MirrorClient: mirror.NewMirrorClient(conn), + } +} + type mirr struct { addrl sync.Map[string, any] // List of all connected addresses selfMirrTgts []*payload.Mirror_Target // Targets of self mirror gateway @@ -151,7 +169,7 @@ func (m *mirr) Start(ctx context.Context) <-chan error { // skipcq: GO-R1005 } func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) { // skipcq: GO-R1005 - ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers") + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+mirror.RPCServiceName+"/"+mirror.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers") defer func() { if span != nil { span.End() @@ -162,14 +180,14 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]* ServingData: errdetails.Serialize(tgts), } resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName, + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName, } resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets())) exists := make(map[string]bool) var result sync.Map[string, error] // map[target host: error] var mu sync.Mutex - err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(ctx), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error { + err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(ctx), func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target) defer func() { if span != nil { @@ -183,22 +201,22 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]* switch { case errors.Is(err, context.Canceled): err = status.WrapWithCanceled( - vald.RegisterRPCName+" API canceld", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API canceld", err, reqInfo, resInfo, ) attrs = trace.StatusCodeCancelled(err.Error()) case errors.Is(err, context.DeadlineExceeded): err = status.WrapWithCanceled( - vald.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo, ) attrs = trace.StatusCodeDeadlineExceeded(err.Error()) case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): err = status.WrapWithInternal( - vald.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, ) attrs = trace.StatusCodeInternal(err.Error()) case errors.Is(err, errors.ErrTargetNotFound): err = status.WrapWithInvalidArgument( - vald.RegisterRPCName+" API target not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API target not found", err, reqInfo, resInfo, ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: @@ -207,7 +225,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]* msg string ) st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, ) attrs = trace.FromGRPCStatus(st.Code(), msg) @@ -251,14 +269,14 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]* }) result.Range(func(target string, rerr error) bool { if rerr != nil { - err = errors.Join(err, errors.Wrapf(rerr, "failed to "+vald.RegisterRPCName+" API to %s", target)) + err = errors.Join(err, errors.Wrapf(rerr, "failed to "+mirror.RegisterRPCName+" API to %s", target)) } return true }) if err != nil { if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) { err = status.WrapWithInternal( - vald.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, ) log.Warn(err) if span != nil { @@ -270,7 +288,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]* } st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, ) log.Warn(err) if span != nil { diff --git a/pkg/gateway/mirror/usecase/vald.go b/pkg/gateway/mirror/usecase/vald.go index 7937ba4728..453c7344c4 100644 --- a/pkg/gateway/mirror/usecase/vald.go +++ b/pkg/gateway/mirror/usecase/vald.go @@ -16,8 +16,9 @@ package usecase import ( "context" + "github.com/vdaas/vald/apis/grpc/v1/mirror" "github.com/vdaas/vald/apis/grpc/v1/vald" - "github.com/vdaas/vald/internal/client/v1/client/mirror" + client "github.com/vdaas/vald/internal/client/v1/client/mirror" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net" "github.com/vdaas/vald/internal/net/grpc" @@ -42,7 +43,7 @@ type run struct { dialer net.Dialer cfg *config.Data server starter.Server - client mirror.Client + client client.Client gateway service.Gateway mirror service.Mirror discover service.Discovery @@ -69,9 +70,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) { // skipcq: CRT-D0001 cOpts = append(cOpts, grpc.WithErrGroup(eg)) - client, err := mirror.New( - mirror.WithAddrs(cfg.Mirror.Client.Addrs...), - mirror.WithClient(grpc.New(cOpts...)), + mClient, err := client.New( + client.WithAddrs(cfg.Mirror.Client.Addrs...), + client.WithClient(grpc.New(cOpts...)), ) if err != nil { return nil, err @@ -79,13 +80,13 @@ func New(cfg *config.Data) (r runner.Runner, err error) { gateway, err := service.NewGateway( service.WithErrGroup(eg), - service.WithMirrorClient(client), + service.WithMirrorClient(mClient), service.WithPodName(cfg.Mirror.PodName), ) if err != nil { return nil, err } - mirror, err := service.NewMirror( + m, err := service.NewMirror( service.WithErrorGroup(eg), service.WithRegisterDuration(cfg.Mirror.RegisterDuration), service.WithGatewayAddrs(cfg.Mirror.GatewayAddr), @@ -102,7 +103,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { service.WithDiscoverySelfMirrorAddrs(cfg.Mirror.SelfMirrorAddr), service.WithDiscoveryColocation(cfg.Mirror.Colocation), service.WithDiscoveryDialer(dialer), - service.WithDiscoveryMirror(mirror), + service.WithDiscoveryMirror(m), service.WithDiscoveryErrGroup(eg), ) if err != nil { @@ -113,7 +114,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { handler.WithValdAddr(cfg.Mirror.GatewayAddr), handler.WithErrGroup(eg), handler.WithGateway(gateway), - handler.WithMirror(mirror), + handler.WithMirror(m), handler.WithStreamConcurrency(cfg.Server.GetGRPCStreamConcurrency()), ) if err != nil { @@ -122,7 +123,8 @@ func New(cfg *config.Data) (r runner.Runner, err error) { grpcServerOptions := []server.Option{ server.WithGRPCRegistFunc(func(srv *grpc.Server) { - vald.RegisterValdServerWithMirror(srv, v) + vald.RegisterValdServer(srv, v) + mirror.RegisterMirrorServer(srv, v) }), server.WithPreStopFunction(func() error { return nil @@ -135,7 +137,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { cfg.Observability, bometrics.New(), cbmetrics.New(), - mirrmetrics.New(mirror), + mirrmetrics.New(m), ) if err != nil { return nil, err @@ -170,9 +172,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) { dialer: dialer, cfg: cfg, server: srv, - client: client, + client: mClient, gateway: gateway, - mirror: mirror, + mirror: m, discover: discover, observability: obs, }, nil