Skip to content

Commit

Permalink
add histogram
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Boten committed Feb 25, 2022
1 parent ee2046e commit aad4633
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,15 @@
import collections
import logging
import re
from typing import Iterable, Optional, Sequence, Union
from typing import Optional, Sequence, Tuple

from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
)
from prometheus_client import core

from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
)
from opentelemetry.sdk._metrics.point import (
AggregationTemporality,
Gauge,
Histogram,
Metric,
Sum,
)
from opentelemetry.sdk._metrics.point import Gauge, Histogram, Metric, Sum

logger = logging.getLogger(__name__)

Expand All @@ -99,14 +88,14 @@ class PrometheusMetricExporter(MetricExporter):

def __init__(self, prefix: str = ""):
self._collector = CustomCollector(prefix)
REGISTRY.register(self._collector)
core.REGISTRY.register(self._collector)

def export(self, export_records: Sequence[Metric]) -> MetricExportResult:
self._collector.add_metrics_data(export_records)
return MetricExportResult.SUCCESS

def shutdown(self) -> None:
REGISTRY.unregister(self._collector)
core.REGISTRY.unregister(self._collector)


class CustomCollector:
Expand All @@ -124,7 +113,7 @@ def __init__(self, prefix: str = ""):
def add_metrics_data(self, export_records: Sequence[Metric]) -> None:
self._metrics_to_export.append(export_records)

def collect(self):
def collect(self) -> None:
"""Collect fetches the metrics from OpenTelemetry
and delivers them as Prometheus Metrics.
Collect is invoked every time a prometheus.Gatherer is run
Expand All @@ -139,49 +128,71 @@ def collect(self):
if prometheus_metric is not None:
yield prometheus_metric

def _translate_to_prometheus(self, export_record: Metric):
def _convert_buckets(self, metric: Metric) -> Sequence[Tuple[str, int]]:
buckets = []
total_count = 0
for i in range(0, len(metric.point.bucket_counts)):
total_count += metric.point.bucket_counts[i]
buckets.append(
(
f"{metric.point.explicit_bounds[i]}",
total_count,
)
)
return buckets

def _translate_to_prometheus(
self, metric: Metric
) -> Optional[core.Metric]:
prometheus_metric = None
label_values = []
label_keys = []
for key, value in export_record.attributes.items():
for key, value in metric.attributes.items():
label_keys.append(self._sanitize(key))
label_values.append(str(value))

metric_name = ""
if self._prefix != "":
metric_name = self._prefix + "_"
metric_name += self._sanitize(export_record.name)

description = export_record.description or ""
if isinstance(export_record.point, Sum):
prometheus_metric = CounterMetricFamily(
name=metric_name, documentation=description, labels=label_keys
metric_name += self._sanitize(metric.name)

description = metric.description or ""
if isinstance(metric.point, Sum):
prometheus_metric = core.CounterMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
prometheus_metric.add_metric(
labels=label_values, value=export_record.point.value
labels=label_values, value=metric.point.value
)
elif isinstance(export_record.point, Gauge):
prometheus_metric = GaugeMetricFamily(
name=metric_name, documentation=description, labels=label_keys
elif isinstance(metric.point, Gauge):
prometheus_metric = core.GaugeMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
prometheus_metric.add_metric(
labels=label_values, value=export_record.point.value
labels=label_values, value=metric.point.value
)
elif isinstance(metric.point, Histogram):
value = metric.point.sum
prometheus_metric = core.HistogramMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
unit=metric.unit,
)
buckets = self._convert_buckets(metric)
prometheus_metric.add_metric(
labels=label_values, buckets=buckets, sum_value=value
)
# TODO: Add support for histograms when supported in OT
# elif isinstance(export_record.point, Histogram):
# value = export_record.point.sum
# prometheus_metric = HistogramMetricFamily(
# name=metric_name,
# documentation=description,
# labels=label_keys,
# )
# prometheus_metric.add_metric(labels=label_values, buckets=export_record.point.explicit_bounds, sum_value=value)
# TODO: add support for Summary once implemented
# elif isinstance(export_record.point, Summary):
else:
logger.warning(
"Unsupported metric type. %s", type(export_record.point)
)
logger.warning("Unsupported metric type. %s", type(metric.point))
return prometheus_metric

def _sanitize(self, key: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
)
from opentelemetry.sdk._metrics import MeterProvider
from opentelemetry.sdk._metrics.export import MetricExportResult
from opentelemetry.sdk._metrics.point import (
AggregationTemporality,
Histogram,
Metric,
)
from opentelemetry.sdk._metrics.point import AggregationTemporality, Histogram
from opentelemetry.sdk.util import get_dict_as_key
from opentelemetry.test.metrictestutil import (
_generate_gauge,
Expand All @@ -41,16 +37,6 @@

class TestPrometheusMetricExporter(unittest.TestCase):
def setUp(self):
set_meter_provider(MeterProvider())
self._meter = get_meter_provider().get_meter(__name__)
self._test_metric = self._meter.create_counter(
"testname",
description="testdesc",
unit="unit",
)
labels = {"environment": "staging"}
self._labels_key = get_dict_as_key(labels)

self._mock_registry_register = mock.Mock()
self._registry_register_patch = mock.patch(
"prometheus_client.core.REGISTRY.register",
Expand Down Expand Up @@ -82,7 +68,6 @@ def test_export(self):
self.assertEqual(len(exporter._collector._metrics_to_export), 1)
self.assertIs(result, MetricExportResult.SUCCESS)

# # TODO: Add unit test for histogram
def test_histogram_to_prometheus(self):
record = _generate_metric(
"test@name",
Expand All @@ -94,14 +79,15 @@ def test_histogram_to_prometheus(self):
explicit_bounds=[123.0, 456.0],
aggregation_temporality=AggregationTemporality.DELTA,
),
attributes={"histo": 1},
)

# collector = CustomCollector("testprefix")
# collector.add_metrics_data([record])
# result_bytes = generate_latest(collector)
# result = result_bytes.decode("utf-8")
# self.assertIn("testprefix_test_name_count 2.0", result)
# self.assertIn("testprefix_test_name_sum 579.0", result)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
result_bytes = generate_latest(collector)
result = result_bytes.decode("utf-8")
self.assertIn('testprefix_test_name_s_sum{histo="1"} 579.0', result)
self.assertIn('testprefix_test_name_s_count{histo="1"} 2.0', result)

def test_sum_to_prometheus(self):
labels = {"environment@": "staging", "os": "Windows"}
Expand All @@ -117,7 +103,9 @@ def test_sum_to_prometheus(self):

for prometheus_metric in collector.collect():
self.assertEqual(type(prometheus_metric), CounterMetricFamily)
self.assertEqual(prometheus_metric.name, "testprefix_test_sum")
self.assertEqual(
prometheus_metric.name, "testprefix_test_sum_testunit"
)
self.assertEqual(prometheus_metric.documentation, "testdesc")
self.assertTrue(len(prometheus_metric.samples) == 1)
self.assertEqual(prometheus_metric.samples[0].value, 123)
Expand All @@ -143,7 +131,9 @@ def test_gauge_to_prometheus(self):

for prometheus_metric in collector.collect():
self.assertEqual(type(prometheus_metric), GaugeMetricFamily)
self.assertEqual(prometheus_metric.name, "testprefix_test_gauge")
self.assertEqual(
prometheus_metric.name, "testprefix_test_gauge_testunit"
)
self.assertEqual(prometheus_metric.documentation, "testdesc")
self.assertTrue(len(prometheus_metric.samples) == 1)
self.assertEqual(prometheus_metric.samples[0].value, 123)
Expand Down

0 comments on commit aad4633

Please sign in to comment.