Skip to content

Commit

Permalink
Added tests. Ready for PR
Browse files Browse the repository at this point in the history
  • Loading branch information
oxeye-nikolay committed Sep 14, 2021
1 parent b561657 commit 315b438
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 92 deletions.
41 changes: 38 additions & 3 deletions instrumentation/opentelemetry-instrumentation-pika/README.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
OpenTelemetry <REPLACE ME> Instrumentation
===========================
OpenTelemetry pika Instrumentation
==================================

|pypi|

.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg
:target: https://pypi.org/project/opentelemetry-instrumentation-pika/

This library allows tracing requests made by the <REPLACE ME> library.
This library allows tracing requests made by the pika library.

Installation
------------
Expand All @@ -15,6 +15,41 @@ Installation

pip install opentelemetry-instrumentation-pika

Usage
-----

* Start broker backend

.. code-block:: python
docker run -p 5672:5672 rabbitmq
* Run instrumented task

.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentation
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentation()
pika_instrumentation.instrument(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument(channel=channel)
* PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider

.. code-block:: python
PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider)
References
----------
Expand Down
2 changes: 2 additions & 0 deletions instrumentation/opentelemetry-instrumentation-pika/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ install_requires =

[options.extras_require]
test =
pytest
opentelemetry-test == 0.24b0

[options.packages.find]
where = src
Expand Down
7 changes: 1 addition & 6 deletions instrumentation/opentelemetry-instrumentation-pika/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@

BASE_DIR = os.path.dirname(__file__)
VERSION_FILENAME = os.path.join(
BASE_DIR,
"src",
"opentelemetry",
"instrumentation",
"pika",
"version.py",
BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py",
)
PACKAGE_INFO = {}
with open(VERSION_FILENAME) as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
* Run instrumented task
.. code:: python
.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentation
Expand All @@ -43,15 +44,16 @@
pika_instrumentation.uninstrument(channel=channel)
PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider
* PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider
.. code-block:: python
.. code:: Python
PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider)
API
---
"""


from .version import __version__
from .pika_instrumentor import PikaInstrumentation
from .version import __version__
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
# limitations under the License.
from typing import Collection


_instruments: Collection[str] = ("pika >= 1.1.0",)
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import pika
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import getLogger
from typing import Any, Callable, Collection, Dict, Optional

from pika.channel import Channel
from pika.adapters import BaseConnection
from typing import Dict, Callable, Optional, Collection, Any

from opentelemetry import trace
from opentelemetry.propagate import inject
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pika import utils
from opentelemetry.trace import Tracer, TracerProvider
from opentelemetry.instrumentation.pika import __version__
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.instrumentation.pika.package import _instruments
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor

from opentelemetry.instrumentation.pika.version import __version__
from opentelemetry.trace import Tracer, TracerProvider

_LOG = getLogger(__name__)
CTX_KEY = "__otel_task_span"
Expand All @@ -25,58 +35,17 @@ def _instrument_consumers(
consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer
) -> Any:
for key, callback in consumers_dict.items():

def decorated_callback(
channel: Channel,
method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes,
) -> Any:
if not properties:
properties = pika.spec.BasicProperties()
span = utils.get_span(
tracer,
channel,
properties,
task_name=key,
operation=MessagingOperationValues.RECEIVE,
)
with trace.use_span(span, end_on_exit=True):
inject(properties.headers)
retval = callback(channel, method, properties, body)
return retval

decorated_callback.__setattr__("_original_callback", callback)
decorated_callback = utils.decorate_callback(callback, tracer, key)
setattr(decorated_callback, "_original_callback", callback)
consumers_dict[key] = decorated_callback

@staticmethod
def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None:
original_function = getattr(channel, "basic_publish")

def decorated_function(
exchange: str,
routing_key: str,
body: bytes,
properties: pika.spec.BasicProperties = None,
mandatory: bool = False,
) -> Any:
if not properties:
properties = pika.spec.BasicProperties()
span = utils.get_span(
tracer,
channel,
properties,
task_name="(temporary)",
operation=None,
)
with trace.use_span(span, end_on_exit=True):
inject(properties.headers)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
return retval

decorated_function.__setattr__("_original_function", original_function)
decorated_function = utils.decorate_basic_publish(
original_function, channel, tracer
)
setattr(decorated_function, "_original_function", original_function)
channel.__setattr__("basic_publish", decorated_function)
channel.basic_publish = decorated_function

Expand All @@ -98,12 +67,9 @@ def _uninstrument_channel_functions(channel: Channel) -> None:

@staticmethod
def instrument_channel(
channel: Channel,
tracer_provider: Optional[TracerProvider] = None,
channel: Channel, tracer_provider: Optional[TracerProvider] = None,
) -> None:
if not hasattr(channel, "_impl") or not isinstance(
channel._impl, Channel
):
if not hasattr(channel, "_impl"):
_LOG.error("Could not find implementation for provided channel!")
return
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
Expand All @@ -119,20 +85,20 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None:
if not channel or not isinstance(channel, Channel):
return
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider)
PikaInstrumentation.instrument_channel(
channel, tracer_provider=tracer_provider
)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
channel: Channel = kwargs.get("channel", None)
if not channel or not isinstance(channel, Channel):
return
if not hasattr(channel, "_impl") or not isinstance(
channel._impl, BaseConnection
):
if not hasattr(channel, "_impl"):
_LOG.error("Could not find implementation for provided channel!")
return
for key, callback in channel._impl._consumers:
for key, callback in channel._impl._consumers.items():
if hasattr(callback, "_original_callback"):
channel._consumers[key] = callback._original_callback
channel._impl._consumers[key] = callback._original_callback
PikaInstrumentation._uninstrument_channel_functions(channel)

def instrumentation_dependencies(self) -> Collection[str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from typing import Any, Callable, List, Optional

from pika.channel import Channel
from typing import Optional, List
from pika.spec import BasicProperties
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span
from opentelemetry.propagate import extract
from opentelemetry.propagators.textmap import Getter, CarrierT
from pika.spec import Basic, BasicProperties

from opentelemetry import propagate, trace
from opentelemetry.propagators.textmap import CarrierT, Getter
from opentelemetry.semconv.trace import (
SpanAttributes,
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span


class PikaGetter(Getter): # type: ignore
Expand All @@ -25,16 +27,75 @@ def keys(self, carrier: CarrierT) -> List[str]:
pika_getter = PikaGetter()


def decorate_callback(
callback: Callable[[Channel, Basic.Deliver, BasicProperties, bytes], Any],
tracer: Tracer,
task_name: str,
):
def decorated_callback(
channel: Channel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes,
) -> Any:
if not properties:
properties = BasicProperties()
span = get_span(
tracer,
channel,
properties,
task_name=task_name,
operation=MessagingOperationValues.RECEIVE,
)
with trace.use_span(span, end_on_exit=True):
propagate.inject(properties.headers)
retval = callback(channel, method, properties, body)
return retval

return decorated_callback


def decorate_basic_publish(
original_function: Callable[[str, str, bytes, BasicProperties, bool], Any],
channel: Channel,
tracer: Tracer,
):
def decorated_function(
exchange: str,
routing_key: str,
body: bytes,
properties: BasicProperties = None,
mandatory: bool = False,
) -> Any:
if not properties:
properties = BasicProperties()
span = get_span(
tracer,
channel,
properties,
task_name="(temporary)",
operation=None,
)
with trace.use_span(span, end_on_exit=True):
propagate.inject(properties.headers)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
return retval

return decorated_function


def get_span(
tracer: Tracer,
channel: Channel,
properties: BasicProperties,
task_name: str,
operation: Optional[MessagingOperationValues],
operation: Optional[MessagingOperationValues] = None,
) -> Span:
if properties.headers is None:
properties.headers = {}
ctx = extract(properties.headers, getter=pika_getter)
ctx = propagate.extract(properties.headers, getter=pika_getter)
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
context=ctx, name=generate_span_name(task_name, operation)
Expand All @@ -44,7 +105,7 @@ def get_span(


def generate_span_name(
task_name: str, operation: MessagingOperationValues
task_name: str, operation: Optional[MessagingOperationValues]
) -> str:
if not operation:
return f"{task_name} send"
Expand All @@ -56,7 +117,7 @@ def enrich_span(
channel: Channel,
properties: BasicProperties,
task_destination: str,
operation: Optional[MessagingOperationValues],
operation: Optional[MessagingOperationValues] = None,
) -> None:
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq")
if operation:
Expand Down
Empty file.
Loading

0 comments on commit 315b438

Please sign in to comment.