Skip to content

Commit

Permalink
Refactor metric format
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed May 10, 2022
1 parent a821311 commit 64109da
Show file tree
Hide file tree
Showing 19 changed files with 921 additions and 1,040 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from json import dumps
from logging import getLogger
from re import IGNORECASE, UNICODE, compile
from typing import Dict, Iterable, Sequence, Tuple, Union
from typing import Dict, Sequence, Tuple, Union

from prometheus_client.core import (
REGISTRY,
Expand All @@ -80,20 +80,23 @@
from opentelemetry.sdk._metrics.export import (
Gauge,
Histogram,
Metric,
HistogramDataPoint,
MetricReader,
Sum,
MetricsData
)

_logger = getLogger(__name__)


def _convert_buckets(metric: Metric) -> Sequence[Tuple[str, int]]:
def _convert_buckets(
bucket_counts: Sequence[int], explicit_bounds: Sequence[float]
) -> Sequence[Tuple[str, int]]:
buckets = []
total_count = 0
for upper_bound, count in zip(
chain(metric.point.explicit_bounds, ["+Inf"]),
metric.point.bucket_counts,
chain(explicit_bounds, ["+Inf"]),
bucket_counts,
):
total_count += count
buckets.append((f"{upper_bound}", total_count))
Expand All @@ -117,13 +120,13 @@ def __init__(self, prefix: str = "") -> None:

def _receive_metrics(
self,
metrics: Iterable[Metric],
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics is None:
if metrics_data is None:
return
self._collector.add_metrics_data(metrics)
self._collector.add_metrics_data(metrics_data)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
REGISTRY.unregister(self._collector)
Expand All @@ -139,14 +142,14 @@ class _CustomCollector:
def __init__(self, prefix: str = ""):
self._prefix = prefix
self._callback = None
self._metrics_to_export = deque()
self._metrics_datas = deque()
self._non_letters_digits_underscore_re = compile(
r"[^\w]", UNICODE | IGNORECASE
)

def add_metrics_data(self, export_records: Sequence[Metric]) -> None:
def add_metrics_data(self, metrics_data: MetricsData) -> None:
"""Add metrics to Prometheus data"""
self._metrics_to_export.append(export_records)
self._metrics_datas.append(metrics_data)

def collect(self) -> None:
"""Collect fetches the metrics from OpenTelemetry
Expand All @@ -159,96 +162,153 @@ def collect(self) -> None:

metric_family_id_metric_family = {}

while self._metrics_to_export:
for export_record in self._metrics_to_export.popleft():
self._translate_to_prometheus(
export_record, metric_family_id_metric_family
)
while self._metrics_datas:
self._translate_to_prometheus(
self._metrics_datas.popleft(), metric_family_id_metric_family
)

if metric_family_id_metric_family:
for metric_family in metric_family_id_metric_family.values():
yield metric_family

def _translate_to_prometheus(
self,
metric: Metric,
metrics_data: MetricsData,
metric_family_id_metric_family: Dict[str, PrometheusMetric],
):
label_values = []
label_keys = []
for key, value in metric.attributes.items():
label_keys.append(self._sanitize(key))
label_values.append(self._check_value(value))

metric_name = ""
if self._prefix != "":
metric_name = self._prefix + "_"
metric_name += self._sanitize(metric.name)

description = metric.description or ""

metric_family_id = "|".join(
[metric_name, description, "%".join(label_keys), metric.unit]
)

if isinstance(metric.point, Sum):

metric_family_id = "|".join(
[metric_family_id, CounterMetricFamily.__name__]
)

if metric_family_id not in metric_family_id_metric_family.keys():
metric_family_id_metric_family[
metric_family_id
] = CounterMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
metrics = []

for resource_metrics in metrics_data.resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
metrics.append(metric)

for metric in metrics:
# pylint: disable=too-many-locals,too-many-branches
label_keyss = []
label_valuess = []
values = []

for number_data_point in metric.data.data_points:
label_keys = []
label_values = []

for key, value in number_data_point.attributes.items():
label_keys.append(self._sanitize(key))
label_values.append(self._check_value(value))

label_keyss.append(label_keys)
label_valuess.append(label_values)
if isinstance(number_data_point, HistogramDataPoint):
values.append(
{
"bucket_counts": number_data_point.bucket_counts,
"explicit_bounds": (
number_data_point.explicit_bounds
),
"sum": number_data_point.sum,
}
)
else:
values.append(number_data_point.value)

metric_name = ""
if self._prefix != "":
metric_name = self._prefix + "_"
metric_name += self._sanitize(metric.name)

description = metric.description or ""

pre_metric_family_ids = []

for label_keys in label_keyss:
pre_metric_family_ids.append(
"|".join(
[
metric_name,
description,
"%".join(label_keys),
metric.unit,
]
)
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=metric.point.value
)
elif isinstance(metric.point, Gauge):

metric_family_id = "|".join(
[metric_family_id, GaugeMetricFamily.__name__]
)

if metric_family_id not in metric_family_id_metric_family.keys():
metric_family_id_metric_family[
metric_family_id
] = GaugeMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=metric.point.value
)
elif isinstance(metric.point, Histogram):

metric_family_id = "|".join(
[metric_family_id, HistogramMetricFamily.__name__]
)

if metric_family_id not in metric_family_id_metric_family.keys():
metric_family_id_metric_family[
metric_family_id
] = HistogramMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values,
buckets=_convert_buckets(metric),
sum_value=metric.point.sum,
)
else:
_logger.warning("Unsupported metric type. %s", type(metric.point))
for pre_metric_family_id, label_values, value in zip(
pre_metric_family_ids, label_valuess, values
):
if isinstance(metric.data, Sum):

metric_family_id = "|".join(
[pre_metric_family_id, CounterMetricFamily.__name__]
)

if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = CounterMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=value
)
elif isinstance(metric.data, Gauge):

metric_family_id = "|".join(
[pre_metric_family_id, GaugeMetricFamily.__name__]
)

if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = GaugeMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[metric_family_id].add_metric(
labels=label_values, value=value
)
elif isinstance(metric.data, Histogram):

metric_family_id = "|".join(
[pre_metric_family_id, HistogramMetricFamily.__name__]
)

if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = HistogramMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
metric_family_id_metric_family[
metric_family_id
].add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"], value["explicit_bounds"]
),
sum_value=value["sum"],
)
else:
_logger.warning(
"Unsupported metric data. %s", type(metric.data)
)

def _sanitize(self, key: str) -> str:
"""sanitize the given metric name or label according to Prometheus rule.
Expand Down
Loading

0 comments on commit 64109da

Please sign in to comment.