From ce12a2e5230a80243f8b21c12c4a203289864741 Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Mon, 26 Aug 2024 14:10:12 -0400 Subject: [PATCH] [exporter-otlp-proto-common] Include metric info when an exception occurs during encoding Also remove redundant str(exception) from the exception message --- .../_internal/metrics_encoder/__init__.py | 292 +++++++++--------- .../tests/test_metrics_encoder.py | 35 +++ .../sdk/metrics/_internal/export/__init__.py | 2 +- 3 files changed, 185 insertions(+), 144 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py index 0d66fd28b70..0df1983e753 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/metrics_encoder/__init__.py @@ -173,152 +173,26 @@ def _get_aggregation( return instrument_class_aggregation -def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: - resource_metrics_dict = {} - - for resource_metrics in data.resource_metrics: - - resource = resource_metrics.resource +class EncodingException(Exception): + """ + Raised by encode_metrics() when an exception is caught during encoding. Contains the problematic metric so + the misbehaving metric name and details can be logged during exception handling. + """ - # It is safe to assume that each entry in data.resource_metrics is - # associated with an unique resource. - scope_metrics_dict = {} + def __init__(self, original_exception, metric): + super().__init__() + self.original_exception = original_exception + self.metric = metric - resource_metrics_dict[resource] = scope_metrics_dict - - for scope_metrics in resource_metrics.scope_metrics: - - instrumentation_scope = scope_metrics.scope - - # The SDK groups metrics in instrumentation scopes already so - # there is no need to check for existing instrumentation scopes - # here. - pb2_scope_metrics = pb2.ScopeMetrics( - scope=InstrumentationScope( - name=instrumentation_scope.name, - version=instrumentation_scope.version, - ) - ) + def __str__(self): + return f"{self.metric}\n{self.original_exception}" - scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics - for metric in scope_metrics.metrics: - pb2_metric = pb2.Metric( - name=metric.name, - description=metric.description, - unit=metric.unit, - ) +def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: + resource_metrics_dict = {} - if isinstance(metric.data, Gauge): - for data_point in metric.data.data_points: - pt = pb2.NumberDataPoint( - attributes=_encode_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - ) - if isinstance(data_point.value, int): - pt.as_int = data_point.value - else: - pt.as_double = data_point.value - pb2_metric.gauge.data_points.append(pt) - - elif isinstance(metric.data, HistogramType): - for data_point in metric.data.data_points: - pt = pb2.HistogramDataPoint( - attributes=_encode_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - count=data_point.count, - sum=data_point.sum, - bucket_counts=data_point.bucket_counts, - explicit_bounds=data_point.explicit_bounds, - max=data_point.max, - min=data_point.min, - ) - pb2_metric.histogram.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.histogram.data_points.append(pt) - - elif isinstance(metric.data, Sum): - for data_point in metric.data.data_points: - pt = pb2.NumberDataPoint( - attributes=_encode_attributes( - data_point.attributes - ), - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - time_unix_nano=data_point.time_unix_nano, - ) - if isinstance(data_point.value, int): - pt.as_int = data_point.value - else: - pt.as_double = data_point.value - # note that because sum is a message type, the - # fields must be set individually rather than - # instantiating a pb2.Sum and setting it once - pb2_metric.sum.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.sum.is_monotonic = metric.data.is_monotonic - pb2_metric.sum.data_points.append(pt) - - elif isinstance(metric.data, ExponentialHistogramType): - for data_point in metric.data.data_points: - - if data_point.positive.bucket_counts: - positive = pb2.ExponentialHistogramDataPoint.Buckets( - offset=data_point.positive.offset, - bucket_counts=data_point.positive.bucket_counts, - ) - else: - positive = None - - if data_point.negative.bucket_counts: - negative = pb2.ExponentialHistogramDataPoint.Buckets( - offset=data_point.negative.offset, - bucket_counts=data_point.negative.bucket_counts, - ) - else: - negative = None - - pt = pb2.ExponentialHistogramDataPoint( - attributes=_encode_attributes( - data_point.attributes - ), - time_unix_nano=data_point.time_unix_nano, - start_time_unix_nano=( - data_point.start_time_unix_nano - ), - count=data_point.count, - sum=data_point.sum, - scale=data_point.scale, - zero_count=data_point.zero_count, - positive=positive, - negative=negative, - flags=data_point.flags, - max=data_point.max, - min=data_point.min, - ) - pb2_metric.exponential_histogram.aggregation_temporality = ( - metric.data.aggregation_temporality - ) - pb2_metric.exponential_histogram.data_points.append(pt) - - else: - _logger.warning( - "unsupported data type %s", - metric.data.__class__.__name__, - ) - continue - - pb2_scope_metrics.metrics.append(pb2_metric) + for resource_metrics in data.resource_metrics: + _encode_resource_metrics(resource_metrics, resource_metrics_dict) resource_data = [] for ( @@ -334,5 +208,137 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest: schema_url=sdk_resource.schema_url, ) ) - resource_metrics = resource_data - return ExportMetricsServiceRequest(resource_metrics=resource_metrics) + return ExportMetricsServiceRequest(resource_metrics=resource_data) + + +def _encode_resource_metrics(resource_metrics, resource_metrics_dict): + resource = resource_metrics.resource + # It is safe to assume that each entry in data.resource_metrics is + # associated with an unique resource. + scope_metrics_dict = {} + resource_metrics_dict[resource] = scope_metrics_dict + for scope_metrics in resource_metrics.scope_metrics: + instrumentation_scope = scope_metrics.scope + + # The SDK groups metrics in instrumentation scopes already so + # there is no need to check for existing instrumentation scopes + # here. + pb2_scope_metrics = pb2.ScopeMetrics( + scope=InstrumentationScope( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + ) + ) + + scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics + + for metric in scope_metrics.metrics: + pb2_metric = pb2.Metric( + name=metric.name, + description=metric.description, + unit=metric.unit, + ) + + try: + _encode_metric(metric, pb2_metric) + except Exception as ex: + # `from None` so we don't get "During handling of the above exception, another exception occurred:" + raise EncodingException(ex, metric) from None + + pb2_scope_metrics.metrics.append(pb2_metric) + + +def _encode_metric(metric, pb2_metric): + if isinstance(metric.data, Gauge): + for data_point in metric.data.data_points: + pt = pb2.NumberDataPoint( + attributes=_encode_attributes(data_point.attributes), + time_unix_nano=data_point.time_unix_nano, + ) + if isinstance(data_point.value, int): + pt.as_int = data_point.value + else: + pt.as_double = data_point.value + pb2_metric.gauge.data_points.append(pt) + + elif isinstance(metric.data, HistogramType): + for data_point in metric.data.data_points: + pt = pb2.HistogramDataPoint( + attributes=_encode_attributes(data_point.attributes), + time_unix_nano=data_point.time_unix_nano, + start_time_unix_nano=data_point.start_time_unix_nano, + count=data_point.count, + sum=data_point.sum, + bucket_counts=data_point.bucket_counts, + explicit_bounds=data_point.explicit_bounds, + max=data_point.max, + min=data_point.min, + ) + pb2_metric.histogram.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.histogram.data_points.append(pt) + + elif isinstance(metric.data, Sum): + for data_point in metric.data.data_points: + pt = pb2.NumberDataPoint( + attributes=_encode_attributes(data_point.attributes), + start_time_unix_nano=data_point.start_time_unix_nano, + time_unix_nano=data_point.time_unix_nano, + ) + if isinstance(data_point.value, int): + pt.as_int = data_point.value + else: + pt.as_double = data_point.value + # note that because sum is a message type, the + # fields must be set individually rather than + # instantiating a pb2.Sum and setting it once + pb2_metric.sum.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.sum.is_monotonic = metric.data.is_monotonic + pb2_metric.sum.data_points.append(pt) + + elif isinstance(metric.data, ExponentialHistogramType): + for data_point in metric.data.data_points: + + if data_point.positive.bucket_counts: + positive = pb2.ExponentialHistogramDataPoint.Buckets( + offset=data_point.positive.offset, + bucket_counts=data_point.positive.bucket_counts, + ) + else: + positive = None + + if data_point.negative.bucket_counts: + negative = pb2.ExponentialHistogramDataPoint.Buckets( + offset=data_point.negative.offset, + bucket_counts=data_point.negative.bucket_counts, + ) + else: + negative = None + + pt = pb2.ExponentialHistogramDataPoint( + attributes=_encode_attributes(data_point.attributes), + time_unix_nano=data_point.time_unix_nano, + start_time_unix_nano=data_point.start_time_unix_nano, + count=data_point.count, + sum=data_point.sum, + scale=data_point.scale, + zero_count=data_point.zero_count, + positive=positive, + negative=negative, + flags=data_point.flags, + max=data_point.max, + min=data_point.min, + ) + pb2_metric.exponential_histogram.aggregation_temporality = ( + metric.data.aggregation_temporality + ) + pb2_metric.exponential_histogram.data_points.append(pt) + + else: + _logger.warning( + "unsupported data type %s", + metric.data.__class__.__name__, + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py index 6d68b48f250..ef0ae98b501 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_metrics_encoder.py @@ -15,6 +15,9 @@ # pylint: disable=protected-access import unittest +from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( + EncodingException, +) from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, ) @@ -814,3 +817,35 @@ def test_encode_exponential_histogram(self): # pylint: disable=protected-access actual = encode_metrics(metrics_data) self.assertEqual(expected, actual) + + def test_encoding_exception_reraise(self): + # this number is too big to fit in a signed 64-bit proto field and causes a ValueError + big_number = 2**63 + metrics_data = MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource( + attributes={}, + schema_url="resource_schema_url", + ), + scope_metrics=[ + ScopeMetrics( + scope=SDKInstrumentationScope( + name="first_name", + version="first_version", + schema_url="insrumentation_scope_schema_url", + ), + metrics=[_generate_sum("sum_double", big_number)], + schema_url="instrumentation_scope_schema_url", + ) + ], + schema_url="resource_schema_url", + ) + ] + ) + with self.assertRaises(EncodingException) as context: + encode_metrics(metrics_data) + + # assert that the EncodingException wraps the metric and original exception + assert isinstance(context.exception.metric, Metric) + assert isinstance(context.exception.original_exception, ValueError) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index eb946ec6479..5221738c83d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -542,7 +542,7 @@ def _receive_metrics( metrics_data, timeout_millis=timeout_millis ) except Exception as e: - _logger.exception("Exception while exporting metrics %s", str(e)) + _logger.exception("Exception while exporting metrics") detach(token) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: