Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SynchronousMeasurementConsumer #2388

Merged
merged 5 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
SynchronousMeasurementConsumer,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

Expand Down Expand Up @@ -156,19 +157,22 @@ def __init__(
self._lock = Lock()
self._meter_lock = Lock()
self._atexit_handler = None

self._measurement_consumer = SynchronousMeasurementConsumer()
self._sdk_config = SdkConfiguration(
resource=resource, metric_readers=metric_readers
)
self._measurement_consumer = SynchronousMeasurementConsumer(
sdk_config=self._sdk_config
)

if shutdown_on_exit:
self._atexit_handler = register(self.shutdown)

self._meters = {}
self._metric_readers = metric_readers

for metric_reader in self._metric_readers:
for metric_reader in self._sdk_config.metric_readers:
metric_reader._register_measurement_consumer(self)

self._resource = resource
self._shutdown = False

def force_flush(self) -> bool:
Expand All @@ -177,7 +181,7 @@ def force_flush(self) -> bool:

metric_reader_result = True

for metric_reader in self._metric_readers:
for metric_reader in self._sdk_config.metric_readers:
metric_reader_result = (
metric_reader_result and metric_reader.force_flush()
)
Expand All @@ -196,7 +200,7 @@ def shutdown(self):

result = True

for metric_reader in self._metric_readers:
for metric_reader in self._sdk_config.metric_readers:
result = result and metric_reader.shutdown()

if not result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Iterable
from threading import Lock
from typing import TYPE_CHECKING, Iterable, List, Mapping

from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.metric_reader_storage import (
MetricReaderStorage,
)
from opentelemetry.sdk._metrics.point import Metric
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration

if TYPE_CHECKING:
from opentelemetry.sdk._metrics.instrument import _Asynchronous
Expand All @@ -41,15 +46,31 @@ def collect(


class SynchronousMeasurementConsumer(MeasurementConsumer):
def __init__(self, sdk_config: SdkConfiguration) -> None:
self._lock = Lock()
self._sdk_config = sdk_config
# should never be mutated
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
reader: MetricReaderStorage(sdk_config)
for reader in sdk_config.metric_readers
}
self._async_instruments: List["_Asynchronous"] = []

def consume_measurement(self, measurement: Measurement) -> None:
pass
for reader_storage in self._reader_storages.values():
reader_storage.consume_measurement(measurement)

def register_asynchronous_instrument(
self, instrument: "_Asynchronous"
) -> None:
pass
with self._lock:
self._async_instruments.append(instrument)

def collect(
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self, metric_reader: MetricReader, temporality: AggregationTemporality
) -> Iterable[Metric]:
pass
with self._lock:
for async_instrument in self._async_instruments:
for measurement in async_instrument.callback():
self.consume_measurement(measurement)
return self._reader_storages[metric_reader].collect(temporality)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterable

from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration


# TODO: #2378
class MetricReaderStorage:
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
"""The SDK's storage for a given reader"""

def __init__(self, sdk_config: SdkConfiguration) -> None:
pass

def consume_measurement(self, measurement: Measurement) -> None:
pass

def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import Sequence

from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk.resources import Resource


@dataclass
class SdkConfiguration:
resource: Resource
# TODO: once views are added
# views: Sequence[View]
metric_readers: Sequence[MetricReader]
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
113 changes: 52 additions & 61 deletions opentelemetry-sdk/tests/metrics/test_measurement_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,87 +13,78 @@
# limitations under the License.

from unittest import TestCase
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

from opentelemetry.sdk._metrics import MeterProvider
from opentelemetry.sdk._metrics.measurement_consumer import (
MeasurementConsumer,
SynchronousMeasurementConsumer,
)
from opentelemetry.sdk._metrics.point import AggregationTemporality
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration


@patch("opentelemetry.sdk._metrics.measurement_consumer.MetricReaderStorage")
class TestSynchronousMeasurementConsumer(TestCase):
def test_parent(self):
def test_parent(self, _):

self.assertIsInstance(
SynchronousMeasurementConsumer(), MeasurementConsumer
SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer
)

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
lzchen marked this conversation as resolved.
Show resolved Hide resolved
def test_measurement_consumer_class(
self, mock_serial_measurement_consumer
):
MeterProvider()

mock_serial_measurement_consumer.assert_called()

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
def test_register_asynchronous_instrument(
self, mock_serial_measurement_consumer
):

meter_provider = MeterProvider()

meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with(
meter_provider.get_meter("name").create_observable_counter(
"name", Mock()
)
)
meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with(
meter_provider.get_meter("name").create_observable_up_down_counter(
"name", Mock()
)
)
meter_provider._measurement_consumer.register_asynchronous_instrument.assert_called_with(
meter_provider.get_meter("name").create_observable_gauge(
"name", Mock()
)
def test_creates_metric_reader_storages(self, MockMetricReaderStorage):
"""It should create one MetricReaderStorage per metric reader passed in the SdkConfiguration"""
reader_mocks = [Mock() for _ in range(5)]
SynchronousMeasurementConsumer(
SdkConfiguration(resource=Mock(), metric_readers=reader_mocks)
)
self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5)

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
def test_consume_measurement_counter(
self, mock_serial_measurement_consumer
def test_measurements_passed_to_each_reader_storage(
self, MockMetricReaderStorage
):
reader_mocks = [Mock() for _ in range(5)]
reader_storage_mocks = [Mock() for _ in range(5)]
MockMetricReaderStorage.side_effect = reader_storage_mocks

meter_provider = MeterProvider()
counter = meter_provider.get_meter("name").create_counter("name")

counter.add(1)

meter_provider._measurement_consumer.consume_measurement.assert_called()

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
def test_consume_measurement_up_down_counter(
self, mock_serial_measurement_consumer
):

meter_provider = MeterProvider()
counter = meter_provider.get_meter("name").create_up_down_counter(
"name"
consumer = SynchronousMeasurementConsumer(
SdkConfiguration(resource=Mock(), metric_readers=reader_mocks)
)
measurement_mock = Mock()
consumer.consume_measurement(measurement_mock)

counter.add(1)
for rs_mock in reader_storage_mocks:
rs_mock.consume_measurement.assert_called_once_with(
measurement_mock
)

meter_provider._measurement_consumer.consume_measurement.assert_called()
def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage):
"""Its collect() method should defer to the underlying MetricReaderStorage"""
reader_mocks = [Mock() for _ in range(5)]
reader_storage_mocks = [Mock() for _ in range(5)]
MockMetricReaderStorage.side_effect = reader_storage_mocks

@patch("opentelemetry.sdk._metrics.SynchronousMeasurementConsumer")
def test_consume_measurement_histogram(
self, mock_serial_measurement_consumer
):
consumer = SynchronousMeasurementConsumer(
SdkConfiguration(resource=Mock(), metric_readers=reader_mocks)
)
for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks):
rs_mock.collect.assert_not_called()
consumer.collect(r_mock, AggregationTemporality.CUMULATIVE)
rs_mock.collect.assert_called_once_with(
AggregationTemporality.CUMULATIVE
)

meter_provider = MeterProvider()
counter = meter_provider.get_meter("name").create_histogram("name")
def test_collect_calls_async_instruments(self, _):
"""Its collect() method should invoke async instruments"""
reader_mock = Mock()
consumer = SynchronousMeasurementConsumer(
SdkConfiguration(resource=Mock(), metric_readers=[reader_mock])
)
async_instrument_mocks = [MagicMock() for _ in range(5)]
for i_mock in async_instrument_mocks:
consumer.register_asynchronous_instrument(i_mock)

counter.record(1)
consumer.collect(reader_mock, AggregationTemporality.CUMULATIVE)

meter_provider._measurement_consumer.consume_measurement.assert_called()
# it should call async instruments
for i_mock in async_instrument_mocks:
i_mock.callback.assert_called_once()
Loading