Skip to content

Commit

Permalink
Add usage of wrapt according to CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
oxeye-nikolay committed Sep 20, 2021
1 parent fedd768 commit e37b5fa
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 22 deletions.
20 changes: 16 additions & 4 deletions instrumentation/opentelemetry-instrumentation-pika/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,22 @@ Usage
docker run -p 5672:5672 rabbitmq
* Run instrumented task

.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
PikaInstrumentor().instrument()
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
* PikaInstrumentor also supports instrumentation of a single channel

.. code-block:: python
import pika
Expand All @@ -37,13 +50,12 @@ Usage
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument(channel=channel)
pika_instrumentation.instrument_channel(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument(channel=channel)
pika_instrumentation.uninstrument_channel(channel=channel)
* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider

Expand Down
2 changes: 2 additions & 0 deletions instrumentation/opentelemetry-instrumentation-pika/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ packages=find_namespace:

install_requires =
opentelemetry-api ~= 1.5
wrapt >= 1.0.0, < 2.0.0
pika >= 1.1.0

[options.extras_require]
test =
pytest
wrapt >= 1.0.0, < 2.0.0
opentelemetry-test == 0.24b0

[options.packages.find]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,22 @@
docker run -p 5672:5672 rabbitmq
* Run instrumented task
.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
PikaInstrumentor().instrument()
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
* PikaInstrumentor also supports instrumentation of a single channel
.. code-block:: python
import pika
Expand All @@ -36,13 +49,12 @@
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument(channel=channel)
pika_instrumentation.instrument_channel(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument(channel=channel)
pika_instrumentation.uninstrument_channel(channel=channel)
* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from logging import getLogger
from typing import Any, Callable, Collection, Dict, Optional

import wrapt
from pika.adapters import BlockingConnection
from pika.channel import Channel

Expand All @@ -22,6 +23,7 @@
from opentelemetry.instrumentation.pika import utils
from opentelemetry.instrumentation.pika.package import _instruments
from opentelemetry.instrumentation.pika.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import Tracer, TracerProvider

_LOG = getLogger(__name__)
Expand Down Expand Up @@ -93,25 +95,25 @@ def uninstrument_channel(channel: Channel) -> None:
if hasattr(callback, "_original_callback"):
channel._impl._consumers[key] = callback._original_callback
PikaInstrumentor._uninstrument_channel_functions(channel)
if hasattr(channel, "__opentelemetry_tracer"):
delattr(channel, "__opentelemetry_tracer")

def _decorate_channel_function(
self, tracer_provider: Optional[TracerProvider]
) -> None:
self.original_channel_func = BlockingConnection.channel

def _wrapper(*args, **kwargs):
channel = self.original_channel_func(*args, **kwargs)
def wrapper(wrapped, instance, args, kwargs):
channel = wrapped(*args, **kwargs)
self.instrument_channel(channel, tracer_provider=tracer_provider)
return channel

BlockingConnection.channel = _wrapper
wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
self._decorate_channel_function(tracer_provider)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
BlockingConnection.channel = self.original_channel_func
unwrap(BlockingConnection, "channel")

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pika.adapters import BaseConnection, BlockingConnection
from pika.channel import Channel
from wrapt import BoundFunctionWrapper

from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.trace import Tracer
Expand All @@ -28,15 +29,15 @@ def setUp(self) -> None:
self.channel._impl._consumers = {"mock_key": self.mock_callback}

def test_instrument_api(self) -> None:
original_channel = BlockingConnection.channel
instrumentation = PikaInstrumentor()
instrumentation.instrument()
self.assertTrue(hasattr(instrumentation, "original_channel_func"))
self.assertEqual(
original_channel, instrumentation.original_channel_func
self.assertTrue(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
instrumentation.uninstrument(channel=self.channel)
self.assertEqual(original_channel, BlockingConnection.channel)
self.assertFalse(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)

@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
Expand Down Expand Up @@ -91,9 +92,6 @@ def test_instrument_basic_publish(
self.assertEqual(
self.channel.basic_publish, decorate_basic_publish.return_value
)
self.assertEqual(
self.channel.basic_publish._original_function, original_function
)

def test_uninstrument_channel_functions(self) -> None:
original_function = self.channel.basic_publish
Expand Down

0 comments on commit e37b5fa

Please sign in to comment.