Skip to content

Commit

Permalink
feat: add new handler base logic
Browse files Browse the repository at this point in the history
Signed-off-by: hlts2 <[email protected]>
  • Loading branch information
hlts2 committed Nov 21, 2023
1 parent ee0f48a commit 374dc57
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 18 deletions.
109 changes: 94 additions & 15 deletions pkg/gateway/mirror/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package grpc
import (
"context"
"fmt"
"io"
"reflect"

"github.com/vdaas/vald/apis/grpc/v1/payload"
Expand Down Expand Up @@ -158,7 +159,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
id, err = vc.Exists(ctx, meta, copts...)
return id, err
})
Expand Down Expand Up @@ -222,7 +223,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res *
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.Search(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -289,7 +290,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest)
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.SearchByID(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -452,7 +453,7 @@ func (s *server) MultiSearch(ctx context.Context, req *payload.Search_MultiReque
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiSearch(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -516,7 +517,7 @@ func (s *server) MultiSearchByID(ctx context.Context, req *payload.Search_MultiI
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiSearchByID(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -580,7 +581,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request)
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.LinearSearch(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -647,7 +648,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.LinearSearchByID(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -813,7 +814,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, req *payload.Search_Mult
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiLinearSearch(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -877,7 +878,7 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, req *payload.Search_
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
res, err = vc.MultiLinearSearchByID(ctx, req, copts...)
return res, err
})
Expand Down Expand Up @@ -947,7 +948,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if len(reqSrcPodName) != 0 {
loc, err = s.doInsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
_, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Insert(ctx, req, copts...)
return loc, err
})
Expand Down Expand Up @@ -1445,7 +1446,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if len(reqSrcPodName) != 0 {
loc, err = s.doUpdate(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
_, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Update(ctx, req, copts...)
return loc, err
})
Expand Down Expand Up @@ -1973,7 +1974,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if len(reqSrcPodName) != 0 {
loc, err = s.doUpsert(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Upsert(ctx, req, copts...)
return loc, err
})
Expand Down Expand Up @@ -2334,7 +2335,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (loc *
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if len(reqSrcPodName) != 0 {
loc, err = s.doRemove(ctx, req, func(ctx context.Context) (*payload.Object_Location, error) {
s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
loc, err = vc.Remove(ctx, req, copts...)
return loc, err
})
Expand Down Expand Up @@ -2690,7 +2691,7 @@ func (s *server) RemoveByTimestamp(ctx context.Context, req *payload.Remove_Time
// So this component sends requests only to the Vald gateway (LB gateway) of its own cluster.
if len(reqSrcPodName) != 0 {
locs, err = s.doRemoveByTimestamp(ctx, req, func(ctx context.Context) (*payload.Object_Locations, error) {
_, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, derr := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
locs, err = vc.RemoveByTimestamp(ctx, req, copts...)
return locs, err
})
Expand Down Expand Up @@ -2923,7 +2924,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques
}
}()

_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
_, err = s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, _ string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
vec, err = vc.GetObject(ctx, req, copts...)
return vec, err
})
Expand Down Expand Up @@ -3028,6 +3029,84 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err
return nil
}

func (s *server) StreamListObject(req *payload.Object_List_Request, stream vald.Object_StreamListObjectServer) error {
ctx, span := trace.StartSpan(grpc.WithGRPCMethod(stream.Context(), vald.PackageName+"."+vald.ObjectRPCServiceName+"/"+vald.StreamListObjectRPCName), apiName+"/"+vald.StreamListObjectRPCName)
defer func() {
if span != nil {
span.End()
}
}()

_, err := s.gateway.Do(ctx, s.vAddr, func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "Do/"+target), apiName+"/"+vald.StreamListObjectRPCName+"/"+target)
defer func() {
if span != nil {
span.End()
}
}()
client, err := vc.StreamListObject(ctx, req, copts...)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
eg, egctx := errgroup.WithContext(ctx)
eg.SetLimit(s.streamConcurrency)

var (
mu sync.Mutex
emu sync.Mutex
errs = make([]error, 0, s.streamConcurrency)
)
finalize := func() error {
err := eg.Wait()
if err != nil {

}
return nil
}
for {
select {
case <-egctx.Done():
return nil, finalize()
default:
res, err := client.Recv()
if err != nil {
if err != io.EOF && errors.Is(err, io.EOF) {
return nil, finalize()
}
return nil, errors.Join(err, finalize())
}
if res != nil {
eg.Go(safety.RecoverFunc(func() (err error) {
// TODO: add trace
mu.Lock()
err = stream.Send(res)
mu.Unlock()
if err != nil {
emu.Lock()
errs = append(errs, err)
emu.Unlock()
}
return nil
}))
}
}
}
})
if err != nil {
if span != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamListObjectRPCName+" gRPC error response")
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
}
return nil
}

type errorState struct {
err error
code codes.Code
Expand Down
6 changes: 3 additions & 3 deletions pkg/gateway/mirror/service/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Gateway interface {
BroadCast(ctx context.Context,
f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
Do(ctx context.Context, target string,
f func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error)
f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error)) (interface{}, error)
DoMulti(ctx context.Context, targets []string,
f func(ctx context.Context, target string, vc vald.ClientWithMirror, copts ...grpc.CallOption) error) error
GRPCClient() grpc.Client
Expand Down Expand Up @@ -133,7 +133,7 @@ func (g *gateway) BroadCast(ctx context.Context,
// It returns the result of the operation and any associated error.
// The provided function should handle the communication logic for a target.
func (g *gateway) Do(ctx context.Context, target string,
f func(ctx context.Context, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error),
f func(ctx context.Context, addr string, vc vald.ClientWithMirror, copts ...grpc.CallOption) (interface{}, error),
) (res interface{}, err error) {
ctx, span := trace.StartSpan(ctx, "vald/gateway/mirror/service/Gateway.Do")
defer func() {
Expand All @@ -147,7 +147,7 @@ func (g *gateway) Do(ctx context.Context, target string,
}
return g.client.GRPCClient().Do(g.ForwardedContext(ctx, g.podName), target,
func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
return f(ictx, vald.NewValdClientWithMirror(conn), copts...)
return f(ictx, target, vald.NewValdClientWithMirror(conn), copts...)
},
)
}
Expand Down

0 comments on commit 374dc57

Please sign in to comment.