Skip to content

Commit

Permalink
Instrumentation for asyncpg (#814)
Browse files Browse the repository at this point in the history
Co-authored-by: Yusuke Tsutsumi <[email protected]>
  • Loading branch information
HiveTraum and toumorokoshi authored Jun 17, 2020
1 parent 9d74918 commit a7f0b75
Show file tree
Hide file tree
Showing 12 changed files with 555 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ sphinx-autodoc-typehints~=1.10.2

# Required by ext packages
asgiref~=3.0
asyncpg>=0.12.0
ddtrace>=0.34.0
aiohttp~= 3.0
Deprecated>=1.2.6
Expand Down
10 changes: 10 additions & 0 deletions docs/ext/asyncpg/asyncpg.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
opentelemetry.ext.asyncpg package
=================================

Module contents
---------------

.. automodule:: opentelemetry.ext.asyncpg
:members:
:undoc-members:
:show-inheritance:
5 changes: 5 additions & 0 deletions ext/opentelemetry-ext-asyncpg/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changelog

## Unreleased

- Initial Release ([#814](https://github.com/open-telemetry/opentelemetry-python/pull/814))
23 changes: 23 additions & 0 deletions ext/opentelemetry-ext-asyncpg/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
OpenTelemetry asyncpg Integration
=================================

|pypi|

.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-asyncpg.svg
:target: https://pypi.org/project/opentelemetry-ext-asyncpg/

This library allows tracing PostgreSQL queries made by the
`asyncpg <https://magicstack.github.io/asyncpg/current/>`_ library.

Installation
------------

::

pip install opentelemetry-ext-asyncpg

References
----------

* `OpenTelemetry asyncpg Integration <https://opentelemetry-python.readthedocs.io/en/latest/ext/asyncpg/asyncpg.html>`_
* `OpenTelemetry Project <https://opentelemetry.io/>`_
55 changes: 55 additions & 0 deletions ext/opentelemetry-ext-asyncpg/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[metadata]
name = opentelemetry-ext-asyncpg
description = OpenTelemetry instrumentation for AsyncPG
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
author_email = [email protected]
url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-asyncpg
platforms = any
license = Apache-2.0
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8

[options]
python_requires = >=3.5
package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api == 0.10.dev0
opentelemetry-instrumentation == 0.10.dev0
asyncpg >= 0.12.0

[options.extras_require]
test =
opentelemetry-test == 0.10.dev0

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
asyncpg = opentelemetry.ext.asyncpg:AsyncPGInstrumentor
26 changes: 26 additions & 0 deletions ext/opentelemetry-ext-asyncpg/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import setuptools

BASE_DIR = os.path.dirname(__file__)
VERSION_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "ext", "asyncpg", "version.py"
)
PACKAGE_INFO = {}
with open(VERSION_FILENAME) as f:
exec(f.read(), PACKAGE_INFO)

setuptools.setup(version=PACKAGE_INFO["__version__"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This library allows tracing PostgreSQL queries made by the
`asyncpg <https://magicstack.github.io/asyncpg/current/>`_ library.
Usage
-----
.. code-block:: python
import asyncpg
from opentelemetry.ext.asyncpg import AsyncPGInstrumentor
# You can optionally pass a custom TracerProvider to AsyncPGInstrumentor.instrument()
AsyncPGInstrumentor().instrument()
conn = await asyncpg.connect(user='user', password='password',
database='database', host='127.0.0.1')
values = await conn.fetch('''SELECT 42;''')
API
---
"""

import asyncpg
import wrapt
from asyncpg import exceptions

from opentelemetry import trace
from opentelemetry.ext.asyncpg.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCanonicalCode

_APPLIED = "_opentelemetry_tracer"


def _exception_to_canonical_code(exc: Exception) -> StatusCanonicalCode:
if isinstance(
exc, (exceptions.InterfaceError, exceptions.SyntaxOrAccessError),
):
return StatusCanonicalCode.INVALID_ARGUMENT
if isinstance(exc, exceptions.IdleInTransactionSessionTimeoutError):
return StatusCanonicalCode.DEADLINE_EXCEEDED
return StatusCanonicalCode.UNKNOWN


def _hydrate_span_from_args(connection, query, parameters) -> dict:
span_attributes = {"db.type": "sql"}

params = getattr(connection, "_params", None)
span_attributes["db.instance"] = getattr(params, "database", None)
span_attributes["db.user"] = getattr(params, "user", None)

if query is not None:
span_attributes["db.statement"] = query

if parameters is not None and len(parameters) > 0:
span_attributes["db.statement.parameters"] = str(parameters)

return span_attributes


async def _do_execute(func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(instance, args[0], args[1:])
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result


class AsyncPGInstrumentor(BaseInstrumentor):
def _instrument(self, **kwargs):
tracer_provider = kwargs.get(
"tracer_provider", trace.get_tracer_provider()
)
setattr(
asyncpg,
_APPLIED,
tracer_provider.get_tracer("asyncpg", __version__),
)
for method in [
"Connection.execute",
"Connection.executemany",
"Connection.fetch",
"Connection.fetchval",
"Connection.fetchrow",
]:
wrapt.wrap_function_wrapper(
"asyncpg.connection", method, _do_execute
)

def _uninstrument(self, **__):
delattr(asyncpg, _APPLIED)
for method in [
"execute",
"executemany",
"fetch",
"fetchval",
"fetchrow",
]:
unwrap(asyncpg.Connection, method)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.10.dev0"
Empty file.
35 changes: 35 additions & 0 deletions ext/opentelemetry-ext-asyncpg/tests/test_asyncpg_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncpg
from asyncpg import Connection

from opentelemetry.ext.asyncpg import AsyncPGInstrumentor
from opentelemetry.test.test_base import TestBase


class TestAsyncPGInstrumentation(TestBase):
def test_instrumentation_flags(self):
AsyncPGInstrumentor().instrument()
self.assertTrue(hasattr(asyncpg, "_opentelemetry_tracer"))
AsyncPGInstrumentor().uninstrument()
self.assertFalse(hasattr(asyncpg, "_opentelemetry_tracer"))

def test_duplicated_instrumentation(self):
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().uninstrument()
for method_name in ["execute", "fetch"]:
method = getattr(Connection, method_name, None)
self.assertFalse(
hasattr(method, "_opentelemetry_ext_asyncpg_applied")
)

def test_duplicated_uninstrumentation(self):
AsyncPGInstrumentor().instrument()
AsyncPGInstrumentor().uninstrument()
AsyncPGInstrumentor().uninstrument()
AsyncPGInstrumentor().uninstrument()
for method_name in ["execute", "fetch"]:
method = getattr(Connection, method_name, None)
self.assertFalse(
hasattr(method, "_opentelemetry_ext_asyncpg_applied")
)
Loading

0 comments on commit a7f0b75

Please sign in to comment.