From db9d40bcb9fe83fdea9afcd763338d92acfc78cd Mon Sep 17 00:00:00 2001 From: Aaron Abbott Date: Fri, 6 May 2022 20:19:39 -0400 Subject: [PATCH] Add callback options with timeout to observable callbacks (#2664) * Add callback options with timeout to observable callbacks * remove debugging code * changelog --- CHANGELOG.md | 2 ++ docs/examples/metrics/example.py | 16 +++++---- docs/getting_started/metrics_example.py | 15 ++++---- .../src/opentelemetry/_metrics/__init__.py | 3 ++ .../_metrics/_internal/__init__.py | 21 +++++++++--- .../_metrics/_internal/instrument.py | 27 +++++++++++---- .../sdk/_metrics/_internal/instrument.py | 34 ++++++++++++++----- .../_internal/measurement_consumer.py | 7 ++-- .../metrics/integration_test/test_cpu_time.py | 14 +++++--- .../tests/metrics/test_backward_compat.py | 18 ++++++++++ .../metrics/test_in_memory_metric_reader.py | 3 +- .../tests/metrics/test_instrument.py | 31 ++++++++++------- 12 files changed, 140 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1614019b288..a3970db53b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD) +- Add CallbackOptions to observable instrument callback params + ([#2664](https://github.com/open-telemetry/opentelemetry-python/pull/2664)) - Add timeouts to metric SDK ([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653)) - Add variadic arguments to metric exporter/reader interfaces diff --git a/docs/examples/metrics/example.py b/docs/examples/metrics/example.py index 11c2b8d8e04..9843d659534 100644 --- a/docs/examples/metrics/example.py +++ b/docs/examples/metrics/example.py @@ -5,6 +5,7 @@ get_meter_provider, set_meter_provider, ) +from opentelemetry._metrics._internal.instrument import CallbackOptions from opentelemetry.exporter.otlp.proto.grpc._metric_exporter import ( OTLPMetricExporter, ) @@ -17,15 +18,17 @@ set_meter_provider(provider) -def observable_counter_func() -> Iterable[Observation]: +def observable_counter_func(options: CallbackOptions) -> Iterable[Observation]: yield Observation(1, {}) -def observable_up_down_counter_func() -> Iterable[Observation]: +def observable_up_down_counter_func( + options: CallbackOptions, +) -> Iterable[Observation]: yield Observation(-10, {}) -def observable_gauge_func() -> Iterable[Observation]: +def observable_gauge_func(options: CallbackOptions) -> Iterable[Observation]: yield Observation(9, {}) @@ -37,7 +40,8 @@ def observable_gauge_func() -> Iterable[Observation]: # Async Counter observable_counter = meter.create_observable_counter( - "observable_counter", observable_counter_func + "observable_counter", + [observable_counter_func], ) # UpDownCounter @@ -47,7 +51,7 @@ def observable_gauge_func() -> Iterable[Observation]: # Async UpDownCounter observable_updown_counter = meter.create_observable_up_down_counter( - "observable_updown_counter", observable_up_down_counter_func + "observable_updown_counter", [observable_up_down_counter_func] ) # Histogram @@ -55,4 +59,4 @@ def observable_gauge_func() -> Iterable[Observation]: histogram.record(99.9) # Async Gauge -gauge = meter.create_observable_gauge("gauge", observable_gauge_func) +gauge = meter.create_observable_gauge("gauge", [observable_gauge_func]) diff --git a/docs/getting_started/metrics_example.py b/docs/getting_started/metrics_example.py index a1e1b7ff073..32342e72122 100644 --- a/docs/getting_started/metrics_example.py +++ b/docs/getting_started/metrics_example.py @@ -18,6 +18,7 @@ from typing import Iterable from opentelemetry._metrics import ( + CallbackOptions, Observation, get_meter_provider, set_meter_provider, @@ -34,15 +35,17 @@ set_meter_provider(provider) -def observable_counter_func() -> Iterable[Observation]: +def observable_counter_func(options: CallbackOptions) -> Iterable[Observation]: yield Observation(1, {}) -def observable_up_down_counter_func() -> Iterable[Observation]: +def observable_up_down_counter_func( + options: CallbackOptions, +) -> Iterable[Observation]: yield Observation(-10, {}) -def observable_gauge_func() -> Iterable[Observation]: +def observable_gauge_func(options: CallbackOptions) -> Iterable[Observation]: yield Observation(9, {}) @@ -54,7 +57,7 @@ def observable_gauge_func() -> Iterable[Observation]: # Async Counter observable_counter = meter.create_observable_counter( - "observable_counter", observable_counter_func + "observable_counter", [observable_counter_func] ) # UpDownCounter @@ -64,7 +67,7 @@ def observable_gauge_func() -> Iterable[Observation]: # Async UpDownCounter observable_updown_counter = meter.create_observable_up_down_counter( - "observable_updown_counter", observable_up_down_counter_func + "observable_updown_counter", [observable_up_down_counter_func] ) # Histogram @@ -72,4 +75,4 @@ def observable_gauge_func() -> Iterable[Observation]: histogram.record(99.9) # Async Gauge -gauge = meter.create_observable_gauge("gauge", observable_gauge_func) +gauge = meter.create_observable_gauge("gauge", [observable_gauge_func]) diff --git a/opentelemetry-api/src/opentelemetry/_metrics/__init__.py b/opentelemetry-api/src/opentelemetry/_metrics/__init__.py index eac37b2cfe3..c205adde0b7 100644 --- a/opentelemetry-api/src/opentelemetry/_metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/_metrics/__init__.py @@ -49,6 +49,7 @@ ) from opentelemetry._metrics._internal.instrument import ( Asynchronous, + CallbackOptions, CallbackT, Counter, Histogram, @@ -71,6 +72,7 @@ Counter, Synchronous, Asynchronous, + CallbackOptions, get_meter_provider, get_meter, Histogram, @@ -95,6 +97,7 @@ obj.__module__ = __name__ __all__ = [ + "CallbackOptions", "MeterProvider", "NoOpMeterProvider", "Meter", diff --git a/opentelemetry-api/src/opentelemetry/_metrics/_internal/__init__.py b/opentelemetry-api/src/opentelemetry/_metrics/_internal/__init__.py index f7203859f06..b2c67af2268 100644 --- a/opentelemetry-api/src/opentelemetry/_metrics/_internal/__init__.py +++ b/opentelemetry-api/src/opentelemetry/_metrics/_internal/__init__.py @@ -280,12 +280,13 @@ def create_observable_counter( """Creates an `ObservableCounter` instrument An observable counter observes a monotonically increasing count by calling provided - callbacks which returns multiple :class:`~opentelemetry._metrics.Observation`. + callbacks which accept a :class:`~opentelemetry._metrics.CallbackOptions` and return + multiple :class:`~opentelemetry._metrics.Observation`. For example, an observable counter could be used to report system CPU time periodically. Here is a basic implementation:: - def cpu_time_callback() -> Iterable[Observation]: + def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]: observations = [] with open("/proc/stat") as procstat: procstat.readline() # skip the first line @@ -308,7 +309,7 @@ def cpu_time_callback() -> Iterable[Observation]: To reduce memory usage, you can use generator callbacks instead of building the full list:: - def cpu_time_callback() -> Iterable[Observation]: + def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]: with open("/proc/stat") as procstat: procstat.readline() # skip the first line for line in procstat: @@ -322,6 +323,8 @@ def cpu_time_callback() -> Iterable[Observation]: callbacks, which each should return iterables of :class:`~opentelemetry._metrics.Observation`:: def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observation]]: + # accept options sent in from OpenTelemetry + options = yield while True: observations = [] with open("/proc/stat") as procstat: @@ -334,7 +337,8 @@ def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observat if "nice" in states_to_include: observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})) # ... other states - yield observations + # yield the observations and receive the options for next iteration + options = yield observations meter.create_observable_counter( "system.cpu.time", @@ -343,6 +347,15 @@ def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observat description="CPU time" ) + The :class:`~opentelemetry._metrics.CallbackOptions` contain a timeout which the + callback should respect. For example if the callback does asynchronous work, like + making HTTP requests, it should respect the timeout:: + + def scrape_http_callback(options: CallbackOptions) -> Iterable[Observation]: + r = requests.get('http://scrapethis.com', timeout=options.timeout_millis / 10**3) + for value in r.json(): + yield Observation(value) + Args: name: The name of the instrument to be created callbacks: A sequence of callbacks that return an iterable of diff --git a/opentelemetry-api/src/opentelemetry/_metrics/_internal/instrument.py b/opentelemetry-api/src/opentelemetry/_metrics/_internal/instrument.py index 254c03a5e6f..301ba1c3920 100644 --- a/opentelemetry-api/src/opentelemetry/_metrics/_internal/instrument.py +++ b/opentelemetry-api/src/opentelemetry/_metrics/_internal/instrument.py @@ -16,6 +16,7 @@ from abc import ABC, abstractmethod +from dataclasses import dataclass from logging import getLogger from re import ASCII from re import compile as re_compile @@ -36,19 +37,31 @@ from opentelemetry._metrics._internal.observation import Observation from opentelemetry.util.types import Attributes -InstrumentT = TypeVar("InstrumentT", bound="Instrument") -CallbackT = Union[ - Callable[[], Iterable[Observation]], - Generator[Iterable[Observation], None, None], -] - - _logger = getLogger(__name__) _name_regex = re_compile(r"[a-zA-Z][-.\w]{0,62}", ASCII) _unit_regex = re_compile(r"\w{0,63}", ASCII) +@dataclass(frozen=True) +class CallbackOptions: + """Options for the callback + + Args: + timeout_millis: Timeout for the callback's execution. If the callback does asynchronous + work (e.g. HTTP requests), it should respect this timeout. + """ + + timeout_millis: float = 10_000 + + +InstrumentT = TypeVar("InstrumentT", bound="Instrument") +CallbackT = Union[ + Callable[[CallbackOptions], Iterable[Observation]], + Generator[Iterable[Observation], CallbackOptions, None], +] + + class Instrument(ABC): """Abstract class that serves as base for all instruments.""" diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/instrument.py index 9e11e220594..37f010ced7f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/instrument.py @@ -15,7 +15,15 @@ # pylint: disable=too-many-ancestors from logging import getLogger -from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union +from typing import ( + TYPE_CHECKING, + Dict, + Generator, + Iterable, + List, + Optional, + Union, +) from opentelemetry._metrics import CallbackT from opentelemetry._metrics import Counter as APICounter @@ -26,6 +34,7 @@ ObservableUpDownCounter as APIObservableUpDownCounter, ) from opentelemetry._metrics import UpDownCounter as APIUpDownCounter +from opentelemetry._metrics._internal.instrument import CallbackOptions from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk.util.instrumentation import InstrumentationScope @@ -93,7 +102,7 @@ def __init__( self._measurement_consumer = measurement_consumer super().__init__(name, callbacks, unit=unit, description=description) - self._callbacks = [] + self._callbacks: List[CallbackT] = [] if callbacks is not None: @@ -101,24 +110,33 @@ def __init__( if isinstance(callback, Generator): - def inner(callback=callback) -> Iterable[Measurement]: - return next(callback) + # advance generator to it's first yield + next(callback) + + def inner( + options: CallbackOptions, + callback=callback, + ) -> Iterable[Measurement]: + try: + return callback.send(options) + except StopIteration: + return [] self._callbacks.append(inner) else: self._callbacks.append(callback) - def callback(self) -> Iterable[Measurement]: + def callback( + self, callback_options: CallbackOptions + ) -> Iterable[Measurement]: for callback in self._callbacks: try: - for api_measurement in callback(): + for api_measurement in callback(callback_options): yield Measurement( api_measurement.value, instrument=self, attributes=api_measurement.attributes, ) - except StopIteration: - pass except Exception: # pylint: disable=broad-except _logger.exception( "Callback failed for instrument %s.", self.name diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py index ce9b1f0d5e3..d50d20f25ec 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/measurement_consumer.py @@ -16,6 +16,7 @@ from threading import Lock from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping +from opentelemetry._metrics._internal.instrument import CallbackOptions from opentelemetry.sdk._metrics._internal.metric_reader_storage import ( MetricReaderStorage, ) @@ -27,7 +28,7 @@ from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric if TYPE_CHECKING: - from opentelemetry.sdk._metrics.instrument import _Asynchronous + from opentelemetry.sdk._metrics._internal.instrument import _Asynchronous class MeasurementConsumer(ABC): @@ -78,8 +79,10 @@ def collect( ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] + # for now, just use the defaults + callback_options = CallbackOptions() for async_instrument in self._async_instruments: - for measurement in async_instrument.callback(): + for measurement in async_instrument.callback(callback_options): metric_reader_storage.consume_measurement(measurement) return self._reader_storages[metric_reader].collect( instrument_type_temporality diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_cpu_time.py b/opentelemetry-sdk/tests/metrics/integration_test/test_cpu_time.py index d1688e02d42..90fa4f7d293 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_cpu_time.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_cpu_time.py @@ -17,7 +17,7 @@ from typing import Generator, Iterable, List from unittest import TestCase -from opentelemetry._metrics import Instrument, Observation +from opentelemetry._metrics import CallbackOptions, Instrument, Observation from opentelemetry.sdk._metrics import MeterProvider from opentelemetry.sdk._metrics.measurement import Measurement @@ -138,7 +138,9 @@ def create_measurements_expected( ] def test_cpu_time_callback(self): - def cpu_time_callback() -> Iterable[Observation]: + def cpu_time_callback( + options: CallbackOptions, + ) -> Iterable[Observation]: procstat = io.StringIO(self.procstat_str) procstat.readline() # skip the first line for line in procstat: @@ -180,7 +182,7 @@ def cpu_time_callback() -> Iterable[Observation]: unit="s", description="CPU time", ) - measurements = list(observable_counter.callback()) + measurements = list(observable_counter.callback(CallbackOptions())) self.assertEqual( measurements, self.create_measurements_expected(observable_counter) ) @@ -189,7 +191,9 @@ def test_cpu_time_generator(self): def cpu_time_generator() -> Generator[ Iterable[Observation], None, None ]: + options = yield while True: + self.assertIsInstance(options, CallbackOptions) measurements = [] procstat = io.StringIO(self.procstat_str) procstat.readline() # skip the first line @@ -250,7 +254,7 @@ def cpu_time_generator() -> Generator[ {"cpu": cpu, "state": "guest_nice"}, ) ) - yield measurements + options = yield measurements meter = MeterProvider().get_meter("name") observable_counter = meter.create_observable_counter( @@ -259,7 +263,7 @@ def cpu_time_generator() -> Generator[ unit="s", description="CPU time", ) - measurements = list(observable_counter.callback()) + measurements = list(observable_counter.callback(CallbackOptions())) self.assertEqual( measurements, self.create_measurements_expected(observable_counter) ) diff --git a/opentelemetry-sdk/tests/metrics/test_backward_compat.py b/opentelemetry-sdk/tests/metrics/test_backward_compat.py index 4b3b504d65d..4d814230cc2 100644 --- a/opentelemetry-sdk/tests/metrics/test_backward_compat.py +++ b/opentelemetry-sdk/tests/metrics/test_backward_compat.py @@ -28,7 +28,9 @@ from typing import Iterable, Sequence from unittest import TestCase +from opentelemetry._metrics import CallbackOptions, Observation from opentelemetry.sdk._metrics import MeterProvider +from opentelemetry.sdk._metrics._internal.export import InMemoryMetricReader from opentelemetry.sdk._metrics.export import ( MetricExporter, MetricExportResult, @@ -65,6 +67,10 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self.collect() +def orig_callback(options: CallbackOptions) -> Iterable[Observation]: + yield Observation(2) + + class TestBackwardCompat(TestCase): def test_metric_exporter(self): exporter = OrigMetricExporter() @@ -87,3 +93,15 @@ def test_metric_reader(self): meter_provider.shutdown() except Exception: self.fail() + + def test_observable_callback(self): + reader = InMemoryMetricReader() + meter_provider = MeterProvider(metric_readers=[reader]) + # produce some data + meter_provider.get_meter("foo").create_counter("mycounter").add(12) + try: + metrics = reader.get_metrics() + except Exception: + self.fail() + + self.assertEqual(len(metrics), 1) diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index 58b2aad3e82..8ed026e2456 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -70,7 +70,8 @@ def test_integration(self): meter = MeterProvider(metric_readers=[reader]).get_meter("test_meter") counter1 = meter.create_counter("counter1") meter.create_observable_gauge( - "observable_gauge1", callbacks=[lambda: [Observation(value=12)]] + "observable_gauge1", + callbacks=[lambda options: [Observation(value=12)]], ) counter1.add(1, {"foo": "1"}) counter1.add(1, {"foo": "2"}) diff --git a/opentelemetry-sdk/tests/metrics/test_instrument.py b/opentelemetry-sdk/tests/metrics/test_instrument.py index 8b307918322..0402a3d6518 100644 --- a/opentelemetry-sdk/tests/metrics/test_instrument.py +++ b/opentelemetry-sdk/tests/metrics/test_instrument.py @@ -16,6 +16,7 @@ from unittest.mock import Mock from opentelemetry._metrics import Observation +from opentelemetry._metrics._internal.instrument import CallbackOptions from opentelemetry.sdk._metrics.instrument import ( Counter, Histogram, @@ -62,7 +63,7 @@ def test_add_non_monotonic(self): TEST_ATTRIBUTES = {"foo": "bar"} -def callable_callback_0(): +def callable_callback_0(options: CallbackOptions): return [ Observation(1, attributes=TEST_ATTRIBUTES), Observation(2, attributes=TEST_ATTRIBUTES), @@ -70,7 +71,7 @@ def callable_callback_0(): ] -def callable_callback_1(): +def callable_callback_1(options: CallbackOptions): return [ Observation(4, attributes=TEST_ATTRIBUTES), Observation(5, attributes=TEST_ATTRIBUTES), @@ -79,19 +80,25 @@ def callable_callback_1(): def generator_callback_0(): - yield [ + options = yield + assert isinstance(options, CallbackOptions) + options = yield [ Observation(1, attributes=TEST_ATTRIBUTES), Observation(2, attributes=TEST_ATTRIBUTES), Observation(3, attributes=TEST_ATTRIBUTES), ] + assert isinstance(options, CallbackOptions) def generator_callback_1(): - yield [ + options = yield + assert isinstance(options, CallbackOptions) + options = yield [ Observation(4, attributes=TEST_ATTRIBUTES), Observation(5, attributes=TEST_ATTRIBUTES), Observation(6, attributes=TEST_ATTRIBUTES), ] + assert isinstance(options, CallbackOptions) class TestObservableGauge(TestCase): @@ -105,7 +112,7 @@ def test_callable_callback_0(self): ) self.assertEqual( - list(observable_gauge.callback()), + list(observable_gauge.callback(CallbackOptions())), [ Measurement( 1, instrument=observable_gauge, attributes=TEST_ATTRIBUTES @@ -125,7 +132,7 @@ def test_callable_multiple_callable_callback(self): ) self.assertEqual( - list(observable_gauge.callback()), + list(observable_gauge.callback(CallbackOptions())), [ Measurement( 1, instrument=observable_gauge, attributes=TEST_ATTRIBUTES @@ -154,7 +161,7 @@ def test_generator_callback_0(self): ) self.assertEqual( - list(observable_gauge.callback()), + list(observable_gauge.callback(CallbackOptions())), [ Measurement( 1, instrument=observable_gauge, attributes=TEST_ATTRIBUTES @@ -178,7 +185,7 @@ def test_generator_multiple_generator_callback(self): ) self.assertEqual( - list(observable_gauge.callback()), + list(observable_gauge.callback(CallbackOptions())), [ Measurement( 1, instrument=observable_gauge, attributes=TEST_ATTRIBUTES @@ -209,7 +216,7 @@ def test_callable_callback_0(self): ) self.assertEqual( - list(observable_counter.callback()), + list(observable_counter.callback(CallbackOptions())), [ Measurement( 1, @@ -235,7 +242,7 @@ def test_generator_callback_0(self): ) self.assertEqual( - list(observable_counter.callback()), + list(observable_counter.callback(CallbackOptions())), [ Measurement( 1, @@ -263,7 +270,7 @@ def test_callable_callback_0(self): ) self.assertEqual( - list(observable_up_down_counter.callback()), + list(observable_up_down_counter.callback(CallbackOptions())), [ Measurement( 1, @@ -289,7 +296,7 @@ def test_generator_callback_0(self): ) self.assertEqual( - list(observable_up_down_counter.callback()), + list(observable_up_down_counter.callback(CallbackOptions())), [ Measurement( 1,