Skip to content

Commit

Permalink
otelgrpc: Add metrics support to NewServerHandler and NewClientHandler (
Browse files Browse the repository at this point in the history
  • Loading branch information
fatsheep9146 authored Nov 6, 2023
1 parent 23181f7 commit 4c540e0
Show file tree
Hide file tree
Showing 8 changed files with 863 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add metrics support (No-op, OTLP and Prometheus) to `go.opentelemetry.io/contrib/exporters/autoexport`. (#4229, #4479)
- Add support for `console` span exporter and metrics exporter in `go.opentelemetry.io/contrib/exporters/autoexport`. (#4486)
- Set unit and description on all instruments in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4500)
- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356)

### Changed

Expand Down
48 changes: 44 additions & 4 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ type config struct {
ReceivedEvent bool
SentEvent bool

meter metric.Meter
rpcServerDuration metric.Int64Histogram
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}

// Option applies an option value for a config.
Expand All @@ -56,7 +62,7 @@ type Option interface {
}

// newConfig returns a config configured with all the passed Options.
func newConfig(opts []Option) *config {
func newConfig(opts []Option, role string) *config {
c := &config{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
Expand All @@ -66,19 +72,53 @@ func newConfig(opts []Option) *config {
o.apply(c)
}

c.tracer = c.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

c.meter = c.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

var err error
c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration",
c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

return c
}

Expand Down
10 changes: 5 additions & 5 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -255,7 +255,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -325,7 +325,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -387,7 +387,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, grpcStatusCodeAttr)
cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...))
cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(attr...))

return resp, err
}
Expand Down Expand Up @@ -446,7 +446,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *metadataSupplier) Keys() []string {
// requests.
// Deprecated: Unnecessary public func.
func Inject(ctx context.Context, md *metadata.MD, opts ...Option) {
c := newConfig(opts)
c := newConfig(opts, "")
c.Propagators.Inject(ctx, &metadataSupplier{
metadata: md,
})
Expand All @@ -78,7 +78,7 @@ func inject(ctx context.Context, propagators propagation.TextMapPropagator) cont
// This function is meant to be used on incoming requests.
// Deprecated: Unnecessary public func.
func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) {
c := newConfig(opts)
c := newConfig(opts, "")
ctx = c.Propagators.Extract(ctx, &metadataSupplier{
metadata: md,
})
Expand Down
116 changes: 80 additions & 36 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.g
import (
"context"
"sync/atomic"
"time"

grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -33,24 +36,32 @@ type gRPCContextKey struct{}
type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
}

type serverHandler struct {
*config
}

// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
config: newConfig(opts, "server"),
}

h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)
return h
}

type serverHandler struct {
*config
tracer trace.Tracer
// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
return ctx
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}

// TagRPC can attach some information to the given context.
Expand All @@ -66,46 +77,30 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
return ctx
h.handleRPC(ctx, rs)
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
type clientHandler struct {
*config
}

// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts),
config: newConfig(opts, "client"),
}

h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

return h
}

type clientHandler struct {
*config
tracer trace.Tracer
}

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
Expand All @@ -117,14 +112,16 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
h.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -140,17 +137,22 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64
metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
wctx := withoutCancel(ctx)

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
Expand All @@ -162,6 +164,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
Expand All @@ -172,16 +175,57 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
semconv.MessageUncompressedSizeKey.Int(rs.Length),
),
)
case *stats.OutTrailer:
case *stats.End:
var rpcStatusAttr attribute.KeyValue

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
}
span.SetAttributes(rpcStatusAttr)
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
c.rpcDuration.Record(wctx, float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...))

default:
return
}
}

func withoutCancel(parent context.Context) context.Context {
if parent == nil {
panic("cannot create context from nil parent")
}
return withoutCancelCtx{parent}
}

type withoutCancelCtx struct {
c context.Context
}

func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (withoutCancelCtx) Done() <-chan struct{} {
return nil
}

func (withoutCancelCtx) Err() error {
return nil
}

func (w withoutCancelCtx) Value(key any) any {
return w.c.Value(key)
}

func (w withoutCancelCtx) String() string {
return "withoutCancel"
}
Loading

0 comments on commit 4c540e0

Please sign in to comment.