From 872975b6be5b8022ee7501221bdb6261624f035e Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Fri, 25 Sep 2020 07:31:35 -0700 Subject: [PATCH 1/6] Adding metric collection as part of instrumentations - Requests (#1116) --- docs/examples/basic_meter/http.py | 42 +++++++ docs/instrumentation/instrumentation.rst | 1 + docs/instrumentation/metric.rst | 7 ++ .../CHANGELOG.md | 2 + .../instrumentation/requests/__init__.py | 103 ++++++++++++------ .../tests/test_requests_integration.py | 57 ++++++++++ opentelemetry-instrumentation/CHANGELOG.md | 2 + opentelemetry-instrumentation/setup.cfg | 1 + .../opentelemetry/instrumentation/metric.py | 85 +++++++++++++++ .../tests/test_metric.py | 87 +++++++++++++++ tox.ini | 2 +- 11 files changed, 353 insertions(+), 36 deletions(-) create mode 100644 docs/examples/basic_meter/http.py create mode 100644 docs/instrumentation/metric.rst create mode 100644 opentelemetry-instrumentation/src/opentelemetry/instrumentation/metric.py create mode 100644 opentelemetry-instrumentation/tests/test_metric.py diff --git a/docs/examples/basic_meter/http.py b/docs/examples/basic_meter/http.py new file mode 100644 index 00000000000..8fd6c6294c1 --- /dev/null +++ b/docs/examples/basic_meter/http.py @@ -0,0 +1,42 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module shows how you can enable collection and exporting of http metrics +related to instrumentations. +""" +import requests + +from opentelemetry import metrics +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter + +# Sets the global MeterProvider instance +metrics.set_meter_provider(MeterProvider()) + +# Exporter to export metrics to the console +exporter = ConsoleMetricsExporter() + +# Instrument the requests library +RequestsInstrumentor().instrument() + +# Indicate to start collecting and exporting requests related metrics +metrics.get_meter_provider().start_pipeline( + RequestsInstrumentor().meter, exporter, 5 +) + +response = requests.get("http://example.com") + +input("...\n") diff --git a/docs/instrumentation/instrumentation.rst b/docs/instrumentation/instrumentation.rst index 9c01b6b6f4f..16b82a20934 100644 --- a/docs/instrumentation/instrumentation.rst +++ b/docs/instrumentation/instrumentation.rst @@ -13,3 +13,4 @@ Submodules :maxdepth: 1 instrumentor + metric diff --git a/docs/instrumentation/metric.rst b/docs/instrumentation/metric.rst new file mode 100644 index 00000000000..6a69eeeca50 --- /dev/null +++ b/docs/instrumentation/metric.rst @@ -0,0 +1,7 @@ +opentelemetry.instrumentation.metric package +============================================ + +.. automodule:: opentelemetry.instrumentation.metric + :members: + :undoc-members: + :show-inheritance: diff --git a/instrumentation/opentelemetry-instrumentation-requests/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-requests/CHANGELOG.md index 5b876bd4ad3..c2e4dcdbb92 100644 --- a/instrumentation/opentelemetry-instrumentation-requests/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-requests/CHANGELOG.md @@ -10,6 +10,8 @@ Released 2020-09-17 ([#1040](https://github.com/open-telemetry/opentelemetry-python/pull/1040)) - Drop support for Python 3.4 ([#1099](https://github.com/open-telemetry/opentelemetry-python/pull/1099)) +- Add support for http metrics + ([#1116](https://github.com/open-telemetry/opentelemetry-python/pull/1116)) ## Version 0.12b0 diff --git a/instrumentation/opentelemetry-instrumentation-requests/src/opentelemetry/instrumentation/requests/__init__.py b/instrumentation/opentelemetry-instrumentation-requests/src/opentelemetry/instrumentation/requests/__init__.py index fef67c5d0da..d0336184ed2 100644 --- a/instrumentation/opentelemetry-instrumentation-requests/src/opentelemetry/instrumentation/requests/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-requests/src/opentelemetry/instrumentation/requests/__init__.py @@ -25,7 +25,7 @@ import opentelemetry.instrumentation.requests # You can optionally pass a custom TracerProvider to - RequestInstrumentor.instrument() + # RequestInstrumentor.instrument() opentelemetry.instrumentation.requests.RequestsInstrumentor().instrument() response = requests.get(url="https://www.example.org/") @@ -43,6 +43,10 @@ from opentelemetry import context, propagators from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.metric import ( + HTTPMetricRecorder, + MetricMixin, +) from opentelemetry.instrumentation.requests.version import __version__ from opentelemetry.instrumentation.utils import http_status_to_canonical_code from opentelemetry.trace import SpanKind, get_tracer @@ -54,6 +58,7 @@ # pylint: disable=unused-argument +# pylint: disable=R0915 def _instrument(tracer_provider=None, span_callback=None): """Enables tracing of all requests calls that go through :code:`requests.session.Session.request` (this includes @@ -118,43 +123,66 @@ def _instrumented_requests_call( exception = None + recorder = RequestsInstrumentor().metric_recorder + + labels = {} + labels["http.method"] = method + labels["http.url"] = url + with get_tracer( __name__, __version__, tracer_provider ).start_as_current_span(span_name, kind=SpanKind.CLIENT) as span: - if span.is_recording(): - span.set_attribute("component", "http") - span.set_attribute("http.method", method.upper()) - span.set_attribute("http.url", url) - - headers = get_or_create_headers() - propagators.inject(type(headers).__setitem__, headers) - - token = context.attach( - context.set_value(_SUPPRESS_REQUESTS_INSTRUMENTATION_KEY, True) - ) - try: - result = call_wrapped() # *** PROCEED - except Exception as exc: # pylint: disable=W0703 - exception = exc - result = getattr(exc, "response", None) - finally: - context.detach(token) - - if exception is not None and span.is_recording(): - span.set_status( - Status(_exception_to_canonical_code(exception)) + with recorder.record_duration(labels): + if span.is_recording(): + span.set_attribute("component", "http") + span.set_attribute("http.method", method) + span.set_attribute("http.url", url) + + headers = get_or_create_headers() + propagators.inject(type(headers).__setitem__, headers) + + token = context.attach( + context.set_value( + _SUPPRESS_REQUESTS_INSTRUMENTATION_KEY, True + ) ) - span.record_exception(exception) - - if result is not None and span.is_recording(): - span.set_attribute("http.status_code", result.status_code) - span.set_attribute("http.status_text", result.reason) - span.set_status( - Status(http_status_to_canonical_code(result.status_code)) - ) - - if span_callback is not None: - span_callback(span, result) + try: + result = call_wrapped() # *** PROCEED + except Exception as exc: # pylint: disable=W0703 + exception = exc + result = getattr(exc, "response", None) + finally: + context.detach(token) + + if exception is not None and span.is_recording(): + span.set_status( + Status(_exception_to_canonical_code(exception)) + ) + span.record_exception(exception) + + if result is not None: + if span.is_recording(): + span.set_attribute( + "http.status_code", result.status_code + ) + span.set_attribute("http.status_text", result.reason) + span.set_status( + Status( + http_status_to_canonical_code( + result.status_code + ) + ) + ) + labels["http.status_code"] = str(result.status_code) + labels["http.status_text"] = result.reason + if result.raw and result.raw.version: + labels["http.flavor"] = ( + str(result.raw.version)[:1] + + "." + + str(result.raw.version)[:-1] + ) + if span_callback is not None: + span_callback(span, result) if exception is not None: raise exception.with_traceback(exception.__traceback__) @@ -202,7 +230,7 @@ def _exception_to_canonical_code(exc: Exception) -> StatusCanonicalCode: return StatusCanonicalCode.UNKNOWN -class RequestsInstrumentor(BaseInstrumentor): +class RequestsInstrumentor(BaseInstrumentor, MetricMixin): """An instrumentor for requests See `BaseInstrumentor` """ @@ -219,6 +247,11 @@ def _instrument(self, **kwargs): tracer_provider=kwargs.get("tracer_provider"), span_callback=kwargs.get("span_callback"), ) + self.init_metrics( + __name__, __version__, + ) + # pylint: disable=W0201 + self.metric_recorder = HTTPMetricRecorder(self.meter, SpanKind.CLIENT) def _uninstrument(self, **kwargs): _uninstrument() diff --git a/instrumentation/opentelemetry-instrumentation-requests/tests/test_requests_integration.py b/instrumentation/opentelemetry-instrumentation-requests/tests/test_requests_integration.py index c3457b73923..2d3636284bb 100644 --- a/instrumentation/opentelemetry-instrumentation-requests/tests/test_requests_integration.py +++ b/instrumentation/opentelemetry-instrumentation-requests/tests/test_requests_integration.py @@ -22,6 +22,7 @@ from opentelemetry import context, propagators, trace from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.sdk import resources +from opentelemetry.sdk.util import get_dict_as_key from opentelemetry.test.mock_textmap import MockTextMapPropagator from opentelemetry.test.test_base import TestBase from opentelemetry.trace.status import StatusCanonicalCode @@ -88,6 +89,27 @@ def test_basic(self): span, opentelemetry.instrumentation.requests ) + self.assertIsNotNone(RequestsInstrumentor().meter) + self.assertEqual(len(RequestsInstrumentor().meter.metrics), 1) + recorder = RequestsInstrumentor().meter.metrics.pop() + match_key = get_dict_as_key( + { + "http.flavor": "1.1", + "http.method": "GET", + "http.status_code": "200", + "http.status_text": "OK", + "http.url": "http://httpbin.org/status/200", + } + ) + for key in recorder.bound_instruments.keys(): + self.assertEqual(key, match_key) + # pylint: disable=protected-access + bound = recorder.bound_instruments.get(key) + for view_data in bound.view_datas: + self.assertEqual(view_data.labels, key) + self.assertEqual(view_data.aggregator.current.count, 1) + self.assertGreater(view_data.aggregator.current.sum, 0) + def test_not_foundbasic(self): url_404 = "http://httpbin.org/status/404" httpretty.register_uri( @@ -246,6 +268,23 @@ def test_requests_exception_without_response(self, *_, **__): span.status.canonical_code, StatusCanonicalCode.UNKNOWN ) + self.assertIsNotNone(RequestsInstrumentor().meter) + self.assertEqual(len(RequestsInstrumentor().meter.metrics), 1) + recorder = RequestsInstrumentor().meter.metrics.pop() + match_key = get_dict_as_key( + { + "http.method": "GET", + "http.url": "http://httpbin.org/status/200", + } + ) + for key in recorder.bound_instruments.keys(): + self.assertEqual(key, match_key) + # pylint: disable=protected-access + bound = recorder.bound_instruments.get(key) + for view_data in bound.view_datas: + self.assertEqual(view_data.labels, key) + self.assertEqual(view_data.aggregator.current.count, 1) + mocked_response = requests.Response() mocked_response.status_code = 500 mocked_response.reason = "Internal Server Error" @@ -272,6 +311,24 @@ def test_requests_exception_with_response(self, *_, **__): self.assertEqual( span.status.canonical_code, StatusCanonicalCode.INTERNAL ) + self.assertIsNotNone(RequestsInstrumentor().meter) + self.assertEqual(len(RequestsInstrumentor().meter.metrics), 1) + recorder = RequestsInstrumentor().meter.metrics.pop() + match_key = get_dict_as_key( + { + "http.method": "GET", + "http.status_code": "500", + "http.status_text": "Internal Server Error", + "http.url": "http://httpbin.org/status/200", + } + ) + for key in recorder.bound_instruments.keys(): + self.assertEqual(key, match_key) + # pylint: disable=protected-access + bound = recorder.bound_instruments.get(key) + for view_data in bound.view_datas: + self.assertEqual(view_data.labels, key) + self.assertEqual(view_data.aggregator.current.count, 1) @mock.patch("requests.adapters.HTTPAdapter.send", side_effect=Exception) def test_requests_basic_exception(self, *_, **__): diff --git a/opentelemetry-instrumentation/CHANGELOG.md b/opentelemetry-instrumentation/CHANGELOG.md index 37f50051030..13c01cc32a6 100644 --- a/opentelemetry-instrumentation/CHANGELOG.md +++ b/opentelemetry-instrumentation/CHANGELOG.md @@ -10,6 +10,8 @@ Released 2020-09-17 - Drop support for Python 3.4 ([#1099](https://github.com/open-telemetry/opentelemetry-python/pull/1099)) +- Add support for http metrics + ([#1116](https://github.com/open-telemetry/opentelemetry-python/pull/1116)) ## 0.9b0 diff --git a/opentelemetry-instrumentation/setup.cfg b/opentelemetry-instrumentation/setup.cfg index 6042f043d90..167036238f8 100644 --- a/opentelemetry-instrumentation/setup.cfg +++ b/opentelemetry-instrumentation/setup.cfg @@ -42,6 +42,7 @@ zip_safe = False include_package_data = True install_requires = opentelemetry-api == 0.14.dev0 + opentelemetry-sdk == 0.14.dev0 wrapt >= 1.0.0, < 2.0.0 [options.packages.find] diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/metric.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/metric.py new file mode 100644 index 00000000000..74445cbff1b --- /dev/null +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/metric.py @@ -0,0 +1,85 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# type: ignore + +""" +OpenTelemetry Instrumentation Metric mixin +""" +import enum +from contextlib import contextmanager +from time import time +from typing import Dict, Optional + +from opentelemetry import metrics +from opentelemetry.sdk.metrics import ValueRecorder + + +class HTTPMetricType(enum.Enum): + CLIENT = 0 + SERVER = 1 + # TODO: Add both + + +class MetricMixin: + """Used to record metrics related to instrumentations.""" + + def init_metrics(self, name: str, version: str): + self._meter = metrics.get_meter(name, version) + + @property + def meter(self): + return self._meter + + +class MetricRecorder: + """Base class for metric recorders of different types.""" + + def __init__(self, meter: Optional[metrics.Meter] = None): + self._meter = meter + + +class HTTPMetricRecorder(MetricRecorder): + """Metric recorder for http instrumentations. Tracks duration.""" + + def __init__( + self, meter: Optional[metrics.Meter], http_type: HTTPMetricType, + ): + super().__init__(meter) + self._http_type = http_type + if self._meter: + self._duration = self._meter.create_metric( + name="{}.{}.duration".format( + "http", self._http_type.name.lower() + ), + description="measures the duration of the {} HTTP request".format( + "inbound" + if self._http_type is HTTPMetricType.SERVER + else "outbound" + ), + unit="ms", + value_type=float, + metric_type=ValueRecorder, + ) + + # Conventions for recording duration can be found at: + # https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/metrics/semantic_conventions/http-metrics.md + @contextmanager + def record_duration(self, labels: Dict[str, str]): + start_time = time() + try: + yield start_time + finally: + if self._meter: + elapsed_time = (time() - start_time) * 1000 + self._duration.record(elapsed_time, labels) diff --git a/opentelemetry-instrumentation/tests/test_metric.py b/opentelemetry-instrumentation/tests/test_metric.py new file mode 100644 index 00000000000..ea8724fc889 --- /dev/null +++ b/opentelemetry-instrumentation/tests/test_metric.py @@ -0,0 +1,87 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# type: ignore + +from unittest import TestCase, mock + +from opentelemetry import metrics as metrics_api +from opentelemetry.instrumentation.metric import ( + HTTPMetricRecorder, + HTTPMetricType, + MetricMixin, +) +from opentelemetry.metrics import set_meter_provider +from opentelemetry.sdk import metrics +from opentelemetry.sdk.util import get_dict_as_key + + +# pylint: disable=protected-access +class TestMetricMixin(TestCase): + @classmethod + def setUpClass(cls): + metrics_api._METER_PROVIDER = None + set_meter_provider(metrics.MeterProvider()) + + @classmethod + def tearDownClass(cls): + metrics_api._METER_PROVIDER = None + + def test_init(self): + mixin = MetricMixin() + mixin.init_metrics("test", 1.0) + meter = mixin.meter + self.assertTrue(isinstance(meter, metrics.Meter)) + self.assertEqual(meter.instrumentation_info.name, "test") + self.assertEqual(meter.instrumentation_info.version, 1.0) + + +class TestHTTPMetricRecorder(TestCase): + @classmethod + def setUpClass(cls): + metrics_api._METER_PROVIDER = None + set_meter_provider(metrics.MeterProvider()) + + @classmethod + def tearDownClass(cls): + metrics_api._METER_PROVIDER = None + + def test_ctor(self): + meter = metrics_api.get_meter(__name__) + recorder = HTTPMetricRecorder(meter, HTTPMetricType.CLIENT) + # pylint: disable=protected-access + self.assertEqual(recorder._http_type, HTTPMetricType.CLIENT) + self.assertTrue(isinstance(recorder._duration, metrics.ValueRecorder)) + self.assertEqual(recorder._duration.name, "http.client.duration") + self.assertEqual( + recorder._duration.description, + "measures the duration of the outbound HTTP request", + ) + + def test_record_duration(self): + meter = metrics_api.get_meter(__name__) + recorder = HTTPMetricRecorder(meter, HTTPMetricType.CLIENT) + labels = {"test": "asd"} + with mock.patch("time.time") as time_patch: + time_patch.return_value = 5.0 + with recorder.record_duration(labels): + labels["test2"] = "asd2" + match_key = get_dict_as_key({"test": "asd", "test2": "asd2"}) + for key in recorder._duration.bound_instruments.keys(): + self.assertEqual(key, match_key) + # pylint: disable=protected-access + bound = recorder._duration.bound_instruments.get(key) + for view_data in bound.view_datas: + self.assertEqual(view_data.labels, key) + self.assertEqual(view_data.aggregator.current.count, 1) + self.assertGreaterEqual(view_data.aggregator.current.sum, 0) diff --git a/tox.ini b/tox.ini index b477d5ba740..806656ced2c 100644 --- a/tox.ini +++ b/tox.ini @@ -396,8 +396,8 @@ deps = commands_pre = pip install -e {toxinidir}/opentelemetry-api \ - -e {toxinidir}/opentelemetry-instrumentation \ -e {toxinidir}/opentelemetry-sdk \ + -e {toxinidir}/opentelemetry-instrumentation \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-requests \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-wsgi \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-flask From b2559409b2bf82e693f3e68ed890dd7fd1fa8eae Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Fri, 25 Sep 2020 21:31:41 +0530 Subject: [PATCH 2/6] Zipkin: Fix OTLP events to Zipkin annotations translation (#1161) --- .../CHANGELOG.md | 1 + .../opentelemetry/exporter/zipkin/__init__.py | 29 +++++++++++-------- .../tests/test_zipkin_exporter.py | 10 ++++++- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/exporter/opentelemetry-exporter-zipkin/CHANGELOG.md b/exporter/opentelemetry-exporter-zipkin/CHANGELOG.md index a980e345684..3801c81dc92 100644 --- a/exporter/opentelemetry-exporter-zipkin/CHANGELOG.md +++ b/exporter/opentelemetry-exporter-zipkin/CHANGELOG.md @@ -4,6 +4,7 @@ - Zipkin exporter now accepts a ``max_tag_value_length`` attribute to customize the maximum allowed size a tag value can have. ([#1151](https://github.com/open-telemetry/opentelemetry-python/pull/1151)) +- Fixed OTLP events to Zipkin annotations translation. ([#1161](https://github.com/open-telemetry/opentelemetry-python/pull/1161)) ## Version 0.13b0 diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py index 5e544275b33..bacfcc278d3 100644 --- a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py @@ -237,20 +237,25 @@ def _extract_tags_from_span(self, span: Span): tags.update(self._extract_tags_from_dict(span.resource.attributes)) return tags - def _extract_annotations_from_events( - self, events - ): # pylint: disable=R0201 - return ( - [ + def _extract_annotations_from_events(self, events): + if not events: + return None + + annotations = [] + for event in events: + attrs = {} + for key, value in event.attributes.items(): + if isinstance(value, str): + value = value[: self.max_tag_value_length] + attrs[key] = value + + annotations.append( { - "timestamp": _nsec_to_usec_round(e.timestamp), - "value": e.name, + "timestamp": _nsec_to_usec_round(event.timestamp), + "value": json.dumps({event.name: attrs}), } - for e in events - ] - if events - else None - ) + ) + return annotations def _nsec_to_usec_round(nsec): diff --git a/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py b/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py index 635594868f2..1979d8459f6 100644 --- a/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py +++ b/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py @@ -223,7 +223,15 @@ def test_export(self): "annotations": [ { "timestamp": event_timestamp // 10 ** 3, - "value": "event0", + "value": json.dumps( + { + "event0": { + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, + } + } + ), } ], "debug": True, From c534a2c9f823311fa6a693004dedccee0156017f Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Tue, 29 Sep 2020 08:17:16 +0530 Subject: [PATCH 3/6] Zipkin: More deterministic unit test for comparing zipkin annotations (#1168) Zipkin annotation values are strings containing JSON documents. We cannot have deterministic ordering of event attributes as they may come in any order and python versions older than 3.7 don't have ordered dicts. We extract the annotations from exported spans, parse the JSON documents into Python dicts and then compare them. --- .../tests/test_zipkin_exporter.py | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py b/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py index 1979d8459f6..6120f2dd6eb 100644 --- a/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py +++ b/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py @@ -98,9 +98,8 @@ def test_constructor_explicit(self): self.assertEqual(exporter.ipv6, ipv6) self.assertEqual(exporter.url, url) - # pylint: disable=too-many-locals + # pylint: disable=too-many-locals,too-many-statements def test_export(self): - span_names = ("test1", "test2", "test3", "test4") trace_id = 0x6E0C63257DE34C926F9EFCD03927272E span_id = 0x34BF92DEEFC58C92 @@ -204,7 +203,7 @@ def test_export(self): local_endpoint = {"serviceName": service_name, "port": 9411} exporter = ZipkinSpanExporter(service_name) - expected = [ + expected_spans = [ { "traceId": format(trace_id, "x"), "id": format(span_id, "x"), @@ -220,22 +219,20 @@ def test_export(self): "otel.status_code": "2", "otel.status_description": "Example description", }, + "debug": True, + "parentId": format(parent_id, "x"), "annotations": [ { "timestamp": event_timestamp // 10 ** 3, - "value": json.dumps( - { - "event0": { - "annotation_bool": True, - "annotation_string": "annotation_test", - "key_float": 0.3, - } + "value": { + "event0": { + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, } - ), + }, } ], - "debug": True, - "parentId": format(parent_id, "x"), }, { "traceId": format(trace_id, "x"), @@ -289,11 +286,21 @@ def test_export(self): status = exporter.export(otel_spans) self.assertEqual(SpanExportResult.SUCCESS, status) - mock_post.assert_called_with( - url="http://localhost:9411/api/v2/spans", - data=json.dumps(expected), - headers={"Content-Type": "application/json"}, + # pylint: disable=unsubscriptable-object + kwargs = mock_post.call_args[1] + + self.assertEqual(kwargs["url"], "http://localhost:9411/api/v2/spans") + actual_spans = sorted( + json.loads(kwargs["data"]), key=lambda span: span["timestamp"] ) + for expected, actual in zip(expected_spans, actual_spans): + expected_annotations = expected.pop("annotations", None) + actual_annotations = actual.pop("annotations", None) + if actual_annotations: + for annotation in actual_annotations: + annotation["value"] = json.loads(annotation["value"]) + self.assertEqual(expected, actual) + self.assertEqual(expected_annotations, actual_annotations) # pylint: disable=too-many-locals def test_zero_padding(self): From 1522b44d98bd8da7b6390a7dcdff5d585c29caae Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Tue, 29 Sep 2020 10:22:11 +0530 Subject: [PATCH 4/6] Added ability to extract span attributes from django request objects. (#1154) OTEL_PYTHON_DJANGO_TRACED_REQUEST_ATTRS env var can be set to a command separated list of attributes names that will be extracted from Django's request object and set as attributes on spans. --- .../CHANGELOG.md | 1 + .../README.rst | 15 +++++++++++ .../instrumentation/django/middleware.py | 11 ++++++++ .../tests/test_middleware.py | 27 +++++++++++++++++++ 4 files changed, 54 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-django/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-django/CHANGELOG.md index d3de446730a..30b13c4a223 100644 --- a/instrumentation/opentelemetry-instrumentation-django/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-django/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Changed span name extraction from request to comply semantic convention ([#992](https://github.com/open-telemetry/opentelemetry-python/pull/992)) +- Added support for `OTEL_PYTHON_DJANGO_TRACED_REQUEST_ATTRS` ([#1154](https://github.com/open-telemetry/opentelemetry-python/pull/1154)) ## Version 0.13b0 diff --git a/instrumentation/opentelemetry-instrumentation-django/README.rst b/instrumentation/opentelemetry-instrumentation-django/README.rst index 5cb570c7e9f..a2b98cabf40 100644 --- a/instrumentation/opentelemetry-instrumentation-django/README.rst +++ b/instrumentation/opentelemetry-instrumentation-django/README.rst @@ -30,6 +30,21 @@ For example, will exclude requests such as ``https://site/client/123/info`` and ``https://site/xyz/healthcheck``. +Request attributes +******************** +To extract certain attributes from Django's request object and use them as span attributes, set the environment variable ``OTEL_PYTHON_DJANGO_TRACED_REQUEST_ATTRS`` to a comma +delimited list of request attribute names. + +For example, + +:: + + export OTEL_PYTHON_DJANGO_TRACED_REQUEST_ATTRS='path_info,content_type' + +will extract path_info and content_type attributes from every traced request and add them as span attritbues. + +Django Request object reference: https://docs.djangoproject.com/en/3.1/ref/request-response/#attributes + References ---------- diff --git a/instrumentation/opentelemetry-instrumentation-django/src/opentelemetry/instrumentation/django/middleware.py b/instrumentation/opentelemetry-instrumentation-django/src/opentelemetry/instrumentation/django/middleware.py index 59f7e6e6229..f468053168d 100644 --- a/instrumentation/opentelemetry-instrumentation-django/src/opentelemetry/instrumentation/django/middleware.py +++ b/instrumentation/opentelemetry-instrumentation-django/src/opentelemetry/instrumentation/django/middleware.py @@ -58,6 +58,13 @@ class _DjangoMiddleware(MiddlewareMixin): else: _excluded_urls = ExcludeList(_excluded_urls) + _traced_request_attrs = [ + attr.strip() + for attr in (Configuration().DJANGO_TRACED_REQUEST_ATTRS or "").split( + "," + ) + ] + @staticmethod def _get_span_name(request): try: @@ -95,6 +102,10 @@ def process_request(self, request): tracer = get_tracer(__name__, __version__) attributes = collect_request_attributes(environ) + for attr in self._traced_request_attrs: + value = getattr(request, attr, None) + if value is not None: + attributes[attr] = str(value) span = tracer.start_span( self._get_span_name(request), diff --git a/instrumentation/opentelemetry-instrumentation-django/tests/test_middleware.py b/instrumentation/opentelemetry-instrumentation-django/tests/test_middleware.py index ee82c5d7d9f..378139d1c53 100644 --- a/instrumentation/opentelemetry-instrumentation-django/tests/test_middleware.py +++ b/instrumentation/opentelemetry-instrumentation-django/tests/test_middleware.py @@ -174,3 +174,30 @@ def test_span_name_404(self): span = span_list[0] self.assertEqual(span.name, "HTTP GET") + + def test_traced_request_attrs(self): + with patch( + "opentelemetry.instrumentation.django.middleware._DjangoMiddleware._traced_request_attrs", + [], + ): + Client().get("/span_name/1234/", CONTENT_TYPE="test/ct") + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + + span = span_list[0] + self.assertNotIn("path_info", span.attributes) + self.assertNotIn("content_type", span.attributes) + self.memory_exporter.clear() + + with patch( + "opentelemetry.instrumentation.django.middleware._DjangoMiddleware._traced_request_attrs", + ["path_info", "content_type", "non_existing_variable"], + ): + Client().get("/span_name/1234/", CONTENT_TYPE="test/ct") + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 1) + + span = span_list[0] + self.assertEqual(span.attributes["path_info"], "/span_name/1234/") + self.assertEqual(span.attributes["content_type"], "test/ct") + self.assertNotIn("non_existing_variable", span.attributes) From 28e3a39ebd8b841834a28186bd7179c6c7190c47 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Tue, 29 Sep 2020 19:58:20 +0530 Subject: [PATCH 5/6] Added ability to extract span attributes from falcon request objects. (#1158) --- .../CHANGELOG.md | 2 ++ .../README.rst | 15 +++++++++ .../instrumentation/falcon/__init__.py | 32 +++++++++++++++---- .../tests/test_falcon.py | 17 ++++++++++ 4 files changed, 60 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-falcon/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-falcon/CHANGELOG.md index e14c730f74b..f398b7460da 100644 --- a/instrumentation/opentelemetry-instrumentation-falcon/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-falcon/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Added support for `OTEL_PYTHON_FALCON_TRACED_REQUEST_ATTRS` ([#1158](https://github.com/open-telemetry/opentelemetry-python/pull/1158)) + ## Version 0.13b0 Released 2020-09-17 diff --git a/instrumentation/opentelemetry-instrumentation-falcon/README.rst b/instrumentation/opentelemetry-instrumentation-falcon/README.rst index f7d5a99d951..8230deaf764 100644 --- a/instrumentation/opentelemetry-instrumentation-falcon/README.rst +++ b/instrumentation/opentelemetry-instrumentation-falcon/README.rst @@ -31,6 +31,21 @@ For example, will exclude requests such as ``https://site/client/123/info`` and ``https://site/xyz/healthcheck``. +Request attributes +******************** +To extract certain attributes from Falcon's request object and use them as span attributes, set the environment variable ``OTEL_PYTHON_FALCON_TRACED_REQUEST_ATTRS`` to a comma +delimited list of request attribute names. + +For example, + +:: + + export OTEL_PYTHON_FALCON_TRACED_REQUEST_ATTRS='query_string,uri_template' + +will extract path_info and content_type attributes from every traced request and add them as span attritbues. + +Falcon Request object reference: https://falcon.readthedocs.io/en/stable/api/request_and_response.html#id1 + References ---------- diff --git a/instrumentation/opentelemetry-instrumentation-falcon/src/opentelemetry/instrumentation/falcon/__init__.py b/instrumentation/opentelemetry-instrumentation-falcon/src/opentelemetry/instrumentation/falcon/__init__.py index 1e67d6101bd..660fc23063c 100644 --- a/instrumentation/opentelemetry-instrumentation-falcon/src/opentelemetry/instrumentation/falcon/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-falcon/src/opentelemetry/instrumentation/falcon/__init__.py @@ -50,6 +50,7 @@ def on_get(self, req, resp): import opentelemetry.instrumentation.wsgi as otel_wsgi from opentelemetry import configuration, context, propagators, trace +from opentelemetry.configuration import Configuration from opentelemetry.instrumentation.falcon.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import http_status_to_canonical_code @@ -92,13 +93,16 @@ def _uninstrument(self, **kwargs): class _InstrumentedFalconAPI(falcon.API): def __init__(self, *args, **kwargs): - mw = kwargs.pop("middleware", []) - if not isinstance(mw, (list, tuple)): - mw = [mw] + middlewares = kwargs.pop("middleware", []) + if not isinstance(middlewares, (list, tuple)): + middlewares = [middlewares] self._tracer = trace.get_tracer(__name__, __version__) - mw.insert(0, _TraceMiddleware(self._tracer)) - kwargs["middleware"] = mw + trace_middleware = _TraceMiddleware( + self._tracer, kwargs.get("traced_request_attributes") + ) + middlewares.insert(0, trace_middleware) + kwargs["middleware"] = middlewares super().__init__(*args, **kwargs) def __call__(self, env, start_response): @@ -144,8 +148,24 @@ def _start_response(status, response_headers, *args, **kwargs): class _TraceMiddleware: # pylint:disable=R0201,W0613 - def __init__(self, tracer=None): + def __init__(self, tracer=None, traced_request_attrs=None): self.tracer = tracer + self._traced_request_attrs = traced_request_attrs or [ + attr.strip() + for attr in ( + Configuration().FALCON_TRACED_REQUEST_ATTRS or "" + ).split(",") + ] + + def process_request(self, req, resp): + span = req.env.get(_ENVIRON_SPAN_KEY) + if not span: + return + + for attr in self._traced_request_attrs: + value = getattr(req, attr, None) + if value is not None: + span.set_attribute(attr, str(value)) def process_resource(self, req, resp, resource, params): span = req.env.get(_ENVIRON_SPAN_KEY) diff --git a/instrumentation/opentelemetry-instrumentation-falcon/tests/test_falcon.py b/instrumentation/opentelemetry-instrumentation-falcon/tests/test_falcon.py index a3d2c5d8d81..5e27e4aacbd 100644 --- a/instrumentation/opentelemetry-instrumentation-falcon/tests/test_falcon.py +++ b/instrumentation/opentelemetry-instrumentation-falcon/tests/test_falcon.py @@ -171,3 +171,20 @@ def test_exclude_lists(self): self.client().simulate_get(path="/hello") span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 1) + + def test_traced_request_attributes(self): + self.client().simulate_get(path="/hello?q=abc") + span = self.memory_exporter.get_finished_spans()[0] + self.assertNotIn("query_string", span.attributes) + self.memory_exporter.clear() + + middleware = self.app._middleware[0][ # pylint:disable=W0212 + 0 + ].__self__ + with patch.object( + middleware, "_traced_request_attrs", ["query_string"] + ): + self.client().simulate_get(path="/hello?q=abc") + span = self.memory_exporter.get_finished_spans()[0] + self.assertIn("query_string", span.attributes) + self.assertEqual(span.attributes["query_string"], "q=abc") From 9be899e84ccb5edcc9830680d5b00029d2583c28 Mon Sep 17 00:00:00 2001 From: Owais Lone Date: Tue, 29 Sep 2020 21:06:05 +0530 Subject: [PATCH 6/6] Added context propagation support to celery instrumentation (#1135) --- .../CHANGELOG.md | 3 + .../README.rst | 22 +++++- .../setup.cfg | 1 + .../instrumentation/celery/__init__.py | 42 ++++++++-- .../tests/celery_test_tasks.py | 29 +++++++ .../tests/test_tasks.py | 78 +++++++++++++++++++ .../tests/celery/test_celery_functional.py | 62 ++++++++------- .../tests/mysql/test_mysql_functional.py | 15 ++-- .../tests/postgres/test_aiopg_functional.py | 18 ++--- .../tests/postgres/test_psycopg_functional.py | 15 ++-- .../tests/pymongo/test_pymongo_functional.py | 12 +-- .../tests/pymysql/test_pymysql_functional.py | 12 +-- 12 files changed, 227 insertions(+), 82 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py create mode 100644 instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py diff --git a/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md index e164a89134b..da615bcc7b3 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Span operation names now include the task type. ([#1135](https://github.com/open-telemetry/opentelemetry-python/pull/1135)) +- Added automatic context propagation. ([#1135](https://github.com/open-telemetry/opentelemetry-python/pull/1135)) + ## Version 0.12b0 Released 2020-08-14 diff --git a/instrumentation/opentelemetry-instrumentation-celery/README.rst b/instrumentation/opentelemetry-instrumentation-celery/README.rst index 42fe6646d1c..307fd352b97 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/README.rst +++ b/instrumentation/opentelemetry-instrumentation-celery/README.rst @@ -29,11 +29,20 @@ Usage .. code-block:: python + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor from opentelemetry.instrumentation.celery import CeleryInstrumentor - CeleryInstrumentor().instrument() - from celery import Celery + from celery.signals import worker_process_init + + @worker_process_init.connect(weak=False) + def init_celery_tracing(*args, **kwargs): + trace.set_tracer_provider(TracerProvider()) + span_processor = BatchExportSpanProcessor(ConsoleSpanExporter()) + trace.get_tracer_provider().add_span_processor(span_processor) + CeleryInstrumentor().instrument() app = Celery("tasks", broker="amqp://localhost") @@ -43,6 +52,15 @@ Usage add.delay(42, 50) + +Setting up tracing +-------------------- + +When tracing a celery worker process, tracing and instrumention both must be initialized after the celery worker +process is initialized. This is required for any tracing components that might use threading to work correctly +such as the BatchExportSpanProcessor. Celery provides a signal called ``worker_process_init`` that can be used to +accomplish this as shown in the example above. + References ---------- * `OpenTelemetry Celery Instrumentation `_ diff --git a/instrumentation/opentelemetry-instrumentation-celery/setup.cfg b/instrumentation/opentelemetry-instrumentation-celery/setup.cfg index 79b63d928a9..b5a039de119 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-celery/setup.cfg @@ -46,6 +46,7 @@ install_requires = [options.extras_require] test = pytest + celery ~= 4.0 opentelemetry-test == 0.14.dev0 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 7e2551142e4..4768e93d18e 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -30,11 +30,20 @@ .. code:: python + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor from opentelemetry.instrumentation.celery import CeleryInstrumentor - CeleryInstrumentor().instrument() - from celery import Celery + from celery.signals import worker_process_init + + @worker_process_init.connect(weak=False) + def init_celery_tracing(*args, **kwargs): + trace.set_tracer_provider(TracerProvider()) + span_processor = BatchExportSpanProcessor(ConsoleSpanExporter()) + trace.get_tracer_provider().add_span_processor(span_processor) + CeleryInstrumentor().instrument() app = Celery("tasks", broker="amqp://localhost") @@ -50,13 +59,15 @@ def add(x, y): import logging import signal +from collections.abc import Iterable from celery import signals # pylint: disable=no-name-in-module -from opentelemetry import trace +from opentelemetry import propagators, trace from opentelemetry.instrumentation.celery import utils from opentelemetry.instrumentation.celery.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.trace.propagation import get_current_span from opentelemetry.trace.status import Status, StatusCanonicalCode logger = logging.getLogger(__name__) @@ -106,9 +117,16 @@ def _trace_prerun(self, *args, **kwargs): if task is None or task_id is None: return + request = task.request + tracectx = propagators.extract(carrier_extractor, request) or {} + parent = get_current_span(tracectx) + logger.debug("prerun signal start task_id=%s", task_id) - span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER) + operation_name = "{0}/{1}".format(_TASK_RUN, task.name) + span = self._tracer.start_span( + operation_name, parent=parent, kind=trace.SpanKind.CONSUMER + ) activation = self._tracer.use_span(span, end_on_exit=True) activation.__enter__() @@ -146,7 +164,10 @@ def _trace_before_publish(self, *args, **kwargs): if task is None or task_id is None: return - span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER) + operation_name = "{0}/{1}".format(_TASK_APPLY_ASYNC, task.name) + span = self._tracer.start_span( + operation_name, kind=trace.SpanKind.PRODUCER + ) # apply some attributes here because most of the data is not available span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) @@ -158,6 +179,10 @@ def _trace_before_publish(self, *args, **kwargs): activation.__enter__() utils.attach_span(task, task_id, (span, activation), is_publish=True) + headers = kwargs.get("headers") + if headers: + propagators.inject(type(headers).__setitem__, headers) + @staticmethod def _trace_after_publish(*args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) @@ -221,3 +246,10 @@ def _trace_retry(*args, **kwargs): # Use `str(reason)` instead of `reason.message` in case we get # something that isn't an `Exception` span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) + + +def carrier_extractor(carrier, key): + value = getattr(carrier, key, []) + if isinstance(value, str) or not isinstance(value, Iterable): + value = (value,) + return value diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py new file mode 100644 index 00000000000..d9660412f04 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py @@ -0,0 +1,29 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from celery import Celery + + +class Config: + result_backend = "rpc" + broker_backend = "memory" + + +app = Celery(broker="memory:///") +app.config_from_object(Config) + + +@app.task +def task_add(num_a, num_b): + return num_a + num_b diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py new file mode 100644 index 00000000000..3a05ebf331a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -0,0 +1,78 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time + +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind + +from .celery_test_tasks import app, task_add + + +class TestCeleryInstrumentation(TestBase): + def setUp(self): + super().setUp() + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + self._thread = threading.Thread(target=self._worker.start) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + + def test_task(self): + CeleryInstrumentor().instrument() + + result = task_add.delay(1, 2) + while not result.ready(): + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assert_span_has_attributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + "messaging.destination": "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assert_span_has_attributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + "messaging.destination_kind": "queue", + "messaging.destination": "celery", + }, + ) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) diff --git a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py index f18c6cdba14..c4be6762ea0 100644 --- a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py +++ b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py @@ -46,21 +46,21 @@ def fn_task(): async_span, run_span = spans - assert ( - async_span.instrumentation_info.name - == opentelemetry.instrumentation.celery.__name__ + assert run_span.parent == async_span.context + assert run_span.parent.span_id == async_span.context.span_id + assert run_span.context.trace_id == async_span.context.trace_id + + assert async_span.instrumentation_info.name == "apply_async/{0}".format( + opentelemetry.instrumentation.celery.__name__ ) - assert ( - async_span.instrumentation_info.version - == opentelemetry.instrumentation.celery.__version__ + assert async_span.instrumentation_info.version == "apply_async/{0}".format( + opentelemetry.instrumentation.celery.__version__ ) - assert ( - run_span.instrumentation_info.name - == opentelemetry.instrumentation.celery.__name__ + assert run_span.instrumentation_info.name == "run/{0}".format( + opentelemetry.instrumentation.celery.__name__ ) - assert ( - run_span.instrumentation_info.version - == opentelemetry.instrumentation.celery.__version__ + assert run_span.instrumentation_info.version == "run/{0}".format( + opentelemetry.instrumentation.celery.__version__ ) @@ -103,7 +103,7 @@ def fn_task(): span = spans[0] assert span.status.is_ok is True - assert span.name == "test_celery_functional.fn_task" + assert span.name == "run/test_celery_functional.fn_task" assert span.attributes.get("messaging.message_id") == t.task_id assert ( span.attributes.get("celery.task_name") @@ -128,7 +128,7 @@ def fn_task(self): span = spans[0] assert span.status.is_ok is True - assert span.name == "test_celery_functional.fn_task" + assert span.name == "run/test_celery_functional.fn_task" assert span.attributes.get("messaging.message_id") == t.task_id assert ( span.attributes.get("celery.task_name") @@ -157,7 +157,10 @@ def fn_task_parameters(user, force_logout=False): assert run_span.context.trace_id != async_span.context.trace_id assert async_span.status.is_ok is True - assert async_span.name == "test_celery_functional.fn_task_parameters" + assert ( + async_span.name + == "apply_async/test_celery_functional.fn_task_parameters" + ) assert async_span.attributes.get("celery.action") == "apply_async" assert async_span.attributes.get("messaging.message_id") == result.task_id assert ( @@ -209,7 +212,10 @@ def fn_task_parameters(user, force_logout=False): assert run_span.context.trace_id != async_span.context.trace_id assert async_span.status.is_ok is True - assert async_span.name == "test_celery_functional.fn_task_parameters" + assert ( + async_span.name + == "apply_async/test_celery_functional.fn_task_parameters" + ) assert async_span.attributes.get("celery.action") == "apply_async" assert async_span.attributes.get("messaging.message_id") == result.task_id assert ( @@ -218,7 +224,7 @@ def fn_task_parameters(user, force_logout=False): ) assert run_span.status.is_ok is True - assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.name == "run/test_celery_functional.fn_task_parameters" assert run_span.attributes.get("celery.action") == "run" assert run_span.attributes.get("celery.state") == "SUCCESS" assert run_span.attributes.get("messaging.message_id") == result.task_id @@ -244,7 +250,7 @@ def fn_exception(): span = spans[0] assert span.status.is_ok is False - assert span.name == "test_celery_functional.fn_exception" + assert span.name == "run/test_celery_functional.fn_exception" assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.state") == "FAILURE" assert ( @@ -273,7 +279,7 @@ def fn_exception(): assert span.status.is_ok is True assert span.status.canonical_code == StatusCanonicalCode.OK - assert span.name == "test_celery_functional.fn_exception" + assert span.name == "run/test_celery_functional.fn_exception" assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.state") == "FAILURE" assert ( @@ -300,7 +306,7 @@ def fn_exception(): assert span.status.is_ok is True assert span.status.canonical_code == StatusCanonicalCode.OK - assert span.name == "test_celery_functional.fn_exception" + assert span.name == "run/test_celery_functional.fn_exception" assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.state") == "RETRY" assert ( @@ -332,7 +338,7 @@ def run(self): span = spans[0] assert span.status.is_ok is True - assert span.name == "test_celery_functional.BaseTask" + assert span.name == "run/test_celery_functional.BaseTask" assert ( span.attributes.get("celery.task_name") == "test_celery_functional.BaseTask" @@ -364,7 +370,7 @@ def run(self): span = spans[0] assert span.status.is_ok is False - assert span.name == "test_celery_functional.BaseTask" + assert span.name == "run/test_celery_functional.BaseTask" assert ( span.attributes.get("celery.task_name") == "test_celery_functional.BaseTask" @@ -401,7 +407,7 @@ def run(self): assert span.status.is_ok is True assert span.status.canonical_code == StatusCanonicalCode.OK - assert span.name == "test_celery_functional.BaseTask" + assert span.name == "run/test_celery_functional.BaseTask" assert span.attributes.get("celery.action") == "run" assert span.attributes.get("celery.state") == "FAILURE" assert span.attributes.get("messaging.message_id") == result.task_id @@ -423,7 +429,7 @@ def add(x, y): span = spans[0] assert span.status.is_ok is True - assert span.name == "test_celery_functional.add" + assert span.name == "run/test_celery_functional.add" assert ( span.attributes.get("celery.task_name") == "test_celery_functional.add" ) @@ -471,7 +477,7 @@ class CelerySubClass(CelerySuperClass): async_span, async_run_span, run_span = spans assert run_span.status.is_ok is True - assert run_span.name == "test_celery_functional.CelerySubClass" + assert run_span.name == "run/test_celery_functional.CelerySubClass" assert ( run_span.attributes.get("celery.task_name") == "test_celery_functional.CelerySubClass" @@ -481,7 +487,7 @@ class CelerySubClass(CelerySuperClass): assert run_span.attributes.get("messaging.message_id") == result.task_id assert async_run_span.status.is_ok is True - assert async_run_span.name == "test_celery_functional.CelerySubClass" + assert async_run_span.name == "run/test_celery_functional.CelerySubClass" assert ( async_run_span.attributes.get("celery.task_name") == "test_celery_functional.CelerySubClass" @@ -493,7 +499,9 @@ class CelerySubClass(CelerySuperClass): ) assert async_span.status.is_ok is True - assert async_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_span.name == "apply_async/test_celery_functional.CelerySubClass" + ) assert ( async_span.attributes.get("celery.task_name") == "test_celery_functional.CelerySubClass" diff --git a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py index 4116f4a19e5..5be0be9f0ec 100644 --- a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py @@ -75,15 +75,13 @@ def validate_spans(self): self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT) def test_execute(self): - """Should create a child span for execute - """ + """Should create a child span for execute""" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") self.validate_spans() def test_execute_with_connection_context_manager(self): - """Should create a child span for execute with connection context - """ + """Should create a child span for execute with connection context""" with self._tracer.start_as_current_span("rootSpan"): with self._connection as conn: cursor = conn.cursor() @@ -91,16 +89,14 @@ def test_execute_with_connection_context_manager(self): self.validate_spans() def test_execute_with_cursor_context_manager(self): - """Should create a child span for execute with cursor context - """ + """Should create a child span for execute with cursor context""" with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") self.validate_spans() def test_executemany(self): - """Should create a child span for executemany - """ + """Should create a child span for executemany""" with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) stmt = "INSERT INTO test (id) VALUES (%s)" @@ -108,8 +104,7 @@ def test_executemany(self): self.validate_spans() def test_callproc(self): - """Should create a child span for callproc - """ + """Should create a child span for callproc""" with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( Exception ): diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py index 9eb209636d9..e7a0d39b51e 100644 --- a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py @@ -85,8 +85,7 @@ def validate_spans(self): self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) def test_execute(self): - """Should create a child span for execute method - """ + """Should create a child span for execute method""" with self._tracer.start_as_current_span("rootSpan"): async_call( self._cursor.execute( @@ -96,8 +95,7 @@ def test_execute(self): self.validate_spans() def test_executemany(self): - """Should create a child span for executemany - """ + """Should create a child span for executemany""" with pytest.raises(psycopg2.ProgrammingError): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) @@ -106,8 +104,7 @@ def test_executemany(self): self.validate_spans() def test_callproc(self): - """Should create a child span for callproc - """ + """Should create a child span for callproc""" with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( Exception ): @@ -169,8 +166,7 @@ def validate_spans(self): self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) def test_execute(self): - """Should create a child span for execute method - """ + """Should create a child span for execute method""" with self._tracer.start_as_current_span("rootSpan"): async_call( self._cursor.execute( @@ -180,8 +176,7 @@ def test_execute(self): self.validate_spans() def test_executemany(self): - """Should create a child span for executemany - """ + """Should create a child span for executemany""" with pytest.raises(psycopg2.ProgrammingError): with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) @@ -190,8 +185,7 @@ def test_executemany(self): self.validate_spans() def test_callproc(self): - """Should create a child span for callproc - """ + """Should create a child span for callproc""" with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( Exception ): diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py index 8a703b00944..27391647818 100644 --- a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py @@ -77,8 +77,7 @@ def validate_spans(self): self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) def test_execute(self): - """Should create a child span for execute method - """ + """Should create a child span for execute method""" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute( "CREATE TABLE IF NOT EXISTS test (id integer)" @@ -86,8 +85,7 @@ def test_execute(self): self.validate_spans() def test_execute_with_connection_context_manager(self): - """Should create a child span for execute with connection context - """ + """Should create a child span for execute with connection context""" with self._tracer.start_as_current_span("rootSpan"): with self._connection as conn: cursor = conn.cursor() @@ -95,8 +93,7 @@ def test_execute_with_connection_context_manager(self): self.validate_spans() def test_execute_with_cursor_context_manager(self): - """Should create a child span for execute with cursor context - """ + """Should create a child span for execute with cursor context""" with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") @@ -104,8 +101,7 @@ def test_execute_with_cursor_context_manager(self): self.assertTrue(cursor.closed) def test_executemany(self): - """Should create a child span for executemany - """ + """Should create a child span for executemany""" with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) stmt = "INSERT INTO test (id) VALUES (%s)" @@ -113,8 +109,7 @@ def test_executemany(self): self.validate_spans() def test_callproc(self): - """Should create a child span for callproc - """ + """Should create a child span for callproc""" with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( Exception ): diff --git a/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py b/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py index 8c52ad06564..acb60178d06 100644 --- a/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py +++ b/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py @@ -64,8 +64,7 @@ def validate_spans(self): ) def test_insert(self): - """Should create a child span for insert - """ + """Should create a child span for insert""" with self._tracer.start_as_current_span("rootSpan"): self._collection.insert_one( {"name": "testName", "value": "testValue"} @@ -73,8 +72,7 @@ def test_insert(self): self.validate_spans() def test_update(self): - """Should create a child span for update - """ + """Should create a child span for update""" with self._tracer.start_as_current_span("rootSpan"): self._collection.update_one( {"name": "testName"}, {"$set": {"value": "someOtherValue"}} @@ -82,15 +80,13 @@ def test_update(self): self.validate_spans() def test_find(self): - """Should create a child span for find - """ + """Should create a child span for find""" with self._tracer.start_as_current_span("rootSpan"): self._collection.find_one() self.validate_spans() def test_delete(self): - """Should create a child span for delete - """ + """Should create a child span for delete""" with self._tracer.start_as_current_span("rootSpan"): self._collection.delete_one({"name": "testName"}) self.validate_spans() diff --git a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py index 7b0cb5b0c03..c5c4d4f4497 100644 --- a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py +++ b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py @@ -72,23 +72,20 @@ def validate_spans(self): self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT) def test_execute(self): - """Should create a child span for execute - """ + """Should create a child span for execute""" with self._tracer.start_as_current_span("rootSpan"): self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") self.validate_spans() def test_execute_with_cursor_context_manager(self): - """Should create a child span for execute with cursor context - """ + """Should create a child span for execute with cursor context""" with self._tracer.start_as_current_span("rootSpan"): with self._connection.cursor() as cursor: cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") self.validate_spans() def test_executemany(self): - """Should create a child span for executemany - """ + """Should create a child span for executemany""" with self._tracer.start_as_current_span("rootSpan"): data = (("1",), ("2",), ("3",)) stmt = "INSERT INTO test (id) VALUES (%s)" @@ -96,8 +93,7 @@ def test_executemany(self): self.validate_spans() def test_callproc(self): - """Should create a child span for callproc - """ + """Should create a child span for callproc""" with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( Exception ):