Skip to content

Commit

Permalink
Merge branch 'main' into refactor/filter-grpc-status-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
kmrmt authored Oct 8, 2024
2 parents 60ae3f9 + a62f834 commit b6422dd
Show file tree
Hide file tree
Showing 18 changed files with 381 additions and 449 deletions.
12 changes: 6 additions & 6 deletions pkg/agent/core/ngt/handler/grpc/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error)
}()
res, err := s.Insert(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Insert gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Object_StreamLocation{
Expand All @@ -181,10 +181,10 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error)
}, nil
})
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamInsert gRPC error response")
if span != nil {
st, _ := status.FromError(err)
if st != nil && span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
Expand Down
90 changes: 30 additions & 60 deletions pkg/agent/core/ngt/handler/grpc/linear_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/vdaas/vald/internal/info"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/errdetails"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
Expand Down Expand Up @@ -344,10 +343,10 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer)
}()
res, err := s.LinearSearch(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse LinearSearch gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Search_StreamResponse{
Expand All @@ -363,11 +362,10 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer)
}, nil
})
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal,
"failed to parse StreamLinearSearch gRPC error response")
if span != nil {
st, _ := status.FromError(err)
if st != nil && span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
Expand All @@ -394,10 +392,10 @@ func (s *server) StreamLinearSearchByID(
}()
res, err := s.LinearSearchByID(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse LinearSearchByID gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Search_StreamResponse{
Expand All @@ -413,10 +411,10 @@ func (s *server) StreamLinearSearchByID(
}, nil
})
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamLinearSearchByID gRPC error response")
if span != nil {
st, _ := status.FromError(err)
if st != nil && span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
Expand Down Expand Up @@ -454,15 +452,10 @@ func (s *server) MultiLinearSearch(
}()
r, err := s.LinearSearch(ctx, query)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal,
"failed to parse LinearSearch gRPC error response",
&errdetails.RequestInfo{
RequestId: query.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(query),
})
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
mu.Lock()
Expand All @@ -480,22 +473,13 @@ func (s *server) MultiLinearSearch(
}
wg.Wait()
if errs != nil {
st, msg, err := status.ParseError(errs, codes.Internal,
"failed to parse MultiLinearSearch gRPC error response",
&errdetails.RequestInfo{
RequestId: strings.Join(rids, ","),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiLinearSearch",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
st, _ := status.FromError(errs)
if st != nil && span != nil {
span.RecordError(errs)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, errs.Error())
}
return nil, err
return nil, errs
}
return res, nil
}
Expand Down Expand Up @@ -530,15 +514,10 @@ func (s *server) MultiLinearSearchByID(
defer wg.Done()
r, err := s.LinearSearchByID(ctx, query)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal,
"failed to parse LinearSearchByID gRPC error response",
&errdetails.RequestInfo{
RequestId: query.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(query),
})
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
mu.Lock()
Expand All @@ -556,22 +535,13 @@ func (s *server) MultiLinearSearchByID(
}
wg.Wait()
if errs != nil {
st, msg, err := status.ParseError(errs, codes.Internal,
"failed to parse MultiLinearSearchByID gRPC error response",
&errdetails.RequestInfo{
RequestId: strings.Join(rids, ","),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiLinearSearchByID",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
st, _ := status.FromError(errs)
if st != nil && span != nil {
span.RecordError(errs)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, errs.Error())
}
return nil, err
return nil, errs
}
return res, nil
}
12 changes: 6 additions & 6 deletions pkg/agent/core/ngt/handler/grpc/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err
}()
res, err := s.GetObject(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse GetObject gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Object_StreamVector{
Expand All @@ -170,10 +170,10 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err
}, nil
})
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamGetObject gRPC error response")
if span != nil {
st, _ := status.FromError(err)
if st != nil && span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
}

Expand Down
36 changes: 13 additions & 23 deletions pkg/agent/core/ngt/handler/grpc/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/vdaas/vald/internal/info"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/errdetails"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
Expand Down Expand Up @@ -157,10 +156,10 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error)
}()
res, err := s.Remove(ctx, req)
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Remove gRPC error response")
if sspan != nil {
st, _ := status.FromError(err)
if st != nil && sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
return &payload.Object_StreamLocation{
Expand All @@ -176,10 +175,10 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error)
}, nil
})
if err != nil {
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamRemove gRPC error response")
if span != nil {
st, _ := status.FromError(err)
if st != nil && span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, err.Error())
}
return err
Expand Down Expand Up @@ -314,23 +313,14 @@ func (s *server) RemoveByTimestamp(
return true
})
if errs != nil {
st, msg, err := status.ParseError(errs, codes.Internal,
"failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response",
&errdetails.RequestInfo{
ServingData: errdetails.Serialize(req),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.Remove",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
},
)
log.Error(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, err.Error())
st, _ := status.FromError(errs)
log.Error(errs)
if st != nil && span != nil {
span.RecordError(errs)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...)
span.SetStatus(trace.StatusError, errs.Error())
}
return nil, err
return nil, errs
}
if locs == nil || len(locs.GetLocations()) == 0 {
err := status.WrapWithNotFound(
Expand Down
Loading

0 comments on commit b6422dd

Please sign in to comment.