Skip to content

Commit

Permalink
Support older pika versions (#837)
Browse files Browse the repository at this point in the history
* feat: support older pika versions

* update tox.ini

* update changelog

* take version from pika

* avoid exception when property name changes

* add callback attr name test
  • Loading branch information
nozik authored Dec 25, 2021
1 parent c962da9 commit 26aa17f
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `opentelemetry-instrumentation-aws-lambda` Adds support for configurable flush timeout via `OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT` property. ([#825](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/825))
- `opentelemetry-instrumentation-pika` Adds support for versions between `0.12.0` to `1.0.0`. ([#837](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/837))

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
| [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 |
| [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging |
| [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 |
| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 |
| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 0.12.0 |
| [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 |
| [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 |
| [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.
from typing import Collection

_instruments: Collection[str] = ("pika >= 1.1.0",)
_instruments: Collection[str] = ("pika >= 0.12.0",)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from logging import getLogger
from typing import Any, Collection, Dict, Optional

import pika
import wrapt
from packaging import version
from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel

Expand All @@ -32,7 +34,18 @@
_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"]


def _consumer_callback_attribute_name() -> str:
pika_version = version.parse(pika.__version__)
return (
"on_message_callback"
if pika_version >= version.parse("1.0.0")
else "consumer_cb"
)


class PikaInstrumentor(BaseInstrumentor): # type: ignore
CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name()

# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_blocking_channel_consumers(
Expand All @@ -41,8 +54,12 @@ def _instrument_blocking_channel_consumers(
consume_hook: utils.HookT = utils.dummy_callback,
) -> Any:
for consumer_tag, consumer_info in channel._consumer_infos.items():
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_callback = getattr(consumer_info, callback_attr, None)
if consumer_callback is None:
continue
decorated_callback = utils._decorate_callback(
consumer_info.on_message_callback,
consumer_callback,
tracer,
consumer_tag,
consume_hook,
Expand All @@ -51,9 +68,9 @@ def _instrument_blocking_channel_consumers(
setattr(
decorated_callback,
"_original_callback",
consumer_info.on_message_callback,
consumer_callback,
)
consumer_info.on_message_callback = decorated_callback
setattr(consumer_info, callback_attr, decorated_callback)

@staticmethod
def _instrument_basic_publish(
Expand Down Expand Up @@ -126,10 +143,12 @@ def uninstrument_channel(channel: BlockingChannel) -> None:
return

for consumers_tag, client_info in channel._consumer_infos.items():
if hasattr(client_info.on_message_callback, "_original_callback"):
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_callback = getattr(client_info, callback_attr, None)
if hasattr(consumer_callback, "_original_callback"):
channel._consumer_infos[
consumers_tag
] = client_info.on_message_callback._original_callback
] = consumer_callback._original_callback
PikaInstrumentor._uninstrument_channel_functions(channel)

def _decorate_channel_function(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from wrapt import BoundFunctionWrapper

from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.instrumentation.pika.pika_instrumentor import (
_consumer_callback_attribute_name,
)
from opentelemetry.instrumentation.pika.utils import dummy_callback
from opentelemetry.trace import Tracer

Expand All @@ -26,7 +29,8 @@ class TestPika(TestCase):
def setUp(self) -> None:
self.channel = mock.MagicMock(spec=Channel)
consumer_info = mock.MagicMock()
consumer_info.on_message_callback = mock.MagicMock()
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
setattr(consumer_info, callback_attr, mock.MagicMock())
self.channel._consumer_infos = {"consumer-tag": consumer_info}
self.mock_callback = mock.MagicMock()

Expand Down Expand Up @@ -72,8 +76,11 @@ def test_instrument_consumers(
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
expected_decoration_calls = [
mock.call(value.on_message_callback, tracer, key, dummy_callback)
mock.call(
getattr(value, callback_attr), tracer, key, dummy_callback
)
for key, value in self.channel._consumer_infos.items()
]
PikaInstrumentor._instrument_blocking_channel_consumers(
Expand Down Expand Up @@ -109,3 +116,13 @@ def test_uninstrument_channel_functions(self) -> None:
self.channel.basic_publish._original_function = original_function
PikaInstrumentor._uninstrument_channel_functions(self.channel)
self.assertEqual(self.channel.basic_publish, original_function)

def test_consumer_callback_attribute_name(self) -> None:
with mock.patch("pika.__version__", "1.0.0"):
self.assertEqual(
_consumer_callback_attribute_name(), "on_message_callback"
)
with mock.patch("pika.__version__", "0.12.0"):
self.assertEqual(
_consumer_callback_attribute_name(), "consumer_cb"
)
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
"instrumentation": "opentelemetry-instrumentation-mysql==0.27b0",
},
"pika": {
"library": "pika >= 1.1.0",
"library": "pika >= 0.12.0",
"instrumentation": "opentelemetry-instrumentation-pika==0.27b0",
},
"psycopg2": {
Expand Down
10 changes: 6 additions & 4 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ envlist =
pypy3-test-propagator-ot-trace

; opentelemetry-instrumentation-pika
py3{6,7,8,9,10}-test-instrumentation-pika
pypy3-test-instrumentation-pika
py3{6,7,8,9,10}-test-instrumentation-pika{0,1}
pypy3-test-instrumentation-pika{0,1}

lint
docker-tests
Expand Down Expand Up @@ -216,6 +216,8 @@ deps =
sqlalchemy11: sqlalchemy>=1.1,<1.2
sqlalchemy14: aiosqlite
sqlalchemy14: sqlalchemy~=1.4
pika0: pika>=0.12.0,<1.0.0
pika1: pika>=1.0.0

; FIXME: add coverage testing
; FIXME: add mypy testing
Expand Down Expand Up @@ -249,7 +251,7 @@ changedir =
test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests
test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests
test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests
test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests
test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests
test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests
test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests
test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests
Expand Down Expand Up @@ -286,7 +288,7 @@ commands_pre =

celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test]

pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]

grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test]

Expand Down

0 comments on commit 26aa17f

Please sign in to comment.