diff --git a/README.md b/README.md
index bc4970e1dd8..ddf4a723894 100755
--- a/README.md
+++ b/README.md
@@ -237,10 +237,16 @@ Docker images tagging policy:
## Vald Users
-
-
-
-
+
+
+
+
+
+
## Contribution
diff --git a/apis/grpc/v1/mirror/mirror.go b/apis/grpc/v1/mirror/mirror.go
new file mode 100644
index 00000000000..70614d73ffa
--- /dev/null
+++ b/apis/grpc/v1/mirror/mirror.go
@@ -0,0 +1,23 @@
+//
+// Copyright (C) 2019-2024 vdaas.org vald team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// You may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+// Package mirror provides vald server interface
+package mirror
+
+const (
+ RPCServiceName = "Mirror"
+ RegisterRPCName = "Register"
+)
diff --git a/apis/grpc/v1/vald/vald.go b/apis/grpc/v1/vald/vald.go
index 14477c37b6f..e3d2a8c46d5 100644
--- a/apis/grpc/v1/vald/vald.go
+++ b/apis/grpc/v1/vald/vald.go
@@ -18,7 +18,6 @@
package vald
import (
- "github.com/vdaas/vald/apis/grpc/v1/mirror"
grpc "google.golang.org/grpc"
)
@@ -36,11 +35,6 @@ type ServerWithFilter interface {
FilterServer
}
-type ServerWithMirror interface {
- Server
- mirror.MirrorServer
-}
-
type UnimplementedValdServer struct {
UnimplementedInsertServer
UnimplementedUpdateServer
@@ -55,11 +49,6 @@ type UnimplementedValdServerWithFilter struct {
UnimplementedFilterServer
}
-type UnimplementedValdServerWithMirror struct {
- UnimplementedValdServer
- mirror.UnimplementedMirrorServer
-}
-
type Client interface {
InsertClient
UpdateClient
@@ -74,11 +63,6 @@ type ClientWithFilter interface {
FilterClient
}
-type ClientWithMirror interface {
- Client
- mirror.MirrorClient
-}
-
const PackageName = "vald.v1"
const (
@@ -89,7 +73,6 @@ const (
RemoveRPCServiceName = "Remove"
ObjectRPCServiceName = "Object"
FilterRPCServiceName = "Filter"
- MirrorRPCServiceName = "Mirror"
)
const (
@@ -143,8 +126,6 @@ const (
GetTimestampRPCName = "GetTimestamp"
StreamGetObjectRPCName = "StreamGetObject"
StreamListObjectRPCName = "StreamListObject"
-
- RegisterRPCName = "Register"
)
type client struct {
@@ -156,11 +137,6 @@ type client struct {
ObjectClient
}
-type clientWithMirror struct {
- Client
- mirror.MirrorClient
-}
-
func RegisterValdServer(s *grpc.Server, srv Server) {
RegisterInsertServer(s, srv)
RegisterUpdateServer(s, srv)
@@ -175,11 +151,6 @@ func RegisterValdServerWithFilter(s *grpc.Server, srv ServerWithFilter) {
RegisterFilterServer(s, srv)
}
-func RegisterValdServerWithMirror(s *grpc.Server, srv ServerWithMirror) {
- RegisterValdServer(s, srv)
- mirror.RegisterMirrorServer(s, srv)
-}
-
func NewValdClient(conn *grpc.ClientConn) Client {
return &client{
NewInsertClient(conn),
@@ -190,10 +161,3 @@ func NewValdClient(conn *grpc.ClientConn) Client {
NewObjectClient(conn),
}
}
-
-func NewValdClientWithMirror(conn *grpc.ClientConn) ClientWithMirror {
- return &clientWithMirror{
- Client: NewValdClient(conn),
- MirrorClient: mirror.NewMirrorClient(conn),
- }
-}
diff --git a/assets/image/vald-users/japansearch_color.png b/assets/image/vald-users/japansearch_color.png
new file mode 100644
index 00000000000..d1aa3924047
Binary files /dev/null and b/assets/image/vald-users/japansearch_color.png differ
diff --git a/assets/image/vald-users/japansearch_color.png.webp b/assets/image/vald-users/japansearch_color.png.webp
deleted file mode 100644
index f78ca9c2f32..00000000000
Binary files a/assets/image/vald-users/japansearch_color.png.webp and /dev/null differ
diff --git a/assets/image/vald-users/lycorp.png b/assets/image/vald-users/lycorp.png
new file mode 100644
index 00000000000..1ee060ebbfc
Binary files /dev/null and b/assets/image/vald-users/lycorp.png differ
diff --git a/assets/image/vald-users/lycorp_black.png b/assets/image/vald-users/lycorp_black.png
new file mode 100644
index 00000000000..1ee060ebbfc
Binary files /dev/null and b/assets/image/vald-users/lycorp_black.png differ
diff --git a/assets/image/vald-users/lycorp_white.png b/assets/image/vald-users/lycorp_white.png
new file mode 100644
index 00000000000..c3438fad7dd
Binary files /dev/null and b/assets/image/vald-users/lycorp_white.png differ
diff --git a/assets/image/vald-users/yahoojapan.svg b/assets/image/vald-users/yahoojapan.svg
deleted file mode 100644
index 515341157ef..00000000000
--- a/assets/image/vald-users/yahoojapan.svg
+++ /dev/null
@@ -1 +0,0 @@
-
\ No newline at end of file
diff --git a/internal/client/v1/client/mirror/mirror.go b/internal/client/v1/client/mirror/mirror.go
index c90b3b9c2b0..616de1fabc3 100644
--- a/internal/client/v1/client/mirror/mirror.go
+++ b/internal/client/v1/client/mirror/mirror.go
@@ -18,7 +18,6 @@ import (
"github.com/vdaas/vald/apis/grpc/v1/mirror"
"github.com/vdaas/vald/apis/grpc/v1/payload"
- "github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/observability/trace"
@@ -69,7 +68,7 @@ func (c *client) GRPCClient() grpc.Client {
}
func (c *client) Register(ctx context.Context, in *payload.Mirror_Targets, opts ...grpc.CallOption) (res *payload.Mirror_Targets, err error) {
- ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/client/"+vald.RegisterRPCName), apiName+"/"+vald.RegisterRPCName)
+ ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/client/"+mirror.RegisterRPCName), apiName+"/"+mirror.RegisterRPCName)
defer func() {
if span != nil {
span.End()
diff --git a/internal/test/mock/client/mirror_client_mock.go b/internal/test/mock/client/mirror_client_mock.go
index 10cd4563ac1..96baff34bee 100644
--- a/internal/test/mock/client/mirror_client_mock.go
+++ b/internal/test/mock/client/mirror_client_mock.go
@@ -16,13 +16,15 @@ package client
import (
"context"
+ "github.com/vdaas/vald/apis/grpc/v1/mirror"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/net/grpc"
)
type MirrorClientMock struct {
- vald.ClientWithMirror
+ vald.Client
+ mirror.MirrorClient
InsertFunc func(ctx context.Context, in *payload.Insert_Request, opts ...grpc.CallOption) (*payload.Object_Location, error)
UpdateFunc func(ctx context.Context, in *payload.Update_Request, opts ...grpc.CallOption) (*payload.Object_Location, error)
diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go
index 9a04ccc00e3..c94d631dc99 100644
--- a/pkg/gateway/mirror/handler/grpc/handler.go
+++ b/pkg/gateway/mirror/handler/grpc/handler.go
@@ -20,6 +20,7 @@ import (
"reflect"
"sync/atomic"
+ "github.com/vdaas/vald/apis/grpc/v1/mirror"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/errors"
@@ -35,6 +36,11 @@ import (
"github.com/vdaas/vald/pkg/gateway/mirror/service"
)
+type Server interface {
+ vald.Server
+ mirror.MirrorServer
+}
+
type server struct {
eg errgroup.Group
gateway service.Gateway // Mirror gateway client service.
@@ -43,13 +49,14 @@ type server struct {
streamConcurrency int
name string
ip string
- vald.UnimplementedValdServerWithMirror
+ vald.UnimplementedValdServer
+ mirror.UnimplementedMirrorServer
}
const apiName = "vald/gateway/mirror"
// New returns a Vald server as gRPC handler with mirror using the provided options.
-func New(opts ...Option) (vald.ServerWithMirror, error) {
+func New(opts ...Option) (Server, error) {
s := new(server)
for _, opt := range append(defaultOptions, opts...) {
if err := opt(s); err != nil {
@@ -69,7 +76,7 @@ func New(opts ...Option) (vald.ServerWithMirror, error) {
// The function connects to the mirror using the provided targets, and if successful,
// returns the addresses of connected Mirror gateways.
func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*payload.Mirror_Targets, error) {
- ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), apiName+"/"+vald.RegisterRPCName)
+ ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+mirror.RPCServiceName+"/"+mirror.RegisterRPCName), apiName+"/"+mirror.RegisterRPCName)
defer func() {
if span != nil {
span.End()
@@ -81,7 +88,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
ServingData: errdetails.Serialize(req),
}
resInfo := &errdetails.ResourceInfo{
- ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
+ ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}
var attrs trace.Attributes
@@ -89,22 +96,22 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
- vald.RegisterRPCName+" API canceld", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API canceld", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithCanceled(
- vald.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
- vald.RegisterRPCName+" API connection not found", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
case errors.Is(err, errors.ErrTargetNotFound):
err = status.WrapWithInvalidArgument(
- vald.RegisterRPCName+" API target not found", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API target not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInvalidArgument(err.Error())
default:
@@ -113,7 +120,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
- "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
+ "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
}
@@ -129,7 +136,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
// Get own address and the addresses of other mirror gateways to which this gateway is currently connected.
tgts, err := s.mirror.MirrorTargets(ctx)
if err != nil {
- err = status.WrapWithInternal(vald.RegisterRPCName+" API failed to get connected vald gateway targets", err,
+ err = status.WrapWithInternal(mirror.RegisterRPCName+" API failed to get connected vald gateway targets", err,
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
@@ -139,7 +146,7 @@ func (s *server) Register(ctx context.Context, req *payload.Mirror_Targets) (*pa
},
},
&errdetails.ResourceInfo{
- ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
+ ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
},
)
@@ -165,7 +172,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
id, err = vc.Exists(ctx, meta, copts...)
return id, err
})
@@ -230,7 +237,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.Search(ctx, req, copts...)
return res, err
})
@@ -298,7 +305,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest)
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.SearchByID(ctx, req, copts...)
return res, err
})
@@ -464,7 +471,7 @@ func (s *server) MultiSearch(ctx context.Context, req *payload.Search_MultiReque
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiSearch(ctx, req, copts...)
return res, err
})
@@ -529,7 +536,7 @@ func (s *server) MultiSearchByID(ctx context.Context, req *payload.Search_MultiI
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiSearchByID(ctx, req, copts...)
return res, err
})
@@ -594,7 +601,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request)
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.LinearSearch(ctx, req, copts...)
return res, err
})
@@ -662,7 +669,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.LinearSearchByID(ctx, req, copts...)
return res, err
})
@@ -831,7 +838,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, req *payload.Search_Mult
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiLinearSearch(ctx, req, copts...)
return res, err
})
@@ -896,7 +903,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, req *payload.Search_
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiLinearSearchByID(ctx, req, copts...)
return res, err
})
@@ -968,7 +975,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if s.isProxied(ctx) {
loc, err = s.doInsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
- _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Insert(ctx, req, copts...)
return loc, err
})
@@ -1016,7 +1023,7 @@ func (s *server) handleInsert(ctx context.Context, req *payload.Insert_Request)
Uuid: req.GetVector().GetId(),
Ips: make([]string, 0),
}
- err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.InsertRPCName+"/"+target)
defer func() {
if span != nil {
@@ -1180,7 +1187,7 @@ func (s *server) handleInsertResult( // skipcq: GO-R1005
Ips: make([]string, 0),
}
- err = s.gateway.DoMulti(ctx, alreadyExistsTgts, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.DoMulti(ctx, alreadyExistsTgts, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "DoMulti/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target)
defer func() {
if span != nil {
@@ -1512,7 +1519,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if s.isProxied(ctx) {
loc, err = s.doUpdate(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
- _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Update(ctx, req, copts...)
return loc, err
})
@@ -1560,7 +1567,7 @@ func (s *server) handleUpdate(ctx context.Context, req *payload.Update_Request)
Uuid: req.GetVector().GetId(),
Ips: make([]string, 0),
}
- err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpdateRPCName+"/"+target)
defer func() {
if span != nil {
@@ -1739,7 +1746,7 @@ func (s *server) handleUpdateResult( // skipcq: GO-R1005
Ips: make([]string, 0),
}
- err = s.gateway.DoMulti(ctx, notFoundTgts, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.DoMulti(ctx, notFoundTgts, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.InsertRPCName+"/"+target)
defer func() {
if span != nil {
@@ -2086,7 +2093,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if s.isProxied(ctx) {
loc, err = s.doUpsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
- s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Upsert(ctx, req, copts...)
return loc, err
})
@@ -2135,7 +2142,7 @@ func (s *server) handleUpsert(ctx context.Context, req *payload.Upsert_Request)
Uuid: req.GetVector().GetId(),
Ips: make([]string, 0),
}
- err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.UpsertRPCName+"/"+target)
defer func() {
if span != nil {
@@ -2466,7 +2473,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if s.isProxied(ctx) {
loc, err = s.doRemove(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
- s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Remove(ctx, req, copts...)
return loc, err
})
@@ -2514,7 +2521,7 @@ func (s *server) handleRemove(ctx context.Context, req *payload.Remove_Request)
Uuid: req.GetId().GetId(),
Ips: make([]string, 0),
}
- err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveRPCName+"/"+target)
defer func() {
if span != nil {
@@ -2841,7 +2848,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if s.isProxied(ctx) {
locs, err = s.doRemoveByTimestamp(ctx, req, func(ctx context.Context) (*payload.Object_Locations, error) {
- _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
locs, err = vc.RemoveByTimestamp(ctx, req, copts...)
return locs, err
})
@@ -2887,7 +2894,7 @@ func (s *server) handleRemoveByTimestamp(ctx context.Context, req *payload.Remov
var result sync.Map[string, *errorState] // map[target host: error state]
locs = new(payload.Object_Locations)
- err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveByTimestampRPCName+"/"+target)
defer func() {
if span != nil {
@@ -3085,7 +3092,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques
}
}()
- _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
+ _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc service.MirrorClient, copts ...grpc.CallOption) (interface{}, error) {
vec, err = vc.GetObject(ctx, req, copts...)
return vec, err
})
@@ -3200,7 +3207,7 @@ func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald.
}
}()
- _, err := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (obj interface{}, err error) {
+ _, err := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) (obj interface{}, err error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "Do/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target)
defer func() {
if span != nil {
diff --git a/pkg/gateway/mirror/handler/grpc/handler_test.go b/pkg/gateway/mirror/handler/grpc/handler_test.go
index fb55eb1b53c..84a24a1dd3a 100644
--- a/pkg/gateway/mirror/handler/grpc/handler_test.go
+++ b/pkg/gateway/mirror/handler/grpc/handler_test.go
@@ -18,6 +18,7 @@ import (
"reflect"
"testing"
+ "github.com/vdaas/vald/apis/grpc/v1/mirror"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/errors"
@@ -42,14 +43,15 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
req *payload.Insert_Request
}
type fields struct {
- eg errgroup.Group
- gateway service.Gateway
- mirror service.Mirror
- vAddr string
- streamConcurrency int
- name string
- ip string
- UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror
+ eg errgroup.Group
+ gateway service.Gateway
+ mirror service.Mirror
+ vAddr string
+ streamConcurrency int
+ name string
+ ip string
+ UnimplementedValdServer vald.UnimplementedValdServer
+ UnimplementedMirrorServer mirror.UnimplementedMirrorServer
}
type want struct {
wantCe *payload.Object_Location
@@ -90,7 +92,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -120,7 +122,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -152,7 +154,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return &payload.Object_Location{
@@ -188,13 +190,13 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.ErrTargetNotFound
@@ -226,7 +228,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return &payload.Object_Location{
@@ -262,13 +264,13 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.New("target not found")
@@ -300,7 +302,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.AlreadyExists, errors.ErrMetaDataAlreadyExists(uuid).Error())
@@ -330,7 +332,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -359,7 +361,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -389,7 +391,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -414,7 +416,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error())
@@ -444,7 +446,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -472,7 +474,7 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
InsertFunc: func(_ context.Context, _ *payload.Insert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return &payload.Object_Location{
@@ -508,13 +510,13 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.New("target not found")
@@ -553,14 +555,15 @@ func Test_server_Insert(t *testing.T) { // skipcq: GO-R1005
checkFunc = defaultCheckFunc
}
s := &server{
- eg: test.fields.eg,
- gateway: test.fields.gateway,
- mirror: test.fields.mirror,
- vAddr: test.fields.vAddr,
- streamConcurrency: test.fields.streamConcurrency,
- name: test.fields.name,
- ip: test.fields.ip,
- UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror,
+ eg: test.fields.eg,
+ gateway: test.fields.gateway,
+ mirror: test.fields.mirror,
+ vAddr: test.fields.vAddr,
+ streamConcurrency: test.fields.streamConcurrency,
+ name: test.fields.name,
+ ip: test.fields.ip,
+ UnimplementedValdServer: test.fields.UnimplementedValdServer,
+ UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer,
}
gotCe, err := s.Insert(test.args.ctx, test.args.req)
@@ -582,14 +585,15 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
req *payload.Update_Request
}
type fields struct {
- eg errgroup.Group
- gateway service.Gateway
- mirror service.Mirror
- vAddr string
- streamConcurrency int
- name string
- ip string
- UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror
+ eg errgroup.Group
+ gateway service.Gateway
+ mirror service.Mirror
+ vAddr string
+ streamConcurrency int
+ name string
+ ip string
+ UnimplementedValdServer vald.UnimplementedValdServer
+ UnimplementedMirrorServer mirror.UnimplementedMirrorServer
}
type want struct {
wantLoc *payload.Object_Location
@@ -630,7 +634,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -660,7 +664,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -692,7 +696,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -722,7 +726,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -754,7 +758,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02", "vald-03",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -792,13 +796,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.ErrTargetNotFound
@@ -836,7 +840,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02", "vald-03",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -874,13 +878,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.ErrTargetNotFound
@@ -914,7 +918,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.NotFound, errors.ErrObjectIDNotFound(uuid).Error())
@@ -944,7 +948,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -973,7 +977,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1003,7 +1007,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1028,7 +1032,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error())
@@ -1058,7 +1062,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1086,7 +1090,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02", "vald-03",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.AlreadyExists, errors.ErrMetaDataAlreadyExists(uuid).Error())
@@ -1124,13 +1128,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.ErrTargetNotFound
@@ -1163,7 +1167,7 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
targets := []string{
"vald-01", "vald-02", "vald-03",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpdateFunc: func(_ context.Context, _ *payload.Update_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1201,13 +1205,13 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
return nil
},
- DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ DoMultiFunc: func(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, target := range targets {
if c, ok := cmap[target]; !ok {
return errors.New("target not found")
@@ -1246,14 +1250,15 @@ func Test_server_Update(t *testing.T) { // skipcq: GO-R1005
checkFunc = defaultCheckFunc
}
s := &server{
- eg: test.fields.eg,
- gateway: test.fields.gateway,
- mirror: test.fields.mirror,
- vAddr: test.fields.vAddr,
- streamConcurrency: test.fields.streamConcurrency,
- name: test.fields.name,
- ip: test.fields.ip,
- UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror,
+ eg: test.fields.eg,
+ gateway: test.fields.gateway,
+ mirror: test.fields.mirror,
+ vAddr: test.fields.vAddr,
+ streamConcurrency: test.fields.streamConcurrency,
+ name: test.fields.name,
+ ip: test.fields.ip,
+ UnimplementedValdServer: test.fields.UnimplementedValdServer,
+ UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer,
}
gotLoc, err := s.Update(test.args.ctx, test.args.req)
@@ -1275,14 +1280,15 @@ func Test_server_Upsert(t *testing.T) {
req *payload.Upsert_Request
}
type fields struct {
- eg errgroup.Group
- gateway service.Gateway
- mirror service.Mirror
- vAddr string
- streamConcurrency int
- name string
- ip string
- UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror
+ eg errgroup.Group
+ gateway service.Gateway
+ mirror service.Mirror
+ vAddr string
+ streamConcurrency int
+ name string
+ ip string
+ UnimplementedValdServer vald.UnimplementedValdServer
+ UnimplementedMirrorServer mirror.UnimplementedMirrorServer
}
type want struct {
wantLoc *payload.Object_Location
@@ -1323,7 +1329,7 @@ func Test_server_Upsert(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1353,7 +1359,7 @@ func Test_server_Upsert(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1385,7 +1391,7 @@ func Test_server_Upsert(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1415,7 +1421,7 @@ func Test_server_Upsert(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ service.MirrorClient, _ ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1440,7 +1446,7 @@ func Test_server_Upsert(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.AlreadyExists, errors.ErrMetaDataAlreadyExists(uuid).Error())
@@ -1470,7 +1476,7 @@ func Test_server_Upsert(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(_ context.Context, _ string, _ service.MirrorClient, _ ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1499,7 +1505,7 @@ func Test_server_Upsert(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1529,7 +1535,7 @@ func Test_server_Upsert(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1554,7 +1560,7 @@ func Test_server_Upsert(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
UpsertFunc: func(_ context.Context, _ *payload.Upsert_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error())
@@ -1584,7 +1590,7 @@ func Test_server_Upsert(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1622,14 +1628,15 @@ func Test_server_Upsert(t *testing.T) {
checkFunc = defaultCheckFunc
}
s := &server{
- eg: test.fields.eg,
- gateway: test.fields.gateway,
- mirror: test.fields.mirror,
- vAddr: test.fields.vAddr,
- streamConcurrency: test.fields.streamConcurrency,
- name: test.fields.name,
- ip: test.fields.ip,
- UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror,
+ eg: test.fields.eg,
+ gateway: test.fields.gateway,
+ mirror: test.fields.mirror,
+ vAddr: test.fields.vAddr,
+ streamConcurrency: test.fields.streamConcurrency,
+ name: test.fields.name,
+ ip: test.fields.ip,
+ UnimplementedValdServer: test.fields.UnimplementedValdServer,
+ UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer,
}
gotLoc, err := s.Upsert(test.args.ctx, test.args.req)
@@ -1651,14 +1658,15 @@ func Test_server_Remove(t *testing.T) {
req *payload.Remove_Request
}
type fields struct {
- eg errgroup.Group
- gateway service.Gateway
- mirror service.Mirror
- vAddr string
- streamConcurrency int
- name string
- ip string
- UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror
+ eg errgroup.Group
+ gateway service.Gateway
+ mirror service.Mirror
+ vAddr string
+ streamConcurrency int
+ name string
+ ip string
+ UnimplementedValdServer vald.UnimplementedValdServer
+ UnimplementedMirrorServer mirror.UnimplementedMirrorServer
}
type want struct {
wantLoc *payload.Object_Location
@@ -1699,7 +1707,7 @@ func Test_server_Remove(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1728,7 +1736,7 @@ func Test_server_Remove(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1760,7 +1768,7 @@ func Test_server_Remove(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1789,7 +1797,7 @@ func Test_server_Remove(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1818,7 +1826,7 @@ func Test_server_Remove(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return loc, nil
@@ -1847,7 +1855,7 @@ func Test_server_Remove(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1872,7 +1880,7 @@ func Test_server_Remove(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error())
@@ -1901,7 +1909,7 @@ func Test_server_Remove(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1929,7 +1937,7 @@ func Test_server_Remove(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveFunc: func(_ context.Context, _ *payload.Remove_Request, _ ...grpc.CallOption) (*payload.Object_Location, error) {
return nil, status.Error(codes.NotFound, errors.ErrIndexNotFound.Error())
@@ -1958,7 +1966,7 @@ func Test_server_Remove(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -1993,14 +2001,15 @@ func Test_server_Remove(t *testing.T) {
checkFunc = defaultCheckFunc
}
s := &server{
- eg: test.fields.eg,
- gateway: test.fields.gateway,
- mirror: test.fields.mirror,
- vAddr: test.fields.vAddr,
- streamConcurrency: test.fields.streamConcurrency,
- name: test.fields.name,
- ip: test.fields.ip,
- UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror,
+ eg: test.fields.eg,
+ gateway: test.fields.gateway,
+ mirror: test.fields.mirror,
+ vAddr: test.fields.vAddr,
+ streamConcurrency: test.fields.streamConcurrency,
+ name: test.fields.name,
+ ip: test.fields.ip,
+ UnimplementedValdServer: test.fields.UnimplementedValdServer,
+ UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer,
}
gotLoc, err := s.Remove(test.args.ctx, test.args.req)
@@ -2021,14 +2030,15 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
req *payload.Remove_TimestampRequest
}
type fields struct {
- eg errgroup.Group
- gateway service.Gateway
- mirror service.Mirror
- vAddr string
- streamConcurrency int
- name string
- ip string
- UnimplementedValdServerWithMirror vald.UnimplementedValdServerWithMirror
+ eg errgroup.Group
+ gateway service.Gateway
+ mirror service.Mirror
+ vAddr string
+ streamConcurrency int
+ name string
+ ip string
+ UnimplementedValdServer vald.UnimplementedValdServer
+ UnimplementedMirrorServer mirror.UnimplementedMirrorServer
}
type want struct {
wantLocs *payload.Object_Locations
@@ -2076,7 +2086,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) {
return &payload.Object_Locations{
@@ -2108,7 +2118,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -2142,7 +2152,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) {
return &payload.Object_Locations{
@@ -2170,7 +2180,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -2198,7 +2208,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) {
return nil, status.Error(codes.Internal, errors.ErrCircuitBreakerHalfOpenFlowLimitation.Error())
@@ -2222,7 +2232,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -2251,7 +2261,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
targets := []string{
"vald-01", "vald-02",
}
- cmap := map[string]vald.ClientWithMirror{
+ cmap := map[string]service.MirrorClient{
targets[0]: &clientmock.MirrorClientMock{
RemoveByTimestampFunc: func(_ context.Context, _ *payload.Remove_TimestampRequest, _ ...grpc.CallOption) (*payload.Object_Locations, error) {
return nil, status.Error(codes.NotFound, errors.ErrObjectIDNotFound(uuid1).Error())
@@ -2275,7 +2285,7 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
FromForwardedContextFunc: func(_ context.Context) string {
return ""
},
- BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error {
+ BroadCastFunc: func(ctx context.Context, f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error {
for _, tgt := range targets {
f(ctx, tgt, cmap[tgt])
}
@@ -2310,14 +2320,15 @@ func Test_server_RemoveByTimestamp(t *testing.T) {
checkFunc = defaultCheckFunc
}
s := &server{
- eg: test.fields.eg,
- gateway: test.fields.gateway,
- mirror: test.fields.mirror,
- vAddr: test.fields.vAddr,
- streamConcurrency: test.fields.streamConcurrency,
- name: test.fields.name,
- ip: test.fields.ip,
- UnimplementedValdServerWithMirror: test.fields.UnimplementedValdServerWithMirror,
+ eg: test.fields.eg,
+ gateway: test.fields.gateway,
+ mirror: test.fields.mirror,
+ vAddr: test.fields.vAddr,
+ streamConcurrency: test.fields.streamConcurrency,
+ name: test.fields.name,
+ ip: test.fields.ip,
+ UnimplementedValdServer: test.fields.UnimplementedValdServer,
+ UnimplementedMirrorServer: test.fields.UnimplementedMirrorServer,
}
gotLocs, err := s.RemoveByTimestamp(test.args.ctx, test.args.req)
diff --git a/pkg/gateway/mirror/handler/grpc/mock_test.go b/pkg/gateway/mirror/handler/grpc/mock_test.go
index a09cbb567a8..59ab5564a4f 100644
--- a/pkg/gateway/mirror/handler/grpc/mock_test.go
+++ b/pkg/gateway/mirror/handler/grpc/mock_test.go
@@ -16,7 +16,6 @@ package grpc
import (
"context"
- "github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/pkg/gateway/mirror/service"
)
@@ -28,9 +27,9 @@ type gatewayMock struct {
ForwardedContextFunc func(ctx context.Context, podName string) context.Context
FromForwardedContextFunc func(ctx context.Context) string
BroadCastFunc func(ctx context.Context,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
+ f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error
DoMultiFunc func(ctx context.Context, targets []string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
+ f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error) error
}
func (gm *gatewayMock) ForwardedContext(ctx context.Context, podName string) context.Context {
@@ -42,13 +41,13 @@ func (gm *gatewayMock) FromForwardedContext(ctx context.Context) string {
}
func (gm *gatewayMock) BroadCast(ctx context.Context,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error,
+ f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error,
) error {
return gm.BroadCastFunc(ctx, f)
}
func (gm *gatewayMock) DoMulti(ctx context.Context, targets []string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error,
+ f func(ctx context.Context, target string, vc service.MirrorClient, copts ...grpc.CallOption) error,
) error {
return gm.DoMultiFunc(ctx, targets, f)
}
diff --git a/pkg/gateway/mirror/handler/rest/handler.go b/pkg/gateway/mirror/handler/rest/handler.go
index 70ed6dbff7a..105cd3ec6f5 100644
--- a/pkg/gateway/mirror/handler/rest/handler.go
+++ b/pkg/gateway/mirror/handler/rest/handler.go
@@ -17,9 +17,9 @@ import (
"net/http"
"github.com/vdaas/vald/apis/grpc/v1/payload"
- "github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/net/http/dump"
"github.com/vdaas/vald/internal/net/http/json"
+ "github.com/vdaas/vald/pkg/gateway/mirror/handler/grpc"
)
// Handler represents an interface for rest handler.
@@ -48,7 +48,7 @@ type Handler interface {
}
type handler struct {
- vald vald.ServerWithMirror
+ vald grpc.Server
}
// New returns a Vald server as rest handler with mirror using the provided options.
diff --git a/pkg/gateway/mirror/handler/rest/option.go b/pkg/gateway/mirror/handler/rest/option.go
index 56272ff02e5..f2287b16ebc 100644
--- a/pkg/gateway/mirror/handler/rest/option.go
+++ b/pkg/gateway/mirror/handler/rest/option.go
@@ -13,15 +13,13 @@
// limitations under the License.
package rest
-import (
- "github.com/vdaas/vald/apis/grpc/v1/vald"
-)
+import "github.com/vdaas/vald/pkg/gateway/mirror/handler/grpc"
type Option func(*handler)
var defaultOptions = []Option{}
-func WithVald(v vald.ServerWithMirror) Option {
+func WithVald(v grpc.Server) Option {
return func(h *handler) {
h.vald = v
}
diff --git a/pkg/gateway/mirror/service/gateway.go b/pkg/gateway/mirror/service/gateway.go
index 632f9db8a67..d2a8b9dbd80 100644
--- a/pkg/gateway/mirror/service/gateway.go
+++ b/pkg/gateway/mirror/service/gateway.go
@@ -17,7 +17,6 @@ import (
"context"
"reflect"
- "github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/client/v1/client/mirror"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
@@ -36,11 +35,11 @@ type Gateway interface {
ForwardedContext(ctx context.Context, podName string) context.Context
FromForwardedContext(ctx context.Context) string
BroadCast(ctx context.Context,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error
Do(ctx context.Context, target string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error)
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error)
DoMulti(ctx context.Context, targets []string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error
GRPCClient() grpc.Client
}
@@ -105,7 +104,7 @@ func (*gateway) FromForwardedContext(ctx context.Context) string {
// to interact with gRPC clients for multiple targets.
// The provided function should handle the communication logic for a target.
func (g *gateway) BroadCast(ctx context.Context,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error,
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error,
) (err error) {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.BroadCast")
defer func() {
@@ -120,7 +119,7 @@ func (g *gateway) BroadCast(ctx context.Context,
case <-ictx.Done():
return nil
default:
- return f(ictx, addr, vald.NewValdClientWithMirror(conn), copts...)
+ return f(ictx, addr, NewMirrorClient(conn), copts...)
}
})
}
@@ -129,7 +128,7 @@ func (g *gateway) BroadCast(ctx context.Context,
// It returns the result of the operation and any associated error.
// The provided function should handle the communication logic for a target.
func (g *gateway) Do(ctx context.Context, target string,
- f func(ctx context.Context, addr string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error),
+ f func(ctx context.Context, addr string, vc MirrorClient, copts ...grpc.CallOption) (interface{}, error),
) (res interface{}, err error) {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.Do")
defer func() {
@@ -143,7 +142,7 @@ func (g *gateway) Do(ctx context.Context, target string,
}
return g.client.GRPCClient().Do(g.ForwardedContext(ctx, g.podName), target,
func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
- return f(ictx, target, vald.NewValdClientWithMirror(conn), copts...)
+ return f(ictx, target, NewMirrorClient(conn), copts...)
},
)
}
@@ -152,7 +151,7 @@ func (g *gateway) Do(ctx context.Context, target string,
// It returns an error if any of the operations fails.
// The provided function should handle the communication logic for a target.
func (g *gateway) DoMulti(ctx context.Context, targets []string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error,
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error,
) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.DoMulti")
defer func() {
@@ -170,7 +169,7 @@ func (g *gateway) DoMulti(ctx context.Context, targets []string,
case <-ictx.Done():
return nil
default:
- return f(ictx, addr, vald.NewValdClientWithMirror(conn), copts...)
+ return f(ictx, addr, NewMirrorClient(conn), copts...)
}
},
)
diff --git a/pkg/gateway/mirror/service/gateway_mock_test.go b/pkg/gateway/mirror/service/gateway_mock_test.go
index 9bd0641033d..baaed0ae4c0 100644
--- a/pkg/gateway/mirror/service/gateway_mock_test.go
+++ b/pkg/gateway/mirror/service/gateway_mock_test.go
@@ -16,7 +16,6 @@ package service
import (
"context"
- "github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/net/grpc"
)
@@ -26,11 +25,11 @@ type GatewayMock struct {
ForwardedContextFunc func(ctx context.Context, podName string) context.Context
FromForwardedContextFunc func(ctx context.Context) string
BroadCastFunc func(ctx context.Context,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error
DoFunc func(ctx context.Context, target string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error)
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error)
DoMultiFunc func(ctx context.Context, targets []string,
- f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
+ f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error
GRPCClientFunc func() grpc.Client
}
@@ -46,21 +45,21 @@ func (gm *GatewayMock) FromForwardedContext(ctx context.Context) string {
// BroadCast calls BroadCastFunc object.
func (gm *GatewayMock) BroadCast(ctx context.Context,
- f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error,
+ f func(_ context.Context, _ string, _ MirrorClient, _ ...grpc.CallOption) error,
) error {
return gm.BroadCastFunc(ctx, f)
}
// Do calls DoFunc object.
func (gm *GatewayMock) Do(ctx context.Context, target string,
- f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) (interface{}, error),
+ f func(_ context.Context, _ string, _ MirrorClient, _ ...grpc.CallOption) (interface{}, error),
) (interface{}, error) {
return gm.DoFunc(ctx, target, f)
}
// DoMulti calls DoMultiFunc object.
func (gm *GatewayMock) DoMulti(ctx context.Context, targets []string,
- f func(_ context.Context, _ string, _ vald.ClientWithMirror, _ ...grpc.CallOption) error,
+ f func(_ context.Context, _ string, _ MirrorClient, _ ...grpc.CallOption) error,
) error {
return gm.DoMultiFunc(ctx, targets, f)
}
diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go
index 892ef39f1a0..c23eb8d510a 100644
--- a/pkg/gateway/mirror/service/mirror.go
+++ b/pkg/gateway/mirror/service/mirror.go
@@ -18,6 +18,7 @@ import (
"reflect"
"time"
+ "github.com/vdaas/vald/apis/grpc/v1/mirror"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/errors"
@@ -45,6 +46,23 @@ type Mirror interface {
RangeMirrorAddr(f func(addr string, _ any) bool)
}
+type MirrorClient interface {
+ vald.Client
+ mirror.MirrorClient
+}
+
+type client struct {
+ vald.Client
+ mirror.MirrorClient
+}
+
+func NewMirrorClient(conn *grpc.ClientConn) MirrorClient {
+ return &client{
+ Client: vald.NewValdClient(conn),
+ MirrorClient: mirror.NewMirrorClient(conn),
+ }
+}
+
type mirr struct {
addrl sync.Map[string, any] // List of all connected addresses
selfMirrTgts []*payload.Mirror_Target // Targets of self mirror gateway
@@ -151,7 +169,7 @@ func (m *mirr) Start(ctx context.Context) <-chan error { // skipcq: GO-R1005
}
func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*payload.Mirror_Target, error) { // skipcq: GO-R1005
- ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.MirrorRPCServiceName+"/"+vald.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers")
+ ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+mirror.RPCServiceName+"/"+mirror.RegisterRPCName), "vald/gateway/mirror/service/Mirror.registers")
defer func() {
if span != nil {
span.End()
@@ -162,14 +180,14 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*
ServingData: errdetails.Serialize(tgts),
}
resInfo := &errdetails.ResourceInfo{
- ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RegisterRPCName,
+ ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName,
}
resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets()))
exists := make(map[string]bool)
var result sync.Map[string, error] // map[target host: error]
var mu sync.Mutex
- err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(ctx), func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error {
+ err := m.gateway.DoMulti(ctx, m.connectedOtherMirrorAddrs(ctx), func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Mirror.registers/"+target)
defer func() {
if span != nil {
@@ -183,22 +201,22 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*
switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
- vald.RegisterRPCName+" API canceld", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API canceld", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithCanceled(
- vald.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
- vald.RegisterRPCName+" API connection not found", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
case errors.Is(err, errors.ErrTargetNotFound):
err = status.WrapWithInvalidArgument(
- vald.RegisterRPCName+" API target not found", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API target not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInvalidArgument(err.Error())
default:
@@ -207,7 +225,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
- "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
+ "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
@@ -251,14 +269,14 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*
})
result.Range(func(target string, rerr error) bool {
if rerr != nil {
- err = errors.Join(err, errors.Wrapf(rerr, "failed to "+vald.RegisterRPCName+" API to %s", target))
+ err = errors.Join(err, errors.Wrapf(rerr, "failed to "+mirror.RegisterRPCName+" API to %s", target))
}
return true
})
if err != nil {
if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) {
err = status.WrapWithInternal(
- vald.RegisterRPCName+" API connection not found", err, reqInfo, resInfo,
+ mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo,
)
log.Warn(err)
if span != nil {
@@ -270,7 +288,7 @@ func (m *mirr) registers(ctx context.Context, tgts *payload.Mirror_Targets) ([]*
}
st, msg, err := status.ParseError(err, codes.Internal,
- "failed to parse "+vald.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
+ "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo,
)
log.Warn(err)
if span != nil {
diff --git a/pkg/gateway/mirror/usecase/vald.go b/pkg/gateway/mirror/usecase/vald.go
index 7937ba47285..453c7344c4d 100644
--- a/pkg/gateway/mirror/usecase/vald.go
+++ b/pkg/gateway/mirror/usecase/vald.go
@@ -16,8 +16,9 @@ package usecase
import (
"context"
+ "github.com/vdaas/vald/apis/grpc/v1/mirror"
"github.com/vdaas/vald/apis/grpc/v1/vald"
- "github.com/vdaas/vald/internal/client/v1/client/mirror"
+ client "github.com/vdaas/vald/internal/client/v1/client/mirror"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/net/grpc"
@@ -42,7 +43,7 @@ type run struct {
dialer net.Dialer
cfg *config.Data
server starter.Server
- client mirror.Client
+ client client.Client
gateway service.Gateway
mirror service.Mirror
discover service.Discovery
@@ -69,9 +70,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
// skipcq: CRT-D0001
cOpts = append(cOpts, grpc.WithErrGroup(eg))
- client, err := mirror.New(
- mirror.WithAddrs(cfg.Mirror.Client.Addrs...),
- mirror.WithClient(grpc.New(cOpts...)),
+ mClient, err := client.New(
+ client.WithAddrs(cfg.Mirror.Client.Addrs...),
+ client.WithClient(grpc.New(cOpts...)),
)
if err != nil {
return nil, err
@@ -79,13 +80,13 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
gateway, err := service.NewGateway(
service.WithErrGroup(eg),
- service.WithMirrorClient(client),
+ service.WithMirrorClient(mClient),
service.WithPodName(cfg.Mirror.PodName),
)
if err != nil {
return nil, err
}
- mirror, err := service.NewMirror(
+ m, err := service.NewMirror(
service.WithErrorGroup(eg),
service.WithRegisterDuration(cfg.Mirror.RegisterDuration),
service.WithGatewayAddrs(cfg.Mirror.GatewayAddr),
@@ -102,7 +103,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
service.WithDiscoverySelfMirrorAddrs(cfg.Mirror.SelfMirrorAddr),
service.WithDiscoveryColocation(cfg.Mirror.Colocation),
service.WithDiscoveryDialer(dialer),
- service.WithDiscoveryMirror(mirror),
+ service.WithDiscoveryMirror(m),
service.WithDiscoveryErrGroup(eg),
)
if err != nil {
@@ -113,7 +114,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
handler.WithValdAddr(cfg.Mirror.GatewayAddr),
handler.WithErrGroup(eg),
handler.WithGateway(gateway),
- handler.WithMirror(mirror),
+ handler.WithMirror(m),
handler.WithStreamConcurrency(cfg.Server.GetGRPCStreamConcurrency()),
)
if err != nil {
@@ -122,7 +123,8 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
grpcServerOptions := []server.Option{
server.WithGRPCRegistFunc(func(srv *grpc.Server) {
- vald.RegisterValdServerWithMirror(srv, v)
+ vald.RegisterValdServer(srv, v)
+ mirror.RegisterMirrorServer(srv, v)
}),
server.WithPreStopFunction(func() error {
return nil
@@ -135,7 +137,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
cfg.Observability,
bometrics.New(),
cbmetrics.New(),
- mirrmetrics.New(mirror),
+ mirrmetrics.New(m),
)
if err != nil {
return nil, err
@@ -170,9 +172,9 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
dialer: dialer,
cfg: cfg,
server: srv,
- client: client,
+ client: mClient,
gateway: gateway,
- mirror: mirror,
+ mirror: m,
discover: discover,
observability: obs,
}, nil