From 5d12e94703e245830cafadef866c0f4e23d9f513 Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 9 Oct 2024 08:25:46 +0900 Subject: [PATCH] Fix gRPC error handling for mirror-gateway handler (#2665) (#2681) * fix: use FromError function when stream rpc error occurs * fix: delete ParseError of Register RPC error handling * fix: crud rpc error handling * fix: add code comment and refactor multi crud error handling * fix: deleted unnecessary error resource info * fix: typo * fix: status code when the parse is failed * fix: ignore parse success variable * fix: use FromError function for stream error * fix: error status handling * fix: typo --------- Signed-off-by: hlts2 Co-authored-by: Kosuke Morimoto Co-authored-by: Yusuke Kato --- pkg/gateway/mirror/handler/grpc/handler.go | 490 +++++++++++---------- pkg/gateway/mirror/service/mirror.go | 40 +- 2 files changed, 272 insertions(+), 258 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index c424ac6a65..847aaac933 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -117,14 +117,18 @@ func (s *server) Register( ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + err = status.WrapWithInternal( + mirror.RegisterRPCName+" API failed to connect mirror gateway targets", err, reqInfo, resInfo, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "mirror gateway targets", + Description: err.Error(), + }, + }, + }, ) - attrs = trace.FromGRPCStatus(st.Code(), msg) + attrs = trace.StatusCodeInternal(err.Error()) } log.Warn(err) if span != nil { @@ -138,7 +142,7 @@ func (s *server) Register( // Get own address and the addresses of other mirror gateways to which this gateway is currently connected. tgts, err := s.mirror.MirrorTargets(ctx) if err != nil { - err = status.WrapWithInternal(mirror.RegisterRPCName+" API failed to get connected vald gateway targets", err, + err = status.WrapWithInternal(mirror.RegisterRPCName+" API failed to get connected mirror gateway targets", err, &errdetails.BadRequest{ FieldViolations: []*errdetails.BadRequestFieldViolation{ { @@ -386,10 +390,15 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }() res, err := s.Search(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.SearchRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -406,11 +415,15 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamSearchRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamSearchRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -436,10 +449,15 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }() res, err := s.SearchByID(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -456,11 +474,15 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamSearchByIDRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamSearchByIDRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -756,10 +778,15 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }() res, err := s.LinearSearch(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -776,11 +803,15 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamLinearSearchRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamLinearSearchRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -811,10 +842,15 @@ func (s *server) StreamLinearSearchByID( }() res, err := s.LinearSearchByID(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -831,11 +867,15 @@ func (s *server) StreamLinearSearchByID( }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamLinearSearchByIDRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamLinearSearchByIDRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -1002,22 +1042,18 @@ func (s *server) Insert( return loc, errors.Join(derr, err) }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.InsertRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Insert API succeeded to %#v", loc) @@ -1058,24 +1094,16 @@ func (s *server) handleInsert( return vc.Insert(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.InsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -1223,24 +1251,16 @@ func (s *server) handleInsertResult( return vc.Update(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpdateRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + ".DoMulti/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -1428,10 +1448,15 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }() res, err := s.Insert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -1448,10 +1473,15 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamInsertRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamInsertRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -1494,14 +1524,15 @@ func (s *server) MultiInsert( loc, err := s.Insert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -1557,22 +1588,18 @@ func (s *server) Update( return loc, errors.Join(derr, err) }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpdateRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Update API succeeded to %#v", loc) @@ -1613,25 +1640,16 @@ func (s *server) handleUpdate( return vc.Update(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpdateRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -1793,25 +1811,16 @@ func (s *server) handleUpdateResult( return vc.Insert(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.InsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -2013,10 +2022,15 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }() res, err := s.Update(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -2033,10 +2047,15 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamUpdateRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamUpdateRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -2079,14 +2098,15 @@ func (s *server) MultiUpdate( loc, err := s.Update(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -2142,23 +2162,18 @@ func (s *server) Upsert( return loc, err }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpsertRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Upsert API succeeded to %#v", loc) @@ -2199,25 +2214,16 @@ func (s *server) handleUpsert( return vc.Upsert(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -2403,10 +2409,15 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }() res, err := s.Upsert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -2423,10 +2434,15 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamUpsertRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamUpsertRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -2469,14 +2485,15 @@ func (s *server) MultiUpsert( loc, err := s.Upsert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -2532,22 +2549,18 @@ func (s *server) Remove( return loc, err }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetId().GetId(), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Remove API remove succeeded to %#v", loc) @@ -2588,24 +2601,16 @@ func (s *server) handleRemove( return vc.Remove(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetId().GetId(), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -2788,10 +2793,15 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }() res, err := s.Remove(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -2808,10 +2818,15 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamRemoveRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamRemoveRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -2854,14 +2869,15 @@ func (s *server) MultiRemove( loc, err := s.Remove(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetId().GetId(), - ServingData: errdetails.Serialize(req), - }) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -2917,20 +2933,16 @@ func (s *server) RemoveByTimestamp( return locs, errors.Join(derr, err) }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - ServingData: errdetails.Serialize(req), + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), - } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveRPCName+" gRPC error response", reqInfo, resInfo, - ) log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return nil, err @@ -2971,21 +2983,16 @@ func (s *server) handleRemoveByTimestamp( return vc.RemoveByTimestamp(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response", - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -3233,10 +3240,15 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }() res, err := s.GetObject(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamVector{ @@ -3253,10 +3265,15 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamGetObjectRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving ,st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamGetObjectRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -3289,10 +3306,15 @@ func (s *server) StreamListObject( return obj, s.doStreamListObject(ctx, client, stream) }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamListObjectRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamListObjectRPCName+" gRPC error response") + } if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index ce4309b972..773e43035d 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -26,7 +26,6 @@ import ( "github.com/vdaas/vald/internal/net" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/codes" - "github.com/vdaas/vald/internal/net/grpc/errdetails" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/sync" @@ -178,12 +177,6 @@ func (m *mirr) registers( } }() - reqInfo := &errdetails.RequestInfo{ - ServingData: errdetails.Serialize(tgts), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName, - } resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets())) exists := make(map[string]bool) var result sync.Map[string, error] // map[target host: error] @@ -203,37 +196,36 @@ func (m *mirr) registers( switch { case errors.Is(err, context.Canceled): err = status.WrapWithCanceled( - mirror.RegisterRPCName+" API canceld", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API canceled", err, ) attrs = trace.StatusCodeCancelled(err.Error()) case errors.Is(err, context.DeadlineExceeded): err = status.WrapWithCanceled( - mirror.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API deadline exceeded", err, ) attrs = trace.StatusCodeDeadlineExceeded(err.Error()) case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): err = status.WrapWithInternal( - mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, ) attrs = trace.StatusCodeInternal(err.Error()) case errors.Is(err, errors.ErrTargetNotFound): err = status.WrapWithInvalidArgument( - mirror.RegisterRPCName+" API target not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API target not found", err, ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, - ) - attrs = trace.FromGRPCStatus(st.Code(), msg) + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code() and st.Message(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+mirror.RegisterRPCName+" gRPC error response") + } + attrs = trace.FromGRPCStatus(st.Code(), st.Message()) // When the ingress resource is deleted, the controller's default backend results(Unimplemented error) are returned so that the connection should be disconnected. // If it is a different namespace on the same cluster, the connection is automatically disconnected because the net.grpc health check fails. - if st != nil && st.Code() == codes.Unimplemented { + if st.Code() == codes.Unimplemented { host, port, err := net.SplitHostPort(target) if err != nil { log.Warn(err) @@ -247,7 +239,7 @@ func (m *mirr) registers( } } } - log.Error("failed to send Register API to %s\t: %v", target, err) + log.Errorf("failed to send Register API to %s\t: %v", target, err) if span != nil { span.RecordError(err) span.SetAttributes(attrs...) @@ -278,7 +270,7 @@ func (m *mirr) registers( if err != nil { if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) { err = status.WrapWithInternal( - mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, ) log.Warn(err) if span != nil { @@ -288,11 +280,11 @@ func (m *mirr) registers( } return nil, err } + log.Error(err) st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + "failed to parse "+mirror.RegisterRPCName+" gRPC error response", ) - log.Warn(err) if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)