diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index c4c013bf56..ee4dcc1fd3 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -153,131 +153,144 @@ def export( def shutdown(self) -> None: raise NotImplementedError() - def convert_to_timeseries( + def _convert_to_timeseries( self, export_records: Sequence[ExportRecord] ) -> Sequence[TimeSeries]: converter_map = { - MinMaxSumCountAggregator: self.convert_from_min_max_sum_count, - SumAggregator: self.convert_from_sum, - HistogramAggregator: self.convert_from_histogram, - LastValueAggregator: self.convert_from_last_value, - ValueObserverAggregator: self.convert_from_last_value, + MinMaxSumCountAggregator: self._convert_from_min_max_sum_count, + SumAggregator: self._convert_from_sum, + HistogramAggregator: self._convert_from_histogram, + LastValueAggregator: self._convert_from_last_value, + ValueObserverAggregator: self._convert_from_value_observer, } timeseries = [] for export_record in export_records: aggregator_type = type(export_record.aggregator) converter = converter_map.get(aggregator_type) - if not converter: + if converter is None: raise ValueError( str(aggregator_type) + " conversion is not supported" ) timeseries.extend(converter(export_record)) return timeseries - def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries: - name = sum_record.instrument.name - value = sum_record.aggregator.checkpoint - return [self.create_timeseries(sum_record, name, value)] + def _convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries: + return [ + self._create_timeseries( + sum_record, + sum_record.instrument.name, + sum_record.aggregator.checkpoint, + ) + ] - def convert_from_min_max_sum_count( + def _convert_from_min_max_sum_count( self, min_max_sum_count_record: ExportRecord ) -> TimeSeries: timeseries = [] - agg_types = ["min", "max", "sum", "count"] - for agg_type in agg_types: + for agg_type in ["min", "max", "sum", "count"]: name = min_max_sum_count_record.instrument.name + "_" + agg_type value = getattr( min_max_sum_count_record.aggregator.checkpoint, agg_type ) timeseries.append( - self.create_timeseries(min_max_sum_count_record, name, value) + self._create_timeseries(min_max_sum_count_record, name, value) ) return timeseries - def convert_from_histogram( + def _convert_from_histogram( self, histogram_record: ExportRecord ) -> TimeSeries: count = 0 timeseries = [] for bound in histogram_record.aggregator.checkpoint.keys(): - bb = "+Inf" if bound == float("inf") else str(bound) - name = ( - histogram_record.instrument.name + '_bucket{le="' + bb + '"}' - ) + bound_str = "+Inf" if bound == float("inf") else str(bound) value = histogram_record.aggregator.checkpoint[bound] timeseries.append( - self.create_timeseries(histogram_record, name, value) + self._create_timeseries( + histogram_record, + histogram_record.instrument.name, + value, + labels=[("le", bound_str)], + ) ) count += value name = histogram_record.instrument.name + "_count" timeseries.append( - self.create_timeseries(histogram_record, name, float(count)) + self._create_timeseries(histogram_record, name, float(count)) ) return timeseries - def convert_from_last_value( + def _convert_from_last_value( self, last_value_record: ExportRecord ) -> TimeSeries: - name = last_value_record.instrument.name - value = last_value_record.aggregator.checkpoint - return [self.create_timeseries(last_value_record, name, value)] + return [ + self._create_timeseries( + last_value_record, + last_value_record.instrument.name, + last_value_record.aggregator.checkpoint, + ) + ] - def convert_from_value_observer( + def _convert_from_value_observer( self, value_observer_record: ExportRecord ) -> TimeSeries: timeseries = [] - agg_types = ["min", "max", "sum", "count", "last"] - for agg_type in agg_types: - name = value_observer_record.instrument.name + "_" + agg_type - value = getattr( - value_observer_record.aggregator.checkpoint, agg_type - ) + for agg_type in ["min", "max", "sum", "count", "last"]: timeseries.append( - self.create_timeseries(value_observer_record, name, value) + self._create_timeseries( + value_observer_record, + value_observer_record.instrument.name + "_" + agg_type, + getattr( + value_observer_record.aggregator.checkpoint, agg_type + ), + ) ) return timeseries # TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries - def convert_from_quantile( + def _convert_from_quantile( self, summary_record: ExportRecord ) -> TimeSeries: raise NotImplementedError() # pylint: disable=no-member - def create_timeseries( - self, export_record: ExportRecord, name, value: float + def _create_timeseries( + self, export_record: ExportRecord, name, value: float, labels=None ) -> TimeSeries: timeseries = TimeSeries() - # Add name label, record labels and resource labels - timeseries.labels.append(self.create_label("__name__", name)) - resource_attributes = export_record.resource.attributes - for label_name, label_value in resource_attributes.items(): - timeseries.labels.append( - self.create_label(label_name, label_value) - ) - for label in export_record.labels: - if label[0] not in resource_attributes.keys(): - timeseries.labels.append(self.create_label(label[0], label[1])) - # Add sample - timeseries.samples.append( - self.create_sample( - export_record.aggregator.last_update_timestamp, value - ) - ) - return timeseries + seen = set() + + def add_label(label_name, label_value): + # Label name must contain only alphanumeric characters and underscores + label_name = re.sub("[^\\w_]", "_", label_name) + if label_name not in seen: + label = Label() + label.name = label_name + label.value = label_value + timeseries.labels.append(label) + seen.add(label_name) + + add_label("__name__", name) + if labels: + for [label_name, label_value] in labels: + add_label(label_name, label_value) + if export_record.resource.attributes: + for ( + label_name, + label_value, + ) in export_record.resource.attributes.items(): + add_label(label_name, label_value) + if export_record.labels: + for [label_name, label_value] in export_record.labels: + add_label(label_name, label_value) - def create_sample(self, timestamp: int, value: float) -> Sample: sample = Sample() - sample.timestamp = int(timestamp / 1000000) + sample.timestamp = int( + export_record.aggregator.last_update_timestamp / 1000000 + ) sample.value = value - return sample - - def create_label(self, name: str, value: str) -> Label: - label = Label() - # Label name must contain only alphanumeric characters and underscores - label.name = re.sub("[^0-9a-zA-Z_]+", "_", name) - label.value = value - return label + timeseries.samples.append(sample) + return timeseries def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: raise NotImplementedError() diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index c837556f30..b1dcf656a9 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -13,12 +13,14 @@ # limitations under the License. import unittest -from unittest import mock +from unittest.mock import MagicMock, Mock, patch from opentelemetry.exporter.prometheus_remote_write import ( PrometheusRemoteWriteMetricsExporter, ) from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( + Label, + Sample, TimeSeries, ) from opentelemetry.sdk.metrics import Counter @@ -130,185 +132,233 @@ def test_invalid_tls_config_key_only_param(self): class TestConversion(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - self._test_metric = Counter( - "testname", "testdesc", "testunit", int, None - ) - self._exporter = PrometheusRemoteWriteMetricsExporter( + self.exporter = PrometheusRemoteWriteMetricsExporter( endpoint="/prom/test_endpoint" ) - def generate_record(aggregator_type): - return ExportRecord( - self._test_metric, None, aggregator_type(), Resource({}), - ) - - self._generate_record = generate_record - - def converter_method(record, name, value): - return (type(record.aggregator), name, value) - - self._converter_mock = mock.MagicMock(return_value=converter_method) - # Ensures conversion to timeseries function works with valid aggregation types def test_valid_convert_to_timeseries(self): - timeseries_mock_method = mock.Mock(return_value=["test_value"]) - self._exporter.convert_from_sum = timeseries_mock_method - self._exporter.convert_from_min_max_sum_count = timeseries_mock_method - self._exporter.convert_from_histogram = timeseries_mock_method - self._exporter.convert_from_last_value = timeseries_mock_method - self._exporter.convert_from_value_observer = timeseries_mock_method test_records = [ - self._generate_record(SumAggregator), - self._generate_record(MinMaxSumCountAggregator), - self._generate_record(HistogramAggregator), - self._generate_record(LastValueAggregator), - self._generate_record(ValueObserverAggregator), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + SumAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + MinMaxSumCountAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + HistogramAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + LastValueAggregator(), + Resource({}), + ), + ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + ValueObserverAggregator(), + Resource({}), + ), ] - data = self._exporter.convert_to_timeseries(test_records) - self.assertEqual(len(data), 5) + for record in test_records: + record.aggregator.update(5) + record.aggregator.take_checkpoint() + data = self.exporter._convert_to_timeseries(test_records) + self.assertEqual(len(data), 14) for timeseries in data: - self.assertEqual(timeseries, "test_value") - - no_type_records = [self._generate_record(lambda: None)] - with self.assertRaises(ValueError): - self._exporter.convert_to_timeseries(no_type_records) + self.assertIsInstance(timeseries, TimeSeries) # Ensures conversion to timeseries fails for unsupported aggregation types def test_invalid_convert_to_timeseries(self): - no_type_records = [self._generate_record(lambda: None)] with self.assertRaises(ValueError): - self._exporter.convert_to_timeseries(no_type_records) + self.exporter._convert_to_timeseries( + [ExportRecord(None, None, None, Resource({}))] + ) # Ensures sum aggregator is correctly converted to timeseries def test_convert_from_sum(self): - sum_record = self._generate_record(SumAggregator) + sum_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + SumAggregator(), + Resource({}), + ) sum_record.aggregator.update(3) sum_record.aggregator.update(2) sum_record.aggregator.take_checkpoint() - self._exporter.create_timeseries = self._converter_mock() - timeseries = self._exporter.convert_from_sum(sum_record) - self.assertEqual(timeseries[0], (SumAggregator, "testname", 5)) + expected_timeseries = self.exporter._create_timeseries( + sum_record, "testname", 5.0 + ) + timeseries = self.exporter._convert_from_sum(sum_record) + self.assertEqual(timeseries[0], expected_timeseries) # Ensures sum min_max_count aggregator is correctly converted to timeseries def test_convert_from_min_max_sum_count(self): - min_max_sum_count_record = self._generate_record( - MinMaxSumCountAggregator + min_max_sum_count_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + MinMaxSumCountAggregator(), + Resource({}), ) min_max_sum_count_record.aggregator.update(5) min_max_sum_count_record.aggregator.update(1) min_max_sum_count_record.aggregator.take_checkpoint() - self._exporter.create_timeseries = self._converter_mock() - timeseries = self._exporter.convert_from_min_max_sum_count( - min_max_sum_count_record + expected_min_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_min", 1.0 ) - self.assertEqual( - timeseries[0], (MinMaxSumCountAggregator, "testname_min", 1) + expected_max_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_max", 5.0 ) - self.assertEqual( - timeseries[1], (MinMaxSumCountAggregator, "testname_max", 5) + expected_sum_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_sum", 6.0 ) - self.assertEqual( - timeseries[2], (MinMaxSumCountAggregator, "testname_sum", 6) + expected_count_timeseries = self.exporter._create_timeseries( + min_max_sum_count_record, "testname_count", 2.0 ) - self.assertEqual( - timeseries[3], (MinMaxSumCountAggregator, "testname_count", 2) + + timeseries = self.exporter._convert_from_min_max_sum_count( + min_max_sum_count_record ) + self.assertEqual(timeseries[0], expected_min_timeseries) + self.assertEqual(timeseries[1], expected_max_timeseries) + self.assertEqual(timeseries[2], expected_sum_timeseries) + self.assertEqual(timeseries[3], expected_count_timeseries) # Ensures histogram aggregator is correctly converted to timeseries def test_convert_from_histogram(self): - histogram_record = self._generate_record(HistogramAggregator) + histogram_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + HistogramAggregator(), + Resource({}), + ) histogram_record.aggregator.update(5) histogram_record.aggregator.update(2) histogram_record.aggregator.update(-1) histogram_record.aggregator.take_checkpoint() - self._exporter.create_timeseries = self._converter_mock() - timeseries = self._exporter.convert_from_histogram(histogram_record) - self.assertEqual( - timeseries[0], (HistogramAggregator, 'testname_bucket{le="0"}', 1) + expected_le_0_timeseries = self.exporter._create_timeseries( + histogram_record, "testname", 1.0, [("le", "0")] ) - self.assertEqual( - timeseries[1], - (HistogramAggregator, 'testname_bucket{le="+Inf"}', 2), + expected_le_inf_timeseries = self.exporter._create_timeseries( + histogram_record, "testname", 2.0, [("le", "+Inf")] + ) + expected_count_timeseries = self.exporter._create_timeseries( + histogram_record, "testname_count", 3.0 ) + timeseries = self.exporter._convert_from_histogram(histogram_record) + self.assertEqual(timeseries[0], expected_le_0_timeseries) self.assertEqual( - timeseries[2], (HistogramAggregator, "testname_count", 3) + timeseries[1], + expected_le_inf_timeseries, ) + self.assertEqual(timeseries[2], expected_count_timeseries) # Ensures last value aggregator is correctly converted to timeseries def test_convert_from_last_value(self): - last_value_record = self._generate_record(LastValueAggregator) + last_value_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + LastValueAggregator(), + Resource({}), + ) last_value_record.aggregator.update(1) last_value_record.aggregator.update(5) last_value_record.aggregator.take_checkpoint() - self._exporter.create_timeseries = self._converter_mock() - timeseries = self._exporter.convert_from_last_value(last_value_record) - self.assertEqual(timeseries[0], (LastValueAggregator, "testname", 5)) + expected_timeseries = self.exporter._create_timeseries( + last_value_record, "testname", 5.0 + ) + timeseries = self.exporter._convert_from_last_value(last_value_record) + self.assertEqual(timeseries[0], expected_timeseries) # Ensures value observer aggregator is correctly converted to timeseries def test_convert_from_value_observer(self): - value_observer_record = self._generate_record(ValueObserverAggregator) + value_observer_record = ExportRecord( + Counter("testname", "testdesc", "testunit", int, None), + None, + ValueObserverAggregator(), + Resource({}), + ) value_observer_record.aggregator.update(5) value_observer_record.aggregator.update(1) value_observer_record.aggregator.update(2) value_observer_record.aggregator.take_checkpoint() - self._exporter.create_timeseries = self._converter_mock() - timeseries = self._exporter.convert_from_value_observer( - value_observer_record + expected_min_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_min", 1.0 ) - self.assertEqual( - timeseries[0], (ValueObserverAggregator, "testname_min", 1) + expected_max_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_max", 5.0 ) - self.assertEqual( - timeseries[1], (ValueObserverAggregator, "testname_max", 5) + expected_sum_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_sum", 8.0 ) - self.assertEqual( - timeseries[2], (ValueObserverAggregator, "testname_sum", 8) + expected_count_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_count", 3.0 ) - self.assertEqual( - timeseries[3], (ValueObserverAggregator, "testname_count", 3) + expected_last_timeseries = self.exporter._create_timeseries( + value_observer_record, "testname_last", 2.0 ) - self.assertEqual( - timeseries[4], (ValueObserverAggregator, "testname_last", 2) + timeseries = self.exporter._convert_from_value_observer( + value_observer_record ) + self.assertEqual(timeseries[0], expected_min_timeseries) + self.assertEqual(timeseries[1], expected_max_timeseries) + self.assertEqual(timeseries[2], expected_sum_timeseries) + self.assertEqual(timeseries[3], expected_count_timeseries) + self.assertEqual(timeseries[4], expected_last_timeseries) # Ensures quantile aggregator is correctly converted to timeseries - # TODO: Add test once method is implemented - def test_convert_from_quantile(self): - pass + # TODO: Add test_convert_from_quantile once method is implemented # Ensures timeseries produced contains appropriate sample and labels def test_create_timeseries(self): + def create_label(name, value): + label = Label() + label.name = name + label.value = value + return label + sum_aggregator = SumAggregator() sum_aggregator.update(5) sum_aggregator.take_checkpoint() - sum_aggregator.last_update_timestamp = 10 export_record = ExportRecord( - self._test_metric, + Counter("testname", "testdesc", "testunit", int, None), get_dict_as_key({"record_name": "record_value"}), sum_aggregator, Resource({"resource_name": "resource_value"}), ) expected_timeseries = TimeSeries() + expected_timeseries.labels.append(create_label("__name__", "testname")) expected_timeseries.labels.append( - self._exporter.create_label("__name__", "testname") + create_label("resource_name", "resource_value") ) expected_timeseries.labels.append( - self._exporter.create_label("resource_name", "resource_value") + create_label("record_name", "record_value") ) - expected_timeseries.labels.append( - self._exporter.create_label("record_name", "record_value") - ) - expected_timeseries.samples.append( - self._exporter.create_sample(10, 5.0), - ) - timeseries = self._exporter.create_timeseries( - export_record, "testname", 5.0, + + sample = expected_timeseries.samples.add() + sample.timestamp = int(sum_aggregator.last_update_timestamp / 1000000) + sample.value = 5.0 + + timeseries = self.exporter._create_timeseries( + export_record, + "testname", + 5.0, ) self.assertEqual(timeseries, expected_timeseries)