From 6c4949c6f4fb53ac3f557c824f929a7bccbd1025 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 21 Jan 2022 10:12:06 -0600 Subject: [PATCH 1/4] Add _ViewInstrumentMatch Fixes #2300 --- .../sdk/_metrics/_view_instrument_match.py | 123 ++++++++++++++++++ .../metrics/test_view_instrument_match.py | 69 ++++++++++ 2 files changed, 192 insertions(+) create mode 100644 opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py create mode 100644 opentelemetry-sdk/tests/metrics/test_view_instrument_match.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py new file mode 100644 index 00000000000..22da89529f5 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -0,0 +1,123 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from logging import getLogger +from threading import Lock +from typing import Callable, Dict, Iterable, List, Optional + +from opentelemetry.sdk._metrics.aggregation import Aggregation, _PointVarT +from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo + +_logger = getLogger(__name__) + + +class _ViewInstrumentMatch: + def __init__( + self, + name: str, + unit: str, + description: str, + attribute_keys: Dict[str, str], + extra_dimensions: List[str], + aggregation: Aggregation, + exemplar_reservoir: Callable, + resource: Resource, + instrumentation_info: InstrumentationInfo, + ): + self._name = name + self._unit = unit + self._description = description + self._resource = resource + self._instrumentation_info = instrumentation_info + + if attribute_keys is None: + self._attribute_keys = set() + else: + self._attribute_keys = set(attribute_keys.items()) + + self._extra_dimensions = extra_dimensions + self._aggregation = aggregation + self._exemplar_reservoir = exemplar_reservoir + self._attributes_aggregation = {} + self._attributes_previous_point = {} + self._lock = Lock() + + def consume_measurement(self, measurement: Measurement) -> None: + if measurement.attributes is None: + measurement_attributes = set() + + else: + measurement_attributes = set(measurement.attributes.items()) + + attributes = frozenset( + measurement_attributes.intersection(self._attribute_keys) + ) + + # What if attributes == frozenset()? + + if attributes not in self._attributes_aggregation.keys(): + with self._lock: + self._attributes_aggregation[attributes] = self._aggregation + + self._attributes_aggregation[attributes].aggregate(measurement.value) + + def collect(self, temporality: int) -> Iterable[Metric]: + with self._lock: + for ( + attributes, + aggregation, + ) in self._attributes_aggregation.items(): + + previous_point = self._attributes_previous_point.get( + attributes + ) + + current_point = aggregation.collect() + + # pylint: disable=assignment-from-none + self._attributes_previous_point[ + attributes + ] = _convert_aggregation_temporality( + previous_point, + current_point, + AggregationTemporality.CUMULATIVE, + ) + + if current_point is not None: + + yield Metric( + attributes=dict(attributes), + description=self._description, + instrumentation_info=self._instrumentation_info, + name=self._name, + resource=self._resource, + unit=self._unit, + point=_convert_aggregation_temporality( + previous_point, + current_point, + temporality, + ), + ) + + +def _convert_aggregation_temporality( + previous_point: Optional[_PointVarT], + current_point: _PointVarT, + aggregation_temporality: int, +) -> _PointVarT: + return None diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py new file mode 100644 index 00000000000..9fd90d5b8a9 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -0,0 +1,69 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +from unittest.mock import Mock + +from opentelemetry.sdk._metrics._view_instrument_match import ( + _ViewInstrumentMatch, +) +from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.point import Metric + + +class Test_ViewInstrumentMatch(TestCase): + @classmethod + def setUpClass(cls): + + cls.mock_aggregation = Mock() + cls.mock_exemplar_reservoir = Mock() + cls.mock_resource = Mock() + cls.mock_instrumentation_info = Mock() + + cls.view_instrument_match = _ViewInstrumentMatch( + "name", + "unit", + "description", + {"a": "b", "c": "d"}, + ["a", "b", "c"], + cls.mock_aggregation, + cls.mock_exemplar_reservoir, + cls.mock_resource, + cls.mock_instrumentation_info, + ) + cls.view_instrument_match.consume_measurement( + Measurement(value=0, attributes={"c": "d", "f": "g"}) + ) + + def test_consume_measurement(self): + + self.assertEqual( + self.view_instrument_match._attributes_aggregation, + {frozenset([("c", "d")]): self.mock_aggregation}, + ) + + def test_collect(self): + + self.assertEqual( + next(self.view_instrument_match.collect(1)), + Metric( + attributes={"c": "d"}, + description="description", + instrumentation_info=self.mock_instrumentation_info, + name="name", + resource=self.mock_resource, + unit="unit", + point=None, + ), + ) From 05222d19dfaff2bd47e919f64d386184d09ea53d Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 14 Feb 2022 13:06:46 -0600 Subject: [PATCH 2/4] WIP --- .../sdk/_metrics/_view_instrument_match.py | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index 22da89529f5..e3a7cc4d0e4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -15,9 +15,11 @@ from logging import getLogger from threading import Lock -from typing import Callable, Dict, Iterable, List, Optional +from typing import Callable, Dict, Iterable, List, Set -from opentelemetry.sdk._metrics.aggregation import Aggregation, _PointVarT +from opentelemetry.sdk._metrics.aggregation import ( + Aggregation, _convert_aggregation_temporality +) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.sdk.resources import Resource @@ -31,13 +33,9 @@ def __init__( self, name: str, unit: str, - description: str, - attribute_keys: Dict[str, str], - extra_dimensions: List[str], + attribute_keys: Set[str] = None, aggregation: Aggregation, exemplar_reservoir: Callable, - resource: Resource, - instrumentation_info: InstrumentationInfo, ): self._name = name self._unit = unit @@ -58,7 +56,7 @@ def __init__( self._lock = Lock() def consume_measurement(self, measurement: Measurement) -> None: - if measurement.attributes is None: + if measurement.attributes is not None: measurement_attributes = set() else: @@ -113,11 +111,3 @@ def collect(self, temporality: int) -> Iterable[Metric]: temporality, ), ) - - -def _convert_aggregation_temporality( - previous_point: Optional[_PointVarT], - current_point: _PointVarT, - aggregation_temporality: int, -) -> _PointVarT: - return None From c39aaa216b6d0900d8115ffa26655f539e4a7c38 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Tue, 15 Feb 2022 16:38:10 -0600 Subject: [PATCH 3/4] Fix issues --- .../sdk/_metrics/_view_instrument_match.py | 47 ++++++------ .../metrics/test_view_instrument_match.py | 73 +++++++++++++++---- 2 files changed, 84 insertions(+), 36 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index e3a7cc4d0e4..8f5664b5e5a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -15,10 +15,10 @@ from logging import getLogger from threading import Lock -from typing import Callable, Dict, Iterable, List, Set +from typing import Iterable, Set from opentelemetry.sdk._metrics.aggregation import ( - Aggregation, _convert_aggregation_temporality + _convert_aggregation_temporality, ) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric @@ -33,48 +33,51 @@ def __init__( self, name: str, unit: str, + description: str, + aggregation: type, + instrumentation_info: InstrumentationInfo, + resource: Resource, attribute_keys: Set[str] = None, - aggregation: Aggregation, - exemplar_reservoir: Callable, ): self._name = name self._unit = unit self._description = description - self._resource = resource - self._instrumentation_info = instrumentation_info - - if attribute_keys is None: - self._attribute_keys = set() - else: - self._attribute_keys = set(attribute_keys.items()) - - self._extra_dimensions = extra_dimensions self._aggregation = aggregation - self._exemplar_reservoir = exemplar_reservoir + self._instrumentation_info = instrumentation_info + self._resource = resource + self._attribute_keys = attribute_keys self._attributes_aggregation = {} self._attributes_previous_point = {} self._lock = Lock() def consume_measurement(self, measurement: Measurement) -> None: - if measurement.attributes is not None: - measurement_attributes = set() + if self._attribute_keys is not None: + + attributes = {} + + for key, value in measurement.attributes.items(): + if key in self._attribute_keys: + attributes[key] = value else: - measurement_attributes = set(measurement.attributes.items()) + attributes = measurement.attributes - attributes = frozenset( - measurement_attributes.intersection(self._attribute_keys) - ) + if not attributes: - # What if attributes == frozenset()? + _logger.warning("Empty measurement attributes found") + + return + + attributes = frozenset(attributes.items()) if attributes not in self._attributes_aggregation.keys(): with self._lock: - self._attributes_aggregation[attributes] = self._aggregation + self._attributes_aggregation[attributes] = self._aggregation() self._attributes_aggregation[attributes].aggregate(measurement.value) def collect(self, temporality: int) -> Iterable[Metric]: + with self._lock: for ( attributes, diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 9fd90d5b8a9..6827dd1ec77 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from logging import WARNING from unittest import TestCase from unittest.mock import Mock @@ -26,37 +27,81 @@ class Test_ViewInstrumentMatch(TestCase): @classmethod def setUpClass(cls): - cls.mock_aggregation = Mock() - cls.mock_exemplar_reservoir = Mock() + cls.mock_aggregation_instance = Mock() + cls.mock_aggregation_class = Mock( + return_value=cls.mock_aggregation_instance + ) cls.mock_resource = Mock() cls.mock_instrumentation_info = Mock() - cls.view_instrument_match = _ViewInstrumentMatch( + def test_consume_measurement(self): + + view_instrument_match = _ViewInstrumentMatch( "name", "unit", "description", - {"a": "b", "c": "d"}, - ["a", "b", "c"], - cls.mock_aggregation, - cls.mock_exemplar_reservoir, - cls.mock_resource, - cls.mock_instrumentation_info, + self.mock_aggregation_class, + self.mock_instrumentation_info, + self.mock_resource, + {"a", "c"}, ) - cls.view_instrument_match.consume_measurement( + + view_instrument_match.consume_measurement( Measurement(value=0, attributes={"c": "d", "f": "g"}) ) + self.assertEqual( + view_instrument_match._attributes_aggregation, + {frozenset([("c", "d")]): self.mock_aggregation_instance}, + ) - def test_consume_measurement(self): + with self.assertLogs(level=WARNING): + view_instrument_match.consume_measurement( + Measurement(value=0, attributes={"w": "x", "y": "z"}) + ) + + self.assertEqual( + view_instrument_match._attributes_aggregation, + {frozenset([("c", "d")]): self.mock_aggregation_instance}, + ) + + view_instrument_match = _ViewInstrumentMatch( + "name", + "unit", + "description", + self.mock_aggregation_class, + self.mock_instrumentation_info, + self.mock_resource, + ) + view_instrument_match.consume_measurement( + Measurement(value=0, attributes={"c": "d", "f": "g"}) + ) self.assertEqual( - self.view_instrument_match._attributes_aggregation, - {frozenset([("c", "d")]): self.mock_aggregation}, + view_instrument_match._attributes_aggregation, + { + frozenset( + [("c", "d"), ("f", "g")] + ): self.mock_aggregation_instance + }, ) def test_collect(self): + view_instrument_match = _ViewInstrumentMatch( + "name", + "unit", + "description", + self.mock_aggregation_class, + self.mock_instrumentation_info, + self.mock_resource, + {"a", "c"}, + ) + + view_instrument_match.consume_measurement( + Measurement(value=0, attributes={"c": "d", "f": "g"}) + ) self.assertEqual( - next(self.view_instrument_match.collect(1)), + next(view_instrument_match.collect(1)), Metric( attributes={"c": "d"}, description="description", From 6635294b16c114a64c5ee6ecd206125d488c6cbd Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 17 Feb 2022 12:19:50 -0600 Subject: [PATCH 4/4] Allow empty frozensets for attributes --- .../sdk/_metrics/_view_instrument_match.py | 6 ------ .../tests/metrics/test_view_instrument_match.py | 13 +++++++------ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index 8f5664b5e5a..0fcb254cb52 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -62,12 +62,6 @@ def consume_measurement(self, measurement: Measurement) -> None: else: attributes = measurement.attributes - if not attributes: - - _logger.warning("Empty measurement attributes found") - - return - attributes = frozenset(attributes.items()) if attributes not in self._attributes_aggregation.keys(): diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 6827dd1ec77..769a0c3dd79 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from logging import WARNING from unittest import TestCase from unittest.mock import Mock @@ -54,14 +53,16 @@ def test_consume_measurement(self): {frozenset([("c", "d")]): self.mock_aggregation_instance}, ) - with self.assertLogs(level=WARNING): - view_instrument_match.consume_measurement( - Measurement(value=0, attributes={"w": "x", "y": "z"}) - ) + view_instrument_match.consume_measurement( + Measurement(value=0, attributes={"w": "x", "y": "z"}) + ) self.assertEqual( view_instrument_match._attributes_aggregation, - {frozenset([("c", "d")]): self.mock_aggregation_instance}, + { + frozenset(): self.mock_aggregation_instance, + frozenset([("c", "d")]): self.mock_aggregation_instance, + }, ) view_instrument_match = _ViewInstrumentMatch(