Skip to content

Commit

Permalink
Refactor metric format
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed May 10, 2022
1 parent a821311 commit 7ad9465
Show file tree
Hide file tree
Showing 16 changed files with 627 additions and 967 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from opentelemetry.sdk._metrics.export import (
Gauge,
Histogram,
HistogramDataPoint,
Metric,
MetricReader,
Sum,
Expand All @@ -88,12 +89,14 @@
_logger = getLogger(__name__)


def _convert_buckets(metric: Metric) -> Sequence[Tuple[str, int]]:
def _convert_buckets(
bucket_counts: Sequence[int], explicit_bounds: Sequence[float]
) -> Sequence[Tuple[str, int]]:
buckets = []
total_count = 0
for upper_bound, count in zip(
chain(metric.point.explicit_bounds, ["+Inf"]),
metric.point.bucket_counts,
chain(explicit_bounds, ["+Inf"]),
bucket_counts,
):
total_count += count
buckets.append((f"{upper_bound}", total_count))
Expand Down Expand Up @@ -174,11 +177,31 @@ def _translate_to_prometheus(
metric: Metric,
metric_family_id_metric_family: Dict[str, PrometheusMetric],
):
label_values = []
label_keys = []
for key, value in metric.attributes.items():
label_keys.append(self._sanitize(key))
label_values.append(self._check_value(value))
# pylint: disable=too-many-locals,too-many-branches
label_keyss = []
label_valuess = []
values = []

for number_data_point in metric.data.data_points:
label_keys = []
label_values = []

for key, value in number_data_point.attributes.items():
label_keys.append(self._sanitize(key))
label_values.append(self._check_value(value))

label_keyss.append(label_keys)
label_valuess.append(label_values)
if isinstance(number_data_point, HistogramDataPoint):
values.append(
{
"bucket_counts": number_data_point.bucket_counts,
"explicit_bounds": number_data_point.explicit_bounds,
"sum": number_data_point.sum,
}
)
else:
values.append(number_data_point.value)

metric_name = ""
if self._prefix != "":
Expand All @@ -187,68 +210,94 @@ def _translate_to_prometheus(

description = metric.description or ""

metric_family_id = "|".join(
[metric_name, description, "%".join(label_keys), metric.unit]
)
pre_metric_family_ids = []

for label_keys in label_keyss:
pre_metric_family_ids.append(
"|".join(
[
metric_name,
description,
"%".join(label_keys),
metric.unit,
]
)
)

if isinstance(metric.point, Sum):
for pre_metric_family_id, label_values, value in zip(
pre_metric_family_ids, label_valuess, values
):
if isinstance(metric.data, Sum):

metric_family_id = "|".join(
[metric_family_id, CounterMetricFamily.__name__]
)
metric_family_id = "|".join(
[pre_metric_family_id, CounterMetricFamily.__name__]
)

if metric_family_id not in metric_family_id_metric_family.keys():
metric_family_id_metric_family[
if (
metric_family_id
] = CounterMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = CounterMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=value
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=metric.point.value
)
elif isinstance(metric.point, Gauge):
elif isinstance(metric.data, Gauge):

metric_family_id = "|".join(
[metric_family_id, GaugeMetricFamily.__name__]
)
metric_family_id = "|".join(
[pre_metric_family_id, GaugeMetricFamily.__name__]
)

if metric_family_id not in metric_family_id_metric_family.keys():
metric_family_id_metric_family[
if (
metric_family_id
] = GaugeMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = GaugeMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=value
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=metric.point.value
)
elif isinstance(metric.point, Histogram):
elif isinstance(metric.data, Histogram):

metric_family_id = "|".join(
[metric_family_id, HistogramMetricFamily.__name__]
)
metric_family_id = "|".join(
[pre_metric_family_id, HistogramMetricFamily.__name__]
)

if metric_family_id not in metric_family_id_metric_family.keys():
metric_family_id_metric_family[
if (
metric_family_id
] = HistogramMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = HistogramMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"], value["explicit_bounds"]
),
sum_value=value["sum"],
)
else:
_logger.warning(
"Unsupported metric data. %s", type(metric.data)
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values,
buckets=_convert_buckets(metric),
sum_value=metric.point.sum,
)
else:
_logger.warning("Unsupported metric type. %s", type(metric.point))

def _sanitize(self, key: str) -> str:
"""sanitize the given metric name or label according to Prometheus rule.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
PrometheusMetricReader,
_CustomCollector,
)
from opentelemetry.sdk._metrics.export import AggregationTemporality, Histogram
from opentelemetry.sdk._metrics.export import (
AggregationTemporality,
Histogram,
HistogramDataPoint,
Metric,
)
from opentelemetry.test.metrictestutil import (
_generate_gauge,
_generate_metric,
_generate_sum,
_generate_unsupported_metric,
)
Expand Down Expand Up @@ -57,19 +61,26 @@ def test_shutdown(self):
self.assertTrue(registry_unregister_patch.called)

def test_histogram_to_prometheus(self):
record = _generate_metric(
"test@name",
Histogram(
aggregation_temporality=AggregationTemporality.CUMULATIVE,
bucket_counts=[1, 3, 2],
explicit_bounds=[123.0, 456.0],
start_time_unix_nano=1641946016139533244,
max=457,
min=1,
sum=579.0,
time_unix_nano=1641946016139533244,
record = Metric(
name="test@name",
description="foo",
unit="s",
data=Histogram(
data_points=[
HistogramDataPoint(
attributes={"histo": 1},
start_time_unix_nano=1641946016139533244,
time_unix_nano=1641946016139533244,
count=6,
sum=579.0,
bucket_counts=[1, 3, 2],
explicit_bounds=[123.0, 456.0],
min=1,
max=457,
)
],
aggregation_temporality=AggregationTemporality.DELTA,
),
attributes={"histo": 1},
)

collector = _CustomCollector("testprefix")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@
Aggregation,
DefaultAggregation,
_Aggregation,
_convert_aggregation_temporality,
_PointVarT,
_SumAggregation,
)
from opentelemetry.sdk._metrics._internal.export import AggregationTemporality
from opentelemetry.sdk._metrics._internal.measurement import Measurement
from opentelemetry.sdk._metrics._internal.point import Metric
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
)
from opentelemetry.sdk._metrics._internal.point import DataPointT
from opentelemetry.sdk._metrics._internal.view import View

_logger = getLogger(__name__)
Expand All @@ -42,17 +37,12 @@ def __init__(
self,
view: View,
instrument: Instrument,
sdk_config: SdkConfiguration,
instrument_class_temporality: Dict[type, AggregationTemporality],
instrument_class_aggregation: Dict[type, Aggregation],
):
self._view = view
self._instrument = instrument
self._sdk_config = sdk_config
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
self._lock = Lock()
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
self._description = (
Expand Down Expand Up @@ -124,46 +114,15 @@ def consume_measurement(self, measurement: Measurement) -> None:

self._attributes_aggregation[attributes].aggregate(measurement)

def collect(self) -> Iterable[Metric]:
def collect(
self, aggregation_temporality: AggregationTemporality
) -> Iterable[DataPointT]:

data_points = []
with self._lock:
for (
attributes,
aggregation,
) in self._attributes_aggregation.items():

previous_point = self._attributes_previous_point.get(
attributes
)

current_point = aggregation.collect()

# pylint: disable=assignment-from-none

self._attributes_previous_point[
attributes
] = _convert_aggregation_temporality(
previous_point,
current_point,
AggregationTemporality.CUMULATIVE,
for aggregation in self._attributes_aggregation.values():
data_points.append(
aggregation.collect(aggregation_temporality)
)

if current_point is not None:

yield Metric(
attributes=dict(attributes),
description=self._description,
instrumentation_scope=(
self._instrument.instrumentation_scope
),
name=self._name,
resource=self._sdk_config.resource,
unit=self._instrument.unit,
point=_convert_aggregation_temporality(
previous_point,
current_point,
self._instrument_class_temporality[
self._instrument.__class__
],
),
)
return data_points
Loading

0 comments on commit 7ad9465

Please sign in to comment.