From 888bed91695da7466a27683898c4b674538289dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Mon, 2 Mar 2020 17:02:03 -0500 Subject: [PATCH] sdk: Implement observer instrument (#425) Observer instruments are used to capture a current set of values at a point in time [1]. This commit extends the Meter interface to allow to register an observer instrument by pasing a callback that will be executed at collection time. The logic inside collection is updated to consider these instruments and a new ObserverAggregator is implemented. [1] https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/api-metrics.md#observer-instruments --- examples/metrics/observer_example.py | 72 ++++++++ examples/metrics/record.py | 16 +- examples/metrics/simple_example.py | 27 ++- .../opentelemetry/ext/prometheus/__init__.py | 14 +- .../src/opentelemetry/metrics/__init__.py | 127 +++++++------ .../tests/metrics/test_metrics.py | 15 -- .../tests/test_implementation.py | 7 + .../src/opentelemetry/sdk/metrics/__init__.py | 136 ++++++++++---- .../sdk/metrics/export/aggregate.py | 29 +++ .../sdk/metrics/export/batcher.py | 5 +- .../tests/metrics/export/test_export.py | 87 +++++++++ .../tests/metrics/test_metrics.py | 168 +++++++++++------- tox.ini | 5 +- 13 files changed, 517 insertions(+), 191 deletions(-) create mode 100644 examples/metrics/observer_example.py diff --git a/examples/metrics/observer_example.py b/examples/metrics/observer_example.py new file mode 100644 index 00000000000..aff25ee476c --- /dev/null +++ b/examples/metrics/observer_example.py @@ -0,0 +1,72 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This example shows how the Observer metric instrument can be used to capture +asynchronous metrics data. +""" +import psutil + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import LabelSet, MeterProvider +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter +from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher +from opentelemetry.sdk.metrics.export.controller import PushController + +# Configure a stateful batcher +batcher = UngroupedBatcher(stateful=True) + +metrics.set_preferred_meter_provider_implementation(lambda _: MeterProvider()) +meter = metrics.get_meter(__name__) + +# Exporter to export metrics to the console +exporter = ConsoleMetricsExporter() + +# Configure a push controller +controller = PushController(meter=meter, exporter=exporter, interval=2) + + +# Callback to gather cpu usage +def get_cpu_usage_callback(observer): + for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)): + label_set = meter.get_label_set({"cpu_number": str(number)}) + observer.observe(percent, label_set) + + +meter.register_observer( + callback=get_cpu_usage_callback, + name="cpu_percent", + description="per-cpu usage", + unit="1", + value_type=float, + label_keys=("cpu_number",), +) + + +# Callback to gather RAM memory usage +def get_ram_usage_callback(observer): + ram_percent = psutil.virtual_memory().percent + observer.observe(ram_percent, LabelSet()) + + +meter.register_observer( + callback=get_ram_usage_callback, + name="ram_percent", + description="RAM memory usage", + unit="1", + value_type=float, + label_keys=(), +) + +input("Press a key to finish...\n") diff --git a/examples/metrics/record.py b/examples/metrics/record.py index f008ff67466..a376b2aafc0 100644 --- a/examples/metrics/record.py +++ b/examples/metrics/record.py @@ -32,15 +32,25 @@ exporter = ConsoleMetricsExporter() # controller collects metrics created from meter and exports it via the # exporter every interval -controller = PushController(meter, exporter, 5) +controller = PushController(meter=meter, exporter=exporter, interval=5) # Example to show how to record using the meter counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) + name="requests", + description="number of requests", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) counter2 = meter.create_metric( - "clicks", "number of clicks", 1, int, Counter, ("environment",) + name="clicks", + description="number of clicks", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) # Labelsets are used to identify key-values that are associated with a specific diff --git a/examples/metrics/simple_example.py b/examples/metrics/simple_example.py index 75879155176..2b8f5cfac8b 100644 --- a/examples/metrics/simple_example.py +++ b/examples/metrics/simple_example.py @@ -62,15 +62,30 @@ def usage(argv): # Metric instruments allow to capture measurements requests_counter = meter.create_metric( - "requests", "number of requests", 1, int, Counter, ("environment",) + name="requests", + description="number of requests", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) clicks_counter = meter.create_metric( - "clicks", "number of clicks", 1, int, Counter, ("environment",) + name="clicks", + description="number of clicks", + unit="1", + value_type=int, + metric_type=Counter, + label_keys=("environment",), ) requests_size = meter.create_metric( - "requests_size", "size of requests", 1, int, Measure, ("environment",) + name="requests_size", + description="size of requests", + unit="1", + value_type=int, + metric_type=Measure, + label_keys=("environment",), ) # Labelsets are used to identify key-values that are associated with a specific @@ -82,21 +97,15 @@ def usage(argv): # Update the metric instruments using the direct calling convention requests_size.record(100, staging_label_set) requests_counter.add(25, staging_label_set) -# Sleep for 5 seconds, exported value should be 25 time.sleep(5) requests_size.record(5000, staging_label_set) requests_counter.add(50, staging_label_set) -# Exported value should be 75 time.sleep(5) requests_size.record(2, testing_label_set) requests_counter.add(35, testing_label_set) -# There should be two exported values 75 and 35, one for each labelset time.sleep(5) clicks_counter.add(5, staging_label_set) -# There should be three exported values, labelsets can be reused for different -# metrics but will be recorded seperately, 75, 35 and 5 - time.sleep(5) diff --git a/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py b/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py index 5b4a17a5569..ebe68e3f4dd 100644 --- a/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py +++ b/ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py @@ -24,11 +24,10 @@ REGISTRY, CollectorRegistry, CounterMetricFamily, - GaugeMetricFamily, UnknownMetricFamily, ) -from opentelemetry.metrics import Counter, Gauge, Measure, Metric +from opentelemetry.metrics import Counter, Measure, Metric from opentelemetry.sdk.metrics.export import ( MetricRecord, MetricsExporter, @@ -112,17 +111,6 @@ def _translate_to_prometheus(self, metric_record: MetricRecord): prometheus_metric.add_metric( labels=label_values, value=metric_record.aggregator.checkpoint ) - - elif isinstance(metric_record.metric, Gauge): - prometheus_metric = GaugeMetricFamily( - name=metric_name, - documentation=metric_record.metric.description, - labels=label_keys, - ) - prometheus_metric.add_metric( - labels=label_values, value=metric_record.aggregator.checkpoint - ) - # TODO: Add support for histograms when supported in OT elif isinstance(metric_record.metric, Measure): prometheus_metric = UnknownMetricFamily( diff --git a/opentelemetry-api/src/opentelemetry/metrics/__init__.py b/opentelemetry-api/src/opentelemetry/metrics/__init__.py index c1b330551aa..3ba9bcad009 100644 --- a/opentelemetry-api/src/opentelemetry/metrics/__init__.py +++ b/opentelemetry-api/src/opentelemetry/metrics/__init__.py @@ -50,13 +50,6 @@ def add(self, value: ValueT) -> None: value: The value to add to the handle. """ - def set(self, value: ValueT) -> None: - """No-op implementation of `GaugeHandle` set. - - Args: - value: The value to set to the handle. - """ - def record(self, value: ValueT) -> None: """No-op implementation of `MeasureHandle` record. @@ -74,15 +67,6 @@ def add(self, value: ValueT) -> None: """ -class GaugeHandle: - def set(self, value: ValueT) -> None: - """Sets the current value of the handle to ``value``. - - Args: - value: The value to set to the handle. - """ - - class MeasureHandle: def record(self, value: ValueT) -> None: """Records the given ``value`` to this handle. @@ -124,7 +108,7 @@ def get_handle(self, label_set: LabelSet) -> "object": Handles are useful to reduce the cost of repeatedly recording a metric with a pre-defined set of label values. All metric kinds (counter, - gauge, measure) support declaring a set of required label keys. The + measure) support declaring a set of required label keys. The values corresponding to these keys should be specified in every handle. "Unspecified" label values, in cases where a handle is requested but a value was not provided are permitted. @@ -153,14 +137,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None: label_set: `LabelSet` to associate with the returned handle. """ - def set(self, value: ValueT, label_set: LabelSet) -> None: - """No-op implementation of `Gauge` set. - - Args: - value: The value to set the gauge metric to. - label_set: `LabelSet` to associate with the returned handle. - """ - def record(self, value: ValueT, label_set: LabelSet) -> None: """No-op implementation of `Measure` record. @@ -186,28 +162,6 @@ def add(self, value: ValueT, label_set: LabelSet) -> None: """ -class Gauge(Metric): - """A gauge type metric that expresses a pre-calculated value. - - Gauge metrics have a value that is either ``Set`` by explicit - instrumentation or observed through a callback. This kind of metric - should be used when the metric cannot be expressed as a sum or because - the measurement interval is arbitrary. - """ - - def get_handle(self, label_set: LabelSet) -> "GaugeHandle": - """Gets a `GaugeHandle`.""" - return GaugeHandle() - - def set(self, value: ValueT, label_set: LabelSet) -> None: - """Sets the value of the gauge to ``value``. - - Args: - value: The value to set the gauge metric to. - label_set: `LabelSet` to associate with the returned handle. - """ - - class Measure(Metric): """A measure type metric that represent raw stats that are recorded. @@ -227,6 +181,37 @@ def record(self, value: ValueT, label_set: LabelSet) -> None: """ +class Observer(abc.ABC): + """An observer type metric instrument used to capture a current set of values. + + + Observer instruments are asynchronous, a callback is invoked with the + observer instrument as argument allowing the user to capture multiple + values per collection interval. + """ + + @abc.abstractmethod + def observe(self, value: ValueT, label_set: LabelSet) -> None: + """Captures ``value`` to the observer. + + Args: + value: The value to capture to this observer metric. + label_set: `LabelSet` associated to ``value``. + """ + + +class DefaultObserver(Observer): + """No-op implementation of ``Observer``.""" + + def observe(self, value: ValueT, label_set: LabelSet) -> None: + """Captures ``value`` to the observer. + + Args: + value: The value to capture to this observer metric. + label_set: `LabelSet` associated to ``value``. + """ + + class MeterProvider(abc.ABC): @abc.abstractmethod def get_meter( @@ -277,15 +262,16 @@ def get_meter( return DefaultMeter() -MetricT = TypeVar("MetricT", Counter, Gauge, Measure) +MetricT = TypeVar("MetricT", Counter, Measure, Observer) +ObserverCallbackT = Callable[[Observer], None] # pylint: disable=unused-argument class Meter(abc.ABC): """An interface to allow the recording of metrics. - `Metric` s are used for recording pre-defined aggregation (gauge and - counter), or raw values (measure) in which the aggregation and labels + `Metric` s are used for recording pre-defined aggregation (counter), + or raw values (measure) in which the aggregation and labels for the exported metric are deferred. """ @@ -325,7 +311,8 @@ def create_metric( Args: name: The name of the metric. description: Human-readable description of the metric. - unit: Unit of the metric values. + unit: Unit of the metric values following the UCUM convention + (https://unitsofmeasure.org/ucum.html). value_type: The type of values being recorded by the metric. metric_type: The type of metric being created. label_keys: The keys for the labels with dynamic values. @@ -333,6 +320,32 @@ def create_metric( Returns: A new ``metric_type`` metric with values of ``value_type``. """ + @abc.abstractmethod + def register_observer( + self, + callback: ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[ValueT], + label_keys: Sequence[str] = (), + enabled: bool = True, + ) -> "Observer": + """Registers an ``Observer`` metric instrument. + + Args: + callback: Callback invoked each collection interval with the + observer as argument. + name: The name of the metric. + description: Human-readable description of the metric. + unit: Unit of the metric values following the UCUM convention + (https://unitsofmeasure.org/ucum.html). + value_type: The type of values being recorded by the metric. + label_keys: The keys for the labels with dynamic values. + enabled: Whether to report the metric by default. + Returns: A new ``Observer`` metric instrument. + """ + @abc.abstractmethod def get_label_set(self, labels: Dict[str, str]) -> "LabelSet": """Gets a `LabelSet` with the given labels. @@ -367,6 +380,18 @@ def create_metric( # pylint: disable=no-self-use return DefaultMetric() + def register_observer( + self, + callback: ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[ValueT], + label_keys: Sequence[str] = (), + enabled: bool = True, + ) -> "Observer": + return DefaultObserver() + def get_label_set(self, labels: Dict[str, str]) -> "LabelSet": # pylint: disable=no-self-use return DefaultLabelSet() diff --git a/opentelemetry-api/tests/metrics/test_metrics.py b/opentelemetry-api/tests/metrics/test_metrics.py index 788ce57680a..45913ca6720 100644 --- a/opentelemetry-api/tests/metrics/test_metrics.py +++ b/opentelemetry-api/tests/metrics/test_metrics.py @@ -36,17 +36,6 @@ def test_counter_add(self): label_set = metrics.LabelSet() counter.add(1, label_set) - def test_gauge(self): - gauge = metrics.Gauge() - label_set = metrics.LabelSet() - handle = gauge.get_handle(label_set) - self.assertIsInstance(handle, metrics.GaugeHandle) - - def test_gauge_set(self): - gauge = metrics.Gauge() - label_set = metrics.LabelSet() - gauge.set(1, label_set) - def test_measure(self): measure = metrics.Measure() label_set = metrics.LabelSet() @@ -65,10 +54,6 @@ def test_counter_handle(self): handle = metrics.CounterHandle() handle.add(1) - def test_gauge_handle(self): - handle = metrics.GaugeHandle() - handle.set(1) - def test_measure_handle(self): handle = metrics.MeasureHandle() handle.record(1) diff --git a/opentelemetry-api/tests/test_implementation.py b/opentelemetry-api/tests/test_implementation.py index 0035803bd26..7271eb51399 100644 --- a/opentelemetry-api/tests/test_implementation.py +++ b/opentelemetry-api/tests/test_implementation.py @@ -13,6 +13,7 @@ # limitations under the License. import unittest +from unittest import mock from opentelemetry import metrics, trace @@ -80,6 +81,12 @@ def test_create_metric(self): metric = meter.create_metric("", "", "", float, metrics.Counter) self.assertIsInstance(metric, metrics.DefaultMetric) + def test_register_observer(self): + meter = metrics.DefaultMeter() + callback = mock.Mock() + observer = meter.register_observer(callback, "", "", "", int, (), True) + self.assertIsInstance(observer, metrics.DefaultObserver) + def test_get_label_set(self): meter = metrics.DefaultMeter() label_set = meter.get_label_set({}) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index fdf145d87ca..fc0fe6ae521 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -100,13 +100,6 @@ def add(self, value: metrics_api.ValueT) -> None: self.update(value) -class GaugeHandle(metrics_api.GaugeHandle, BaseHandle): - def set(self, value: metrics_api.ValueT) -> None: - """See `opentelemetry.metrics.GaugeHandle.set`.""" - if self._validate_update(value): - self.update(value) - - class MeasureHandle(metrics_api.MeasureHandle, BaseHandle): def record(self, value: metrics_api.ValueT) -> None: """See `opentelemetry.metrics.MeasureHandle.record`.""" @@ -158,7 +151,7 @@ def get_handle(self, label_set: LabelSet) -> BaseHandle: return handle def __repr__(self): - return '{}(name="{}", description={})'.format( + return '{}(name="{}", description="{}")'.format( type(self).__name__, self.name, self.description ) @@ -198,14 +191,24 @@ def add(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: UPDATE_FUNCTION = add -class Gauge(Metric, metrics_api.Gauge): - """See `opentelemetry.metrics.Gauge`. - """ +class Measure(Metric, metrics_api.Measure): + """See `opentelemetry.metrics.Measure`.""" + + HANDLE_TYPE = MeasureHandle + + def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: + """See `opentelemetry.metrics.Measure.record`.""" + self.get_handle(label_set).record(value) + + UPDATE_FUNCTION = record - HANDLE_TYPE = GaugeHandle + +class Observer(metrics_api.Observer): + """See `opentelemetry.metrics.Observer`.""" def __init__( self, + callback: metrics_api.ObserverCallbackT, name: str, description: str, unit: str, @@ -214,40 +217,59 @@ def __init__( label_keys: Sequence[str] = (), enabled: bool = True, ): - super().__init__( - name, - description, - unit, - value_type, - meter, - label_keys=label_keys, - enabled=enabled, - ) - - def set(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: - """See `opentelemetry.metrics.Gauge.set`.""" - self.get_handle(label_set).set(value) - - UPDATE_FUNCTION = set - + self.callback = callback + self.name = name + self.description = description + self.unit = unit + self.value_type = value_type + self.meter = meter + self.label_keys = label_keys + self.enabled = enabled -class Measure(Metric, metrics_api.Measure): - """See `opentelemetry.metrics.Measure`.""" + self.aggregators = {} - HANDLE_TYPE = MeasureHandle + def observe(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: + if not self.enabled: + return + if not isinstance(value, self.value_type): + logger.warning( + "Invalid value passed for %s.", self.value_type.__name__ + ) + return - def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None: - """See `opentelemetry.metrics.Measure.record`.""" - self.get_handle(label_set).record(value) + if label_set not in self.aggregators: + # TODO: how to cleanup aggregators? + self.aggregators[label_set] = self.meter.batcher.aggregator_for( + self.__class__ + ) + aggregator = self.aggregators[label_set] + aggregator.update(value) + + def run(self) -> bool: + try: + self.callback(self) + # pylint: disable=broad-except + except Exception as exc: + logger.warning( + "Exception while executing observer callback: %s.", exc + ) + return False + return True - UPDATE_FUNCTION = record + def __repr__(self): + return '{}(name="{}", description="{}")'.format( + type(self).__name__, self.name, self.description + ) class Record: """Container class used for processing in the `Batcher`""" def __init__( - self, metric: Metric, label_set: LabelSet, aggregator: Aggregator + self, + metric: metrics_api.MetricT, + label_set: LabelSet, + aggregator: Aggregator, ): self.metric = metric self.label_set = label_set @@ -271,6 +293,7 @@ def __init__( ): self.instrumentation_info = instrumentation_info self.metrics = set() + self.observers = set() self.batcher = UngroupedBatcher(stateful) def collect(self) -> None: @@ -280,6 +303,11 @@ def collect(self) -> None: each aggregator belonging to the metrics that were created with this meter instance. """ + + self._collect_metrics() + self._collect_observers() + + def _collect_metrics(self) -> None: for metric in self.metrics: if metric.enabled: for label_set, handle in metric.handles.items(): @@ -289,6 +317,19 @@ def collect(self) -> None: # Applies different batching logic based on type of batcher self.batcher.process(record) + def _collect_observers(self) -> None: + for observer in self.observers: + if not observer.enabled: + continue + + # TODO: capture timestamp? + if not observer.run(): + continue + + for label_set, aggregator in observer.aggregators.items(): + record = Record(observer, label_set, aggregator) + self.batcher.process(record) + def record_batch( self, label_set: LabelSet, @@ -322,6 +363,29 @@ def create_metric( self.metrics.add(metric) return metric + def register_observer( + self, + callback: metrics_api.ObserverCallbackT, + name: str, + description: str, + unit: str, + value_type: Type[metrics_api.ValueT], + label_keys: Sequence[str] = (), + enabled: bool = True, + ) -> metrics_api.Observer: + ob = Observer( + callback, + name, + description, + unit, + value_type, + self, + label_keys, + enabled, + ) + self.observers.add(ob) + return ob + def get_label_set(self, labels: Dict[str, str]): """See `opentelemetry.metrics.Meter.create_metric`. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py index f082cce8919..5b730cc8040 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py @@ -111,3 +111,32 @@ def merge(self, other): self.checkpoint = self._merge_checkpoint( self.checkpoint, other.checkpoint ) + + +class ObserverAggregator(Aggregator): + """Same as MinMaxSumCount but also with last value.""" + + _TYPE = namedtuple("minmaxsumcountlast", "min max sum count last") + + def __init__(self): + super().__init__() + self.mmsc = MinMaxSumCountAggregator() + self.current = None + self.checkpoint = self._TYPE(None, None, None, 0, None) + + def update(self, value): + self.mmsc.update(value) + self.current = value + + def take_checkpoint(self): + self.mmsc.take_checkpoint() + self.checkpoint = self._TYPE(*(self.mmsc.checkpoint + (self.current,))) + + def merge(self, other): + self.mmsc.merge(other.mmsc) + self.checkpoint = self._TYPE( + *( + self.mmsc.checkpoint + + (other.checkpoint.last or self.checkpoint.last,) + ) + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py index 86ddc3fcc16..f4418c61399 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/batcher.py @@ -15,12 +15,13 @@ import abc from typing import Sequence, Type -from opentelemetry.metrics import Counter, Measure, MetricT +from opentelemetry.metrics import Counter, Measure, MetricT, Observer from opentelemetry.sdk.metrics.export import MetricRecord from opentelemetry.sdk.metrics.export.aggregate import ( Aggregator, CounterAggregator, MinMaxSumCountAggregator, + ObserverAggregator, ) @@ -50,6 +51,8 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator: return CounterAggregator() if issubclass(metric_type, Measure): return MinMaxSumCountAggregator() + if issubclass(metric_type, Observer): + return ObserverAggregator() # TODO: Add other aggregators return CounterAggregator() diff --git a/opentelemetry-sdk/tests/metrics/export/test_export.py b/opentelemetry-sdk/tests/metrics/export/test_export.py index cd12bcbb6bf..3aab1632ec3 100644 --- a/opentelemetry-sdk/tests/metrics/export/test_export.py +++ b/opentelemetry-sdk/tests/metrics/export/test_export.py @@ -25,6 +25,7 @@ from opentelemetry.sdk.metrics.export.aggregate import ( CounterAggregator, MinMaxSumCountAggregator, + ObserverAggregator, ) from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher from opentelemetry.sdk.metrics.export.controller import PushController @@ -425,6 +426,92 @@ def test_concurrent_update_and_checkpoint(self): self.assertEqual(checkpoint_total, fut.result()) +class TestObserverAggregator(unittest.TestCase): + def test_update(self): + observer = ObserverAggregator() + # test current values without any update + self.assertEqual( + observer.mmsc.current, (None, None, None, 0), + ) + self.assertIsNone(observer.current) + + # call update with some values + values = (3, 50, 3, 97, 27) + for val in values: + observer.update(val) + + self.assertEqual( + observer.mmsc.current, + (min(values), max(values), sum(values), len(values)), + ) + + self.assertEqual(observer.current, values[-1]) + + def test_checkpoint(self): + observer = ObserverAggregator() + + # take checkpoint wihtout any update + observer.take_checkpoint() + self.assertEqual( + observer.checkpoint, (None, None, None, 0, None), + ) + + # call update with some values + values = (3, 50, 3, 97) + for val in values: + observer.update(val) + + observer.take_checkpoint() + self.assertEqual( + observer.checkpoint, + (min(values), max(values), sum(values), len(values), values[-1]), + ) + + def test_merge(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + mmsc_checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2) + + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + checkpoint2 = ObserverAggregator._TYPE(*(mmsc_checkpoint2 + (27,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer2.mmsc.checkpoint = mmsc_checkpoint2 + + observer1.checkpoint = checkpoint1 + observer2.checkpoint = checkpoint2 + + observer1.merge(observer2) + + self.assertEqual( + observer1.checkpoint, + ( + min(checkpoint1.min, checkpoint2.min), + max(checkpoint1.max, checkpoint2.max), + checkpoint1.sum + checkpoint2.sum, + checkpoint1.count + checkpoint2.count, + checkpoint2.last, + ), + ) + + def test_merge_with_empty(self): + observer1 = ObserverAggregator() + observer2 = ObserverAggregator() + + mmsc_checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3) + checkpoint1 = ObserverAggregator._TYPE(*(mmsc_checkpoint1 + (23,))) + + observer1.mmsc.checkpoint = mmsc_checkpoint1 + observer1.checkpoint = checkpoint1 + + observer1.merge(observer2) + + self.assertEqual(observer1.checkpoint, checkpoint1) + + class TestController(unittest.TestCase): def test_push_controller(self): meter = mock.Mock() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index daba171c51c..ea20cdd5930 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -62,6 +62,23 @@ def test_collect_disabled_metric(self): meter.collect() self.assertFalse(batcher_mock.process.called) + def test_collect_observers(self): + meter = metrics.MeterProvider().get_meter(__name__) + batcher_mock = mock.Mock() + meter.batcher = batcher_mock + + def callback(observer): + self.assertIsInstance(observer, metrics_api.Observer) + observer.observe(45, meter.get_label_set(())) + + observer = metrics.Observer( + callback, "name", "desc", "unit", int, meter, (), True + ) + + meter.observers.add(observer) + meter.collect() + self.assertTrue(batcher_mock.process.called) + def test_record_batch(self): meter = metrics.MeterProvider().get_meter(__name__) label_keys = ("key1",) @@ -82,14 +99,12 @@ def test_record_batch_multiple(self): counter = metrics.Counter( "name", "desc", "unit", float, meter, label_keys ) - gauge = metrics.Gauge("name", "desc", "unit", int, meter, label_keys) measure = metrics.Measure( "name", "desc", "unit", float, meter, label_keys ) - record_tuples = [(counter, 1.0), (gauge, 5), (measure, 3.0)] + record_tuples = [(counter, 1.0), (measure, 3.0)] meter.record_batch(label_set, record_tuples) self.assertEqual(counter.get_handle(label_set).aggregator.current, 1.0) - self.assertEqual(gauge.get_handle(label_set).aggregator.current, 5.0) self.assertEqual( measure.get_handle(label_set).aggregator.current, (3.0, 3.0, 3.0, 1), @@ -115,28 +130,39 @@ def test_create_metric(self): counter = meter.create_metric( "name", "desc", "unit", int, metrics.Counter, () ) - self.assertTrue(isinstance(counter, metrics.Counter)) + self.assertIsInstance(counter, metrics.Counter) self.assertEqual(counter.value_type, int) self.assertEqual(counter.name, "name") - def test_create_gauge(self): - meter = metrics.MeterProvider().get_meter(__name__) - gauge = meter.create_metric( - "name", "desc", "unit", float, metrics.Gauge, () - ) - self.assertTrue(isinstance(gauge, metrics.Gauge)) - self.assertEqual(gauge.value_type, float) - self.assertEqual(gauge.name, "name") - def test_create_measure(self): meter = metrics.MeterProvider().get_meter(__name__) measure = meter.create_metric( "name", "desc", "unit", float, metrics.Measure, () ) - self.assertTrue(isinstance(measure, metrics.Measure)) + self.assertIsInstance(measure, metrics.Measure) self.assertEqual(measure.value_type, float) self.assertEqual(measure.name, "name") + def test_register_observer(self): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + + observer = meter.register_observer( + callback, "name", "desc", "unit", int, (), True + ) + + self.assertIsInstance(observer, metrics_api.Observer) + self.assertEqual(len(meter.observers), 1) + + self.assertEqual(observer.callback, callback) + self.assertEqual(observer.name, "name") + self.assertEqual(observer.description, "desc") + self.assertEqual(observer.unit, "unit") + self.assertEqual(observer.value_type, int) + self.assertEqual(observer.label_keys, ()) + self.assertTrue(observer.enabled) + def test_get_label_set(self): meter = metrics.MeterProvider().get_meter(__name__) kvp = {"environment": "staging", "a": "z"} @@ -155,7 +181,7 @@ def test_get_label_set_empty(self): class TestMetric(unittest.TestCase): def test_get_handle(self): meter = metrics.MeterProvider().get_meter(__name__) - metric_types = [metrics.Counter, metrics.Gauge, metrics.Measure] + metric_types = [metrics.Counter, metrics.Measure] for _type in metric_types: metric = _type("name", "desc", "unit", int, meter, ("key",)) kvp = {"key": "value"} @@ -176,20 +202,6 @@ def test_add(self): self.assertEqual(handle.aggregator.current, 5) -class TestGauge(unittest.TestCase): - def test_set(self): - meter = metrics.MeterProvider().get_meter(__name__) - metric = metrics.Gauge("name", "desc", "unit", int, meter, ("key",)) - kvp = {"key": "value"} - label_set = meter.get_label_set(kvp) - handle = metric.get_handle(label_set) - metric.set(3, label_set) - self.assertEqual(handle.aggregator.current, 3) - metric.set(2, label_set) - # TODO: Fix once other aggregators implemented - self.assertEqual(handle.aggregator.current, 5) - - class TestMeasure(unittest.TestCase): def test_record(self): meter = metrics.MeterProvider().get_meter(__name__) @@ -206,6 +218,72 @@ def test_record(self): ) +class TestObserver(unittest.TestCase): + def test_observe(self): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.Observer( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + kvp = {"key": "value"} + label_set = meter.get_label_set(kvp) + values = (37, 42, 7, 21) + for val in values: + observer.observe(val, label_set) + self.assertEqual( + observer.aggregators[label_set].mmsc.current, + (min(values), max(values), sum(values), len(values)), + ) + + self.assertEqual(observer.aggregators[label_set].current, values[-1]) + + def test_observe_disabled(self): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.Observer( + None, "name", "desc", "unit", int, meter, ("key",), False + ) + kvp = {"key": "value"} + label_set = meter.get_label_set(kvp) + observer.observe(37, label_set) + self.assertEqual(len(observer.aggregators), 0) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_observe_incorrect_type(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + observer = metrics.Observer( + None, "name", "desc", "unit", int, meter, ("key",), True + ) + kvp = {"key": "value"} + label_set = meter.get_label_set(kvp) + observer.observe(37.0, label_set) + self.assertEqual(len(observer.aggregators), 0) + self.assertTrue(logger_mock.warning.called) + + def test_run(self): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + observer = metrics.Observer( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertTrue(observer.run()) + callback.assert_called_once_with(observer) + + @mock.patch("opentelemetry.sdk.metrics.logger") + def test_run_exception(self, logger_mock): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = mock.Mock() + callback.side_effect = Exception("We have a problem!") + + observer = metrics.Observer( + callback, "name", "desc", "unit", int, meter, (), True + ) + + self.assertFalse(observer.run()) + self.assertTrue(logger_mock.warning.called) + + class TestCounterHandle(unittest.TestCase): def test_add(self): aggregator = export.aggregate.CounterAggregator() @@ -237,38 +315,6 @@ def test_update(self, time_mock): self.assertEqual(handle.aggregator.current, 4.0) -# TODO: fix tests once aggregator implemented -class TestGaugeHandle(unittest.TestCase): - def test_set(self): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, True, aggregator) - handle.set(3) - self.assertEqual(handle.aggregator.current, 3) - - def test_set_disabled(self): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, False, aggregator) - handle.set(3) - self.assertEqual(handle.aggregator.current, 0) - - @mock.patch("opentelemetry.sdk.metrics.logger") - def test_set_incorrect_type(self, logger_mock): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, True, aggregator) - handle.set(3.0) - self.assertEqual(handle.aggregator.current, 0) - self.assertTrue(logger_mock.warning.called) - - @mock.patch("opentelemetry.sdk.metrics.time_ns") - def test_update(self, time_mock): - aggregator = export.aggregate.CounterAggregator() - handle = metrics.GaugeHandle(int, True, aggregator) - time_mock.return_value = 123 - handle.update(4.0) - self.assertEqual(handle.last_update_timestamp, 123) - self.assertEqual(handle.aggregator.current, 4.0) - - class TestMeasureHandle(unittest.TestCase): def test_record(self): aggregator = export.aggregate.MinMaxSumCountAggregator() diff --git a/tox.ini b/tox.ini index 3588ca69495..0423a6cc7ea 100644 --- a/tox.ini +++ b/tox.ini @@ -187,6 +187,7 @@ deps = flake8 isort black + psutil commands_pre = python scripts/eachdist.py install --editable @@ -238,7 +239,7 @@ deps = docker-compose >= 1.25.2 pymongo ~= 3.1 -changedir = +changedir = ext/opentelemetry-ext-docker-tests/tests commands_pre = @@ -246,7 +247,7 @@ commands_pre = -e {toxinidir}/opentelemetry-sdk \ -e {toxinidir}/ext/opentelemetry-ext-pymongo - docker-compose up -d -commands = +commands = pytest {posargs} commands_post =