diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py index 626806cee27..57c44160c8a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py @@ -45,6 +45,7 @@ SynchronousMeasurementConsumer, ) from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import InstrumentationInfo @@ -156,8 +157,12 @@ def __init__( self._lock = Lock() self._meter_lock = Lock() self._atexit_handler = None - - self._measurement_consumer = SynchronousMeasurementConsumer() + self._sdk_config = SdkConfiguration( + resource=resource, metric_readers=metric_readers + ) + self._measurement_consumer = SynchronousMeasurementConsumer( + sdk_config=self._sdk_config + ) if shutdown_on_exit: self._atexit_handler = register(self.shutdown) @@ -165,10 +170,9 @@ def __init__( self._meters = {} self._metric_readers = metric_readers - for metric_reader in self._metric_readers: + for metric_reader in self._sdk_config.metric_readers: metric_reader._register_measurement_consumer(self) - self._resource = resource self._shutdown = False def force_flush(self) -> bool: @@ -177,7 +181,7 @@ def force_flush(self) -> bool: metric_reader_result = True - for metric_reader in self._metric_readers: + for metric_reader in self._sdk_config.metric_readers: metric_reader_result = ( metric_reader_result and metric_reader.force_flush() ) @@ -196,7 +200,7 @@ def shutdown(self): result = True - for metric_reader in self._metric_readers: + for metric_reader in self._sdk_config.metric_readers: result = result and metric_reader.shutdown() if not result: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index d8c75139cf3..b602185bc93 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -13,12 +13,17 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Iterable +from threading import Lock +from typing import TYPE_CHECKING, Iterable, List, Mapping from opentelemetry.sdk._metrics.aggregation import AggregationTemporality from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.metric_reader_storage import ( + MetricReaderStorage, +) from opentelemetry.sdk._metrics.point import Metric +from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration if TYPE_CHECKING: from opentelemetry.sdk._metrics.instrument import _Asynchronous @@ -41,15 +46,31 @@ def collect( class SynchronousMeasurementConsumer(MeasurementConsumer): + def __init__(self, sdk_config: SdkConfiguration) -> None: + self._lock = Lock() + self._sdk_config = sdk_config + # should never be mutated + self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = { + reader: MetricReaderStorage(sdk_config) + for reader in sdk_config.metric_readers + } + self._async_instruments: List["_Asynchronous"] = [] + def consume_measurement(self, measurement: Measurement) -> None: - pass + for reader_storage in self._reader_storages.values(): + reader_storage.consume_measurement(measurement) def register_asynchronous_instrument( self, instrument: "_Asynchronous" ) -> None: - pass + with self._lock: + self._async_instruments.append(instrument) def collect( self, metric_reader: MetricReader, temporality: AggregationTemporality ) -> Iterable[Metric]: - pass + with self._lock: + for async_instrument in self._async_instruments: + for measurement in async_instrument.callback(): + self.consume_measurement(measurement) + return self._reader_storages[metric_reader].collect(temporality) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py new file mode 100644 index 00000000000..ffb204f447e --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -0,0 +1,34 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Iterable + +from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry.sdk._metrics.measurement import Measurement +from opentelemetry.sdk._metrics.point import Metric +from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration + + +# TODO: #2378 +class MetricReaderStorage: + """The SDK's storage for a given reader""" + + def __init__(self, sdk_config: SdkConfiguration) -> None: + pass + + def consume_measurement(self, measurement: Measurement) -> None: + pass + + def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]: + pass diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py new file mode 100644 index 00000000000..3b077bdea13 --- /dev/null +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/sdk_configuration.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass +from typing import Sequence + +from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk.resources import Resource + + +@dataclass +class SdkConfiguration: + resource: Resource + # TODO: once views are added + # views: Sequence[View] + metric_readers: Sequence[MetricReader] diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index f1a96cadc1c..0b7072f9e46 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -13,87 +13,78 @@ # limitations under the License. from unittest import TestCase -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch -from opentelemetry.sdk._metrics import MeterProvider from opentelemetry.sdk._metrics.measurement_consumer import ( MeasurementConsumer, SynchronousMeasurementConsumer, ) +from opentelemetry.sdk._metrics.point import AggregationTemporality +from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration +@patch("opentelemetry.sdk._metrics.measurement_consumer.MetricReaderStorage") class TestSynchronousMeasurementConsumer(TestCase): - def test_parent(self): + def test_parent(self, _): self.assertIsInstance( - SynchronousMeasurementConsumer(), MeasurementConsumer + SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer ) - @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") - def test_measurement_consumer_class( - self, mock_serial_measurement_consumer - ): - MeterProvider() - - mock_serial_measurement_consumer.assert_called() - - @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") - def test_register_asynchronous_instrument( - self, mock_serial_measurement_consumer - ): - - meter_provider = MeterProvider() - - meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with( - meter_provider.get_meter("name").create_observable_counter( - "name", Mock() - ) - ) - meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with( - meter_provider.get_meter("name").create_observable_up_down_counter( - "name", Mock() - ) - ) - meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with( - meter_provider.get_meter("name").create_observable_gauge( - "name", Mock() - ) + def test_creates_metric_reader_storages(self, MockMetricReaderStorage): + """It should create one MetricReaderStorage per metric reader passed in the SdkConfiguration""" + reader_mocks = [Mock() for _ in range(5)] + SynchronousMeasurementConsumer( + SdkConfiguration(resource=Mock(), metric_readers=reader_mocks) ) + self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5) - @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") - def test_consume_measurement_counter( - self, mock_serial_measurement_consumer + def test_measurements_passed_to_each_reader_storage( + self, MockMetricReaderStorage ): + reader_mocks = [Mock() for _ in range(5)] + reader_storage_mocks = [Mock() for _ in range(5)] + MockMetricReaderStorage.side_effect = reader_storage_mocks - meter_provider = MeterProvider() - counter = meter_provider.get_meter("name").create_counter("name") - - counter.add(1) - - meter_provider._measurement_consumer.consume_measurement.assert_called() - - @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") - def test_consume_measurement_up_down_counter( - self, mock_serial_measurement_consumer - ): - - meter_provider = MeterProvider() - counter = meter_provider.get_meter("name").create_up_down_counter( - "name" + consumer = SynchronousMeasurementConsumer( + SdkConfiguration(resource=Mock(), metric_readers=reader_mocks) ) + measurement_mock = Mock() + consumer.consume_measurement(measurement_mock) - counter.add(1) + for rs_mock in reader_storage_mocks: + rs_mock.consume_measurement.assert_called_once_with( + measurement_mock + ) - meter_provider._measurement_consumer.consume_measurement.assert_called() + def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): + """Its collect() method should defer to the underlying MetricReaderStorage""" + reader_mocks = [Mock() for _ in range(5)] + reader_storage_mocks = [Mock() for _ in range(5)] + MockMetricReaderStorage.side_effect = reader_storage_mocks - @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") - def test_consume_measurement_histogram( - self, mock_serial_measurement_consumer - ): + consumer = SynchronousMeasurementConsumer( + SdkConfiguration(resource=Mock(), metric_readers=reader_mocks) + ) + for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks): + rs_mock.collect.assert_not_called() + consumer.collect(r_mock, AggregationTemporality.CUMULATIVE) + rs_mock.collect.assert_called_once_with( + AggregationTemporality.CUMULATIVE + ) - meter_provider = MeterProvider() - counter = meter_provider.get_meter("name").create_histogram("name") + def test_collect_calls_async_instruments(self, _): + """Its collect() method should invoke async instruments""" + reader_mock = Mock() + consumer = SynchronousMeasurementConsumer( + SdkConfiguration(resource=Mock(), metric_readers=[reader_mock]) + ) + async_instrument_mocks = [MagicMock() for _ in range(5)] + for i_mock in async_instrument_mocks: + consumer.register_asynchronous_instrument(i_mock) - counter.record(1) + consumer.collect(reader_mock, AggregationTemporality.CUMULATIVE) - meter_provider._measurement_consumer.consume_measurement.assert_called() + # it should call async instruments + for i_mock in async_instrument_mocks: + i_mock.callback.assert_called_once() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 20714167023..bfe1d0df9d9 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -15,7 +15,7 @@ from logging import WARNING from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import Mock, patch from opentelemetry.sdk._metrics import Meter, MeterProvider from opentelemetry.sdk._metrics.instrument import ( @@ -26,14 +26,11 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk._metrics.measurement_consumer import ( - SynchronousMeasurementConsumer, -) from opentelemetry.sdk.resources import Resource class TestMeterProvider(TestCase): - def test_meter_provider_resource(self): + def test_resource(self): """ `MeterProvider` provides a way to allow a `Resource` to be specified. """ @@ -41,12 +38,17 @@ def test_meter_provider_resource(self): meter_provider_0 = MeterProvider() meter_provider_1 = MeterProvider() - self.assertIs(meter_provider_0._resource, meter_provider_1._resource) - self.assertIsInstance(meter_provider_0._resource, Resource) - self.assertIsInstance(meter_provider_1._resource, Resource) + self.assertIs( + meter_provider_0._sdk_config.resource, + meter_provider_1._sdk_config.resource, + ) + self.assertIsInstance(meter_provider_0._sdk_config.resource, Resource) + self.assertIsInstance(meter_provider_1._sdk_config.resource, Resource) resource = Resource({"key": "value"}) - self.assertIs(MeterProvider(resource=resource)._resource, resource) + self.assertIs( + MeterProvider(resource=resource)._sdk_config.resource, resource + ) def test_get_meter(self): """ @@ -103,10 +105,76 @@ def test_shutdown_subsequent_calls(self): with self.assertLogs(level=WARNING): meter_provider.shutdown() + @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") + def test_creates_sync_measurement_consumer( + self, mock_sync_measurement_consumer + ): + MeterProvider() + mock_sync_measurement_consumer.assert_called() + + @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") + def test_register_asynchronous_instrument( + self, mock_sync_measurement_consumer + ): + + meter_provider = MeterProvider() + + meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with( + meter_provider.get_meter("name").create_observable_counter( + "name", Mock() + ) + ) + meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with( + meter_provider.get_meter("name").create_observable_up_down_counter( + "name", Mock() + ) + ) + meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with( + meter_provider.get_meter("name").create_observable_gauge( + "name", Mock() + ) + ) + + @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") + def test_consume_measurement_counter(self, mock_sync_measurement_consumer): + sync_consumer_instance = mock_sync_measurement_consumer() + meter_provider = MeterProvider() + counter = meter_provider.get_meter("name").create_counter("name") + + counter.add(1) + + sync_consumer_instance.consume_measurement.assert_called() + + @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") + def test_consume_measurement_up_down_counter( + self, mock_sync_measurement_consumer + ): + sync_consumer_instance = mock_sync_measurement_consumer() + meter_provider = MeterProvider() + counter = meter_provider.get_meter("name").create_up_down_counter( + "name" + ) + + counter.add(1) + + sync_consumer_instance.consume_measurement.assert_called() + + @patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer") + def test_consume_measurement_histogram( + self, mock_sync_measurement_consumer + ): + sync_consumer_instance = mock_sync_measurement_consumer() + meter_provider = MeterProvider() + counter = meter_provider.get_meter("name").create_histogram("name") + + counter.record(1) + + sync_consumer_instance.consume_measurement.assert_called() + class TestMeter(TestCase): def setUp(self): - self.meter = Meter(Mock(), SynchronousMeasurementConsumer()) + self.meter = Meter(Mock(), Mock()) def test_create_counter(self): counter = self.meter.create_counter(