Skip to content

Commit

Permalink
[exporter-otlp-proto-common] Include metric info when an exception oc…
Browse files Browse the repository at this point in the history
…curs during encoding

Also remove redundant str(exception) from the exception message
  • Loading branch information
pmcollins committed Aug 28, 2024
1 parent ea36c5d commit ce12a2e
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,152 +173,26 @@ def _get_aggregation(
return instrument_class_aggregation


def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
resource_metrics_dict = {}

for resource_metrics in data.resource_metrics:

resource = resource_metrics.resource
class EncodingException(Exception):
"""
Raised by encode_metrics() when an exception is caught during encoding. Contains the problematic metric so
the misbehaving metric name and details can be logged during exception handling.
"""

# It is safe to assume that each entry in data.resource_metrics is
# associated with an unique resource.
scope_metrics_dict = {}
def __init__(self, original_exception, metric):
super().__init__()
self.original_exception = original_exception
self.metric = metric

resource_metrics_dict[resource] = scope_metrics_dict

for scope_metrics in resource_metrics.scope_metrics:

instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)
def __str__(self):
return f"{self.metric}\n{self.original_exception}"

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)
def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
resource_metrics_dict = {}

if isinstance(metric.data, Gauge):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, HistogramType):
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=(
data_point.start_time_unix_nano
),
count=data_point.count,
sum=data_point.sum,
bucket_counts=data_point.bucket_counts,
explicit_bounds=data_point.explicit_bounds,
max=data_point.max,
min=data_point.min,
)
pb2_metric.histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)

elif isinstance(metric.data, Sum):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
start_time_unix_nano=(
data_point.start_time_unix_nano
),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = metric.data.is_monotonic
pb2_metric.sum.data_points.append(pt)

elif isinstance(metric.data, ExponentialHistogramType):
for data_point in metric.data.data_points:

if data_point.positive.bucket_counts:
positive = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.positive.offset,
bucket_counts=data_point.positive.bucket_counts,
)
else:
positive = None

if data_point.negative.bucket_counts:
negative = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.negative.offset,
bucket_counts=data_point.negative.bucket_counts,
)
else:
negative = None

pt = pb2.ExponentialHistogramDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=(
data_point.start_time_unix_nano
),
count=data_point.count,
sum=data_point.sum,
scale=data_point.scale,
zero_count=data_point.zero_count,
positive=positive,
negative=negative,
flags=data_point.flags,
max=data_point.max,
min=data_point.min,
)
pb2_metric.exponential_histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.exponential_histogram.data_points.append(pt)

else:
_logger.warning(
"unsupported data type %s",
metric.data.__class__.__name__,
)
continue

pb2_scope_metrics.metrics.append(pb2_metric)
for resource_metrics in data.resource_metrics:
_encode_resource_metrics(resource_metrics, resource_metrics_dict)

resource_data = []
for (
Expand All @@ -334,5 +208,137 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
schema_url=sdk_resource.schema_url,
)
)
resource_metrics = resource_data
return ExportMetricsServiceRequest(resource_metrics=resource_metrics)
return ExportMetricsServiceRequest(resource_metrics=resource_data)


def _encode_resource_metrics(resource_metrics, resource_metrics_dict):
resource = resource_metrics.resource
# It is safe to assume that each entry in data.resource_metrics is
# associated with an unique resource.
scope_metrics_dict = {}
resource_metrics_dict[resource] = scope_metrics_dict
for scope_metrics in resource_metrics.scope_metrics:
instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)

try:
_encode_metric(metric, pb2_metric)
except Exception as ex:
# `from None` so we don't get "During handling of the above exception, another exception occurred:"
raise EncodingException(ex, metric) from None

pb2_scope_metrics.metrics.append(pb2_metric)


def _encode_metric(metric, pb2_metric):
if isinstance(metric.data, Gauge):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(data_point.attributes),
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)

elif isinstance(metric.data, HistogramType):
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=_encode_attributes(data_point.attributes),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=data_point.start_time_unix_nano,
count=data_point.count,
sum=data_point.sum,
bucket_counts=data_point.bucket_counts,
explicit_bounds=data_point.explicit_bounds,
max=data_point.max,
min=data_point.min,
)
pb2_metric.histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)

elif isinstance(metric.data, Sum):
for data_point in metric.data.data_points:
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(data_point.attributes),
start_time_unix_nano=data_point.start_time_unix_nano,
time_unix_nano=data_point.time_unix_nano,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = metric.data.is_monotonic
pb2_metric.sum.data_points.append(pt)

elif isinstance(metric.data, ExponentialHistogramType):
for data_point in metric.data.data_points:

if data_point.positive.bucket_counts:
positive = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.positive.offset,
bucket_counts=data_point.positive.bucket_counts,
)
else:
positive = None

if data_point.negative.bucket_counts:
negative = pb2.ExponentialHistogramDataPoint.Buckets(
offset=data_point.negative.offset,
bucket_counts=data_point.negative.bucket_counts,
)
else:
negative = None

pt = pb2.ExponentialHistogramDataPoint(
attributes=_encode_attributes(data_point.attributes),
time_unix_nano=data_point.time_unix_nano,
start_time_unix_nano=data_point.start_time_unix_nano,
count=data_point.count,
sum=data_point.sum,
scale=data_point.scale,
zero_count=data_point.zero_count,
positive=positive,
negative=negative,
flags=data_point.flags,
max=data_point.max,
min=data_point.min,
)
pb2_metric.exponential_histogram.aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.exponential_histogram.data_points.append(pt)

else:
_logger.warning(
"unsupported data type %s",
metric.data.__class__.__name__,
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# pylint: disable=protected-access
import unittest

from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import (
EncodingException,
)
from opentelemetry.exporter.otlp.proto.common.metrics_encoder import (
encode_metrics,
)
Expand Down Expand Up @@ -814,3 +817,35 @@ def test_encode_exponential_histogram(self):
# pylint: disable=protected-access
actual = encode_metrics(metrics_data)
self.assertEqual(expected, actual)

def test_encoding_exception_reraise(self):
# this number is too big to fit in a signed 64-bit proto field and causes a ValueError
big_number = 2**63
metrics_data = MetricsData(
resource_metrics=[
ResourceMetrics(
resource=Resource(
attributes={},
schema_url="resource_schema_url",
),
scope_metrics=[
ScopeMetrics(
scope=SDKInstrumentationScope(
name="first_name",
version="first_version",
schema_url="insrumentation_scope_schema_url",
),
metrics=[_generate_sum("sum_double", big_number)],
schema_url="instrumentation_scope_schema_url",
)
],
schema_url="resource_schema_url",
)
]
)
with self.assertRaises(EncodingException) as context:
encode_metrics(metrics_data)

# assert that the EncodingException wraps the metric and original exception
assert isinstance(context.exception.metric, Metric)
assert isinstance(context.exception.original_exception, ValueError)
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def _receive_metrics(
metrics_data, timeout_millis=timeout_millis
)
except Exception as e:
_logger.exception("Exception while exporting metrics %s", str(e))
_logger.exception("Exception while exporting metrics")
detach(token)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
Expand Down

0 comments on commit ce12a2e

Please sign in to comment.