From 082d55355500e9a6e46a801f92e5f5b9e2d054b0 Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Fri, 24 Sep 2021 18:17:11 +0200 Subject: [PATCH] Adds metrics API (#1887) * Adds metric prototype Fixes #1835 * Fix docs * Add API metrics doc * Add missing docs * Add files * Adding docs * Refactor to _initialize * Refactor initialize * Add more documentation * Add exporter test * Add process * Fix tests * Try to add aggregator_class argument Tests are failing here * Fix instrument parent classes * Test default aggregator * WIP * Add prototype test * Tests passing again * Use right counters * All tests passing * Rearrange instrument storage * Fix tests * Add HTTP server test * WIP * WIP * Add prototype * WIP * Fail the test * WIP * WIP * WIP * WIP * Add views * Discard instruments via views * Fix tests * WIP * WIP * Fix lint * WIP * Fix test * Fix lint * Fix method * Fix lint * Mypy workaround * Skip if 3.6 * Fix lint * Add reason * Fix 3.6 * Fix run * Fix lint * Remove SDK metrics * Remove SDK docs * Remove metrics * Remove assertnotraises mixin * Revert sdk docs conf * Remove SDK env var changes * Fix unit checking * Define positional-only arguments * Add Metrics plans * Add API tests * WIP * WIP test * WIP * WIP * WIP * Set provider test passing * Use a fixture * Add test for get_provider * Rename tests * WIP * WIP * WIP * WIP * Remove non specific requirement * Add meter requirements * Put all meter provider tests in one file * Add meter tests * Make attributes be passed as a dictionary * Make some interfaces private * Log an error instead * Remove ASCII flag * Add CHANGELOG entry * Add instrument tests * All tests passing * Add test * Add name tests * Add unit tests * Add description tests * Add counter tests * Add more tests * Add Histogram tests * Add observable gauge tests * Add updowncounter tests * Add observableupdowncounter tests * Fix lint * Fix docs * Fix lint * Ignore mypy * Remove useless pylint skip * Remove useless pylint skip * Remove useless pylint skip * Remove useless pylint skip * Remove useless pylint skip * Add locks to meter and meterprovider * Add lock to instruments * Fix fixmes * Fix lint * Add documentation placeholder * Remove blank line as requested. * Do not override Rlock * Remove unecessary super calls * Add missing super calls * Remove plan files * Add missing parameters * Rename observe to callback * Fix lint * Rename to secure_instrument_name * Remove locks * Fix lint * Remove args and kwargs * Remove implementation that gives meters access to meter provider * Allow creating async instruments with either a callback function or generator * add additional test with callback form of observable counter * add a test/example that reads measurements from proc stat * implement cpu time integration test with generator too Co-authored-by: Aaron Abbott --- .../src/opentelemetry/metrics/__init__.py | 384 ++++++++++++++++++ .../src/opentelemetry/metrics/instrument.py | 234 +++++++++++ .../src/opentelemetry/metrics/measurement.py | 39 ++ 3 files changed, 657 insertions(+) create mode 100644 opentelemetry-api/src/opentelemetry/metrics/__init__.py create mode 100644 opentelemetry-api/src/opentelemetry/metrics/instrument.py create mode 100644 opentelemetry-api/src/opentelemetry/metrics/measurement.py diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py new file mode 100644 index 00000000000..83d210e063b --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -0,0 +1,384 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-ancestors +# type: ignore + +# FIXME enhance the documentation of this module +""" +This module provides abstract and concrete (but noop) classes that can be used +to generate metrics. +""" + + +from abc import ABC, abstractmethod +from logging import getLogger +from os import environ +from typing import Optional, cast + +from opentelemetry.environment_variables import OTEL_PYTHON_METER_PROVIDER +from opentelemetry.metrics.instrument import ( + Counter, + DefaultCounter, + DefaultHistogram, + DefaultObservableCounter, + DefaultObservableGauge, + DefaultObservableUpDownCounter, + DefaultUpDownCounter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.util._providers import _load_provider + +_logger = getLogger(__name__) + + +class MeterProvider(ABC): + @abstractmethod + def get_meter( + self, + name, + version=None, + schema_url=None, + ) -> "Meter": + if name is None or name == "": + _logger.warning("Invalid name: %s", name) + + +class _DefaultMeterProvider(MeterProvider): + def get_meter( + self, + name, + version=None, + schema_url=None, + ) -> "Meter": + super().get_meter(name, version=version, schema_url=schema_url) + return _DefaultMeter(name, version=version, schema_url=schema_url) + + +class ProxyMeterProvider(MeterProvider): + def get_meter( + self, + name, + version=None, + schema_url=None, + ) -> "Meter": + if _METER_PROVIDER: + return _METER_PROVIDER.get_meter( + name, version=version, schema_url=schema_url + ) + return ProxyMeter(name, version=version, schema_url=schema_url) + + +class Meter(ABC): + def __init__(self, name, version=None, schema_url=None): + super().__init__() + self._name = name + self._version = version + self._schema_url = schema_url + self._instrument_names = set() + + @property + def name(self): + return self._name + + @property + def version(self): + return self._version + + @property + def schema_url(self): + return self._schema_url + + def _secure_instrument_name(self, name): + name = name.lower() + + if name in self._instrument_names: + _logger.error("Instrument name %s has been used already", name) + + else: + self._instrument_names.add(name) + + @abstractmethod + def create_counter(self, name, unit="", description="") -> Counter: + self._secure_instrument_name(name) + + @abstractmethod + def create_up_down_counter( + self, name, unit="", description="" + ) -> UpDownCounter: + self._secure_instrument_name(name) + + @abstractmethod + def create_observable_counter( + self, name, callback, unit="", description="" + ) -> ObservableCounter: + """Creates an observable counter instrument + + An observable counter observes a monotonically increasing count by + calling a provided callback which returns multiple + :class:`~opentelemetry.metrics.measurement.Measurement`. + + For example, an observable counter could be used to report system CPU + time periodically. Here is a basic implementation:: + + def cpu_time_callback() -> Iterable[Measurement]: + measurements = [] + with open("/proc/stat") as procstat: + procstat.readline() # skip the first line + for line in procstat: + if not line.startswith("cpu"): break + cpu, *states = line.split() + measurements.append(Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"})) + measurements.append(Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})) + measurements.append(Measurement(int(states[2]) // 100, {"cpu": cpu, "state": "system"})) + # ... other states + return measurements + + meter.create_observable_counter( + "system.cpu.time", + callback=cpu_time_callback, + unit="s", + description="CPU time" + ) + + To reduce memory usage, you can use generator callbacks instead of + building the full list:: + + def cpu_time_callback() -> Iterable[Measurement]: + with open("/proc/stat") as procstat: + procstat.readline() # skip the first line + for line in procstat: + if not line.startswith("cpu"): break + cpu, *states = line.split() + yield Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"}) + yield Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}) + # ... other states + + Alternatively, you can pass a generator directly instead of a callback, + which should return iterables of + :class:`~opentelemetry.metrics.measurement.Measurement`:: + + def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Measurement]]: + while True: + measurements = [] + with open("/proc/stat") as procstat: + procstat.readline() # skip the first line + for line in procstat: + if not line.startswith("cpu"): break + cpu, *states = line.split() + if "user" in states_to_include: + measurements.append(Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"})) + if "nice" in states_to_include: + measurements.append(Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})) + # ... other states + yield measurements + + meter.create_observable_counter( + "system.cpu.time", + callback=cpu_time_callback({"user", "system"}), + unit="s", + description="CPU time" + ) + + Args: + name: The name of the instrument to be created + callback: A callback that returns an iterable of + :class:`~opentelemetry.metrics.measurement.Measurement`. + Alternatively, can be a generator that yields iterables of + :class:`~opentelemetry.metrics.measurement.Measurement`. + unit: The unit for measurements this instrument reports. For + example, ``By`` for bytes. UCUM units are recommended. + description: A description for this instrument and what it measures. + """ + + self._secure_instrument_name(name) + + @abstractmethod + def create_histogram(self, name, unit="", description="") -> Histogram: + self._secure_instrument_name(name) + + @abstractmethod + def create_observable_gauge( + self, name, callback, unit="", description="" + ) -> ObservableGauge: + self._secure_instrument_name(name) + + @abstractmethod + def create_observable_up_down_counter( + self, name, callback, unit="", description="" + ) -> ObservableUpDownCounter: + self._secure_instrument_name(name) + + +class ProxyMeter(Meter): + def __init__( + self, + name, + version=None, + schema_url=None, + ): + super().__init__(name, version=version, schema_url=schema_url) + self._real_meter: Optional[Meter] = None + self._noop_meter = _DefaultMeter( + name, version=version, schema_url=schema_url + ) + + @property + def _meter(self) -> Meter: + if self._real_meter is not None: + return self._real_meter + + if _METER_PROVIDER: + self._real_meter = _METER_PROVIDER.get_meter( + self._name, + self._version, + ) + return self._real_meter + return self._noop_meter + + def create_counter(self, *args, **kwargs) -> Counter: + return self._meter.create_counter(*args, **kwargs) + + def create_up_down_counter(self, *args, **kwargs) -> UpDownCounter: + return self._meter.create_up_down_counter(*args, **kwargs) + + def create_observable_counter(self, *args, **kwargs) -> ObservableCounter: + return self._meter.create_observable_counter(*args, **kwargs) + + def create_histogram(self, *args, **kwargs) -> Histogram: + return self._meter.create_histogram(*args, **kwargs) + + def create_observable_gauge(self, *args, **kwargs) -> ObservableGauge: + return self._meter.create_observable_gauge(*args, **kwargs) + + def create_observable_up_down_counter( + self, *args, **kwargs + ) -> ObservableUpDownCounter: + return self._meter.create_observable_up_down_counter(*args, **kwargs) + + +class _DefaultMeter(Meter): + def create_counter(self, name, unit="", description="") -> Counter: + super().create_counter(name, unit=unit, description=description) + return DefaultCounter(name, unit=unit, description=description) + + def create_up_down_counter( + self, name, unit="", description="" + ) -> UpDownCounter: + super().create_up_down_counter( + name, unit=unit, description=description + ) + return DefaultUpDownCounter(name, unit=unit, description=description) + + def create_observable_counter( + self, name, callback, unit="", description="" + ) -> ObservableCounter: + super().create_observable_counter( + name, callback, unit=unit, description=description + ) + return DefaultObservableCounter( + name, + callback, + unit=unit, + description=description, + ) + + def create_histogram(self, name, unit="", description="") -> Histogram: + super().create_histogram(name, unit=unit, description=description) + return DefaultHistogram(name, unit=unit, description=description) + + def create_observable_gauge( + self, name, callback, unit="", description="" + ) -> ObservableGauge: + super().create_observable_gauge( + name, callback, unit=unit, description=description + ) + return DefaultObservableGauge( + name, + callback, + unit=unit, + description=description, + ) + + def create_observable_up_down_counter( + self, name, callback, unit="", description="" + ) -> ObservableUpDownCounter: + super().create_observable_up_down_counter( + name, callback, unit=unit, description=description + ) + return DefaultObservableUpDownCounter( + name, + callback, + unit=unit, + description=description, + ) + + +_METER_PROVIDER = None +_PROXY_METER_PROVIDER = None + + +def get_meter( + name: str, + version: str = "", + meter_provider: Optional[MeterProvider] = None, +) -> "Meter": + """Returns a `Meter` for use by the given instrumentation library. + + This function is a convenience wrapper for + opentelemetry.trace.MeterProvider.get_meter. + + If meter_provider is omitted the current configured one is used. + """ + if meter_provider is None: + meter_provider = get_meter_provider() + return meter_provider.get_meter(name, version) + + +def set_meter_provider(meter_provider: MeterProvider) -> None: + """Sets the current global :class:`~.MeterProvider` object. + + This can only be done once, a warning will be logged if any furter attempt + is made. + """ + global _METER_PROVIDER # pylint: disable=global-statement + + if _METER_PROVIDER is not None: + _logger.warning("Overriding of current MeterProvider is not allowed") + return + + _METER_PROVIDER = meter_provider + + +def get_meter_provider() -> MeterProvider: + """Gets the current global :class:`~.MeterProvider` object.""" + # pylint: disable=global-statement + global _METER_PROVIDER + global _PROXY_METER_PROVIDER + + if _METER_PROVIDER is None: + if OTEL_PYTHON_METER_PROVIDER not in environ.keys(): + if _PROXY_METER_PROVIDER is None: + _PROXY_METER_PROVIDER = ProxyMeterProvider() + return _PROXY_METER_PROVIDER + + _METER_PROVIDER = cast( + "MeterProvider", + _load_provider(OTEL_PYTHON_METER_PROVIDER, "meter_provider"), + ) + return _METER_PROVIDER diff --git a/opentelemetry-api/src/opentelemetry/metrics/instrument.py b/opentelemetry-api/src/opentelemetry/metrics/instrument.py new file mode 100644 index 00000000000..5d382056408 --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/metrics/instrument.py @@ -0,0 +1,234 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-ancestors + +# type: ignore + +from abc import ABC, abstractmethod +from collections import abc as collections_abc +from logging import getLogger +from re import compile as compile_ +from typing import Callable, Generator, Iterable, Union + +from opentelemetry.metrics.measurement import Measurement + +_TInstrumentCallback = Callable[[], Iterable[Measurement]] +_TInstrumentCallbackGenerator = Generator[Iterable[Measurement], None, None] +TCallback = Union[_TInstrumentCallback, _TInstrumentCallbackGenerator] + + +_logger = getLogger(__name__) + + +class Instrument(ABC): + + _name_regex = compile_(r"[a-zA-Z][-.\w]{0,62}") + + @property + def name(self): + return self._name + + @property + def unit(self): + return self._unit + + @property + def description(self): + return self._description + + @abstractmethod + def __init__(self, name, unit="", description=""): + + if name is None or self._name_regex.fullmatch(name) is None: + _logger.error("Invalid instrument name %s", name) + + else: + self._name = name + + if unit is None: + self._unit = "" + elif len(unit) > 63: + _logger.error("unit must be 63 characters or shorter") + + elif any(ord(character) > 127 for character in unit): + _logger.error("unit must only contain ASCII characters") + else: + self._unit = unit + + if description is None: + description = "" + + self._description = description + + +class Synchronous(Instrument): + pass + + +class Asynchronous(Instrument): + @abstractmethod + def __init__( + self, + name, + callback: TCallback, + *args, + unit="", + description="", + **kwargs + ): + super().__init__( + name, *args, unit=unit, description=description, **kwargs + ) + + if isinstance(callback, collections_abc.Callable): + self._callback = callback + elif isinstance(callback, collections_abc.Generator): + self._callback = self._wrap_generator_callback(callback) + else: + _logger.error("callback must be a callable or generator") + + def _wrap_generator_callback( + self, + generator_callback: _TInstrumentCallbackGenerator, + ) -> _TInstrumentCallback: + """Wraps a generator style callback into a callable one""" + has_items = True + + def inner() -> Iterable[Measurement]: + nonlocal has_items + if not has_items: + return [] + + try: + return next(generator_callback) + except StopIteration: + has_items = False + _logger.error( + "callback generator for instrument %s ran out of measurements", + self._name, + ) + return [] + + return inner + + def callback(self): + measurements = self._callback() + if not isinstance(measurements, collections_abc.Iterable): + _logger.error( + "Callback must return an iterable of Measurement, got %s", + type(measurements), + ) + return + for measurement in measurements: + if not isinstance(measurement, Measurement): + _logger.error( + "Callback must return an iterable of Measurement, " + "iterable contained type %s", + type(measurement), + ) + yield measurement + + +class _Adding(Instrument): + pass + + +class _Grouping(Instrument): + pass + + +class _Monotonic(_Adding): + pass + + +class _NonMonotonic(_Adding): + pass + + +class Counter(_Monotonic, Synchronous): + @abstractmethod + def add(self, amount, attributes=None): + if amount < 0: + _logger.error("Amount must be non-negative") + + +class DefaultCounter(Counter): + def __init__(self, name, unit="", description=""): + super().__init__(name, unit=unit, description=description) + + def add(self, amount, attributes=None): + return super().add(amount, attributes=attributes) + + +class UpDownCounter(_NonMonotonic, Synchronous): + @abstractmethod + def add(self, amount, attributes=None): + pass + + +class DefaultUpDownCounter(UpDownCounter): + def __init__(self, name, unit="", description=""): + super().__init__(name, unit=unit, description=description) + + def add(self, amount, attributes=None): + return super().add(amount, attributes=attributes) + + +class ObservableCounter(_Monotonic, Asynchronous): + def callback(self): + measurements = super().callback() + + for measurement in measurements: + if measurement.value < 0: + _logger.error("Amount must be non-negative") + yield measurement + + +class DefaultObservableCounter(ObservableCounter): + def __init__(self, name, callback, unit="", description=""): + super().__init__(name, callback, unit=unit, description=description) + + +class ObservableUpDownCounter(_NonMonotonic, Asynchronous): + + pass + + +class DefaultObservableUpDownCounter(ObservableUpDownCounter): + def __init__(self, name, callback, unit="", description=""): + super().__init__(name, callback, unit=unit, description=description) + + +class Histogram(_Grouping, Synchronous): + @abstractmethod + def record(self, amount, attributes=None): + pass + + +class DefaultHistogram(Histogram): + def __init__(self, name, unit="", description=""): + super().__init__(name, unit=unit, description=description) + + def record(self, amount, attributes=None): + return super().record(amount, attributes=attributes) + + +class ObservableGauge(_Grouping, Asynchronous): + pass + + +class DefaultObservableGauge(ObservableGauge): + def __init__(self, name, callback, unit="", description=""): + super().__init__(name, callback, unit=unit, description=description) diff --git a/opentelemetry-api/src/opentelemetry/metrics/measurement.py b/opentelemetry-api/src/opentelemetry/metrics/measurement.py new file mode 100644 index 00000000000..6b5b081c266 --- /dev/null +++ b/opentelemetry-api/src/opentelemetry/metrics/measurement.py @@ -0,0 +1,39 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=too-many-ancestors +# type:ignore + + +from abc import ABC, abstractmethod + + +class Measurement(ABC): + @property + def value(self): + return self._value + + @property + def attributes(self): + return self._attributes + + @abstractmethod + def __init__(self, value, attributes=None): + self._value = value + self._attributes = attributes + + +class DefaultMeasurement(Measurement): + def __init__(self, value, attributes=None): + super().__init__(value, attributes=attributes)