Skip to content

Commit

Permalink
Add all needed spans, and add support of instrumentation and uninstru…
Browse files Browse the repository at this point in the history
…mentation
  • Loading branch information
oxeye-nikolay committed Sep 13, 2021
1 parent 6acb1c5 commit b561657
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,57 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `pika` to trace RabbitMQ applications.
Usage
-----
* Start broker backend
.. code-block:: python
docker run -p 5672:5672 rabbitmq
* Run instrumented task
.. code:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentation
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentation()
pika_instrumentation.instrument(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument(channel=channel)
PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider
.. code:: Python
PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider)
API
---
"""


from .version import __version__
from .pika_instrumentor import PikaInstrumentation
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Collection


_instruments = ("pika >= 1.1.0",)
_instruments: Collection[str] = ("pika >= 1.1.0",)
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import pika
from logging import getLogger
from opentelemetry import trace
from typing import Dict, Callable
from typing import Collection, Any
from pika.channel import Channel
from pika.adapters import BaseConnection
from typing import Dict, Callable, Optional, Collection, Any
from opentelemetry import trace
from opentelemetry.propagate import inject
from opentelemetry.instrumentation.pika import utils
from opentelemetry.trace import Tracer, TracerProvider
from opentelemetry.instrumentation.pika import __version__
from opentelemetry.semconv.trace import MessagingOperationValues
from opentelemetry.instrumentation.pika.package import _instruments
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand All @@ -15,16 +16,18 @@
_LOG = getLogger(__name__)
CTX_KEY = "__otel_task_span"

FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"]

class PikaInstrumentation(BaseInstrumentor):

class PikaInstrumentation(BaseInstrumentor): # type: ignore
@staticmethod
def _instrument_consumers(
consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer
) -> Any:
for key, callback in consumers_dict.items():

def decorated_callback(
channel: pika.channel.Channel,
channel: Channel,
method: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes,
Expand All @@ -47,12 +50,16 @@ def decorated_callback(
consumers_dict[key] = decorated_callback

@staticmethod
def _instrument_publish(channel: Any, tracer: Tracer) -> None:
original_basic_publish = channel.basic_publish
def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None:
original_function = getattr(channel, "basic_publish")

def decorated_basic_publish(
exchange, routing_key, body, properties=None, mandatory=False
):
def decorated_function(
exchange: str,
routing_key: str,
body: bytes,
properties: pika.spec.BasicProperties = None,
mandatory: bool = False,
) -> Any:
if not properties:
properties = pika.spec.BasicProperties()
span = utils.get_span(
Expand All @@ -64,45 +71,69 @@ def decorated_basic_publish(
)
with trace.use_span(span, end_on_exit=True):
inject(properties.headers)
retval = original_basic_publish(
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
return retval

decorated_basic_publish.__setattr__(
"_original_function", original_basic_publish
)
channel.basic_publish = decorated_basic_publish
decorated_function.__setattr__("_original_function", original_function)
channel.__setattr__("basic_publish", decorated_function)
channel.basic_publish = decorated_function

@staticmethod
def _instrument_channel_functions(
channel: Channel, tracer: Tracer
) -> None:
if hasattr(channel, "basic_publish"):
PikaInstrumentation._instrument_basic_publish(channel, tracer)

@staticmethod
def _uninstrument_channel_functions(channel: Channel) -> None:
for function_name in FUNCTIONS_TO_UNINSTRUMENT:
if not hasattr(channel, function_name):
continue
function = getattr(channel, function_name)
if hasattr(function, "_original_function"):
channel.__setattr__(function_name, function._original_function)

@staticmethod
def instrument_channel(
channel: Any, tracer_provider: TracerProvider
channel: Channel,
tracer_provider: Optional[TracerProvider] = None,
) -> None:
if not hasattr(channel, "_impl") or not isinstance(
channel._impl, pika.channel.Channel
channel._impl, Channel
):
_LOG.error("Could not find implementation for provided channel!")
return
tracer = trace.get_tracer(__name__, pika.__version__, tracer_provider)
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
channel.__setattr__("__opentelemetry_tracer", tracer)
if channel._impl._consumers:
PikaInstrumentation._instrument_consumers(
channel._impl._consumers, tracer
)
PikaInstrumentation._instrument_publish(channel, tracer)
PikaInstrumentation._instrument_channel_functions(channel, tracer)

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
channel: Channel = kwargs.get("channel", None)
if not channel or not isinstance(channel, Channel):
return
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider)

def _uninstrument(self, connection: Any, **kwargs: Dict[str, Any]) -> None:
if not hasattr(connection, "_impl") or not isinstance(
connection._impl, BaseConnection
def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
channel: Channel = kwargs.get("channel", None)
if not channel or not isinstance(channel, Channel):
return
if not hasattr(channel, "_impl") or not isinstance(
channel._impl, BaseConnection
):
_LOG.error("Could not find implementation for provided channel!")
return
for key, callback in connection._impl._consumers:
for key, callback in channel._impl._consumers:
if hasattr(callback, "_original_callback"):
connection._consumers[key] = callback._original_callback
if hasattr(connection.basic_publish, "_original_function"):
connection.basic_publish = (
connection.basic_publish._original_function
)
channel._consumers[key] = callback._original_callback
PikaInstrumentation._uninstrument_channel_functions(channel)

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
from typing import Optional
from pika.channel import Channel
from typing import Optional, List
from pika.spec import BasicProperties
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span
from opentelemetry.propagate import extract
from opentelemetry.propagators.textmap import Getter
from opentelemetry.propagators.textmap import Getter, CarrierT
from opentelemetry.semconv.trace import (
SpanAttributes,
MessagingOperationValues,
)


class PikaGetter(Getter):
def get(self, carrier, key):
class PikaGetter(Getter): # type: ignore
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
value = carrier.get(key, None)
if value is None:
return None
return (value,)
return [value]

def keys(self, carrier):
def keys(self, carrier: CarrierT) -> List[str]:
return []


Expand All @@ -35,6 +35,7 @@ def get_span(
if properties.headers is None:
properties.headers = {}
ctx = extract(properties.headers, getter=pika_getter)
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
context=ctx, name=generate_span_name(task_name, operation)
)
Expand All @@ -45,6 +46,8 @@ def get_span(
def generate_span_name(
task_name: str, operation: MessagingOperationValues
) -> str:
if not operation:
return f"{task_name} send"
return f"{task_name} {operation.value}"


Expand All @@ -61,18 +64,25 @@ def enrich_span(
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, task_destination)
span.set_attribute(
SpanAttributes.MESSAGING_DESTINATION_KIND, properties.type
)
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id
)
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id
)
span.set_attribute(
SpanAttributes.NET_PEER_NAME, channel.connection.params.host
)
span.set_attribute(
SpanAttributes.NET_PEER_PORT, channel.connection.params.port
)
if properties.message_id:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id
)
if properties.correlation_id:
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id
)
if not hasattr(channel.connection, "params"):
span.set_attribute(
SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host
)
span.set_attribute(
SpanAttributes.NET_PEER_PORT, channel.connection._impl.params.port
)
else:
span.set_attribute(
SpanAttributes.NET_PEER_NAME, channel.connection.params.host
)
span.set_attribute(
SpanAttributes.NET_PEER_PORT, channel.connection.params.port
)

0 comments on commit b561657

Please sign in to comment.