Skip to content

Commit

Permalink
Add _ViewInstrumentMatch (#2400)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl authored Feb 17, 2022
1 parent ec3053e commit a07e039
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from logging import getLogger
from threading import Lock
from typing import Iterable, Set

from opentelemetry.sdk._metrics.aggregation import (
_convert_aggregation_temporality,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

_logger = getLogger(__name__)


class _ViewInstrumentMatch:
def __init__(
self,
name: str,
unit: str,
description: str,
aggregation: type,
instrumentation_info: InstrumentationInfo,
resource: Resource,
attribute_keys: Set[str] = None,
):
self._name = name
self._unit = unit
self._description = description
self._aggregation = aggregation
self._instrumentation_info = instrumentation_info
self._resource = resource
self._attribute_keys = attribute_keys
self._attributes_aggregation = {}
self._attributes_previous_point = {}
self._lock = Lock()

def consume_measurement(self, measurement: Measurement) -> None:

if self._attribute_keys is not None:

attributes = {}

for key, value in measurement.attributes.items():
if key in self._attribute_keys:
attributes[key] = value
else:
attributes = measurement.attributes

attributes = frozenset(attributes.items())

if attributes not in self._attributes_aggregation.keys():
with self._lock:
self._attributes_aggregation[attributes] = self._aggregation()

self._attributes_aggregation[attributes].aggregate(measurement.value)

def collect(self, temporality: int) -> Iterable[Metric]:

with self._lock:
for (
attributes,
aggregation,
) in self._attributes_aggregation.items():

previous_point = self._attributes_previous_point.get(
attributes
)

current_point = aggregation.collect()

# pylint: disable=assignment-from-none
self._attributes_previous_point[
attributes
] = _convert_aggregation_temporality(
previous_point,
current_point,
AggregationTemporality.CUMULATIVE,
)

if current_point is not None:

yield Metric(
attributes=dict(attributes),
description=self._description,
instrumentation_info=self._instrumentation_info,
name=self._name,
resource=self._resource,
unit=self._unit,
point=_convert_aggregation_temporality(
previous_point,
current_point,
temporality,
),
)
115 changes: 115 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_view_instrument_match.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import TestCase
from unittest.mock import Mock

from opentelemetry.sdk._metrics._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric


class Test_ViewInstrumentMatch(TestCase):
@classmethod
def setUpClass(cls):

cls.mock_aggregation_instance = Mock()
cls.mock_aggregation_class = Mock(
return_value=cls.mock_aggregation_instance
)
cls.mock_resource = Mock()
cls.mock_instrumentation_info = Mock()

def test_consume_measurement(self):

view_instrument_match = _ViewInstrumentMatch(
"name",
"unit",
"description",
self.mock_aggregation_class,
self.mock_instrumentation_info,
self.mock_resource,
{"a", "c"},
)

view_instrument_match.consume_measurement(
Measurement(value=0, attributes={"c": "d", "f": "g"})
)
self.assertEqual(
view_instrument_match._attributes_aggregation,
{frozenset([("c", "d")]): self.mock_aggregation_instance},
)

view_instrument_match.consume_measurement(
Measurement(value=0, attributes={"w": "x", "y": "z"})
)

self.assertEqual(
view_instrument_match._attributes_aggregation,
{
frozenset(): self.mock_aggregation_instance,
frozenset([("c", "d")]): self.mock_aggregation_instance,
},
)

view_instrument_match = _ViewInstrumentMatch(
"name",
"unit",
"description",
self.mock_aggregation_class,
self.mock_instrumentation_info,
self.mock_resource,
)

view_instrument_match.consume_measurement(
Measurement(value=0, attributes={"c": "d", "f": "g"})
)
self.assertEqual(
view_instrument_match._attributes_aggregation,
{
frozenset(
[("c", "d"), ("f", "g")]
): self.mock_aggregation_instance
},
)

def test_collect(self):

view_instrument_match = _ViewInstrumentMatch(
"name",
"unit",
"description",
self.mock_aggregation_class,
self.mock_instrumentation_info,
self.mock_resource,
{"a", "c"},
)

view_instrument_match.consume_measurement(
Measurement(value=0, attributes={"c": "d", "f": "g"})
)
self.assertEqual(
next(view_instrument_match.collect(1)),
Metric(
attributes={"c": "d"},
description="description",
instrumentation_info=self.mock_instrumentation_info,
name="name",
resource=self.mock_resource,
unit="unit",
point=None,
),
)

0 comments on commit a07e039

Please sign in to comment.