Skip to content

Commit

Permalink
Add callback options with timeout to observable callbacks (open-telem…
Browse files Browse the repository at this point in the history
…etry#2664)

* Add callback options with timeout to observable callbacks

* remove debugging code

* changelog
  • Loading branch information
aabmass authored May 7, 2022
1 parent 07a5b64 commit db9d40b
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.11.1-0.30b1...HEAD)

- Add CallbackOptions to observable instrument callback params
([#2664](https://github.com/open-telemetry/opentelemetry-python/pull/2664))
- Add timeouts to metric SDK
([#2653](https://github.com/open-telemetry/opentelemetry-python/pull/2653))
- Add variadic arguments to metric exporter/reader interfaces
Expand Down
16 changes: 10 additions & 6 deletions docs/examples/metrics/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
get_meter_provider,
set_meter_provider,
)
from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.exporter.otlp.proto.grpc._metric_exporter import (
OTLPMetricExporter,
)
Expand All @@ -17,15 +18,17 @@
set_meter_provider(provider)


def observable_counter_func() -> Iterable[Observation]:
def observable_counter_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(1, {})


def observable_up_down_counter_func() -> Iterable[Observation]:
def observable_up_down_counter_func(
options: CallbackOptions,
) -> Iterable[Observation]:
yield Observation(-10, {})


def observable_gauge_func() -> Iterable[Observation]:
def observable_gauge_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(9, {})


Expand All @@ -37,7 +40,8 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async Counter
observable_counter = meter.create_observable_counter(
"observable_counter", observable_counter_func
"observable_counter",
[observable_counter_func],
)

# UpDownCounter
Expand All @@ -47,12 +51,12 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async UpDownCounter
observable_updown_counter = meter.create_observable_up_down_counter(
"observable_updown_counter", observable_up_down_counter_func
"observable_updown_counter", [observable_up_down_counter_func]
)

# Histogram
histogram = meter.create_histogram("histogram")
histogram.record(99.9)

# Async Gauge
gauge = meter.create_observable_gauge("gauge", observable_gauge_func)
gauge = meter.create_observable_gauge("gauge", [observable_gauge_func])
15 changes: 9 additions & 6 deletions docs/getting_started/metrics_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Iterable

from opentelemetry._metrics import (
CallbackOptions,
Observation,
get_meter_provider,
set_meter_provider,
Expand All @@ -34,15 +35,17 @@
set_meter_provider(provider)


def observable_counter_func() -> Iterable[Observation]:
def observable_counter_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(1, {})


def observable_up_down_counter_func() -> Iterable[Observation]:
def observable_up_down_counter_func(
options: CallbackOptions,
) -> Iterable[Observation]:
yield Observation(-10, {})


def observable_gauge_func() -> Iterable[Observation]:
def observable_gauge_func(options: CallbackOptions) -> Iterable[Observation]:
yield Observation(9, {})


Expand All @@ -54,7 +57,7 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async Counter
observable_counter = meter.create_observable_counter(
"observable_counter", observable_counter_func
"observable_counter", [observable_counter_func]
)

# UpDownCounter
Expand All @@ -64,12 +67,12 @@ def observable_gauge_func() -> Iterable[Observation]:

# Async UpDownCounter
observable_updown_counter = meter.create_observable_up_down_counter(
"observable_updown_counter", observable_up_down_counter_func
"observable_updown_counter", [observable_up_down_counter_func]
)

# Histogram
histogram = meter.create_histogram("histogram")
histogram.record(99.9)

# Async Gauge
gauge = meter.create_observable_gauge("gauge", observable_gauge_func)
gauge = meter.create_observable_gauge("gauge", [observable_gauge_func])
3 changes: 3 additions & 0 deletions opentelemetry-api/src/opentelemetry/_metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from opentelemetry._metrics._internal.instrument import (
Asynchronous,
CallbackOptions,
CallbackT,
Counter,
Histogram,
Expand All @@ -71,6 +72,7 @@
Counter,
Synchronous,
Asynchronous,
CallbackOptions,
get_meter_provider,
get_meter,
Histogram,
Expand All @@ -95,6 +97,7 @@
obj.__module__ = __name__

__all__ = [
"CallbackOptions",
"MeterProvider",
"NoOpMeterProvider",
"Meter",
Expand Down
21 changes: 17 additions & 4 deletions opentelemetry-api/src/opentelemetry/_metrics/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,13 @@ def create_observable_counter(
"""Creates an `ObservableCounter` instrument
An observable counter observes a monotonically increasing count by calling provided
callbacks which returns multiple :class:`~opentelemetry._metrics.Observation`.
callbacks which accept a :class:`~opentelemetry._metrics.CallbackOptions` and return
multiple :class:`~opentelemetry._metrics.Observation`.
For example, an observable counter could be used to report system CPU
time periodically. Here is a basic implementation::
def cpu_time_callback() -> Iterable[Observation]:
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
observations = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
Expand All @@ -308,7 +309,7 @@ def cpu_time_callback() -> Iterable[Observation]:
To reduce memory usage, you can use generator callbacks instead of
building the full list::
def cpu_time_callback() -> Iterable[Observation]:
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
Expand All @@ -322,6 +323,8 @@ def cpu_time_callback() -> Iterable[Observation]:
callbacks, which each should return iterables of :class:`~opentelemetry._metrics.Observation`::
def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observation]]:
# accept options sent in from OpenTelemetry
options = yield
while True:
observations = []
with open("/proc/stat") as procstat:
Expand All @@ -334,7 +337,8 @@ def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observat
if "nice" in states_to_include:
observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
# ... other states
yield observations
# yield the observations and receive the options for next iteration
options = yield observations
meter.create_observable_counter(
"system.cpu.time",
Expand All @@ -343,6 +347,15 @@ def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observat
description="CPU time"
)
The :class:`~opentelemetry._metrics.CallbackOptions` contain a timeout which the
callback should respect. For example if the callback does asynchronous work, like
making HTTP requests, it should respect the timeout::
def scrape_http_callback(options: CallbackOptions) -> Iterable[Observation]:
r = requests.get('http://scrapethis.com', timeout=options.timeout_millis / 10**3)
for value in r.json():
yield Observation(value)
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


from abc import ABC, abstractmethod
from dataclasses import dataclass
from logging import getLogger
from re import ASCII
from re import compile as re_compile
Expand All @@ -36,19 +37,31 @@
from opentelemetry._metrics._internal.observation import Observation
from opentelemetry.util.types import Attributes

InstrumentT = TypeVar("InstrumentT", bound="Instrument")
CallbackT = Union[
Callable[[], Iterable[Observation]],
Generator[Iterable[Observation], None, None],
]


_logger = getLogger(__name__)

_name_regex = re_compile(r"[a-zA-Z][-.\w]{0,62}", ASCII)
_unit_regex = re_compile(r"\w{0,63}", ASCII)


@dataclass(frozen=True)
class CallbackOptions:
"""Options for the callback
Args:
timeout_millis: Timeout for the callback's execution. If the callback does asynchronous
work (e.g. HTTP requests), it should respect this timeout.
"""

timeout_millis: float = 10_000


InstrumentT = TypeVar("InstrumentT", bound="Instrument")
CallbackT = Union[
Callable[[CallbackOptions], Iterable[Observation]],
Generator[Iterable[Observation], CallbackOptions, None],
]


class Instrument(ABC):
"""Abstract class that serves as base for all instruments."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
# pylint: disable=too-many-ancestors

from logging import getLogger
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union
from typing import (
TYPE_CHECKING,
Dict,
Generator,
Iterable,
List,
Optional,
Union,
)

from opentelemetry._metrics import CallbackT
from opentelemetry._metrics import Counter as APICounter
Expand All @@ -26,6 +34,7 @@
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics import UpDownCounter as APIUpDownCounter
from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

Expand Down Expand Up @@ -93,32 +102,41 @@ def __init__(
self._measurement_consumer = measurement_consumer
super().__init__(name, callbacks, unit=unit, description=description)

self._callbacks = []
self._callbacks: List[CallbackT] = []

if callbacks is not None:

for callback in callbacks:

if isinstance(callback, Generator):

def inner(callback=callback) -> Iterable[Measurement]:
return next(callback)
# advance generator to it's first yield
next(callback)

def inner(
options: CallbackOptions,
callback=callback,
) -> Iterable[Measurement]:
try:
return callback.send(options)
except StopIteration:
return []

self._callbacks.append(inner)
else:
self._callbacks.append(callback)

def callback(self) -> Iterable[Measurement]:
def callback(
self, callback_options: CallbackOptions
) -> Iterable[Measurement]:
for callback in self._callbacks:
try:
for api_measurement in callback():
for api_measurement in callback(callback_options):
yield Measurement(
api_measurement.value,
instrument=self,
attributes=api_measurement.attributes,
)
except StopIteration:
pass
except Exception: # pylint: disable=broad-except
_logger.exception(
"Callback failed for instrument %s.", self.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from threading import Lock
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping

from opentelemetry._metrics._internal.instrument import CallbackOptions
from opentelemetry.sdk._metrics._internal.metric_reader_storage import (
MetricReaderStorage,
)
Expand All @@ -27,7 +28,7 @@
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric

if TYPE_CHECKING:
from opentelemetry.sdk._metrics.instrument import _Asynchronous
from opentelemetry.sdk._metrics._internal.instrument import _Asynchronous


class MeasurementConsumer(ABC):
Expand Down Expand Up @@ -78,8 +79,10 @@ def collect(
) -> Iterable[Metric]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
# for now, just use the defaults
callback_options = CallbackOptions()
for async_instrument in self._async_instruments:
for measurement in async_instrument.callback():
for measurement in async_instrument.callback(callback_options):
metric_reader_storage.consume_measurement(measurement)
return self._reader_storages[metric_reader].collect(
instrument_type_temporality
Expand Down
Loading

0 comments on commit db9d40b

Please sign in to comment.