From 02f9e15b874eb70e446b7f19677416a355b7e1a3 Mon Sep 17 00:00:00 2001 From: David Howden Date: Fri, 6 Sep 2024 08:18:41 +1000 Subject: [PATCH 1/2] Allow setting per-request custom metrics in gRPC stats handler. Similar to https://github.com/open-telemetry/opentelemetry-go-contrib/pull/5876 but for gRPC. --- .../google.golang.org/grpc/otelgrpc/config.go | 35 ++++++++++++++----- .../grpc/otelgrpc/stats_handler.go | 18 ++++++++-- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config.go b/instrumentation/google.golang.org/grpc/otelgrpc/config.go index 18436eaedff..dbfc68fd8c5 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config.go @@ -4,6 +4,8 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" import ( + "context" + "google.golang.org/grpc/stats" "go.opentelemetry.io/otel" @@ -36,14 +38,15 @@ type Filter func(*stats.RPCTagInfo) bool // config is a group of options for this instrumentation. type config struct { - Filter Filter - InterceptorFilter InterceptorFilter - Propagators propagation.TextMapPropagator - TracerProvider trace.TracerProvider - MeterProvider metric.MeterProvider - SpanStartOptions []trace.SpanStartOption - SpanAttributes []attribute.KeyValue - MetricAttributes []attribute.KeyValue + Filter Filter + InterceptorFilter InterceptorFilter + Propagators propagation.TextMapPropagator + TracerProvider trace.TracerProvider + MeterProvider metric.MeterProvider + SpanStartOptions []trace.SpanStartOption + SpanAttributes []attribute.KeyValue + MetricAttributes []attribute.KeyValue + MetricAttributesFn func(ctx context.Context, payload any) []attribute.KeyValue ReceivedEvent bool SentEvent bool @@ -285,3 +288,19 @@ func (o metricAttributesOption) apply(c *config) { func WithMetricAttributes(a ...attribute.KeyValue) Option { return metricAttributesOption{a: a} } + +type metricAttributesFnOption struct { + f func(ctx context.Context, payload any) []attribute.KeyValue +} + +func (o metricAttributesFnOption) apply(c *config) { + if o.f != nil { + c.MetricAttributesFn = o.f + } +} + +// WithMetricAttributesFn returns an Option to add custom attributes to the metrics +// based on the incoming request. +func WithMetricAttributesFn(f func(ctx context.Context, payload any) []attribute.KeyValue) Option { + return metricAttributesFnOption{f: f} +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index fbcbfb84e04..491bd37dd7c 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -147,10 +147,19 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool } switch rs := rs.(type) { - case *stats.Begin: case *stats.InPayload: if gctx != nil { - messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + // Run this once on InPayload and record the attributes for the entire RPC. + if c.MetricAttributesFn != nil { + fnMetricAttrs := c.MetricAttributesFn(ctx, rs.Payload) + + // Record them for the entire RPC. + gctx.metricAttrs = append(gctx.metricAttrs, fnMetricAttrs...) + + // For this run we need to manually add them to the metricAttrs slice. + metricAttrs = append(metricAttrs, fnMetricAttrs...) + } + c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...))) } @@ -164,6 +173,7 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool ), ) } + case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) @@ -180,11 +190,12 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool ), ) } - case *stats.OutTrailer: + case *stats.OutHeader: if p, ok := peer.FromContext(ctx); ok { span.SetAttributes(peerAttr(p.Addr.String())...) } + case *stats.End: var rpcStatusAttr attribute.KeyValue @@ -216,6 +227,7 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...) c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...) } + default: return } From bdb739e7c4cd4689487def6731e4731a3b4e0c44 Mon Sep 17 00:00:00 2001 From: David Howden Date: Fri, 6 Sep 2024 21:39:38 +1000 Subject: [PATCH 2/2] fix stmt accidentally removed --- .../google.golang.org/grpc/otelgrpc/stats_handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index 491bd37dd7c..8e7927614ea 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -149,6 +149,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool switch rs := rs.(type) { case *stats.InPayload: if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + // Run this once on InPayload and record the attributes for the entire RPC. if c.MetricAttributesFn != nil { fnMetricAttrs := c.MetricAttributesFn(ctx, rs.Payload)