diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index cd3a43cf7c..232d5d3c6a 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -16,6 +16,7 @@ package grpc import ( "context" "fmt" + "io" "reflect" "github.com/vdaas/vald/apis/grpc/v1/payload" @@ -158,7 +159,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { id, err = vc.Exists(ctx, meta, copts...) return id, err }) @@ -222,7 +223,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.Search(ctx, req, copts...) return res, err }) @@ -289,7 +290,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.SearchByID(ctx, req, copts...) return res, err }) @@ -452,7 +453,7 @@ func (s *server) MultiSearch(ctx context.Context, req *payload.Search_MultiReque } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiSearch(ctx, req, copts...) return res, err }) @@ -516,7 +517,7 @@ func (s *server) MultiSearchByID(ctx context.Context, req *payload.Search_MultiI } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiSearchByID(ctx, req, copts...) return res, err }) @@ -580,7 +581,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.LinearSearch(ctx, req, copts...) return res, err }) @@ -647,7 +648,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.LinearSearchByID(ctx, req, copts...) return res, err }) @@ -813,7 +814,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, req *payload.Search_Mult } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiLinearSearch(ctx, req, copts...) return res, err }) @@ -877,7 +878,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, req *payload.Search_ } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { res, err = vc.MultiLinearSearchByID(ctx, req, copts...) return res, err }) @@ -947,7 +948,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if len(reqSrcPodName) != 0 { loc, err = s.doInsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Insert(ctx, req, copts...) return loc, err }) @@ -1445,7 +1446,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if len(reqSrcPodName) != 0 { loc, err = s.doUpdate(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Update(ctx, req, copts...) return loc, err }) @@ -1973,7 +1974,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if len(reqSrcPodName) != 0 { loc, err = s.doUpsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Upsert(ctx, req, copts...) return loc, err }) @@ -2334,7 +2335,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (loc * // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if len(reqSrcPodName) != 0 { loc, err = s.doRemove(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) { - s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { loc, err = vc.Remove(ctx, req, copts...) return loc, err }) @@ -2690,7 +2691,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time // So this component sends requests only to the Vald gateway (LB gateway) of its own cluster. if len(reqSrcPodName) != 0 { locs, err = s.doRemoveByTimestamp(ctx, req, func(ctx context.Context) (*payload.Object_Locations, error) { - _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { locs, err = vc.RemoveByTimestamp(ctx, req, copts...) return locs, err }) @@ -2923,7 +2924,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques } }() - _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + _, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { vec, err = vc.GetObject(ctx, req, copts...) return vec, err }) @@ -3028,6 +3029,84 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err return nil } +func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald.Object_StreamListObjectServer) error { + ctx, span := trace.StartSpan(grpc.WithGRPCMethod(stream.Context(), vald.PackageName+"."+vald.ObjectRPCServiceName+"/"+vald.StreamListObjectRPCName), apiName+"/"+vald.StreamListObjectRPCName) + defer func() { + if span != nil { + span.End() + } + }() + + _, err := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) { + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "Do/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target) + defer func() { + if span != nil { + span.End() + } + }() + client, err := vc.StreamListObject(ctx, req, copts...) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + eg, egctx := errgroup.WithContext(ctx) + eg.SetLimit(s.streamConcurrency) + + var ( + mu sync.Mutex + emu sync.Mutex + errs = make([]error, 0, s.streamConcurrency) + ) + finalize := func() error { + err := eg.Wait() + if err != nil { + + } + return nil + } + for { + select { + case <-egctx.Done(): + return nil, finalize() + default: + res, err := client.Recv() + if err != nil { + if err != io.EOF && errors.Is(err, io.EOF) { + return nil, finalize() + } + return nil, errors.Join(err, finalize()) + } + if res != nil { + eg.Go(safety.RecoverFunc(func() (err error) { + // TODO: add trace + mu.Lock() + err = stream.Send(res) + mu.Unlock() + if err != nil { + emu.Lock() + errs = append(errs, err) + emu.Unlock() + } + return nil + })) + } + } + } + }) + if err != nil { + if span != nil { + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamListObjectRPCName+" gRPC error response") + span.RecordError(err) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetStatus(trace.StatusError, err.Error()) + } + return err + } + return nil +} + type errorState struct { err error code codes.Code diff --git a/pkg/gateway/mirror/service/gateway.go b/pkg/gateway/mirror/service/gateway.go index f4a91613e4..9699bc4f96 100644 --- a/pkg/gateway/mirror/service/gateway.go +++ b/pkg/gateway/mirror/service/gateway.go @@ -42,7 +42,7 @@ type Gateway interface { BroadCast(ctx context.Context, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error Do(ctx context.Context, target string, - f func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error) + f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error) DoMulti(ctx context.Context, targets []string, f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error GRPCClient() grpc.Client @@ -133,7 +133,7 @@ func (g *gateway) BroadCast(ctx context.Context, // It returns the result of the operation and any associated error. // The provided function should handle the communication logic for a target. func (g *gateway) Do(ctx context.Context, target string, - f func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error), + f func(ctx context.Context, addr string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error), ) (res interface{}, err error) { ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.Do") defer func() { @@ -147,7 +147,7 @@ func (g *gateway) Do(ctx context.Context, target string, } return g.client.GRPCClient().Do(g.ForwardedContext(ctx, g.podName), target, func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - return f(ictx, vald.NewValdClientWithMirror(conn), copts...) + return f(ictx, target, vald.NewValdClientWithMirror(conn), copts...) }, ) }