Skip to content

Commit

Permalink
enhance: Use stats Handler to record request/response size metrics
Browse files Browse the repository at this point in the history
Related to milvus-io#36102

This PR use newly added `grpcSizeStatsHandler` to
reduce calling `proto.Size` since the request &
response size info is recorded by grpc framework.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Sep 9, 2024
1 parent 3123093 commit 39cefd4
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 35 deletions.
11 changes: 11 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,17 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
unaryServerOption,
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
grpc.StatsHandler(metrics.NewGRPCSizeStatsHandler().
// both inbound and outbound
WithTargetMethods(
milvuspb.MilvusService_Search_FullMethodName,
milvuspb.MilvusService_HybridSearch_FullMethodName,
milvuspb.MilvusService_Query_FullMethodName,
).
// inbound only
WithInboundRecord(milvuspb.MilvusService_Insert_FullMethodName,
milvuspb.MilvusService_Delete_FullMethodName,
milvuspb.MilvusService_Upsert_FullMethodName)),
}

if Params.TLSMode.GetAsInt() == 1 {
Expand Down
61 changes: 26 additions & 35 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,9 +2505,10 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
)
method := "Insert"
tr := timerecord.NewTimeRecorder(method)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.InsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request)))
metrics.GetStats(ctx).
SetNodeID(paramtable.GetNodeID()).
SetInboundLabel(method).
SetCollectionName(request.GetCollectionName())
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()

it := &insertTask{
Expand Down Expand Up @@ -2637,18 +2638,19 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
)
log.Debug("Start processing delete request in Proxy")
defer log.Debug("Finish processing delete request in Proxy")
method := "Delete"

metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.DeleteLabel, request.GetCollectionName()).Add(float64(proto.Size(request)))
metrics.GetStats(ctx).
SetNodeID(paramtable.GetNodeID()).
SetInboundLabel(method).
SetCollectionName(request.GetCollectionName())

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.MutationResult{
Status: merr.Status(err),
}, nil
}

method := "Delete"
tr := timerecord.NewTimeRecorder(method)

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
Expand Down Expand Up @@ -2747,9 +2749,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
method := "Upsert"
tr := timerecord.NewTimeRecorder(method)

metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.UpsertLabel, request.GetCollectionName()).Add(float64(proto.Size(request)))
metrics.GetStats(ctx).
SetNodeID(paramtable.GetNodeID()).
SetInboundLabel(method).
SetCollectionName(request.GetCollectionName())

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()

request.Base = commonpbutil.NewMsgBase(
Expand Down Expand Up @@ -2908,12 +2912,10 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
}

func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))
metrics.GetStats(ctx).
SetNodeID(paramtable.GetNodeID()).
SetInboundLabel(metrics.SearchLabel).
SetCollectionName(request.GetCollectionName())

metrics.ProxyReceivedNQ.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
Expand Down Expand Up @@ -3095,8 +3097,6 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
if merr.Ok(qt.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v))
}

metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
}
return qt.result, nil
}
Expand All @@ -3120,12 +3120,10 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea
}

func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.HybridSearchLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))
metrics.GetStats(ctx).
SetNodeID(paramtable.GetNodeID()).
SetInboundLabel(metrics.HybridSearchLabel).
SetCollectionName(request.GetCollectionName())

if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.SearchResults{
Expand Down Expand Up @@ -3284,8 +3282,6 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
if merr.Ok(qt.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v))
}

metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
}
return qt.result, nil
}
Expand Down Expand Up @@ -3552,12 +3548,10 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
}

subLabel := GetCollectionRateSubLabel(request)
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.QueryLabel,
request.GetCollectionName(),
).Add(float64(receiveSize))
metrics.GetStats(ctx).
SetNodeID(paramtable.GetNodeID()).
SetInboundLabel(metrics.QueryLabel).
SetCollectionName(request.GetCollectionName())
metrics.ProxyReceivedNQ.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
metrics.SearchLabel,
Expand Down Expand Up @@ -3599,9 +3593,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
request.GetCollectionName(),
).Inc()

sentSize := proto.Size(qt.result)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))

username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := hookutil.GetExtension().Report(map[string]any{
Expand Down
153 changes: 153 additions & 0 deletions pkg/metrics/grpc_stats_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"
"strconv"

"google.golang.org/grpc/stats"

"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// milvusGrpcKey is context key type.
type milvusGrpcKey struct{}

// GrpcStats stores the meta and payload size info
// it should be attached to context so that request sizing could be avoided
type GrpcStats struct {
fullMethodName string
collectionName string
inboundPayloadSize int
inboundLabel string
nodeID int64
}

func (s *GrpcStats) SetCollectionName(collName string) *GrpcStats {
if s == nil {
return s
}
s.collectionName = collName
return s
}

func (s *GrpcStats) SetInboundLabel(label string) *GrpcStats {
if s == nil {
return s
}
s.inboundLabel = label
return s
}

func (s *GrpcStats) SetNodeID(nodeID int64) *GrpcStats {
if s == nil {
return s
}
s.nodeID = nodeID
return s
}

func attachStats(ctx context.Context, stats *GrpcStats) context.Context {
return context.WithValue(ctx, milvusGrpcKey{}, stats)
}

func GetStats(ctx context.Context) *GrpcStats {
stats, ok := ctx.Value(milvusGrpcKey{}).(*GrpcStats)
if !ok {
return nil
}

return stats
}

// grpcSizeStatsHandler implementing stats.Handler
// this handler process grpc request & response related metrics logic
type grpcSizeStatsHandler struct {
outboundMethods typeutil.Set[string]
targetMethods typeutil.Set[string]
}

func NewGRPCSizeStatsHandler() *grpcSizeStatsHandler {
return &grpcSizeStatsHandler{
targetMethods: make(typeutil.Set[string]),
outboundMethods: make(typeutil.Set[string]),
}
}

func (h *grpcSizeStatsHandler) isTarget(method string) bool {
return h.targetMethods.Contain(method)
}

func (h *grpcSizeStatsHandler) shouldRecordOutbound(method string) bool {
return h.outboundMethods.Contain(method)
}

func (h *grpcSizeStatsHandler) WithTargetMethods(methods ...string) *grpcSizeStatsHandler {
h.targetMethods.Insert(methods...)
h.outboundMethods.Insert(methods...)
return h
}

func (h *grpcSizeStatsHandler) WithInboundRecord(methods ...string) *grpcSizeStatsHandler {
h.targetMethods.Insert(methods...)
return h
}

// TagConn exists to satisfy gRPC stats.Handler interface.
func (h *grpcSizeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}

// HandleConn exists to satisfy gRPC stats.Handler interface.
func (h *grpcSizeStatsHandler) HandleConn(_ context.Context, _ stats.ConnStats) {}

func (h *grpcSizeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// if method is not target, just return origin ctx
if !h.isTarget(info.FullMethodName) {
return ctx
}
// attach stats
return attachStats(ctx, &GrpcStats{fullMethodName: info.FullMethodName})
}

// HandleRPC implements per-RPC stats instrumentation.
func (h *grpcSizeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
mstats := GetStats(ctx)
// if no stats found, do nothing
if mstats == nil {
return
}

switch rs := rs.(type) {
case *stats.InPayload:
// store inbound payload size in stats, collection name could be fetch in service after
mstats.inboundPayloadSize = rs.Length
case *stats.OutPayload:
// all info set
// set metrics with inbound size and related meta
nodeIDValue := strconv.FormatInt(mstats.nodeID, 10)
ProxyReceiveBytes.WithLabelValues(
nodeIDValue,
mstats.inboundLabel, mstats.collectionName).Add(float64(mstats.inboundPayloadSize))
// set outbound payload size metrics for marked methods
if h.shouldRecordOutbound(mstats.fullMethodName) {
ProxyReadReqSendBytes.WithLabelValues(nodeIDValue).Add(float64(rs.Length))
}
default:
}
}

0 comments on commit 39cefd4

Please sign in to comment.