diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py index b3232b367e8..b6837d1a38a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py @@ -159,7 +159,7 @@ def __init__( self._meter_lock = Lock() self._atexit_handler = None self._sdk_config = SdkConfiguration( - resource=resource, metric_readers=metric_readers + resource=resource, metric_readers=metric_readers, views=() ) self._measurement_consumer = SynchronousMeasurementConsumer( sdk_config=self._sdk_config diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index d9da9a067a0..bf0c4d5f404 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -95,7 +95,7 @@ def add( ) return self._measurement_consumer.consume_measurement( - Measurement(amount, attributes) + Measurement(amount, self, attributes) ) @@ -104,7 +104,7 @@ def add( self, amount: Union[int, float], attributes: Dict[str, str] = None ): self._measurement_consumer.consume_measurement( - Measurement(amount, attributes) + Measurement(amount, self, attributes) ) @@ -127,7 +127,7 @@ def record( ) return self._measurement_consumer.consume_measurement( - Measurement(amount, attributes) + Measurement(amount, self, attributes) ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py index 18110b07430..6026787d827 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py @@ -15,10 +15,12 @@ from dataclasses import dataclass from typing import Union +from opentelemetry._metrics.instrument import Instrument from opentelemetry.util.types import Attributes @dataclass(frozen=True) class Measurement: value: Union[int, float] + instrument: Instrument attributes: Attributes = None diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index b602185bc93..c4b67702760 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -70,7 +70,8 @@ def collect( self, metric_reader: MetricReader, temporality: AggregationTemporality ) -> Iterable[Metric]: with self._lock: + metric_reader_storage = self._reader_storages[metric_reader] for async_instrument in self._async_instruments: for measurement in async_instrument.callback(): - self.consume_measurement(measurement) + metric_reader_storage.consume_measurement(measurement) return self._reader_storages[metric_reader].collect(temporality) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index ffb204f447e..18991f307c0 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -12,23 +12,112 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable +from threading import RLock +from typing import Dict, Iterable, List -from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry._metrics.instrument import Counter, Histogram, Instrument +from opentelemetry.sdk._metrics._view_instrument_match import ( + _ViewInstrumentMatch, +) +from opentelemetry.sdk._metrics.aggregation import ( + AggregationTemporality, + ExplicitBucketHistogramAggregation, + LastValueAggregation, + SumAggregation, +) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration +from opentelemetry.sdk._metrics.view import View -# TODO: #2378 class MetricReaderStorage: """The SDK's storage for a given reader""" def __init__(self, sdk_config: SdkConfiguration) -> None: - pass + self._lock = RLock() + self._sdk_config = sdk_config + self._view_instrument_match: Dict[ + Instrument, List[_ViewInstrumentMatch] + ] = {} + + def _get_or_init_view_instrument_match( + self, instrument: Instrument + ) -> List["_ViewInstrumentMatch"]: + # Optimistically get the relevant views for the given instrument. Once set for a given + # instrument, the mapping will never change + if instrument in self._view_instrument_match: + return self._view_instrument_match[instrument] + + with self._lock: + # double check if it was set before we held the lock + if instrument in self._view_instrument_match: + return self._view_instrument_match[instrument] + + # not present, hold the lock and add a new mapping + matches = [] + for view in self._sdk_config.views: + if view.match(instrument): + # Note: if a view matches multiple instruments, this will create a separate + # _ViewInstrumentMatch per instrument. If the user's View configuration includes a + # name, this will cause multiple conflicting output streams. + matches.append( + _ViewInstrumentMatch( + name=view.name or instrument.name, + resource=self._sdk_config.resource, + instrumentation_info=None, + aggregation=view.aggregation, + unit=instrument.unit, + description=view.description, + ) + ) + + # if no view targeted the instrument, use the default + if not matches: + # TODO: the logic to select aggregation could be moved + if isinstance(instrument, Counter): + agg = SumAggregation(True, AggregationTemporality.DELTA) + elif isinstance(instrument, Histogram): + agg = ExplicitBucketHistogramAggregation() + else: + agg = LastValueAggregation() + matches.append( + _ViewInstrumentMatch( + resource=self._sdk_config.resource, + instrumentation_info=None, + aggregation=agg, + unit=instrument.unit, + description=instrument.description, + name=instrument.name, + ) + ) + self._view_instrument_match[instrument] = matches + return matches def consume_measurement(self, measurement: Measurement) -> None: - pass + for matches in self._get_or_init_view_instrument_match( + measurement.instrument + ): + matches.consume_measurement(measurement) def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]: - pass + # use a list instead of yielding to prevent a slow reader from holding SDK locks + metrics: List[Metric] = [] + + # While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are + # sure we collect all existing view). However, instruments can still send measurements + # that will make it into the individual aggregations; collection will acquire those + # locks iteratively to keep locking as fine-grained as possible. One side effect is + # that end times can be slightly skewed among the metric streams produced by the SDK, + # but we still align the output timestamps for a single instrument. + with self._lock: + for matches in self._view_instrument_match.values(): + for match in matches: + metrics.extend(match.collect(temporality)) + + return metrics + + +def default_view(instrument: Instrument) -> View: + # TODO: #2247 + return View() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py index 3b077bdea13..2c603b5e4da 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py @@ -2,12 +2,12 @@ from typing import Sequence from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.view import View from opentelemetry.sdk.resources import Resource @dataclass class SdkConfiguration: resource: Resource - # TODO: once views are added - # views: Sequence[View] metric_readers: Sequence[MetricReader] + views: Sequence[View] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py new file mode 100644 index 00000000000..e103425c03b --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/view.py @@ -0,0 +1,20 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# TODO: #2247 +# pylint: disable=no-self-use +class View: + def match(self) -> bool: + return False diff --git a/opentelemetry-sdk/tests/metrics/test_aggregation.py b/opentelemetry-sdk/tests/metrics/test_aggregation.py index cf9c315ab3d..cbeb9bf4acd 100644 --- a/opentelemetry-sdk/tests/metrics/test_aggregation.py +++ b/opentelemetry-sdk/tests/metrics/test_aggregation.py @@ -16,7 +16,9 @@ from logging import WARNING from math import inf from time import sleep +from typing import Union from unittest import TestCase +from unittest.mock import Mock from opentelemetry.sdk._metrics.aggregation import ( AggregationTemporality, @@ -27,6 +29,13 @@ ) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Gauge, Histogram, Sum +from opentelemetry.util.types import Attributes + + +def measurement( + value: Union[int, float], attributes: Attributes = None +) -> Measurement: + return Measurement(value, instrument=Mock(), attributes=attributes) class TestSynchronousSumAggregation(TestCase): @@ -39,9 +48,9 @@ def test_aggregate_delta(self): True, AggregationTemporality.DELTA ) - synchronous_sum_aggregation.aggregate(Measurement(1)) - synchronous_sum_aggregation.aggregate(Measurement(2)) - synchronous_sum_aggregation.aggregate(Measurement(3)) + synchronous_sum_aggregation.aggregate(measurement(1)) + synchronous_sum_aggregation.aggregate(measurement(2)) + synchronous_sum_aggregation.aggregate(measurement(3)) self.assertEqual(synchronous_sum_aggregation._value, 6) @@ -49,9 +58,9 @@ def test_aggregate_delta(self): True, AggregationTemporality.DELTA ) - synchronous_sum_aggregation.aggregate(Measurement(1)) - synchronous_sum_aggregation.aggregate(Measurement(-2)) - synchronous_sum_aggregation.aggregate(Measurement(3)) + synchronous_sum_aggregation.aggregate(measurement(1)) + synchronous_sum_aggregation.aggregate(measurement(-2)) + synchronous_sum_aggregation.aggregate(measurement(3)) self.assertEqual(synchronous_sum_aggregation._value, 2) @@ -64,9 +73,9 @@ def test_aggregate_cumulative(self): True, AggregationTemporality.CUMULATIVE ) - synchronous_sum_aggregation.aggregate(Measurement(1)) - synchronous_sum_aggregation.aggregate(Measurement(2)) - synchronous_sum_aggregation.aggregate(Measurement(3)) + synchronous_sum_aggregation.aggregate(measurement(1)) + synchronous_sum_aggregation.aggregate(measurement(2)) + synchronous_sum_aggregation.aggregate(measurement(3)) self.assertEqual(synchronous_sum_aggregation._value, 6) @@ -74,9 +83,9 @@ def test_aggregate_cumulative(self): True, AggregationTemporality.CUMULATIVE ) - synchronous_sum_aggregation.aggregate(Measurement(1)) - synchronous_sum_aggregation.aggregate(Measurement(-2)) - synchronous_sum_aggregation.aggregate(Measurement(3)) + synchronous_sum_aggregation.aggregate(measurement(1)) + synchronous_sum_aggregation.aggregate(measurement(-2)) + synchronous_sum_aggregation.aggregate(measurement(3)) self.assertEqual(synchronous_sum_aggregation._value, 2) @@ -89,13 +98,13 @@ def test_collect_delta(self): True, AggregationTemporality.DELTA ) - synchronous_sum_aggregation.aggregate(Measurement(1)) + synchronous_sum_aggregation.aggregate(measurement(1)) first_sum = synchronous_sum_aggregation.collect() self.assertEqual(first_sum.value, 1) self.assertTrue(first_sum.is_monotonic) - synchronous_sum_aggregation.aggregate(Measurement(1)) + synchronous_sum_aggregation.aggregate(measurement(1)) second_sum = synchronous_sum_aggregation.collect() self.assertEqual(second_sum.value, 1) @@ -114,13 +123,13 @@ def test_collect_cumulative(self): True, AggregationTemporality.CUMULATIVE ) - sum_aggregation.aggregate(Measurement(1)) + sum_aggregation.aggregate(measurement(1)) first_sum = sum_aggregation.collect() self.assertEqual(first_sum.value, 1) self.assertTrue(first_sum.is_monotonic) - sum_aggregation.aggregate(Measurement(1)) + sum_aggregation.aggregate(measurement(1)) second_sum = sum_aggregation.collect() self.assertEqual(second_sum.value, 2) @@ -144,13 +153,13 @@ def test_aggregate(self): last_value_aggregation = LastValueAggregation() - last_value_aggregation.aggregate(Measurement(1)) + last_value_aggregation.aggregate(measurement(1)) self.assertEqual(last_value_aggregation._value, 1) - last_value_aggregation.aggregate(Measurement(2)) + last_value_aggregation.aggregate(measurement(2)) self.assertEqual(last_value_aggregation._value, 2) - last_value_aggregation.aggregate(Measurement(3)) + last_value_aggregation.aggregate(measurement(3)) self.assertEqual(last_value_aggregation._value, 3) def test_collect(self): @@ -162,13 +171,13 @@ def test_collect(self): self.assertIsNone(last_value_aggregation.collect()) - last_value_aggregation.aggregate(Measurement(1)) + last_value_aggregation.aggregate(measurement(1)) first_gauge = last_value_aggregation.collect() self.assertIsInstance(first_gauge, Gauge) self.assertEqual(first_gauge.value, 1) - last_value_aggregation.aggregate(Measurement(1)) + last_value_aggregation.aggregate(measurement(1)) # CI fails the last assertion without this sleep(0.1) @@ -192,13 +201,13 @@ def test_aggregate(self): ExplicitBucketHistogramAggregation(boundaries=[0, 2, 4]) ) - explicit_bucket_histogram_aggregation.aggregate(Measurement(-1)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(0)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(1)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(2)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(3)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(4)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(5)) + explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) + explicit_bucket_histogram_aggregation.aggregate(measurement(0)) + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) + explicit_bucket_histogram_aggregation.aggregate(measurement(2)) + explicit_bucket_histogram_aggregation.aggregate(measurement(3)) + explicit_bucket_histogram_aggregation.aggregate(measurement(4)) + explicit_bucket_histogram_aggregation.aggregate(measurement(5)) # The first bucket keeps count of values between (-inf, 0] (-1 and 0) self.assertEqual( @@ -233,11 +242,11 @@ def test_min_max(self): ExplicitBucketHistogramAggregation() ) - explicit_bucket_histogram_aggregation.aggregate(Measurement(-1)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(2)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(7)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(8)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(9999)) + explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) + explicit_bucket_histogram_aggregation.aggregate(measurement(2)) + explicit_bucket_histogram_aggregation.aggregate(measurement(7)) + explicit_bucket_histogram_aggregation.aggregate(measurement(8)) + explicit_bucket_histogram_aggregation.aggregate(measurement(9999)) self.assertEqual(explicit_bucket_histogram_aggregation._min, -1) self.assertEqual(explicit_bucket_histogram_aggregation._max, 9999) @@ -246,11 +255,11 @@ def test_min_max(self): ExplicitBucketHistogramAggregation(record_min_max=False) ) - explicit_bucket_histogram_aggregation.aggregate(Measurement(-1)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(2)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(7)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(8)) - explicit_bucket_histogram_aggregation.aggregate(Measurement(9999)) + explicit_bucket_histogram_aggregation.aggregate(measurement(-1)) + explicit_bucket_histogram_aggregation.aggregate(measurement(2)) + explicit_bucket_histogram_aggregation.aggregate(measurement(7)) + explicit_bucket_histogram_aggregation.aggregate(measurement(8)) + explicit_bucket_histogram_aggregation.aggregate(measurement(9999)) self.assertEqual(explicit_bucket_histogram_aggregation._min, inf) self.assertEqual(explicit_bucket_histogram_aggregation._max, -inf) @@ -264,7 +273,7 @@ def test_collect(self): ExplicitBucketHistogramAggregation(boundaries=[0, 1, 2]) ) - explicit_bucket_histogram_aggregation.aggregate(Measurement(1)) + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) first_histogram = explicit_bucket_histogram_aggregation.collect() self.assertEqual(first_histogram.bucket_counts, (0, 1, 0, 0)) @@ -272,7 +281,7 @@ def test_collect(self): # CI fails the last assertion without this sleep(0.1) - explicit_bucket_histogram_aggregation.aggregate(Measurement(1)) + explicit_bucket_histogram_aggregation.aggregate(measurement(1)) second_histogram = explicit_bucket_histogram_aggregation.collect() self.assertEqual(second_histogram.bucket_counts, (0, 1, 0, 0)) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 0b7072f9e46..aa481482ec1 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -35,7 +35,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage): """It should create one MetricReaderStorage per metric reader passed in the SdkConfiguration""" reader_mocks = [Mock() for _ in range(5)] SynchronousMeasurementConsumer( - SdkConfiguration(resource=Mock(), metric_readers=reader_mocks) + SdkConfiguration( + resource=Mock(), metric_readers=reader_mocks, views=() + ) ) self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5) @@ -47,7 +49,9 @@ def test_measurements_passed_to_each_reader_storage( MockMetricReaderStorage.side_effect = reader_storage_mocks consumer = SynchronousMeasurementConsumer( - SdkConfiguration(resource=Mock(), metric_readers=reader_mocks) + SdkConfiguration( + resource=Mock(), metric_readers=reader_mocks, views=() + ) ) measurement_mock = Mock() consumer.consume_measurement(measurement_mock) @@ -64,7 +68,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): MockMetricReaderStorage.side_effect = reader_storage_mocks consumer = SynchronousMeasurementConsumer( - SdkConfiguration(resource=Mock(), metric_readers=reader_mocks) + SdkConfiguration( + resource=Mock(), metric_readers=reader_mocks, views=() + ) ) for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks): rs_mock.collect.assert_not_called() @@ -73,14 +79,20 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): AggregationTemporality.CUMULATIVE ) - def test_collect_calls_async_instruments(self, _): - """Its collect() method should invoke async instruments""" + def test_collect_calls_async_instruments(self, MockMetricReaderStorage): + """Its collect() method should invoke async instruments and pass measurements to the + corresponding metric reader storage""" reader_mock = Mock() + reader_storage_mock = Mock() + MockMetricReaderStorage.return_value = reader_storage_mock consumer = SynchronousMeasurementConsumer( - SdkConfiguration(resource=Mock(), metric_readers=[reader_mock]) + SdkConfiguration( + resource=Mock(), metric_readers=[reader_mock], views=() + ) ) async_instrument_mocks = [MagicMock() for _ in range(5)] for i_mock in async_instrument_mocks: + i_mock.callback.return_value = [Mock()] consumer.register_asynchronous_instrument(i_mock) consumer.collect(reader_mock, AggregationTemporality.CUMULATIVE) @@ -88,3 +100,8 @@ def test_collect_calls_async_instruments(self, _): # it should call async instruments for i_mock in async_instrument_mocks: i_mock.callback.assert_called_once() + + # it should pass measurements to reader storage + self.assertEqual( + len(reader_storage_mock.consume_measurement.mock_calls), 5 + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py new file mode 100644 index 00000000000..a5b44b23331 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -0,0 +1,142 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import Mock, patch + +from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.metric_reader_storage import ( + MetricReaderStorage, +) +from opentelemetry.sdk._metrics.point import AggregationTemporality +from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration +from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc + + +def mock_view_matching(*instruments) -> Mock: + mock = Mock() + mock.match.side_effect = lambda instrument: instrument in instruments + return mock + + +def mock_instrument() -> Mock: + instr = Mock() + instr.attributes = {} + return instr + + +@patch("opentelemetry.sdk._metrics.metric_reader_storage._ViewInstrumentMatch") +class TestMetricReaderStorage(ConcurrencyTestBase): + def test_creates_view_instrument_matches( + self, MockViewInstrumentMatch: Mock + ): + """It should create a MockViewInstrumentMatch when an instrument matches a view""" + instrument1 = Mock(name="instrument1") + instrument2 = Mock(name="instrument2") + + view1 = mock_view_matching(instrument1) + view2 = mock_view_matching(instrument1, instrument2) + storage = MetricReaderStorage( + SdkConfiguration( + resource=Mock(), metric_readers=(), views=(view1, view2) + ) + ) + + # instrument1 matches view1 and view2, so should create two ViewInstrumentMatch objects + storage.consume_measurement(Measurement(1, instrument1)) + self.assertEqual( + len(MockViewInstrumentMatch.call_args_list), + 2, + MockViewInstrumentMatch.mock_calls, + ) + # they should only be created the first time the instrument is seen + storage.consume_measurement(Measurement(1, instrument1)) + self.assertEqual(len(MockViewInstrumentMatch.call_args_list), 2) + + # instrument2 matches view2, so should create a single ViewInstrumentMatch + MockViewInstrumentMatch.call_args_list.clear() + storage.consume_measurement(Measurement(1, instrument2)) + self.assertEqual(len(MockViewInstrumentMatch.call_args_list), 1) + + def test_forwards_calls_to_view_instrument_match( + self, MockViewInstrumentMatch: Mock + ): + view_instrument_match1 = Mock() + view_instrument_match2 = Mock() + view_instrument_match3 = Mock() + MockViewInstrumentMatch.side_effect = [ + view_instrument_match1, + view_instrument_match2, + view_instrument_match3, + ] + + instrument1 = Mock(name="instrument1") + instrument2 = Mock(name="instrument2") + view1 = mock_view_matching(instrument1) + view2 = mock_view_matching(instrument1, instrument2) + storage = MetricReaderStorage( + SdkConfiguration( + resource=Mock(), metric_readers=(), views=(view1, view2) + ) + ) + + # Measurements from an instrument should be passed on to each ViewInstrumentMatch objects + # created for that instrument + measurement = Measurement(1, instrument1) + storage.consume_measurement(measurement) + view_instrument_match1.consume_measurement.assert_called_once_with( + measurement + ) + view_instrument_match2.consume_measurement.assert_called_once_with( + measurement + ) + view_instrument_match3.consume_measurement.assert_not_called() + + measurement = Measurement(1, instrument2) + storage.consume_measurement(measurement) + view_instrument_match3.consume_measurement.assert_called_once_with( + measurement + ) + + # collect() should call collect on all of its _ViewInstrumentMatch objects and combine them together + all_metrics = [Mock() for _ in range(6)] + view_instrument_match1.collect.return_value = all_metrics[:2] + view_instrument_match2.collect.return_value = all_metrics[2:4] + view_instrument_match3.collect.return_value = all_metrics[4:] + + result = storage.collect(AggregationTemporality.CUMULATIVE) + view_instrument_match1.collect.assert_called_once() + view_instrument_match2.collect.assert_called_once() + view_instrument_match3.collect.assert_called_once() + self.assertEqual(result, all_metrics) + + def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): + mock_view_instrument_match_ctor = MockFunc() + MockViewInstrumentMatch.side_effect = mock_view_instrument_match_ctor + + instrument1 = Mock(name="instrument1") + view1 = mock_view_matching(instrument1) + storage = MetricReaderStorage( + SdkConfiguration( + resource=Mock(), metric_readers=(), views=(view1,) + ) + ) + + def send_measurement(): + storage.consume_measurement(Measurement(1, instrument1)) + + # race sending many measurements concurrently + self.run_with_many_threads(send_measurement) + + # _ViewInstrumentMatch constructor should have only been called once + self.assertEqual(mock_view_instrument_match_ctor.call_count, 1) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 38f9249c434..c0747294035 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -45,8 +45,14 @@ def test_consume_measurement(self): {"a", "c"}, ) + instrument1 = Mock(name="instrument1") + view_instrument_match.consume_measurement( - Measurement(value=0, attributes={"c": "d", "f": "g"}) + Measurement( + value=0, + instrument=instrument1, + attributes={"c": "d", "f": "g"}, + ) ) self.assertEqual( view_instrument_match._attributes_aggregation, @@ -54,7 +60,11 @@ def test_consume_measurement(self): ) view_instrument_match.consume_measurement( - Measurement(value=0, attributes={"w": "x", "y": "z"}) + Measurement( + value=0, + instrument=instrument1, + attributes={"w": "x", "y": "z"}, + ) ) self.assertEqual( @@ -75,7 +85,11 @@ def test_consume_measurement(self): ) view_instrument_match.consume_measurement( - Measurement(value=0, attributes={"c": "d", "f": "g"}) + Measurement( + value=0, + instrument=instrument1, + attributes={"c": "d", "f": "g"}, + ) ) self.assertEqual( view_instrument_match._attributes_aggregation, @@ -95,7 +109,7 @@ def test_consume_measurement(self): self.mock_resource, ) view_instrument_match.consume_measurement( - Measurement(value=0, attributes=None) + Measurement(value=0, instrument=instrument1, attributes=None) ) self.assertEqual( view_instrument_match._attributes_aggregation, @@ -115,7 +129,11 @@ def test_collect(self): ) view_instrument_match.consume_measurement( - Measurement(value=0, attributes={"c": "d", "f": "g"}) + Measurement( + value=0, + instrument=Mock(name="instrument1"), + attributes={"c": "d", "f": "g"}, + ) ) self.assertEqual( next(view_instrument_match.collect(1)),