diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ddc2d7a82d..833acd981b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ on: - 'release/*' pull_request: env: - CORE_REPO_SHA: 10208c1be1e720925a80a66f711b8afbe67537f4 + CORE_REPO_SHA: adad94bfa69520cb4cbabca714827fd14503baf0 jobs: build: diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f6d8db4b9..5db67907db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`, `opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks ([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576)) +- `opentelemetry-instrumentation-pika` added RabbitMQ's pika module instrumentation. + ([#680](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/680)) ### Changed diff --git a/instrumentation/README.md b/instrumentation/README.md index fba704c81d..2117799389 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -19,6 +19,7 @@ | [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2~=2.7 | | [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging | | [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 | +| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 | | [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 | | [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 | | [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 | diff --git a/instrumentation/opentelemetry-instrumentation-pika/README.rst b/instrumentation/opentelemetry-instrumentation-pika/README.rst new file mode 100644 index 0000000000..0f6ab3fd78 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/README.rst @@ -0,0 +1,70 @@ +OpenTelemetry pika Instrumentation +================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-pika/ + +This library allows tracing requests made by the pika library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-pika + +Usage +----- + +* Start broker backend + +.. code-block:: python + + docker run -p 5672:5672 rabbitmq + +* Run instrumented task + +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentor + + PikaInstrumentor().instrument() + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + +* PikaInstrumentor also supports instrumentation of a single channel + +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentor + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + + pika_instrumentation = PikaInstrumentor() + pika_instrumentation.instrument_channel(channel=channel) + + + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + + pika_instrumentation.uninstrument_channel(channel=channel) + +* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider + +.. code-block:: python + + PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider) + +References +---------- + +* `OpenTelemetry pika/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg new file mode 100644 index 0000000000..18a9101433 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -0,0 +1,56 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-pika +description = OpenTelemetry pika instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-pika +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.5 + wrapt >= 1.0.0, < 2.0.0 + +[options.extras_require] +test = + pytest + wrapt >= 1.0.0, < 2.0.0 + opentelemetry-test == 0.24b0 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + pika = opentelemetry.instrumentation.pika:PikaInstrumentor diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.py b/instrumentation/opentelemetry-instrumentation-pika/setup.py new file mode 100644 index 0000000000..ac600392c1 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.py @@ -0,0 +1,89 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json +import os +from configparser import ConfigParser + +import setuptools + +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extra_require section from setup.cfg. To support extra_require +# secion in setup.cfg, we load it here and merge it with the extra_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + +BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py" +) +with open(VERSION_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +PACKAGE_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "package.py" +) +with open(PACKAGE_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + +setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, + version=PACKAGE_INFO["__version__"], + extras_require=extras_require, +) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py new file mode 100644 index 0000000000..55ff695820 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -0,0 +1,73 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `pika` to trace RabbitMQ applications. + +Usage +----- + +* Start broker backend + +.. code-block:: python + + docker run -p 5672:5672 rabbitmq + +* Run instrumented task + +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentor + + PikaInstrumentor().instrument() + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + +* PikaInstrumentor also supports instrumentation of a single channel + +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentor + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + + pika_instrumentation = PikaInstrumentor() + pika_instrumentation.instrument_channel(channel=channel) + + + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + + pika_instrumentation.uninstrument_channel(channel=channel) + +* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider + +.. code-block:: python + + PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider) + +API +--- +""" +# pylint: disable=import-error + +from .pika_instrumentor import PikaInstrumentor +from .version import __version__ + +__all__ = ["PikaInstrumentor", "__version__"] diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py new file mode 100644 index 0000000000..27ceebbac7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Collection + +_instruments: Collection[str] = ("pika >= 1.1.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py new file mode 100644 index 0000000000..a48e46034e --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -0,0 +1,134 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from logging import getLogger +from typing import Any, Callable, Collection, Dict, Optional + +import wrapt +from pika.adapters import BlockingConnection +from pika.channel import Channel + +from opentelemetry import trace +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.pika import utils +from opentelemetry.instrumentation.pika.package import _instruments +from opentelemetry.instrumentation.pika.version import __version__ +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import Tracer, TracerProvider + +_LOG = getLogger(__name__) +_CTX_KEY = "__otel_task_span" + +_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] + + +class PikaInstrumentor(BaseInstrumentor): # type: ignore + # pylint: disable=attribute-defined-outside-init + @staticmethod + def _instrument_consumers( + consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer + ) -> Any: + for key, callback in consumers_dict.items(): + decorated_callback = utils._decorate_callback( + callback, tracer, key + ) + setattr(decorated_callback, "_original_callback", callback) + consumers_dict[key] = decorated_callback + + @staticmethod + def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None: + original_function = getattr(channel, "basic_publish") + decorated_function = utils._decorate_basic_publish( + original_function, channel, tracer + ) + setattr(decorated_function, "_original_function", original_function) + channel.__setattr__("basic_publish", decorated_function) + channel.basic_publish = decorated_function + + @staticmethod + def _instrument_channel_functions( + channel: Channel, tracer: Tracer + ) -> None: + if hasattr(channel, "basic_publish"): + PikaInstrumentor._instrument_basic_publish(channel, tracer) + + @staticmethod + def _uninstrument_channel_functions(channel: Channel) -> None: + for function_name in _FUNCTIONS_TO_UNINSTRUMENT: + if not hasattr(channel, function_name): + continue + function = getattr(channel, function_name) + if hasattr(function, "_original_function"): + channel.__setattr__(function_name, function._original_function) + + @staticmethod + def instrument_channel( + channel: Channel, tracer_provider: Optional[TracerProvider] = None, + ) -> None: + if not hasattr(channel, "_is_instrumented_by_opentelemetry"): + channel._is_instrumented_by_opentelemetry = False + if channel._is_instrumented_by_opentelemetry: + _LOG.warning( + "Attempting to instrument Pika channel while already instrumented!" + ) + return + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + if not hasattr(channel, "_impl"): + _LOG.error("Could not find implementation for provided channel!") + return + if channel._impl._consumers: + PikaInstrumentor._instrument_consumers( + channel._impl._consumers, tracer + ) + PikaInstrumentor._instrument_channel_functions(channel, tracer) + + @staticmethod + def uninstrument_channel(channel: Channel) -> None: + if ( + not hasattr(channel, "_is_instrumented_by_opentelemetry") + or not channel._is_instrumented_by_opentelemetry + ): + _LOG.error( + "Attempting to uninstrument Pika channel while already uninstrumented!" + ) + return + if not hasattr(channel, "_impl"): + _LOG.error("Could not find implementation for provided channel!") + return + for key, callback in channel._impl._consumers.items(): + if hasattr(callback, "_original_callback"): + channel._impl._consumers[key] = callback._original_callback + PikaInstrumentor._uninstrument_channel_functions(channel) + + def _decorate_channel_function( + self, tracer_provider: Optional[TracerProvider] + ) -> None: + def wrapper(wrapped, instance, args, kwargs): + channel = wrapped(*args, **kwargs) + self.instrument_channel(channel, tracer_provider=tracer_provider) + return channel + + wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper) + + def _instrument(self, **kwargs: Dict[str, Any]) -> None: + tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) + self.__setattr__("__opentelemetry_tracer_provider", tracer_provider) + self._decorate_channel_function(tracer_provider) + + def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: + if hasattr(self, "__opentelemetry_tracer_provider"): + delattr(self, "__opentelemetry_tracer_provider") + unwrap(BlockingConnection, "channel") + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py new file mode 100644 index 0000000000..7ad1440572 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -0,0 +1,160 @@ +from typing import Any, Callable, List, Optional + +from pika.channel import Channel +from pika.spec import Basic, BasicProperties + +from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.propagators.textmap import CarrierT, Getter +from opentelemetry.semconv.trace import ( + MessagingOperationValues, + SpanAttributes, +) +from opentelemetry.trace import Tracer +from opentelemetry.trace.span import Span + + +class _PikaGetter(Getter): # type: ignore + def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: + value = carrier.get(key, None) + if value is None: + return None + return [value] + + def keys(self, carrier: CarrierT) -> List[str]: + return [] + + +_pika_getter = _PikaGetter() + + +def _decorate_callback( + callback: Callable[[Channel, Basic.Deliver, BasicProperties, bytes], Any], + tracer: Tracer, + task_name: str, +): + def decorated_callback( + channel: Channel, + method: Basic.Deliver, + properties: BasicProperties, + body: bytes, + ) -> Any: + if not properties: + properties = BasicProperties() + span = _get_span( + tracer, + channel, + properties, + task_name=task_name, + operation=MessagingOperationValues.RECEIVE, + ) + with trace.use_span(span, end_on_exit=True): + propagate.inject(properties.headers) + retval = callback(channel, method, properties, body) + return retval + + return decorated_callback + + +def _decorate_basic_publish( + original_function: Callable[[str, str, bytes, BasicProperties, bool], Any], + channel: Channel, + tracer: Tracer, +): + def decorated_function( + exchange: str, + routing_key: str, + body: bytes, + properties: BasicProperties = None, + mandatory: bool = False, + ) -> Any: + if not properties: + properties = BasicProperties() + span = _get_span( + tracer, + channel, + properties, + task_name="(temporary)", + operation=None, + ) + if not span: + return original_function( + exchange, routing_key, body, properties, mandatory + ) + with trace.use_span(span, end_on_exit=True): + if span.is_recording(): + propagate.inject(properties.headers) + retval = original_function( + exchange, routing_key, body, properties, mandatory + ) + return retval + + return decorated_function + + +def _get_span( + tracer: Tracer, + channel: Channel, + properties: BasicProperties, + task_name: str, + operation: Optional[MessagingOperationValues] = None, +) -> Optional[Span]: + if properties.headers is None: + properties.headers = {} + ctx = propagate.extract(properties.headers, getter=_pika_getter) + if context.get_value("suppress_instrumentation") or context.get_value( + _SUPPRESS_INSTRUMENTATION_KEY + ): + return None + task_name = properties.type if properties.type else task_name + span = tracer.start_span( + context=ctx, name=_generate_span_name(task_name, operation) + ) + if span.is_recording(): + _enrich_span(span, channel, properties, task_name, operation) + return span + + +def _generate_span_name( + task_name: str, operation: Optional[MessagingOperationValues] +) -> str: + if not operation: + return f"{task_name} send" + return f"{task_name} {operation.value}" + + +def _enrich_span( + span: Span, + channel: Channel, + properties: BasicProperties, + task_destination: str, + operation: Optional[MessagingOperationValues] = None, +) -> None: + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq") + if operation: + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, task_destination) + if properties.message_id: + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id + ) + if properties.correlation_id: + span.set_attribute( + SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id + ) + if not hasattr(channel.connection, "params"): + span.set_attribute( + SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host + ) + span.set_attribute( + SpanAttributes.NET_PEER_PORT, channel.connection._impl.params.port + ) + else: + span.set_attribute( + SpanAttributes.NET_PEER_NAME, channel.connection.params.host + ) + span.set_attribute( + SpanAttributes.NET_PEER_PORT, channel.connection.params.port + ) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py new file mode 100644 index 0000000000..d33bd87ce4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.24b0" diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py new file mode 100644 index 0000000000..db74aa3f51 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py @@ -0,0 +1,37 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +from opentelemetry.instrumentation.pika.utils import _PikaGetter + + +class TestPikaGetter(TestCase): + def setUp(self) -> None: + self.getter = _PikaGetter() + + def test_get_none(self) -> None: + carrier = {} + value = self.getter.get(carrier, "test") + self.assertIsNone(value) + + def test_get_value(self) -> None: + key = "test" + value = "value" + carrier = {key: value} + val = self.getter.get(carrier, key) + self.assertEqual(val, [value]) + + def test_keys(self): + keys = self.getter.keys({}) + self.assertEqual(keys, []) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py new file mode 100644 index 0000000000..508d49c3bd --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -0,0 +1,104 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase, mock + +from pika.adapters import BaseConnection, BlockingConnection +from pika.channel import Channel +from wrapt import BoundFunctionWrapper + +from opentelemetry.instrumentation.pika import PikaInstrumentor +from opentelemetry.trace import Tracer + + +class TestPika(TestCase): + def setUp(self) -> None: + self.channel = mock.MagicMock(spec=Channel) + self.channel._impl = mock.MagicMock(spec=BaseConnection) + self.mock_callback = mock.MagicMock() + self.channel._impl._consumers = {"mock_key": self.mock_callback} + + def test_instrument_api(self) -> None: + instrumentation = PikaInstrumentor() + instrumentation.instrument() + self.assertTrue( + isinstance(BlockingConnection.channel, BoundFunctionWrapper) + ) + assert hasattr( + instrumentation, "__opentelemetry_tracer_provider" + ), "Tracer not stored for the object!" + instrumentation.uninstrument(channel=self.channel) + self.assertFalse( + isinstance(BlockingConnection.channel, BoundFunctionWrapper) + ) + + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" + ) + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers" + ) + def test_instrument( + self, + instrument_consumers: mock.MagicMock, + instrument_channel_functions: mock.MagicMock, + ): + PikaInstrumentor.instrument_channel(channel=self.channel) + assert hasattr( + self.channel, "_is_instrumented_by_opentelemetry" + ), "channel is not marked as instrumented!" + instrument_consumers.assert_called_once() + instrument_channel_functions.assert_called_once() + + @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") + def test_instrument_consumers( + self, decorate_callback: mock.MagicMock + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + expected_decoration_calls = [ + mock.call(value, tracer, key) + for key, value in self.channel._impl._consumers.items() + ] + PikaInstrumentor._instrument_consumers( + self.channel._impl._consumers, tracer + ) + decorate_callback.assert_has_calls( + calls=expected_decoration_calls, any_order=True + ) + assert all( + hasattr(callback, "_original_callback") + for callback in self.channel._impl._consumers.values() + ) + + @mock.patch( + "opentelemetry.instrumentation.pika.utils._decorate_basic_publish" + ) + def test_instrument_basic_publish( + self, decorate_basic_publish: mock.MagicMock + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + original_function = self.channel.basic_publish + PikaInstrumentor._instrument_basic_publish(self.channel, tracer) + decorate_basic_publish.assert_called_once_with( + original_function, self.channel, tracer + ) + self.assertEqual( + self.channel.basic_publish, decorate_basic_publish.return_value + ) + + def test_uninstrument_channel_functions(self) -> None: + original_function = self.channel.basic_publish + self.channel.basic_publish = mock.MagicMock() + self.channel.basic_publish._original_function = original_function + PikaInstrumentor._uninstrument_channel_functions(self.channel) + self.assertEqual(self.channel.basic_publish, original_function) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py new file mode 100644 index 0000000000..d8ce6d5364 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -0,0 +1,163 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase, mock + +from opentelemetry.instrumentation.pika import utils +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import Span, Tracer + + +class TestUtils(TestCase): + @staticmethod + @mock.patch("opentelemetry.context.get_value") + @mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name") + @mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span") + @mock.patch("opentelemetry.propagate.extract") + def test_get_span( + extract: mock.MagicMock, + enrich_span: mock.MagicMock, + generate_span_name: mock.MagicMock, + get_value: mock.MagicMock, + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + channel = mock.MagicMock() + properties = mock.MagicMock() + task_name = "test.test" + get_value.return_value = None + _ = utils._get_span(tracer, channel, properties, task_name) + extract.assert_called_once() + generate_span_name.assert_called_once() + tracer.start_span.assert_called_once_with( + context=extract.return_value, name=generate_span_name.return_value + ) + enrich_span.assert_called_once() + + @mock.patch("opentelemetry.context.get_value") + @mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name") + @mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span") + @mock.patch("opentelemetry.propagate.extract") + def test_get_span_suppressed( + self, + extract: mock.MagicMock, + enrich_span: mock.MagicMock, + generate_span_name: mock.MagicMock, + get_value: mock.MagicMock, + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + channel = mock.MagicMock() + properties = mock.MagicMock() + task_name = "test.test" + get_value.return_value = True + span = utils._get_span(tracer, channel, properties, task_name) + self.assertEqual(span, None) + extract.assert_called_once() + generate_span_name.assert_not_called() + + def test_generate_span_name_no_operation(self) -> None: + task_name = "test.test" + operation = None + span_name = utils._generate_span_name(task_name, operation) + self.assertEqual(span_name, f"{task_name} send") + + def test_generate_span_name_with_operation(self) -> None: + task_name = "test.test" + operation = mock.MagicMock() + operation.value = "process" + span_name = utils._generate_span_name(task_name, operation) + self.assertEqual(span_name, f"{task_name} {operation.value}") + + @staticmethod + def test_enrich_span_basic_values() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + span = mock.MagicMock(spec=Span) + utils._enrich_span(span, channel, properties, task_destination) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[ + mock.call(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq"), + mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True), + mock.call( + SpanAttributes.MESSAGING_DESTINATION, task_destination + ), + mock.call( + SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id + ), + mock.call( + SpanAttributes.MESSAGING_CONVERSATION_ID, + properties.correlation_id, + ), + mock.call( + SpanAttributes.NET_PEER_NAME, + channel.connection.params.host, + ), + mock.call( + SpanAttributes.NET_PEER_PORT, + channel.connection.params.port, + ), + ], + ) + + @staticmethod + def test_enrich_span_with_operation() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + operation = mock.MagicMock() + span = mock.MagicMock(spec=Span) + utils._enrich_span( + span, channel, properties, task_destination, operation + ) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[ + mock.call(SpanAttributes.MESSAGING_OPERATION, operation.value) + ], + ) + + @staticmethod + def test_enrich_span_without_operation() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + span = mock.MagicMock(spec=Span) + utils._enrich_span(span, channel, properties, task_destination) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)], + ) + + @staticmethod + def test_enrich_span_unique_connection() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + span = mock.MagicMock(spec=Span) + # We do this to create the behaviour of hasattr(channel.connection, "params") == False + del channel.connection.params + utils._enrich_span(span, channel, properties, task_destination) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[ + mock.call( + SpanAttributes.NET_PEER_NAME, + channel.connection._impl.params.host, + ), + mock.call( + SpanAttributes.NET_PEER_PORT, + channel.connection._impl.params.port, + ), + ], + ) diff --git a/tox.ini b/tox.ini index 645ed88e6e..d681345fb6 100644 --- a/tox.ini +++ b/tox.ini @@ -153,6 +153,10 @@ envlist = py3{6,7,8,9}-test-propagator-ot-trace pypy3-test-propagator-ot-trace + ; opentelemetry-instrumentation-pika + py3{6,7,8,9}-test-instrumentation-pika + pypy3-test-instrumentation-pika + lint docker-tests docs @@ -210,6 +214,7 @@ changedir = test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests + test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests @@ -242,6 +247,8 @@ commands_pre = celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] + pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test] falcon{2,3},flask,django,pyramid,tornado,starlette,fastapi,aiohttp,asgi,requests,urllib,urllib3,wsgi: pip install {toxinidir}/util/opentelemetry-util-http[test] @@ -372,6 +379,7 @@ commands_pre = python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-flask[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sqlalchemy[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] + python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sklearn[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-redis[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-fastapi[test] @@ -430,6 +438,7 @@ commands_pre = "{env:CORE_REPO}#egg=opentelemetry-test&subdirectory=tests/util" \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-asyncpg \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery \ + -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-mysql \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-psycopg2 \