diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md index 3b15e34e4c..323ab2817d 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/CHANGELOG.md @@ -5,3 +5,5 @@ ((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180]) - Add Exporter constructor validation methods ((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206]) +- Add conversion to TimeSeries methods + ((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207]) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index 9f21aa2ae6..eadacd6e01 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import re from typing import Dict, Sequence +from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( + WriteRequest, +) from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( Label, Sample, @@ -24,6 +29,15 @@ MetricsExporter, MetricsExportResult, ) +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) + +logger = logging.getLogger(__name__) class PrometheusRemoteWriteMetricsExporter(MetricsExporter): @@ -55,6 +69,14 @@ def __init__( self.tls_config = tls_config self.proxies = proxies + self.converter_map = { + MinMaxSumCountAggregator: self._convert_from_min_max_sum_count, + SumAggregator: self._convert_from_sum, + HistogramAggregator: self._convert_from_histogram, + LastValueAggregator: self._convert_from_last_value, + ValueObserverAggregator: self._convert_from_value_observer, + } + @property def endpoint(self): return self._endpoint @@ -142,50 +164,145 @@ def export( def shutdown(self) -> None: raise NotImplementedError() - def convert_to_timeseries( + def _convert_to_timeseries( self, export_records: Sequence[ExportRecord] ) -> Sequence[TimeSeries]: - raise NotImplementedError() - - def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries: - raise NotImplementedError() + timeseries = [] + for export_record in export_records: + aggregator_type = type(export_record.aggregator) + converter = self.converter_map.get(aggregator_type) + if converter: + timeseries.extend(converter(export_record)) + else: + logger.warning( + "%s aggregator is not supported, record dropped", + aggregator_type, + ) + return timeseries - def convert_from_min_max_sum_count( + def _convert_from_sum( + self, sum_record: ExportRecord + ) -> Sequence[TimeSeries]: + return [ + self._create_timeseries( + sum_record, + sum_record.instrument.name + "_sum", + sum_record.aggregator.checkpoint, + ) + ] + + def _convert_from_min_max_sum_count( self, min_max_sum_count_record: ExportRecord - ) -> TimeSeries: - raise NotImplementedError() - - def convert_from_histogram( + ) -> Sequence[TimeSeries]: + timeseries = [] + for agg_type in ["min", "max", "sum", "count"]: + name = min_max_sum_count_record.instrument.name + "_" + agg_type + value = getattr( + min_max_sum_count_record.aggregator.checkpoint, agg_type + ) + timeseries.append( + self._create_timeseries(min_max_sum_count_record, name, value) + ) + return timeseries + + def _convert_from_histogram( self, histogram_record: ExportRecord - ) -> TimeSeries: - raise NotImplementedError() + ) -> Sequence[TimeSeries]: + timeseries = [] + for bound in histogram_record.aggregator.checkpoint.keys(): + bound_str = "+Inf" if bound == float("inf") else str(bound) + value = histogram_record.aggregator.checkpoint[bound] + timeseries.append( + self._create_timeseries( + histogram_record, + histogram_record.instrument.name + "_histogram", + value, + extra_label=("le", bound_str), + ) + ) + return timeseries - def convert_from_last_value( + def _convert_from_last_value( self, last_value_record: ExportRecord - ) -> TimeSeries: - raise NotImplementedError() - - def convert_from_value_observer( + ) -> Sequence[TimeSeries]: + return [ + self._create_timeseries( + last_value_record, + last_value_record.instrument.name + "_last", + last_value_record.aggregator.checkpoint, + ) + ] + + def _convert_from_value_observer( self, value_observer_record: ExportRecord - ) -> TimeSeries: - raise NotImplementedError() + ) -> Sequence[TimeSeries]: + timeseries = [] + for agg_type in ["min", "max", "sum", "count", "last"]: + timeseries.append( + self._create_timeseries( + value_observer_record, + value_observer_record.instrument.name + "_" + agg_type, + getattr( + value_observer_record.aggregator.checkpoint, agg_type + ), + ) + ) + return timeseries - def convert_from_quantile( + # TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries + def _convert_from_quantile( self, summary_record: ExportRecord - ) -> TimeSeries: + ) -> Sequence[TimeSeries]: raise NotImplementedError() # pylint: disable=no-member - def create_timeseries( - self, export_record: ExportRecord, name, value: float + def _create_timeseries( + self, + export_record: ExportRecord, + name: str, + value: float, + extra_label: (str, str) = None, ) -> TimeSeries: - raise NotImplementedError() - - def create_sample(self, timestamp: int, value: float) -> Sample: - raise NotImplementedError() + timeseries = TimeSeries() + seen = set() + + def add_label(label_name: str, label_value: str): + # Label name must contain only alphanumeric characters and underscores + label_name = re.sub("[^\\w_]", "_", label_name) + if label_name not in seen: + label = Label() + label.name = label_name + label.value = label_value + timeseries.labels.append(label) + seen.add(label_name) + else: + logger.warning( + "Duplicate label with name %s and value %s", + label_name, + label_value, + ) - def create_label(self, name: str, value: str) -> Label: - raise NotImplementedError() + # The __name__ label is required by PromQL as its value appears as the metric_name + add_label("__name__", name) + if extra_label: + add_label(extra_label[0], extra_label[1]) + if export_record.resource.attributes: + for ( + label_name, + label_value, + ) in export_record.resource.attributes.items(): + add_label(label_name, str(label_value)) + if export_record.labels: + for [label_name, label_value] in export_record.labels: + add_label(label_name, label_value) + + sample = Sample() + sample.timestamp = int( + export_record.aggregator.last_update_timestamp / 1000000 + ) + sample.value = value + timeseries.samples.append(sample) + return timeseries def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: raise NotImplementedError() diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 5208828fc1..1996f73417 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -13,10 +13,28 @@ # limitations under the License. import unittest +from logging import Logger +from unittest.mock import MagicMock, Mock, patch from opentelemetry.exporter.prometheus_remote_write import ( PrometheusRemoteWriteMetricsExporter, ) +from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( + Label, + Sample, + TimeSeries, +) +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util import get_dict_as_key class TestValidation(unittest.TestCase): @@ -115,44 +133,228 @@ def test_invalid_tls_config_key_only_param(self): class TestConversion(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self.exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) # Ensures conversion to timeseries function works with valid aggregation types def test_valid_convert_to_timeseries(self): - pass + test_records = [ + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + SumAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + MinMaxSumCountAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + HistogramAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + LastValueAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + ValueObserverAggregator(), + Resource({}), + ), + ] + for record in test_records: + record.aggregator.update(5) + record.aggregator.take_checkpoint() + data = self.exporter._convert_to_timeseries(test_records) + self.assertIsInstance(data, list) + self.assertEqual(len(data), 13) + for timeseries in data: + self.assertIsInstance(timeseries, TimeSeries) # Ensures conversion to timeseries fails for unsupported aggregation types def test_invalid_convert_to_timeseries(self): - pass + data = self.exporter._convert_to_timeseries( + [ExportRecord(None, None, None, Resource({}))] + ) + self.assertIsInstance(data, list) + self.assertEqual(len(data), 0) # Ensures sum aggregator is correctly converted to timeseries def test_convert_from_sum(self): - pass + sum_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + SumAggregator(), + Resource({}), + ) + sum_record.aggregator.update(3) + sum_record.aggregator.update(2) + sum_record.aggregator.take_checkpoint() + + expected_timeseries = self.exporter._create_timeseries( + sum_record, "testname_sum", 5.0 + ) + timeseries = self.exporter._convert_from_sum(sum_record) + self.assertEqual(timeseries[0], expected_timeseries) # Ensures sum min_max_count aggregator is correctly converted to timeseries def test_convert_from_min_max_sum_count(self): - pass + min_max_sum_count_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + MinMaxSumCountAggregator(), + Resource({}), + ) + min_max_sum_count_record.aggregator.update(5) + min_max_sum_count_record.aggregator.update(1) + min_max_sum_count_record.aggregator.take_checkpoint() + + expected_min_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_min", 1.0 + ) + expected_max_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_max", 5.0 + ) + expected_sum_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_sum", 6.0 + ) + expected_count_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_count", 2.0 + ) + + timeseries = self.exporter._convert_from_min_max_sum_count( + min_max_sum_count_record + ) + self.assertEqual(timeseries[0], expected_min_timeseries) + self.assertEqual(timeseries[1], expected_max_timeseries) + self.assertEqual(timeseries[2], expected_sum_timeseries) + self.assertEqual(timeseries[3], expected_count_timeseries) # Ensures histogram aggregator is correctly converted to timeseries def test_convert_from_histogram(self): - pass + histogram_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + HistogramAggregator(), + Resource({}), + ) + histogram_record.aggregator.update(5) + histogram_record.aggregator.update(2) + histogram_record.aggregator.update(-1) + histogram_record.aggregator.take_checkpoint() + + expected_le_0_timeseries = self.exporter._create_timeseries( + histogram_record, "testname_histogram", 1.0, ("le", "0") + ) + expected_le_inf_timeseries = self.exporter._create_timeseries( + histogram_record, "testname_histogram", 2.0, ("le", "+Inf") + ) + timeseries = self.exporter._convert_from_histogram(histogram_record) + self.assertEqual(timeseries[0], expected_le_0_timeseries) + self.assertEqual(timeseries[1], expected_le_inf_timeseries) # Ensures last value aggregator is correctly converted to timeseries def test_convert_from_last_value(self): - pass + last_value_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + LastValueAggregator(), + Resource({}), + ) + last_value_record.aggregator.update(1) + last_value_record.aggregator.update(5) + last_value_record.aggregator.take_checkpoint() + + expected_timeseries = self.exporter._create_timeseries( + last_value_record, "testname_last", 5.0 + ) + timeseries = self.exporter._convert_from_last_value(last_value_record) + self.assertEqual(timeseries[0], expected_timeseries) # Ensures value observer aggregator is correctly converted to timeseries def test_convert_from_value_observer(self): - pass + value_observer_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + ValueObserverAggregator(), + Resource({}), + ) + value_observer_record.aggregator.update(5) + value_observer_record.aggregator.update(1) + value_observer_record.aggregator.update(2) + value_observer_record.aggregator.take_checkpoint() + + expected_min_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_min", 1.0 + ) + expected_max_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_max", 5.0 + ) + expected_sum_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_sum", 8.0 + ) + expected_count_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_count", 3.0 + ) + expected_last_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_last", 2.0 + ) + timeseries = self.exporter._convert_from_value_observer( + value_observer_record + ) + self.assertEqual(timeseries[0], expected_min_timeseries) + self.assertEqual(timeseries[1], expected_max_timeseries) + self.assertEqual(timeseries[2], expected_sum_timeseries) + self.assertEqual(timeseries[3], expected_count_timeseries) + self.assertEqual(timeseries[4], expected_last_timeseries) # Ensures quantile aggregator is correctly converted to timeseries - # TODO: Add test once method is implemented - def test_convert_from_quantile(self): - pass + # TODO: Add test_convert_from_quantile once method is implemented # Ensures timeseries produced contains appropriate sample and labels def test_create_timeseries(self): - pass + def create_label(name, value): + label = Label() + label.name = name + label.value = value + return label + + sum_aggregator = SumAggregator() + sum_aggregator.update(5) + sum_aggregator.take_checkpoint() + export_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + get_dict_as_key({"record_name": "record_value"}), + sum_aggregator, + Resource({"resource_name": "resource_value"}), + ) + + expected_timeseries = TimeSeries() + expected_timeseries.labels.append(create_label("__name__", "testname")) + expected_timeseries.labels.append( + create_label("resource_name", "resource_value") + ) + expected_timeseries.labels.append( + create_label("record_name", "record_value") + ) + + sample = expected_timeseries.samples.add() + sample.timestamp = int(sum_aggregator.last_update_timestamp / 1000000) + sample.value = 5.0 + + timeseries = self.exporter._create_timeseries( + export_record, "testname", 5.0 + ) + self.assertEqual(timeseries, expected_timeseries) class TestExport(unittest.TestCase):