From 567c010f26a11882879242e2732361fb5bc014b7 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 19 Dec 2024 12:44:46 +0530 Subject: [PATCH] enable otel dial option to pass empty MetricOptions{} --- stats/opentelemetry/client_metrics.go | 25 +++++++++----- stats/opentelemetry/e2e_test.go | 48 +++++++++++++++++++++++++++ stats/opentelemetry/opentelemetry.go | 19 +++++++++++ stats/opentelemetry/server_metrics.go | 26 ++++++++++----- 4 files changed, 100 insertions(+), 18 deletions(-) diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index 4af7f933c8ba..7b419b1b6913 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -18,6 +18,7 @@ package opentelemetry import ( "context" + "go.opentelemetry.io/otel/sdk/metric" "sync/atomic" "time" @@ -38,29 +39,35 @@ type clientStatsHandler struct { clientMetrics clientMetrics } -func (h *clientStatsHandler) initializeMetrics() { - // Will set no metrics to record, logically making this stats handler a - // no-op. - if h.options.MetricsOptions.MeterProvider == nil { - return - } - +func (h *clientStatsHandler) setClientMetrics() (*estats.Metrics, otelmetric.Meter) { meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version)) if meter == nil { - return + return nil, nil } metrics := h.options.MetricsOptions.Metrics if metrics == nil { metrics = DefaultMetrics() } - h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) h.clientMetrics.attemptDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + return metrics, meter +} + +func (h *clientStatsHandler) initializeMetrics() { + // Will set no metrics to record, logically making this stats handler a + // no-op. + if h.options.MetricsOptions.MeterProvider == nil { + h.MetricsRecorder = &NoopMetricsRecorder{} + h.options.MetricsOptions.MeterProvider = metric.NewMeterProvider() + h.setClientMetrics() + return + } + metrics, meter := h.setClientMetrics() rm := ®istryMetrics{ optionalLabels: h.options.MetricsOptions.OptionalLabels, } diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index e56c0fe94805..e93772520371 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -582,3 +582,51 @@ func pollForWantMetrics(ctx context.Context, t *testing.T, reader *metric.Manual return fmt.Errorf("error waiting for metrics %v: %v", wantMetrics, ctx.Err()) } + +// TestMetricsDisabled verifies that RPCs call succeed as expected when +// metrics option is disabled in the OpenTelemetry instrumentation. +func (s) TestMetricsDisabled(t *testing.T) { + ss := &stubserver.StubServer{ + UnaryCallF: func(_ context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{Payload: &testpb.Payload{ + Body: make([]byte, len(in.GetPayload().GetBody())), + }}, nil + }, + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + } + }, + } + + otelOptions := opentelemetry.Options{ + MetricsOptions: opentelemetry.MetricsOptions{}, + } + + if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(otelOptions)}, opentelemetry.DialOption(otelOptions)); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Make two RPCs, a unary RPC and a streaming RPC. + if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{ + Body: make([]byte, 10000), + }}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + stream, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("ss.Client.FullDuplexCall failed: %v", err) + } + + stream.CloseSend() + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("stream.Recv received an unexpected error: %v, expected an EOF error", err) + } +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index cc5ad387fb4c..c9fbd64e5080 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -388,3 +388,22 @@ var ( func DefaultMetrics() *estats.Metrics { return defaultPerCallMetrics.Join(estats.DefaultMetrics) } + +// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent +// nil panics. +type NoopMetricsRecorder struct{} + +// RecordInt64Count is a noop implementation of RecordInt64Count. +func (r *NoopMetricsRecorder) RecordInt64Count(*estats.Int64CountHandle, int64, ...string) {} + +// RecordFloat64Count is a noop implementation of RecordFloat64Count. +func (r *NoopMetricsRecorder) RecordFloat64Count(*estats.Float64CountHandle, float64, ...string) {} + +// RecordInt64Histo is a noop implementation of RecordInt64Histo. +func (r *NoopMetricsRecorder) RecordInt64Histo(*estats.Int64HistoHandle, int64, ...string) {} + +// RecordFloat64Histo is a noop implementation of RecordFloat64Histo. +func (r *NoopMetricsRecorder) RecordFloat64Histo(*estats.Float64HistoHandle, float64, ...string) {} + +// RecordInt64Gauge is a noop implementation of RecordInt64Gauge. +func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, ...string) {} diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index eaea559b2c10..3f7946d84a07 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -18,6 +18,7 @@ package opentelemetry import ( "context" + "go.opentelemetry.io/otel/sdk/metric" "sync/atomic" "time" @@ -38,27 +39,34 @@ type serverStatsHandler struct { serverMetrics serverMetrics } -func (h *serverStatsHandler) initializeMetrics() { - // Will set no metrics to record, logically making this stats handler a - // no-op. - if h.options.MetricsOptions.MeterProvider == nil { - return - } - +func (h *serverStatsHandler) setServerMetrics() (*estats.Metrics, otelmetric.Meter) { meter := h.options.MetricsOptions.MeterProvider.Meter("grpc-go", otelmetric.WithInstrumentationVersion(grpc.Version)) if meter == nil { - return + return nil, nil } metrics := h.options.MetricsOptions.Metrics if metrics == nil { metrics = DefaultMetrics() } - h.serverMetrics.callStarted = createInt64Counter(metrics.Metrics(), "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started.")) h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.serverMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + return metrics, meter +} + +func (h *serverStatsHandler) initializeMetrics() { + // Will set no metrics to record, logically making this stats handler a + // no-op. + if h.options.MetricsOptions.MeterProvider == nil { + h.MetricsRecorder = &NoopMetricsRecorder{} + h.options.MetricsOptions.MeterProvider = metric.NewMeterProvider() + h.setServerMetrics() + return + } + + metrics, meter := h.setServerMetrics() rm := ®istryMetrics{ optionalLabels: h.options.MetricsOptions.OptionalLabels, }