From 421e19869c620d9a2b9375d8d1751d037edc8bfd Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 1 Oct 2024 19:46:17 +0900 Subject: [PATCH 01/11] fix: use FromError function when stream rpc error occurs Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 63 +++++++++++++++------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 7065b1e177..3f2ff567cf 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -386,10 +386,13 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }() res, err := s.Search(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -436,10 +439,13 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }() res, err := s.SearchByID(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -756,10 +762,13 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }() res, err := s.LinearSearch(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -811,10 +820,13 @@ func (s *server) StreamLinearSearchByID( }() res, err := s.LinearSearchByID(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Search_StreamResponse{ @@ -1428,10 +1440,13 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }() res, err := s.Insert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -2013,10 +2028,13 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }() res, err := s.Update(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -2403,10 +2421,13 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }() res, err := s.Upsert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -2788,10 +2809,13 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }() res, err := s.Remove(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamLocation{ @@ -3233,10 +3257,13 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }() res, err := s.GetObject(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + st = status.New(codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") + } if sspan != nil { sspan.RecordError(err) - sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) sspan.SetStatus(trace.StatusError, err.Error()) } return &payload.Object_StreamVector{ From 595293e83da6f9271b846de7a5caf6296d636320 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 1 Oct 2024 20:10:15 +0900 Subject: [PATCH 02/11] fix: delete ParseError of Register RPC error handling Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 3f2ff567cf..e9afec1735 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -117,14 +117,18 @@ func (s *server) Register( ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + err = status.WrapWithInternal( + mirror.RegisterRPCName+" API failed to connect mirror gateway targets", err, reqInfo, resInfo, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "mirror gateway targets", + Description: err.Error(), + }, + }, + }, ) - attrs = trace.FromGRPCStatus(st.Code(), msg) + attrs = trace.StatusCodeInternal(err.Error()) } log.Warn(err) if span != nil { @@ -138,7 +142,7 @@ func (s *server) Register( // Get own address and the addresses of other mirror gateways to which this gateway is currently connected. tgts, err := s.mirror.MirrorTargets(ctx) if err != nil { - err = status.WrapWithInternal(mirror.RegisterRPCName+" API failed to get connected vald gateway targets", err, + err = status.WrapWithInternal(mirror.RegisterRPCName+" API failed to get connected mirror gateway targets", err, &errdetails.BadRequest{ FieldViolations: []*errdetails.BadRequestFieldViolation{ { From 3ceb863160c129c34d323bef533a03eb2ed060e0 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 1 Oct 2024 22:40:35 +0900 Subject: [PATCH 03/11] fix: crud rpc error handling Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 276 +++++++-------------- 1 file changed, 90 insertions(+), 186 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index e9afec1735..f303b456a5 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -390,8 +390,9 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }() res, err := s.Search(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") } if sspan != nil { @@ -443,8 +444,9 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }() res, err := s.SearchByID(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") } if sspan != nil { @@ -766,8 +768,9 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }() res, err := s.LinearSearch(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") } if sspan != nil { @@ -824,8 +827,9 @@ func (s *server) StreamLinearSearchByID( }() res, err := s.LinearSearchByID(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") } if sspan != nil { @@ -1018,22 +1022,13 @@ func (s *server) Insert( return loc, errors.Join(derr, err) }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), - } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.InsertRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Insert API succeeded to %#v", loc) @@ -1074,24 +1069,15 @@ func (s *server) handleInsert( return vc.Insert(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.InsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -1239,24 +1225,15 @@ func (s *server) handleInsertResult( return vc.Update(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpdateRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + ".DoMulti/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -1444,8 +1421,9 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }() res, err := s.Insert(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") } if sspan != nil { @@ -1576,22 +1554,13 @@ func (s *server) Update( return loc, errors.Join(derr, err) }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), - } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpdateRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Update API succeeded to %#v", loc) @@ -1632,25 +1601,15 @@ func (s *server) handleUpdate( return vc.Update(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpdateRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -1812,25 +1771,15 @@ func (s *server) handleUpdateResult( return vc.Insert(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.InsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -2032,8 +1981,9 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }() res, err := s.Update(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } if sspan != nil { @@ -2164,23 +2114,13 @@ func (s *server) Upsert( return loc, err }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), - } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpsertRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Upsert API succeeded to %#v", loc) @@ -2221,25 +2161,15 @@ func (s *server) handleUpsert( return vc.Upsert(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.UpsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -2425,8 +2355,9 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }() res, err := s.Upsert(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } if sspan != nil { @@ -2557,22 +2488,13 @@ func (s *server) Remove( return loc, err }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - RequestId: req.GetId().GetId(), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), - } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveRPCName+" gRPC error response", reqInfo, resInfo, - ) - log.Warn(err) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } + log.Warn(err) return nil, err } log.Debugf("Remove API remove succeeded to %#v", loc) @@ -2613,24 +2535,15 @@ func (s *server) handleRemove( return vc.Remove(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetId().GetId(), - }, - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -2813,8 +2726,9 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }() res, err := s.Remove(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } if sspan != nil { @@ -2945,20 +2859,15 @@ func (s *server) RemoveByTimestamp( return locs, errors.Join(derr, err) }) if err != nil { - reqInfo := &errdetails.RequestInfo{ - ServingData: errdetails.Serialize(req), + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, s.vAddr), - } - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveRPCName+" gRPC error response", reqInfo, resInfo, - ) log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return nil, err @@ -2999,21 +2908,15 @@ func (s *server) handleRemoveByTimestamp( return vc.RemoveByTimestamp(ctx, req, copts...) }) if err != nil { - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response", - &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveByTimestampRPCName + ".BroadCast/" + target, - ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), - }, - ) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") + } log.Warn(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } code = st.Code() @@ -3261,8 +3164,9 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }() res, err := s.GetObject(ctx, req) if err != nil { - st, _ := status.FromError(err) - if st == nil || st.Message() == "" { + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") } if sspan != nil { From 51f4a0e2439fbec9a5149dec8b9dcc392aef3a8f Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 1 Oct 2024 23:05:17 +0900 Subject: [PATCH 04/11] fix: add code comment and refactor multi crud error handling Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 57 +++++++++++----------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index f303b456a5..2826823faf 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -392,6 +392,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") } @@ -446,6 +447,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") } @@ -770,6 +772,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") } @@ -829,6 +832,7 @@ func (s *server) StreamLinearSearchByID( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") } @@ -1071,6 +1075,7 @@ func (s *server) handleInsert( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") } @@ -1227,6 +1232,7 @@ func (s *server) handleInsertResult( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } @@ -1423,6 +1429,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") } @@ -1491,14 +1498,10 @@ func (s *server) MultiInsert( loc, err := s.Insert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -1603,6 +1606,7 @@ func (s *server) handleUpdate( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } @@ -1773,6 +1777,7 @@ func (s *server) handleUpdateResult( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") } @@ -1983,6 +1988,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } @@ -2051,14 +2057,10 @@ func (s *server) MultiUpdate( loc, err := s.Update(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -2163,6 +2165,7 @@ func (s *server) handleUpsert( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } @@ -2357,6 +2360,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } @@ -2425,14 +2429,10 @@ func (s *server) MultiUpsert( loc, err := s.Upsert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetVector().GetId(), - ServingData: errdetails.Serialize(req), - }) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -2537,6 +2537,7 @@ func (s *server) handleRemove( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } @@ -2728,6 +2729,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } @@ -2796,14 +2798,10 @@ func (s *server) MultiRemove( loc, err := s.Remove(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response", - &errdetails.RequestInfo{ - RequestId: req.GetId().GetId(), - ServingData: errdetails.Serialize(req), - }) - if span != nil { + st, ok := status.FromError(err) + if ok && st != nil && st.Message() != "" && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } emu.Lock() @@ -2861,6 +2859,7 @@ func (s *server) RemoveByTimestamp( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") } @@ -2910,6 +2909,7 @@ func (s *server) handleRemoveByTimestamp( if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") } @@ -3166,6 +3166,7 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err if err != nil { st, ok := status.FromError(err) if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") } From a41cfac1312e48e412b2bc7b54421d44fcea84b0 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 2 Oct 2024 03:43:13 +0900 Subject: [PATCH 05/11] fix: deleted unnecessary error resource info Signed-off-by: hlts2 --- pkg/gateway/mirror/service/mirror.go | 40 +++++++++++----------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index ce4309b972..224ed3fbf4 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -26,7 +26,6 @@ import ( "github.com/vdaas/vald/internal/net" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/codes" - "github.com/vdaas/vald/internal/net/grpc/errdetails" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/sync" @@ -178,12 +177,6 @@ func (m *mirr) registers( } }() - reqInfo := &errdetails.RequestInfo{ - ServingData: errdetails.Serialize(tgts), - } - resInfo := &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + mirror.RegisterRPCName, - } resTgts := make([]*payload.Mirror_Target, 0, len(tgts.GetTargets())) exists := make(map[string]bool) var result sync.Map[string, error] // map[target host: error] @@ -203,37 +196,36 @@ func (m *mirr) registers( switch { case errors.Is(err, context.Canceled): err = status.WrapWithCanceled( - mirror.RegisterRPCName+" API canceld", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API canceld", err, ) attrs = trace.StatusCodeCancelled(err.Error()) case errors.Is(err, context.DeadlineExceeded): err = status.WrapWithCanceled( - mirror.RegisterRPCName+" API deadline exceeded", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API deadline exceeded", err, ) attrs = trace.StatusCodeDeadlineExceeded(err.Error()) case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): err = status.WrapWithInternal( - mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, ) attrs = trace.StatusCodeInternal(err.Error()) case errors.Is(err, errors.ErrTargetNotFound): err = status.WrapWithInvalidArgument( - mirror.RegisterRPCName+" API target not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API target not found", err, ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: - var ( - st *status.Status - msg string - ) - st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, - ) - attrs = trace.FromGRPCStatus(st.Code(), msg) + st, ok := status.FromError(err) + if !ok || st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code() and st.Message(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Internal, "failed to parse "+mirror.RegisterRPCName+" gRPC error response") + } + attrs = trace.FromGRPCStatus(st.Code(), st.Message()) // When the ingress resource is deleted, the controller's default backend results(Unimplemented error) are returned so that the connection should be disconnected. // If it is a different namespace on the same cluster, the connection is automatically disconnected because the net.grpc health check fails. - if st != nil && st.Code() == codes.Unimplemented { + if st.Code() == codes.Unimplemented { host, port, err := net.SplitHostPort(target) if err != nil { log.Warn(err) @@ -247,7 +239,7 @@ func (m *mirr) registers( } } } - log.Error("failed to send Register API to %s\t: %v", target, err) + log.Errorf("failed to send Register API to %s\t: %v", target, err) if span != nil { span.RecordError(err) span.SetAttributes(attrs...) @@ -278,7 +270,7 @@ func (m *mirr) registers( if err != nil { if errors.Is(err, errors.ErrGRPCClientConnNotFound("*")) { err = status.WrapWithInternal( - mirror.RegisterRPCName+" API connection not found", err, reqInfo, resInfo, + mirror.RegisterRPCName+" API connection not found", err, ) log.Warn(err) if span != nil { @@ -288,11 +280,11 @@ func (m *mirr) registers( } return nil, err } + log.Error(err) st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+mirror.RegisterRPCName+" gRPC error response", reqInfo, resInfo, + "failed to parse "+mirror.RegisterRPCName+" gRPC error response", ) - log.Warn(err) if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) From 184aece4d607171ae7d5bff959ca3318126f505a Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 2 Oct 2024 03:54:16 +0900 Subject: [PATCH 06/11] fix: typo Signed-off-by: hlts2 --- pkg/gateway/mirror/service/mirror.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index 224ed3fbf4..aa289c81d5 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -196,7 +196,7 @@ func (m *mirr) registers( switch { case errors.Is(err, context.Canceled): err = status.WrapWithCanceled( - mirror.RegisterRPCName+" API canceld", err, + mirror.RegisterRPCName+" API canceled", err, ) attrs = trace.StatusCodeCancelled(err.Error()) case errors.Is(err, context.DeadlineExceeded): From 906bcda488f97d231533557c73ce66b5e48ea511 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 2 Oct 2024 14:52:16 +0900 Subject: [PATCH 07/11] fix: status code when the parse is failed Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 32 +++++++++++----------- pkg/gateway/mirror/service/mirror.go | 2 +- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 2826823faf..fdfa5be08d 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -394,7 +394,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.SearchRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -449,7 +449,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -774,7 +774,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -834,7 +834,7 @@ func (s *server) StreamLinearSearchByID( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -1077,7 +1077,7 @@ func (s *server) handleInsert( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -1234,7 +1234,7 @@ func (s *server) handleInsertResult( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -1431,7 +1431,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -1608,7 +1608,7 @@ func (s *server) handleUpdate( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -1779,7 +1779,7 @@ func (s *server) handleUpdateResult( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -1990,7 +1990,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -2167,7 +2167,7 @@ func (s *server) handleUpsert( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -2362,7 +2362,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -2539,7 +2539,7 @@ func (s *server) handleRemove( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -2731,7 +2731,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } if sspan != nil { sspan.RecordError(err) @@ -2861,7 +2861,7 @@ func (s *server) RemoveByTimestamp( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") } log.Warn(err) if span != nil { @@ -2911,7 +2911,7 @@ func (s *server) handleRemoveByTimestamp( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") } log.Warn(err) if span != nil { diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index aa289c81d5..ee9e79b041 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -219,7 +219,7 @@ func (m *mirr) registers( if !ok || st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code() and st.Message(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Internal, "failed to parse "+mirror.RegisterRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+mirror.RegisterRPCName+" gRPC error response") } attrs = trace.FromGRPCStatus(st.Code(), st.Message()) From 31552df5856bf8f409a5f250afd8221d1d3c3caf Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 2 Oct 2024 16:43:48 +0900 Subject: [PATCH 08/11] fix: ignore parse success variable Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 60 +++++++++++----------- pkg/gateway/mirror/service/mirror.go | 4 +- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index fdfa5be08d..15aefc1a94 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -390,8 +390,8 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }() res, err := s.Search(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.SearchRPCName+" gRPC error response") @@ -445,8 +445,8 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }() res, err := s.SearchByID(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") @@ -770,8 +770,8 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }() res, err := s.LinearSearch(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") @@ -830,8 +830,8 @@ func (s *server) StreamLinearSearchByID( }() res, err := s.LinearSearchByID(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") @@ -1073,8 +1073,8 @@ func (s *server) handleInsert( return vc.Insert(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") @@ -1230,8 +1230,8 @@ func (s *server) handleInsertResult( return vc.Update(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") @@ -1427,8 +1427,8 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }() res, err := s.Insert(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") @@ -1775,8 +1775,8 @@ func (s *server) handleUpdateResult( return vc.Insert(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") @@ -1986,8 +1986,8 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }() res, err := s.Update(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") @@ -2163,8 +2163,8 @@ func (s *server) handleUpsert( return vc.Upsert(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") @@ -2358,8 +2358,8 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }() res, err := s.Upsert(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") @@ -2727,8 +2727,8 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }() res, err := s.Remove(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") @@ -2857,8 +2857,8 @@ func (s *server) RemoveByTimestamp( return locs, errors.Join(derr, err) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") @@ -2907,8 +2907,8 @@ func (s *server) handleRemoveByTimestamp( return vc.RemoveByTimestamp(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") @@ -3164,8 +3164,8 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }() res, err := s.GetObject(ctx, req) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index ee9e79b041..773e43035d 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -215,8 +215,8 @@ func (m *mirr) registers( ) attrs = trace.StatusCodeInvalidArgument(err.Error()) default: - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code() and st.Message(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+mirror.RegisterRPCName+" gRPC error response") From f2e676c69432ad9fe37efb262556e61da2d371e2 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 8 Oct 2024 13:50:44 +0900 Subject: [PATCH 09/11] fix: use FromError function for stream error Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 64 ++++++++++------------ 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 75c22acd8c..348a0aee0a 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -415,11 +415,10 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamSearchRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -470,11 +469,10 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamSearchByIDRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -795,11 +793,10 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamLinearSearchRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -855,11 +852,10 @@ func (s *server) StreamLinearSearchByID( }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+vald.StreamLinearSearchByIDRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -1452,10 +1448,10 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamInsertRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -2011,10 +2007,10 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamUpdateRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -2383,10 +2379,10 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamUpsertRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -2752,10 +2748,10 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamRemoveRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -3189,10 +3185,10 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }, ) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamGetObjectRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err @@ -3225,10 +3221,10 @@ func (s *server) StreamListObject( return obj, s.doStreamListObject(ctx, client, stream) }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamListObjectRPCName+" gRPC error response") - if span != nil { + st, _ := status.FromError(err) + if st != nil && span != nil { span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) } return err From a9cbae8eaeb37545509c960de6c1d3cbb65cfbbc Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 8 Oct 2024 14:15:39 +0900 Subject: [PATCH 10/11] fix: error status handling Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 162 ++++++++++++++++----- 1 file changed, 126 insertions(+), 36 deletions(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index 348a0aee0a..b23dd824d8 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -416,7 +416,12 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamSearchRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -470,7 +475,12 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamSearchByIDRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -770,7 +780,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) if err != nil { st, _ := status.FromError(err) if st == nil || st.Message() == "" { - // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") } @@ -794,7 +804,12 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamLinearSearchRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -829,7 +844,7 @@ func (s *server) StreamLinearSearchByID( if err != nil { st, _ := status.FromError(err) if st == nil || st.Message() == "" { - // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") } @@ -853,7 +868,12 @@ func (s *server) StreamLinearSearchByID( ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamLinearSearchByIDRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -1022,8 +1042,13 @@ func (s *server) Insert( return loc, errors.Join(derr, err) }) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -1425,7 +1450,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) if err != nil { st, _ := status.FromError(err) if st == nil || st.Message() == "" { - // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") } @@ -1449,7 +1474,12 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamInsertRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -1494,8 +1524,13 @@ func (s *server) MultiInsert( loc, err := s.Insert(ctx, req) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -1553,8 +1588,13 @@ func (s *server) Update( return loc, errors.Join(derr, err) }) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -1600,8 +1640,8 @@ func (s *server) handleUpdate( return vc.Update(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") @@ -2008,7 +2048,12 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamUpdateRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2053,8 +2098,13 @@ func (s *server) MultiUpdate( loc, err := s.Update(ctx, req) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpdateRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2112,8 +2162,13 @@ func (s *server) Upsert( return loc, err }) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2356,7 +2411,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) if err != nil { st, _ := status.FromError(err) if st == nil || st.Message() == "" { - // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") } @@ -2380,7 +2435,12 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamUpsertRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2425,8 +2485,13 @@ func (s *server) MultiUpsert( loc, err := s.Upsert(ctx, req) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.UpsertRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2484,8 +2549,13 @@ func (s *server) Remove( return loc, err }) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2531,8 +2601,8 @@ func (s *server) handleRemove( return vc.Remove(ctx, req, copts...) }) if err != nil { - st, ok := status.FromError(err) - if !ok || st == nil || st.Message() == "" { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") @@ -2725,7 +2795,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) if err != nil { st, _ := status.FromError(err) if st == nil || st.Message() == "" { - // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") } @@ -2749,7 +2819,12 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamRemoveRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -2794,8 +2869,13 @@ func (s *server) MultiRemove( loc, err := s.Remove(ctx, req) if err != nil { - st, ok := status.FromError(err) - if ok && st != nil && st.Message() != "" && span != nil { + st, _ := status.FromError(err) + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -3162,7 +3242,7 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err if err != nil { st, _ := status.FromError(err) if st == nil || st.Message() == "" { - // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Proto(), although it is unlikely to match this condition. + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) st = status.New(codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") } @@ -3186,7 +3266,12 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err ) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving ,st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamGetObjectRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) @@ -3222,7 +3307,12 @@ func (s *server) StreamListObject( }) if err != nil { st, _ := status.FromError(err) - if st != nil && span != nil { + if st == nil || st.Message() == "" { + // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. + log.Errorf("gRPC call returned not a gRPC status error: %v", err) + st = status.New(codes.Unknown, "failed to parse "+vald.StreamListObjectRPCName+" gRPC error response") + } + if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), st.Message())...) span.SetStatus(trace.StatusError, err.Error()) From 6c1084d1af124ea69852717837824178649cbee6 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 8 Oct 2024 14:19:12 +0900 Subject: [PATCH 11/11] fix: typo Signed-off-by: hlts2 --- pkg/gateway/mirror/handler/grpc/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/mirror/handler/grpc/handler.go b/pkg/gateway/mirror/handler/grpc/handler.go index b23dd824d8..847aaac933 100644 --- a/pkg/gateway/mirror/handler/grpc/handler.go +++ b/pkg/gateway/mirror/handler/grpc/handler.go @@ -2937,7 +2937,7 @@ func (s *server) RemoveByTimestamp( if st == nil || st.Message() == "" { // This condition is implemented just in case to prevent nil pointer errors when retrieving st.Code(), although it is unlikely to match this condition. log.Errorf("gRPC call returned not a gRPC status error: %v", err) - st = status.New(codes.Unknown, "failed to parse "+vald.InsertRPCName+" gRPC error response") + st = status.New(codes.Unknown, "failed to parse "+vald.RemoveByTimestampRPCName+" gRPC error response") } log.Warn(err) if span != nil {