Skip to content

Commit

Permalink
add RemoveByTimestamp for mirror
Browse files Browse the repository at this point in the history
Signed-off-by: hlts2 <[email protected]>
  • Loading branch information
hlts2 committed Sep 27, 2023
1 parent 388f8ef commit 91deb87
Showing 1 changed file with 202 additions and 40 deletions.
242 changes: 202 additions & 40 deletions pkg/gateway/mirror/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
id, err = vc.Exists(ctx, meta, copts...)
if err != nil {
return nil, err
}
return id, nil
return id, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -235,10 +232,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.Search(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -305,10 +299,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest)

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.SearchByID(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -471,10 +462,7 @@ func (s *server) MultiSearch(ctx context.Context, req *payload.Search_MultiReque

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiSearch(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -538,10 +526,7 @@ func (s *server) MultiSearchByID(ctx context.Context, req *payload.Search_MultiI

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiSearchByID(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -605,10 +590,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request)

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.LinearSearch(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -675,10 +657,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.LinearSearchByID(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -844,10 +823,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, req *payload.Search_Mult

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiLinearSearch(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -911,10 +887,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, req *payload.Search_

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiLinearSearchByID(ctx, req, copts...)
if err != nil {
return nil, err
}
return res, nil
return res, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -983,10 +956,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc *
if len(reqSrcPodName) != 0 {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Insert(ctx, req, copts...)
if err != nil {
return nil, err
}
return loc, nil
return loc, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
Expand Down Expand Up @@ -2959,6 +2929,198 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ
return res, nil
}

func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_TimestampRequest) (locs *payload.Object_Locations, err error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.RemoveRPCServiceName+"/"+vald.RemoveByTimestampRPCName), apiName+"/"+vald.RemoveByTimestampRPCName)
defer func() {
if span != nil {
span.End()
}
}()

reqSrcPodName := s.gateway.FromForwardedContext(ctx)

// When this condition is matched, the request is proxied to another Mirror gateway.
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if len(reqSrcPodName) != 0 {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, opts ...grpc.CallOption) (interface{}, error) {
locs, err = vc.RemoveByTimestamp(ctx, req, opts...)
return locs, err
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
ServingData: errdetails.Serialize(req),
}
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr),
}
var attrs trace.Attributes

switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
vald.RemoveByTimestampRPCName+" API canceld", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithDeadlineExceeded(
vald.RemoveByTimestampRPCName+" API deadline exceeded", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrTargetNotFound):
err = status.WrapWithInternal(
vald.RemoveByTimestampRPCName+" API target not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
vald.RemoveByTimestampRPCName+" API connection not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
default:
var (
st *status.Status
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
"failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
}
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}
return locs, nil
}

var mu sync.Mutex
var result sync.Map[string, error]
locs = new(payload.Object_Locations)

err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.ClientWithMirror, opts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "BroadCast/"+target), apiName+"/"+vald.RemoveByTimestampRPCName+"/"+target)
defer func() {
if span != nil {
span.End()
}
}()

res, err := vc.RemoveByTimestamp(ctx, req, opts...)
if err != nil {
reqInfo := &errdetails.RequestInfo{
ServingData: errdetails.Serialize(req),
}
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}
var attrs trace.Attributes
var code codes.Code

switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
vald.RemoveByTimestampRPCName+" API canceld", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithDeadlineExceeded(
vald.RemoveByTimestampRPCName+" API deadline exceeded", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
vald.RemoveByTimestampRPCName+" API connection not found", err, reqInfo, resInfo,
)
attrs = trace.StatusCodeInternal(err.Error())
default:
var (
st *status.Status
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
"failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response", reqInfo, resInfo,
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
code = st.Code()
}
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
if code == codes.NotFound {
return nil
}
result.Store(target, err)
return err
}
mu.Lock()
locs.Locations = append(locs.Locations, res.GetLocations()...)
mu.Unlock()
return nil
})
if err != nil {
reqInfo := &errdetails.RequestInfo{
ServingData: errdetails.Serialize(req),
}
resInfo := &errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName + ".BroadCast",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}
if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) {
err = status.WrapWithInternal(
vald.RemoveByTimestampRPCName+" API connection not found", err, reqInfo, resInfo,
)
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeInternal(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}

// There is no possibility to reach this part, but we add error handling just in case.
st, msg, err := status.ParseError(err, codes.Internal,
"failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response", reqInfo, resInfo,
)
log.Warn(err)
if err != nil {
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
}
}
return nil, err
}

result.Range(func(_ string, rerr error) bool {
if rerr != nil {
err = errors.Join(err, rerr)
}
return true
})
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal,
"failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response")
if err != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}
return locs, nil
}

func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorRequest) (vec *payload.Object_Vector, err error) {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(ctx, vald.PackageName+"."+vald.ObjectRPCServiceName+"/"+vald.GetObjectRPCName), apiName+"/"+vald.GetObjectRPCName)
defer func() {
Expand Down

0 comments on commit 91deb87

Please sign in to comment.