-
Notifications
You must be signed in to change notification settings - Fork 656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add _ViewInstrumentMatch #2400
Add _ViewInstrumentMatch #2400
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
# Copyright The OpenTelemetry Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
from logging import getLogger | ||
from threading import Lock | ||
from typing import Iterable, Set | ||
|
||
from opentelemetry.sdk._metrics.aggregation import ( | ||
_convert_aggregation_temporality, | ||
) | ||
from opentelemetry.sdk._metrics.measurement import Measurement | ||
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric | ||
from opentelemetry.sdk.resources import Resource | ||
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo | ||
|
||
_logger = getLogger(__name__) | ||
|
||
|
||
class _ViewInstrumentMatch: | ||
def __init__( | ||
self, | ||
name: str, | ||
unit: str, | ||
description: str, | ||
aggregation: type, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does |
||
instrumentation_info: InstrumentationInfo, | ||
resource: Resource, | ||
attribute_keys: Set[str] = None, | ||
): | ||
self._name = name | ||
self._unit = unit | ||
self._description = description | ||
self._aggregation = aggregation | ||
self._instrumentation_info = instrumentation_info | ||
self._resource = resource | ||
self._attribute_keys = attribute_keys | ||
self._attributes_aggregation = {} | ||
self._attributes_previous_point = {} | ||
ocelotl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._lock = Lock() | ||
|
||
def consume_measurement(self, measurement: Measurement) -> None: | ||
|
||
if self._attribute_keys is not None: | ||
|
||
attributes = {} | ||
|
||
for key, value in measurement.attributes.items(): | ||
if key in self._attribute_keys: | ||
attributes[key] = value | ||
else: | ||
attributes = measurement.attributes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be None There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, @codeboten fixed this in #2469. ✌️ |
||
|
||
if not attributes: | ||
|
||
_logger.warning("Empty measurement attributes found") | ||
|
||
return | ||
|
||
attributes = frozenset(attributes.items()) | ||
|
||
if attributes not in self._attributes_aggregation.keys(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
with self._lock: | ||
self._attributes_aggregation[attributes] = self._aggregation() | ||
ocelotl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self._attributes_aggregation[attributes].aggregate(measurement.value) | ||
|
||
def collect(self, temporality: int) -> Iterable[Metric]: | ||
|
||
with self._lock: | ||
for ( | ||
attributes, | ||
aggregation, | ||
) in self._attributes_aggregation.items(): | ||
|
||
previous_point = self._attributes_previous_point.get( | ||
attributes | ||
) | ||
|
||
current_point = aggregation.collect() | ||
|
||
# pylint: disable=assignment-from-none | ||
self._attributes_previous_point[ | ||
attributes | ||
] = _convert_aggregation_temporality( | ||
previous_point, | ||
current_point, | ||
AggregationTemporality.CUMULATIVE, | ||
) | ||
|
||
if current_point is not None: | ||
|
||
yield Metric( | ||
attributes=dict(attributes), | ||
description=self._description, | ||
instrumentation_info=self._instrumentation_info, | ||
name=self._name, | ||
resource=self._resource, | ||
unit=self._unit, | ||
point=_convert_aggregation_temporality( | ||
previous_point, | ||
current_point, | ||
temporality, | ||
), | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
# Copyright The OpenTelemetry Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from logging import WARNING | ||
from unittest import TestCase | ||
from unittest.mock import Mock | ||
|
||
from opentelemetry.sdk._metrics._view_instrument_match import ( | ||
_ViewInstrumentMatch, | ||
) | ||
from opentelemetry.sdk._metrics.measurement import Measurement | ||
from opentelemetry.sdk._metrics.point import Metric | ||
|
||
|
||
class Test_ViewInstrumentMatch(TestCase): | ||
@classmethod | ||
def setUpClass(cls): | ||
|
||
cls.mock_aggregation_instance = Mock() | ||
cls.mock_aggregation_class = Mock( | ||
return_value=cls.mock_aggregation_instance | ||
) | ||
cls.mock_resource = Mock() | ||
cls.mock_instrumentation_info = Mock() | ||
|
||
def test_consume_measurement(self): | ||
|
||
view_instrument_match = _ViewInstrumentMatch( | ||
"name", | ||
"unit", | ||
"description", | ||
self.mock_aggregation_class, | ||
self.mock_instrumentation_info, | ||
self.mock_resource, | ||
{"a", "c"}, | ||
) | ||
|
||
view_instrument_match.consume_measurement( | ||
Measurement(value=0, attributes={"c": "d", "f": "g"}) | ||
) | ||
self.assertEqual( | ||
view_instrument_match._attributes_aggregation, | ||
{frozenset([("c", "d")]): self.mock_aggregation_instance}, | ||
) | ||
|
||
with self.assertLogs(level=WARNING): | ||
view_instrument_match.consume_measurement( | ||
Measurement(value=0, attributes={"w": "x", "y": "z"}) | ||
) | ||
|
||
self.assertEqual( | ||
view_instrument_match._attributes_aggregation, | ||
{frozenset([("c", "d")]): self.mock_aggregation_instance}, | ||
) | ||
|
||
view_instrument_match = _ViewInstrumentMatch( | ||
"name", | ||
"unit", | ||
"description", | ||
self.mock_aggregation_class, | ||
self.mock_instrumentation_info, | ||
self.mock_resource, | ||
) | ||
|
||
view_instrument_match.consume_measurement( | ||
Measurement(value=0, attributes={"c": "d", "f": "g"}) | ||
) | ||
self.assertEqual( | ||
view_instrument_match._attributes_aggregation, | ||
{ | ||
frozenset( | ||
[("c", "d"), ("f", "g")] | ||
): self.mock_aggregation_instance | ||
}, | ||
) | ||
|
||
def test_collect(self): | ||
|
||
view_instrument_match = _ViewInstrumentMatch( | ||
"name", | ||
"unit", | ||
"description", | ||
self.mock_aggregation_class, | ||
self.mock_instrumentation_info, | ||
self.mock_resource, | ||
{"a", "c"}, | ||
) | ||
|
||
view_instrument_match.consume_measurement( | ||
Measurement(value=0, attributes={"c": "d", "f": "g"}) | ||
) | ||
self.assertEqual( | ||
next(view_instrument_match.collect(1)), | ||
Metric( | ||
attributes={"c": "d"}, | ||
description="description", | ||
instrumentation_info=self.mock_instrumentation_info, | ||
name="name", | ||
resource=self.mock_resource, | ||
unit="unit", | ||
point=None, | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just pass in the view here to get all of these properties? They are well defined there and then if more fields are added to the view, you don't need to update this constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These attributes don't necessarily come from the view, they may come from the instrument, like this.