diff --git a/docs-requirements.txt b/docs-requirements.txt index 84f80b30e59..230c76149cc 100644 --- a/docs-requirements.txt +++ b/docs-requirements.txt @@ -4,6 +4,7 @@ sphinx-autodoc-typehints~=1.10.2 # Required by ext packages asgiref~=3.0 +asyncpg>=0.12.0 ddtrace>=0.34.0 aiohttp~= 3.0 Deprecated>=1.2.6 diff --git a/docs/ext/asyncpg/asyncpg.rst b/docs/ext/asyncpg/asyncpg.rst new file mode 100644 index 00000000000..3a4a9b3c4e9 --- /dev/null +++ b/docs/ext/asyncpg/asyncpg.rst @@ -0,0 +1,10 @@ +opentelemetry.ext.asyncpg package +================================= + +Module contents +--------------- + +.. automodule:: opentelemetry.ext.asyncpg + :members: + :undoc-members: + :show-inheritance: diff --git a/ext/opentelemetry-ext-asyncpg/CHANGELOG.md b/ext/opentelemetry-ext-asyncpg/CHANGELOG.md new file mode 100644 index 00000000000..f4390b09ab1 --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## Unreleased + +- Initial Release ([#814](https://github.com/open-telemetry/opentelemetry-python/pull/814)) diff --git a/ext/opentelemetry-ext-asyncpg/README.rst b/ext/opentelemetry-ext-asyncpg/README.rst new file mode 100644 index 00000000000..f852bfdbb27 --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry asyncpg Integration +================================= + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-asyncpg.svg + :target: https://pypi.org/project/opentelemetry-ext-asyncpg/ + +This library allows tracing PostgreSQL queries made by the +`asyncpg `_ library. + +Installation +------------ + +:: + + pip install opentelemetry-ext-asyncpg + +References +---------- + +* `OpenTelemetry asyncpg Integration `_ +* `OpenTelemetry Project `_ diff --git a/ext/opentelemetry-ext-asyncpg/setup.cfg b/ext/opentelemetry-ext-asyncpg/setup.cfg new file mode 100644 index 00000000000..df00ed7db4c --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/setup.cfg @@ -0,0 +1,55 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-ext-asyncpg +description = OpenTelemetry instrumentation for AsyncPG +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-asyncpg +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.5 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api == 0.10.dev0 + opentelemetry-instrumentation == 0.10.dev0 + asyncpg >= 0.12.0 + +[options.extras_require] +test = + opentelemetry-test == 0.10.dev0 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + asyncpg = opentelemetry.ext.asyncpg:AsyncPGInstrumentor diff --git a/ext/opentelemetry-ext-asyncpg/setup.py b/ext/opentelemetry-ext-asyncpg/setup.py new file mode 100644 index 00000000000..8172205c1a9 --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/setup.py @@ -0,0 +1,26 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "ext", "asyncpg", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py b/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py new file mode 100644 index 00000000000..c373d7194dc --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/__init__.py @@ -0,0 +1,136 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This library allows tracing PostgreSQL queries made by the +`asyncpg `_ library. + +Usage +----- + +.. code-block:: python + + import asyncpg + from opentelemetry.ext.asyncpg import AsyncPGInstrumentor + + # You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument() + AsyncPGInstrumentor().instrument() + conn = await asyncpg.connect(user='user', password='password', + database='database', host='127.0.0.1') + values = await conn.fetch('''SELECT 42;''') + +API +--- +""" + +import asyncpg +import wrapt +from asyncpg import exceptions + +from opentelemetry import trace +from opentelemetry.ext.asyncpg.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import SpanKind +from opentelemetry.trace.status import Status, StatusCanonicalCode + +_APPLIED = "_opentelemetry_tracer" + + +def _exception_to_canonical_code(exc: Exception) -> StatusCanonicalCode: + if isinstance( + exc, (exceptions.InterfaceError, exceptions.SyntaxOrAccessError), + ): + return StatusCanonicalCode.INVALID_ARGUMENT + if isinstance(exc, exceptions.IdleInTransactionSessionTimeoutError): + return StatusCanonicalCode.DEADLINE_EXCEEDED + return StatusCanonicalCode.UNKNOWN + + +def _hydrate_span_from_args(connection, query, parameters) -> dict: + span_attributes = {"db.type": "sql"} + + params = getattr(connection, "_params", None) + span_attributes["db.instance"] = getattr(params, "database", None) + span_attributes["db.user"] = getattr(params, "user", None) + + if query is not None: + span_attributes["db.statement"] = query + + if parameters is not None and len(parameters) > 0: + span_attributes["db.statement.parameters"] = str(parameters) + + return span_attributes + + +async def _do_execute(func, instance, args, kwargs): + span_attributes = _hydrate_span_from_args(instance, args[0], args[1:]) + tracer = getattr(asyncpg, _APPLIED) + + exception = None + + with tracer.start_as_current_span( + "postgresql", kind=SpanKind.CLIENT + ) as span: + + for attribute, value in span_attributes.items(): + span.set_attribute(attribute, value) + + try: + result = await func(*args, **kwargs) + except Exception as exc: # pylint: disable=W0703 + exception = exc + raise + finally: + if exception is not None: + span.set_status( + Status(_exception_to_canonical_code(exception)) + ) + else: + span.set_status(Status(StatusCanonicalCode.OK)) + + return result + + +class AsyncPGInstrumentor(BaseInstrumentor): + def _instrument(self, **kwargs): + tracer_provider = kwargs.get( + "tracer_provider", trace.get_tracer_provider() + ) + setattr( + asyncpg, + _APPLIED, + tracer_provider.get_tracer("asyncpg", __version__), + ) + for method in [ + "Connection.execute", + "Connection.executemany", + "Connection.fetch", + "Connection.fetchval", + "Connection.fetchrow", + ]: + wrapt.wrap_function_wrapper( + "asyncpg.connection", method, _do_execute + ) + + def _uninstrument(self, **__): + delattr(asyncpg, _APPLIED) + for method in [ + "execute", + "executemany", + "fetch", + "fetchval", + "fetchrow", + ]: + unwrap(asyncpg.Connection, method) diff --git a/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/version.py b/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/version.py new file mode 100644 index 00000000000..6d4fefa599e --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/src/opentelemetry/ext/asyncpg/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.10.dev0" diff --git a/ext/opentelemetry-ext-asyncpg/tests/__init__.py b/ext/opentelemetry-ext-asyncpg/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ext/opentelemetry-ext-asyncpg/tests/test_asyncpg_wrapper.py b/ext/opentelemetry-ext-asyncpg/tests/test_asyncpg_wrapper.py new file mode 100644 index 00000000000..cd0d8e35f09 --- /dev/null +++ b/ext/opentelemetry-ext-asyncpg/tests/test_asyncpg_wrapper.py @@ -0,0 +1,35 @@ +import asyncpg +from asyncpg import Connection + +from opentelemetry.ext.asyncpg import AsyncPGInstrumentor +from opentelemetry.test.test_base import TestBase + + +class TestAsyncPGInstrumentation(TestBase): + def test_instrumentation_flags(self): + AsyncPGInstrumentor().instrument() + self.assertTrue(hasattr(asyncpg, "_opentelemetry_tracer")) + AsyncPGInstrumentor().uninstrument() + self.assertFalse(hasattr(asyncpg, "_opentelemetry_tracer")) + + def test_duplicated_instrumentation(self): + AsyncPGInstrumentor().instrument() + AsyncPGInstrumentor().instrument() + AsyncPGInstrumentor().instrument() + AsyncPGInstrumentor().uninstrument() + for method_name in ["execute", "fetch"]: + method = getattr(Connection, method_name, None) + self.assertFalse( + hasattr(method, "_opentelemetry_ext_asyncpg_applied") + ) + + def test_duplicated_uninstrumentation(self): + AsyncPGInstrumentor().instrument() + AsyncPGInstrumentor().uninstrument() + AsyncPGInstrumentor().uninstrument() + AsyncPGInstrumentor().uninstrument() + for method_name in ["execute", "fetch"]: + method = getattr(Connection, method_name, None) + self.assertFalse( + hasattr(method, "_opentelemetry_ext_asyncpg_applied") + ) diff --git a/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py new file mode 100644 index 00000000000..408e50feb89 --- /dev/null +++ b/ext/opentelemetry-ext-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -0,0 +1,240 @@ +import asyncio +import os + +import asyncpg +import pytest + +from opentelemetry.ext.asyncpg import AsyncPGInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.status import StatusCanonicalCode + +POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost") +POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432")) +POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests") +POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword") +POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser") + + +def _await(coro): + loop = asyncio.get_event_loop() + return loop.run_until_complete(coro) + + +class TestFunctionalPsycopg(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + AsyncPGInstrumentor().instrument(tracer_provider=cls.tracer_provider) + cls._connection = _await( + asyncpg.connect( + database=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + ) + + @classmethod + def tearDownClass(cls): + AsyncPGInstrumentor().uninstrument() + + @pytest.mark.asyncpg + def test_instrumented_execute_method_without_arguments(self, *_, **__): + _await(self._connection.execute("SELECT 42;")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT 42;", + }, + ) + + @pytest.mark.asyncpg + def test_instrumented_execute_method_with_arguments(self, *_, **__): + _await(self._connection.execute("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + @pytest.mark.asyncpg + def test_instrumented_fetch_method_without_arguments(self, *_, **__): + _await(self._connection.fetch("SELECT 42;")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT 42;", + }, + ) + + @pytest.mark.asyncpg + def test_instrumented_fetch_method_with_arguments(self, *_, **__): + _await(self._connection.fetch("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + @pytest.mark.asyncpg + def test_instrumented_executemany_method_with_arguments(self, *_, **__): + _await(self._connection.executemany("SELECT $1;", [["1"], ["2"]])) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + { + "db.type": "sql", + "db.statement": "SELECT $1;", + "db.statement.parameters": "([['1'], ['2']],)", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + }, + spans[0].attributes, + ) + + @pytest.mark.asyncpg + def test_instrumented_execute_interface_error_method(self, *_, **__): + with self.assertRaises(asyncpg.InterfaceError): + _await(self._connection.execute("SELECT 42;", 1, 2, 3)) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.statement.parameters": "(1, 2, 3)", + "db.statement": "SELECT 42;", + }, + ) + + @pytest.mark.asyncpg + def test_instrumented_transaction_method(self, *_, **__): + async def _transaction_execute(): + async with self._connection.transaction(): + await self._connection.execute("SELECT 42;") + + _await(_transaction_execute()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(3, len(spans)) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "BEGIN;", + }, + spans[0].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "SELECT 42;", + }, + spans[1].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[1].status.canonical_code + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "COMMIT;", + }, + spans[2].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[2].status.canonical_code + ) + + @pytest.mark.asyncpg + def test_instrumented_failed_transaction_method(self, *_, **__): + async def _transaction_execute(): + async with self._connection.transaction(): + await self._connection.execute("SELECT 42::uuid;") + + with self.assertRaises(asyncpg.CannotCoerceError): + _await(_transaction_execute()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(3, len(spans)) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "BEGIN;", + }, + spans[0].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "SELECT 42::uuid;", + }, + spans[1].attributes, + ) + self.assertEqual( + StatusCanonicalCode.INVALID_ARGUMENT, + spans[1].status.canonical_code, + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "ROLLBACK;", + }, + spans[2].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[2].status.canonical_code + ) diff --git a/tox.ini b/tox.ini index 771305f2c1d..e4ad8e17098 100644 --- a/tox.ini +++ b/tox.ini @@ -116,6 +116,10 @@ envlist = py3{5,6,7,8}-test-ext-asgi pypy3-test-ext-asgi + ; opentelemetry-ext-asyncpg + py3{5,6,7,8}-test-ext-asyncpg + ; ext-asyncpg intentionally excluded from pypy3 + ; opentelemetry-ext-sqlite3 py3{4,5,6,7,8}-test-ext-sqlite3 pypy3-test-ext-sqlite3 @@ -217,6 +221,7 @@ changedir = test-ext-pymysql: ext/opentelemetry-ext-pymysql/tests test-ext-pyramid: ext/opentelemetry-ext-pyramid/tests test-ext-asgi: ext/opentelemetry-ext-asgi/tests + test-ext-asyncpg: ext/opentelemetry-ext-asyncpg/tests test-ext-sqlite3: ext/opentelemetry-ext-sqlite3/tests test-ext-wsgi: ext/opentelemetry-ext-wsgi/tests test-ext-zipkin: ext/opentelemetry-ext-zipkin/tests @@ -253,6 +258,8 @@ commands_pre = wsgi,flask,django,pyramid: pip install {toxinidir}/ext/opentelemetry-ext-wsgi asgi,starlette: pip install {toxinidir}/ext/opentelemetry-ext-asgi + asyncpg: pip install {toxinidir}/ext/opentelemetry-ext-asyncpg + boto: pip install {toxinidir}/ext/opentelemetry-ext-boto[test] flask: pip install {toxinidir}/ext/opentelemetry-ext-flask[test] @@ -388,6 +395,7 @@ commands = [testenv:docker-tests] deps = pytest + asyncpg==0.20.1 docker-compose >= 1.25.2 mysql-connector-python ~= 8.0 pymongo ~= 3.1 @@ -405,6 +413,7 @@ commands_pre = -e {toxinidir}/opentelemetry-sdk \ -e {toxinidir}/opentelemetry-instrumentation \ -e {toxinidir}/tests/util \ + -e {toxinidir}/ext/opentelemetry-ext-asyncpg \ -e {toxinidir}/ext/opentelemetry-ext-celery \ -e {toxinidir}/ext/opentelemetry-ext-dbapi \ -e {toxinidir}/ext/opentelemetry-ext-mysql \