Skip to content

Commit

Permalink
Throw an error when multiple instruments are registered by the same n…
Browse files Browse the repository at this point in the history
…ame (#1438)
  • Loading branch information
srikanthccv authored Dec 7, 2020
1 parent 6411755 commit 8d195e6
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 69 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ env:
# Otherwise, set variable to the commit of your branch on
# opentelemetry-python-contrib which is compatible with these Core repo
# changes.
CONTRIB_REPO_SHA: fd12b1d624fe44ca17d2c88c0ace39dc80db85df
CONTRIB_REPO_SHA: b37945bdeaf49822b240281d493d053995cc2b7b

jobs:
build:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ var
sdist
develop-eggs
.installed.cfg
pyvenv.cfg
lib
share/
lib64
__pycache__
venv*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ def test_get_collector_metric_type(self):
def test_get_collector_point(self):
aggregator = aggregate.SumAggregator()
int_counter = self._meter.create_counter(
"testName", "testDescription", "unit", int,
"testNameIntCounter", "testDescription", "unit", int,
)
float_counter = self._meter.create_counter(
"testName", "testDescription", "unit", float,
"testNameFloatCounter", "testDescription", "unit", float,
)
valuerecorder = self._meter.create_valuerecorder(
"testName", "testDescription", "unit", float,
"testNameValueRecorder", "testDescription", "unit", float,
)
result = metrics_exporter.get_collector_point(
ExportRecord(
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_export(self):

def test_translate_to_collector(self):
test_metric = self._meter.create_counter(
"testname", "testdesc", "unit", int, self._labels.keys()
"testcollector", "testdesc", "unit", int, self._labels.keys()
)
aggregator = aggregate.SumAggregator()
aggregator.update(123)
Expand All @@ -185,7 +185,9 @@ def test_translate_to_collector(self):
)
self.assertEqual(len(output_metrics), 1)
self.assertIsInstance(output_metrics[0], metrics_pb2.Metric)
self.assertEqual(output_metrics[0].metric_descriptor.name, "testname")
self.assertEqual(
output_metrics[0].metric_descriptor.name, "testcollector"
)
self.assertEqual(
output_metrics[0].metric_descriptor.description, "testdesc"
)
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-instrumentation/tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_ctor(self):
"measures the duration of the outbound HTTP request",
)

def test_ctor_types(self):
def test_ctor_type_client(self):
meter = metrics_api.get_meter(__name__)
recorder = HTTPMetricRecorder(meter, HTTPMetricType.CLIENT)
self.assertEqual(recorder._http_type, HTTPMetricType.CLIENT)
Expand All @@ -81,13 +81,17 @@ def test_ctor_types(self):
)
self.assertIsNone(recorder._server_duration)

def test_ctor_type_server(self):
meter = metrics_api.get_meter(__name__)
recorder = HTTPMetricRecorder(meter, HTTPMetricType.SERVER)
self.assertEqual(recorder._http_type, HTTPMetricType.SERVER)
self.assertTrue(
isinstance(recorder._server_duration, metrics.ValueRecorder)
)
self.assertIsNone(recorder._client_duration)

def test_ctor_type_both(self):
meter = metrics_api.get_meter(__name__)
recorder = HTTPMetricRecorder(meter, HTTPMetricType.BOTH)
self.assertEqual(recorder._http_type, HTTPMetricType.BOTH)
self.assertTrue(
Expand Down
111 changes: 56 additions & 55 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import atexit
import logging
import threading
from typing import Dict, Sequence, Tuple, Type, TypeVar
from typing import Dict, Sequence, Tuple, Type, TypeVar, Union

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export import (
Expand Down Expand Up @@ -356,59 +356,65 @@ def __init__(
):
self.instrumentation_info = instrumentation_info
self.processor = Processor(source.stateful, source.resource)
self.metrics = set()
self.observers = set()
self.metrics_lock = threading.Lock()
self.observers_lock = threading.Lock()
self.instruments = {}
self.instruments_lock = threading.Lock()
self.view_manager = ViewManager()

def _register_instrument(
self, instrument: Union[metrics_api.Metric, metrics_api.Observer]
):
name = instrument.name.strip().lower()
with self.instruments_lock:
if name in self.instruments:
raise ValueError(
"Multiple instruments can't be registered by the same name: ({})".format(
name
)
)
self.instruments[name] = instrument

def collect(self) -> None:
"""Collects all the metrics created with this `Meter` for export.
Utilizes the processor to create checkpoints of the current values in
each aggregator belonging to the metrics that were created with this
meter instance.
"""

self._collect_metrics()
self._collect_observers()

def _collect_metrics(self) -> None:
for metric in self.metrics:
if not metric.enabled:
continue
to_remove = []
with metric.bound_instruments_lock:
for (
labels,
bound_instrument,
) in metric.bound_instruments.items():
for view_data in bound_instrument.view_datas:
with self.instruments_lock:
for instrument in self.instruments.values():
if not instrument.enabled:
continue
if isinstance(instrument, metrics_api.Metric):
to_remove = []
with instrument.bound_instruments_lock:
for (
labels,
bound_instrument,
) in instrument.bound_instruments.items():
for view_data in bound_instrument.view_datas:
accumulation = Accumulation(
instrument,
view_data.labels,
view_data.aggregator,
)
self.processor.process(accumulation)

if bound_instrument.ref_count() == 0:
to_remove.append(labels)

# Remove handles that were released
for labels in to_remove:
del instrument.bound_instruments[labels]
elif isinstance(instrument, metrics_api.Observer):
if not instrument.run():
continue

for labels, aggregator in instrument.aggregators.items():
accumulation = Accumulation(
metric, view_data.labels, view_data.aggregator
instrument, labels, aggregator
)
self.processor.process(accumulation)

if bound_instrument.ref_count() == 0:
to_remove.append(labels)

# Remove handles that were released
for labels in to_remove:
del metric.bound_instruments[labels]

def _collect_observers(self) -> None:
with self.observers_lock:
for observer in self.observers:
if not observer.enabled:
continue

if not observer.run():
continue

for labels, aggregator in observer.aggregators.items():
accumulation = Accumulation(observer, labels, aggregator)
self.processor.process(accumulation)

def record_batch(
self,
labels: Dict[str, str],
Expand All @@ -432,8 +438,7 @@ def create_counter(
counter = Counter(
name, description, unit, value_type, self, enabled=enabled
)
with self.metrics_lock:
self.metrics.add(counter)
self._register_instrument(counter)
return counter

def create_updowncounter(
Expand All @@ -448,8 +453,7 @@ def create_updowncounter(
counter = UpDownCounter(
name, description, unit, value_type, self, enabled=enabled
)
with self.metrics_lock:
self.metrics.add(counter)
self._register_instrument(counter)
return counter

def create_valuerecorder(
Expand All @@ -464,8 +468,7 @@ def create_valuerecorder(
recorder = ValueRecorder(
name, description, unit, value_type, self, enabled=enabled
)
with self.metrics_lock:
self.metrics.add(recorder)
self._register_instrument(recorder)
return recorder

def register_sumobserver(
Expand All @@ -488,8 +491,7 @@ def register_sumobserver(
label_keys,
enabled,
)
with self.observers_lock:
self.observers.add(ob)
self._register_instrument(ob)
return ob

def register_updownsumobserver(
Expand All @@ -512,8 +514,7 @@ def register_updownsumobserver(
label_keys,
enabled,
)
with self.observers_lock:
self.observers.add(ob)
self._register_instrument(ob)
return ob

def register_valueobserver(
Expand All @@ -536,13 +537,13 @@ def register_valueobserver(
label_keys,
enabled,
)
with self.observers_lock:
self.observers.add(ob)
self._register_instrument(ob)
return ob

def unregister_observer(self, observer: metrics_api.Observer) -> None:
with self.observers_lock:
self.observers.remove(observer)
name = observer.name.strip().lower()
with self.instruments_lock:
self.instruments.pop(name)

def register_view(self, view):
self.view_manager.register_view(view)
Expand Down
49 changes: 42 additions & 7 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ def callback(observer):
self.assertIsInstance(observer, metrics_api.Observer)
observer.observe(45, {})

observer = metrics.ValueObserver(
meter.register_valueobserver(
callback, "name", "desc", "unit", int, (), True
)

meter.observers.add(observer)
meter.collect()
self.assertTrue(processor_mock.process.called)

Expand Down Expand Up @@ -164,6 +162,23 @@ def test_create_counter(self):
self.assertIs(meter_provider.resource, resource)
self.assertEqual(counter.meter, meter)

def test_instrument_same_name_error(self):
resource = Mock(spec=resources.Resource)
meter_provider = metrics.MeterProvider(resource=resource)
meter = meter_provider.get_meter(__name__)
counter = meter.create_counter("name", "desc", "unit", int,)
self.assertIsInstance(counter, metrics.Counter)
self.assertEqual(counter.value_type, int)
self.assertEqual(counter.name, "name")
self.assertIs(meter_provider.resource, resource)
self.assertEqual(counter.meter, meter)
with self.assertRaises(ValueError) as ctx:
_ = meter.create_counter("naME", "desc", "unit", int,)
self.assertTrue(
"Multiple instruments can't be registered by the same name: (name)"
in str(ctx.exception)
)

def test_create_updowncounter(self):
meter = metrics.MeterProvider().get_meter(__name__)
updowncounter = meter.create_updowncounter(
Expand Down Expand Up @@ -193,7 +208,7 @@ def test_register_sumobserver(self):
)

self.assertIsInstance(observer, metrics.SumObserver)
self.assertEqual(len(meter.observers), 1)
self.assertEqual(len(meter.instruments), 1)

self.assertEqual(observer.callback, callback)
self.assertEqual(observer.name, "name")
Expand All @@ -213,7 +228,7 @@ def test_register_updownsumobserver(self):
)

self.assertIsInstance(observer, metrics.UpDownSumObserver)
self.assertEqual(len(meter.observers), 1)
self.assertEqual(len(meter.instruments), 1)

self.assertEqual(observer.callback, callback)
self.assertEqual(observer.name, "name")
Expand All @@ -233,7 +248,7 @@ def test_register_valueobserver(self):
)

self.assertIsInstance(observer, metrics.ValueObserver)
self.assertEqual(len(meter.observers), 1)
self.assertEqual(len(meter.instruments), 1)

self.assertEqual(observer.callback, callback)
self.assertEqual(observer.name, "name")
Expand All @@ -253,7 +268,27 @@ def test_unregister_observer(self):
)

meter.unregister_observer(observer)
self.assertEqual(len(meter.observers), 0)
self.assertEqual(len(meter.instruments), 0)

def test_unregister_and_reregister_observer(self):
meter = metrics.MeterProvider().get_meter(__name__)

callback = Mock()

observer = meter.register_valueobserver(
callback,
"nameCaSEinSENsitive",
"desc",
"unit",
int,
metrics.ValueObserver,
)

meter.unregister_observer(observer)
self.assertEqual(len(meter.instruments), 0)
observer = meter.register_valueobserver(
callback, "name", "desc", "unit", int, metrics.ValueObserver
)


class TestMetric(unittest.TestCase):
Expand Down

0 comments on commit 8d195e6

Please sign in to comment.