From 760fa2843d8863ddc06bb32d15a09b0e97ae9636 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 9 Sep 2024 17:13:05 +0800 Subject: [PATCH] enhance: Use stats Handler to record request/response size metrics (#36107) Related to #36102 This PR use newly added `grpcSizeStatsHandler` to reduce calling `proto.Size` since the request & response size info is recorded by grpc framework. Signed-off-by: Congqi Xia --- internal/distributed/proxy/service.go | 11 ++ internal/proxy/impl.go | 61 +++++----- pkg/metrics/grpc_stats_handler.go | 153 ++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 35 deletions(-) create mode 100644 pkg/metrics/grpc_stats_handler.go diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 9fd8c39e52a32..a98e8b0fbda52 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -323,6 +323,17 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) { grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), unaryServerOption, grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + grpc.StatsHandler(metrics.NewGRPCSizeStatsHandler(). + // both inbound and outbound + WithTargetMethods( + milvuspb.MilvusService_Search_FullMethodName, + milvuspb.MilvusService_HybridSearch_FullMethodName, + milvuspb.MilvusService_Query_FullMethodName, + ). + // inbound only + WithInboundRecord(milvuspb.MilvusService_Insert_FullMethodName, + milvuspb.MilvusService_Delete_FullMethodName, + milvuspb.MilvusService_Upsert_FullMethodName)), } if Params.TLSMode.GetAsInt() == 1 { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 08d1f671f3f7c..f8be3b9fcf8c8 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2504,9 +2504,10 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) ) method := "Insert" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.InsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) + metrics.GetStats(ctx). + SetNodeID(paramtable.GetNodeID()). + SetInboundLabel(method). + SetCollectionName(request.GetCollectionName()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() it := &insertTask{ @@ -2630,10 +2631,12 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) ) log.Debug("Start processing delete request in Proxy") defer log.Debug("Finish processing delete request in Proxy") + method := "Delete" - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.DeleteLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) + metrics.GetStats(ctx). + SetNodeID(paramtable.GetNodeID()). + SetInboundLabel(method). + SetCollectionName(request.GetCollectionName()) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.MutationResult{ @@ -2641,7 +2644,6 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } - method := "Delete" tr := timerecord.NewTimeRecorder(method) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, @@ -2740,9 +2742,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) method := "Upsert" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.UpsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request))) + metrics.GetStats(ctx). + SetNodeID(paramtable.GetNodeID()). + SetInboundLabel(method). + SetCollectionName(request.GetCollectionName()) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() request.Base = commonpbutil.NewMsgBase( @@ -2895,12 +2899,10 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) } func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { - receiveSize := proto.Size(request) - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.SearchLabel, - request.GetCollectionName(), - ).Add(float64(receiveSize)) + metrics.GetStats(ctx). + SetNodeID(paramtable.GetNodeID()). + SetInboundLabel(metrics.SearchLabel). + SetCollectionName(request.GetCollectionName()) metrics.ProxyReceivedNQ.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -3082,8 +3084,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) if merr.Ok(qt.result.GetStatus()) { metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v)) } - - metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize)) } return qt.result, nil } @@ -3107,12 +3107,10 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea } func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) { - receiveSize := proto.Size(request) - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.HybridSearchLabel, - request.GetCollectionName(), - ).Add(float64(receiveSize)) + metrics.GetStats(ctx). + SetNodeID(paramtable.GetNodeID()). + SetInboundLabel(metrics.HybridSearchLabel). + SetCollectionName(request.GetCollectionName()) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.SearchResults{ @@ -3271,8 +3269,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea if merr.Ok(qt.result.GetStatus()) { metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v)) } - - metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize)) } return qt.result, nil } @@ -3532,12 +3528,10 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* } subLabel := GetCollectionRateSubLabel(request) - receiveSize := proto.Size(request) - metrics.ProxyReceiveBytes.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), - metrics.QueryLabel, - request.GetCollectionName(), - ).Add(float64(receiveSize)) + metrics.GetStats(ctx). + SetNodeID(paramtable.GetNodeID()). + SetInboundLabel(metrics.QueryLabel). + SetCollectionName(request.GetCollectionName()) metrics.ProxyReceivedNQ.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel, @@ -3580,9 +3574,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* request.GetCollectionName(), ).Inc() - sentSize := proto.Size(qt.result) - metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize)) - username := GetCurUserFromContextOrDefault(ctx) nodeID := paramtable.GetStringNodeID() v := hookutil.GetExtension().Report(map[string]any{ diff --git a/pkg/metrics/grpc_stats_handler.go b/pkg/metrics/grpc_stats_handler.go new file mode 100644 index 0000000000000..0a473e83e88f8 --- /dev/null +++ b/pkg/metrics/grpc_stats_handler.go @@ -0,0 +1,153 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "strconv" + + "google.golang.org/grpc/stats" + + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// milvusGrpcKey is context key type. +type milvusGrpcKey struct{} + +// GrpcStats stores the meta and payload size info +// it should be attached to context so that request sizing could be avoided +type GrpcStats struct { + fullMethodName string + collectionName string + inboundPayloadSize int + inboundLabel string + nodeID int64 +} + +func (s *GrpcStats) SetCollectionName(collName string) *GrpcStats { + if s == nil { + return s + } + s.collectionName = collName + return s +} + +func (s *GrpcStats) SetInboundLabel(label string) *GrpcStats { + if s == nil { + return s + } + s.inboundLabel = label + return s +} + +func (s *GrpcStats) SetNodeID(nodeID int64) *GrpcStats { + if s == nil { + return s + } + s.nodeID = nodeID + return s +} + +func attachStats(ctx context.Context, stats *GrpcStats) context.Context { + return context.WithValue(ctx, milvusGrpcKey{}, stats) +} + +func GetStats(ctx context.Context) *GrpcStats { + stats, ok := ctx.Value(milvusGrpcKey{}).(*GrpcStats) + if !ok { + return nil + } + + return stats +} + +// grpcSizeStatsHandler implementing stats.Handler +// this handler process grpc request & response related metrics logic +type grpcSizeStatsHandler struct { + outboundMethods typeutil.Set[string] + targetMethods typeutil.Set[string] +} + +func NewGRPCSizeStatsHandler() *grpcSizeStatsHandler { + return &grpcSizeStatsHandler{ + targetMethods: make(typeutil.Set[string]), + outboundMethods: make(typeutil.Set[string]), + } +} + +func (h *grpcSizeStatsHandler) isTarget(method string) bool { + return h.targetMethods.Contain(method) +} + +func (h *grpcSizeStatsHandler) shouldRecordOutbound(method string) bool { + return h.outboundMethods.Contain(method) +} + +func (h *grpcSizeStatsHandler) WithTargetMethods(methods ...string) *grpcSizeStatsHandler { + h.targetMethods.Insert(methods...) + h.outboundMethods.Insert(methods...) + return h +} + +func (h *grpcSizeStatsHandler) WithInboundRecord(methods ...string) *grpcSizeStatsHandler { + h.targetMethods.Insert(methods...) + return h +} + +// TagConn exists to satisfy gRPC stats.Handler interface. +func (h *grpcSizeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleConn exists to satisfy gRPC stats.Handler interface. +func (h *grpcSizeStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {} + +func (h *grpcSizeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + // if method is not target, just return origin ctx + if !h.isTarget(info.FullMethodName) { + return ctx + } + // attach stats + return attachStats(ctx, &GrpcStats{fullMethodName: info.FullMethodName}) +} + +// HandleRPC implements per-RPC stats instrumentation. +func (h *grpcSizeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + mstats := GetStats(ctx) + // if no stats found, do nothing + if mstats == nil { + return + } + + switch rs := rs.(type) { + case *stats.InPayload: + // store inbound payload size in stats, collection name could be fetch in service after + mstats.inboundPayloadSize = rs.Length + case *stats.OutPayload: + // all info set + // set metrics with inbound size and related meta + nodeIDValue := strconv.FormatInt(mstats.nodeID, 10) + ProxyReceiveBytes.WithLabelValues( + nodeIDValue, + mstats.inboundLabel, mstats.collectionName).Add(float64(mstats.inboundPayloadSize)) + // set outbound payload size metrics for marked methods + if h.shouldRecordOutbound(mstats.fullMethodName) { + ProxyReadReqSendBytes.WithLabelValues(nodeIDValue).Add(float64(rs.Length)) + } + default: + } +}