diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e71b58f94ae..7a7b9354dcb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ env: # Otherwise, set variable to the commit of your branch on # opentelemetry-python-contrib which is compatible with these Core repo # changes. - CONTRIB_REPO_SHA: fd12b1d624fe44ca17d2c88c0ace39dc80db85df + CONTRIB_REPO_SHA: b37945bdeaf49822b240281d493d053995cc2b7b jobs: build: diff --git a/.gitignore b/.gitignore index 5378aadb363..d1687658908 100644 --- a/.gitignore +++ b/.gitignore @@ -16,7 +16,9 @@ var sdist develop-eggs .installed.cfg +pyvenv.cfg lib +share/ lib64 __pycache__ venv*/ diff --git a/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py b/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py index b48a4a5a33d..3b40b8d75ab 100644 --- a/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-opencensus/tests/test_otcollector_metrics_exporter.py @@ -91,13 +91,13 @@ def test_get_collector_metric_type(self): def test_get_collector_point(self): aggregator = aggregate.SumAggregator() int_counter = self._meter.create_counter( - "testName", "testDescription", "unit", int, + "testNameIntCounter", "testDescription", "unit", int, ) float_counter = self._meter.create_counter( - "testName", "testDescription", "unit", float, + "testNameFloatCounter", "testDescription", "unit", float, ) valuerecorder = self._meter.create_valuerecorder( - "testName", "testDescription", "unit", float, + "testNameValueRecorder", "testDescription", "unit", float, ) result = metrics_exporter.get_collector_point( ExportRecord( @@ -168,7 +168,7 @@ def test_export(self): def test_translate_to_collector(self): test_metric = self._meter.create_counter( - "testname", "testdesc", "unit", int, self._labels.keys() + "testcollector", "testdesc", "unit", int, self._labels.keys() ) aggregator = aggregate.SumAggregator() aggregator.update(123) @@ -185,7 +185,9 @@ def test_translate_to_collector(self): ) self.assertEqual(len(output_metrics), 1) self.assertIsInstance(output_metrics[0], metrics_pb2.Metric) - self.assertEqual(output_metrics[0].metric_descriptor.name, "testname") + self.assertEqual( + output_metrics[0].metric_descriptor.name, "testcollector" + ) self.assertEqual( output_metrics[0].metric_descriptor.description, "testdesc" ) diff --git a/opentelemetry-instrumentation/tests/test_metric.py b/opentelemetry-instrumentation/tests/test_metric.py index 8e676c737e2..c0bdcca15ad 100644 --- a/opentelemetry-instrumentation/tests/test_metric.py +++ b/opentelemetry-instrumentation/tests/test_metric.py @@ -72,7 +72,7 @@ def test_ctor(self): "measures the duration of the outbound HTTP request", ) - def test_ctor_types(self): + def test_ctor_type_client(self): meter = metrics_api.get_meter(__name__) recorder = HTTPMetricRecorder(meter, HTTPMetricType.CLIENT) self.assertEqual(recorder._http_type, HTTPMetricType.CLIENT) @@ -81,6 +81,8 @@ def test_ctor_types(self): ) self.assertIsNone(recorder._server_duration) + def test_ctor_type_server(self): + meter = metrics_api.get_meter(__name__) recorder = HTTPMetricRecorder(meter, HTTPMetricType.SERVER) self.assertEqual(recorder._http_type, HTTPMetricType.SERVER) self.assertTrue( @@ -88,6 +90,8 @@ def test_ctor_types(self): ) self.assertIsNone(recorder._client_duration) + def test_ctor_type_both(self): + meter = metrics_api.get_meter(__name__) recorder = HTTPMetricRecorder(meter, HTTPMetricType.BOTH) self.assertEqual(recorder._http_type, HTTPMetricType.BOTH) self.assertTrue( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py index a09047e5231..ca0ec2f967c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py @@ -15,7 +15,7 @@ import atexit import logging import threading -from typing import Dict, Sequence, Tuple, Type, TypeVar +from typing import Dict, Sequence, Tuple, Type, TypeVar, Union from opentelemetry import metrics as metrics_api from opentelemetry.sdk.metrics.export import ( @@ -356,12 +356,23 @@ def __init__( ): self.instrumentation_info = instrumentation_info self.processor = Processor(source.stateful, source.resource) - self.metrics = set() - self.observers = set() - self.metrics_lock = threading.Lock() - self.observers_lock = threading.Lock() + self.instruments = {} + self.instruments_lock = threading.Lock() self.view_manager = ViewManager() + def _register_instrument( + self, instrument: Union[metrics_api.Metric, metrics_api.Observer] + ): + name = instrument.name.strip().lower() + with self.instruments_lock: + if name in self.instruments: + raise ValueError( + "Multiple instruments can't be registered by the same name: ({})".format( + name + ) + ) + self.instruments[name] = instrument + def collect(self) -> None: """Collects all the metrics created with this `Meter` for export. @@ -369,46 +380,41 @@ def collect(self) -> None: each aggregator belonging to the metrics that were created with this meter instance. """ - - self._collect_metrics() - self._collect_observers() - - def _collect_metrics(self) -> None: - for metric in self.metrics: - if not metric.enabled: - continue - to_remove = [] - with metric.bound_instruments_lock: - for ( - labels, - bound_instrument, - ) in metric.bound_instruments.items(): - for view_data in bound_instrument.view_datas: + with self.instruments_lock: + for instrument in self.instruments.values(): + if not instrument.enabled: + continue + if isinstance(instrument, metrics_api.Metric): + to_remove = [] + with instrument.bound_instruments_lock: + for ( + labels, + bound_instrument, + ) in instrument.bound_instruments.items(): + for view_data in bound_instrument.view_datas: + accumulation = Accumulation( + instrument, + view_data.labels, + view_data.aggregator, + ) + self.processor.process(accumulation) + + if bound_instrument.ref_count() == 0: + to_remove.append(labels) + + # Remove handles that were released + for labels in to_remove: + del instrument.bound_instruments[labels] + elif isinstance(instrument, metrics_api.Observer): + if not instrument.run(): + continue + + for labels, aggregator in instrument.aggregators.items(): accumulation = Accumulation( - metric, view_data.labels, view_data.aggregator + instrument, labels, aggregator ) self.processor.process(accumulation) - if bound_instrument.ref_count() == 0: - to_remove.append(labels) - - # Remove handles that were released - for labels in to_remove: - del metric.bound_instruments[labels] - - def _collect_observers(self) -> None: - with self.observers_lock: - for observer in self.observers: - if not observer.enabled: - continue - - if not observer.run(): - continue - - for labels, aggregator in observer.aggregators.items(): - accumulation = Accumulation(observer, labels, aggregator) - self.processor.process(accumulation) - def record_batch( self, labels: Dict[str, str], @@ -432,8 +438,7 @@ def create_counter( counter = Counter( name, description, unit, value_type, self, enabled=enabled ) - with self.metrics_lock: - self.metrics.add(counter) + self._register_instrument(counter) return counter def create_updowncounter( @@ -448,8 +453,7 @@ def create_updowncounter( counter = UpDownCounter( name, description, unit, value_type, self, enabled=enabled ) - with self.metrics_lock: - self.metrics.add(counter) + self._register_instrument(counter) return counter def create_valuerecorder( @@ -464,8 +468,7 @@ def create_valuerecorder( recorder = ValueRecorder( name, description, unit, value_type, self, enabled=enabled ) - with self.metrics_lock: - self.metrics.add(recorder) + self._register_instrument(recorder) return recorder def register_sumobserver( @@ -488,8 +491,7 @@ def register_sumobserver( label_keys, enabled, ) - with self.observers_lock: - self.observers.add(ob) + self._register_instrument(ob) return ob def register_updownsumobserver( @@ -512,8 +514,7 @@ def register_updownsumobserver( label_keys, enabled, ) - with self.observers_lock: - self.observers.add(ob) + self._register_instrument(ob) return ob def register_valueobserver( @@ -536,13 +537,13 @@ def register_valueobserver( label_keys, enabled, ) - with self.observers_lock: - self.observers.add(ob) + self._register_instrument(ob) return ob def unregister_observer(self, observer: metrics_api.Observer) -> None: - with self.observers_lock: - self.observers.remove(observer) + name = observer.name.strip().lower() + with self.instruments_lock: + self.instruments.pop(name) def register_view(self, view): self.view_manager.register_view(view) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 1697d8e6c85..32c22c8c6bb 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -118,11 +118,9 @@ def callback(observer): self.assertIsInstance(observer, metrics_api.Observer) observer.observe(45, {}) - observer = metrics.ValueObserver( + meter.register_valueobserver( callback, "name", "desc", "unit", int, (), True ) - - meter.observers.add(observer) meter.collect() self.assertTrue(processor_mock.process.called) @@ -164,6 +162,23 @@ def test_create_counter(self): self.assertIs(meter_provider.resource, resource) self.assertEqual(counter.meter, meter) + def test_instrument_same_name_error(self): + resource = Mock(spec=resources.Resource) + meter_provider = metrics.MeterProvider(resource=resource) + meter = meter_provider.get_meter(__name__) + counter = meter.create_counter("name", "desc", "unit", int,) + self.assertIsInstance(counter, metrics.Counter) + self.assertEqual(counter.value_type, int) + self.assertEqual(counter.name, "name") + self.assertIs(meter_provider.resource, resource) + self.assertEqual(counter.meter, meter) + with self.assertRaises(ValueError) as ctx: + _ = meter.create_counter("naME", "desc", "unit", int,) + self.assertTrue( + "Multiple instruments can't be registered by the same name: (name)" + in str(ctx.exception) + ) + def test_create_updowncounter(self): meter = metrics.MeterProvider().get_meter(__name__) updowncounter = meter.create_updowncounter( @@ -193,7 +208,7 @@ def test_register_sumobserver(self): ) self.assertIsInstance(observer, metrics.SumObserver) - self.assertEqual(len(meter.observers), 1) + self.assertEqual(len(meter.instruments), 1) self.assertEqual(observer.callback, callback) self.assertEqual(observer.name, "name") @@ -213,7 +228,7 @@ def test_register_updownsumobserver(self): ) self.assertIsInstance(observer, metrics.UpDownSumObserver) - self.assertEqual(len(meter.observers), 1) + self.assertEqual(len(meter.instruments), 1) self.assertEqual(observer.callback, callback) self.assertEqual(observer.name, "name") @@ -233,7 +248,7 @@ def test_register_valueobserver(self): ) self.assertIsInstance(observer, metrics.ValueObserver) - self.assertEqual(len(meter.observers), 1) + self.assertEqual(len(meter.instruments), 1) self.assertEqual(observer.callback, callback) self.assertEqual(observer.name, "name") @@ -253,7 +268,27 @@ def test_unregister_observer(self): ) meter.unregister_observer(observer) - self.assertEqual(len(meter.observers), 0) + self.assertEqual(len(meter.instruments), 0) + + def test_unregister_and_reregister_observer(self): + meter = metrics.MeterProvider().get_meter(__name__) + + callback = Mock() + + observer = meter.register_valueobserver( + callback, + "nameCaSEinSENsitive", + "desc", + "unit", + int, + metrics.ValueObserver, + ) + + meter.unregister_observer(observer) + self.assertEqual(len(meter.instruments), 0) + observer = meter.register_valueobserver( + callback, "name", "desc", "unit", int, metrics.ValueObserver + ) class TestMetric(unittest.TestCase):