Skip to content

Commit

Permalink
feat(aiohttp): add client integration (#3362)
Browse files Browse the repository at this point in the history
This PR adds support for aiohttp client.

Work was long ago started on this in (#294 (thank you @thehesiod!!!). There were a number of issues in the library that led to not being able to merge in the work such as async context management, service naming, integration configuration and more which have all since been addressed.

#294 and later #1372 included additional support for ClientResponse and StreamReader which are omitted here with the intention of introducing them as follow ups.

Co-authored-by: Alexander Mohr <[email protected]>
  • Loading branch information
Kyle-Verhoog and thehesiod authored Mar 9, 2022
1 parent 9dab78a commit 2ac1fc9
Show file tree
Hide file tree
Showing 18 changed files with 803 additions and 13 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,12 @@ jobs:

aiohttp:
<<: *machine_executor
parallelism: 6
steps:
- run_test:
pattern: 'aiohttp'
snapshot: true
docker_services: 'httpbin_local'

asgi:
<<: *contrib_job_small
Expand Down
40 changes: 38 additions & 2 deletions ddtrace/contrib/aiohttp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
"""
The ``aiohttp`` integration traces all requests defined in the application handlers.
The ``aiohttp`` integration traces requests made with the client or to the server.
Automatic instrumentation is not available for ``aiohttp.web.Application``, instead
The client is automatically instrumented while the server must be manually instrumented using middleware.
Client
******
Enabling
~~~~~~~~
The client integration is enabled automatically when using
:ref:`ddtrace-run<ddtracerun>` or :func:`patch_all()<ddtrace.patch_all>`.
Or use :func:`patch()<ddtrace.patch>` to manually enable the integration::
from ddtrace import patch
patch(aiohttp=True)
Global Configuration
~~~~~~~~~~~~~~~~~~~~
.. py:data:: ddtrace.config.aiohttp_client['distributed_tracing']
Include distributed tracing headers in requests sent from the aiohttp client.
This option can also be set with the ``DD_AIOHTTP_CLIENT_DISTRIBUTED_TRACING``
environment variable.
Default: ``True``
Server
******
Enabling
~~~~~~~~
Automatic instrumentation is not available for the server, instead
the provided ``trace_app`` function must be used::
from aiohttp import web
Expand Down
116 changes: 115 additions & 1 deletion ddtrace/contrib/aiohttp/patch.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,139 @@
import os

from yarl import URL

from ddtrace import config
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils import get_argument_value
from ddtrace.internal.utils.formats import asbool
from ddtrace.vendor import wrapt

from ...ext import SpanTypes
from ...internal.compat import parse
from ...pin import Pin
from ...propagation.http import HTTPPropagator
from ..trace_utils import ext_service
from ..trace_utils import set_http_meta
from ..trace_utils import unwrap
from ..trace_utils import with_traced_module as with_traced_module_sync
from ..trace_utils import wrap
from ..trace_utils_async import with_traced_module


log = get_logger(__name__)


# Server config
config._add(
"aiohttp",
dict(distributed_tracing=True),
)

config._add(
"aiohttp_client",
dict(
distributed_tracing=True,
distributed_tracing=asbool(os.getenv("DD_AIOHTTP_CLIENT_DISTRIBUTED_TRACING", True)),
),
)


class _WrappedConnectorClass(wrapt.ObjectProxy):
def __init__(self, obj, pin):
super().__init__(obj)
pin.onto(self)

async def connect(self, req, *args, **kwargs):
pin = Pin.get_from(self)
with pin.tracer.trace("%s.connect" % self.__class__.__name__):
result = await self.__wrapped__.connect(req, *args, **kwargs)
return result

async def _create_connection(self, req, *args, **kwargs):
pin = Pin.get_from(self)
with pin.tracer.trace("%s._create_connection" % self.__class__.__name__):
result = await self.__wrapped__._create_connection(req, *args, **kwargs)
return result


@with_traced_module
async def _traced_clientsession_request(aiohttp, pin, func, instance, args, kwargs):
method = get_argument_value(args, kwargs, 0, "method") # type: str
url = URL(get_argument_value(args, kwargs, 1, "url")) # type: URL
params = kwargs.get("params")
headers = kwargs.get("headers") or {}

with pin.tracer.trace(
"aiohttp.request", span_type=SpanTypes.HTTP, service=ext_service(pin, config.aiohttp_client)
) as span:
if pin._config["distributed_tracing"]:
HTTPPropagator.inject(span.context, headers)
kwargs["headers"] = headers

# Params can be included separate of the URL so the URL has to be constructed
# with the passed params.
url_str = str(url.update_query(params) if params else url)
parsed_url = parse.urlparse(url_str)
set_http_meta(
span,
config.aiohttp_client,
method=method,
url=url_str,
query=parsed_url.query,
request_headers=headers,
)
resp = await func(*args, **kwargs) # type: aiohttp.ClientResponse
set_http_meta(
span, config.aiohttp_client, response_headers=resp.headers, status_code=resp.status, status_msg=resp.reason
)
return resp


@with_traced_module_sync
def _traced_clientsession_init(aiohttp, pin, func, instance, args, kwargs):
func(*args, **kwargs)
instance._connector = _WrappedConnectorClass(instance._connector, pin)


def _patch_client(aiohttp):
Pin().onto(aiohttp)
pin = Pin(_config=config.aiohttp_client.copy())
pin.onto(aiohttp.ClientSession)

wrap("aiohttp", "ClientSession.__init__", _traced_clientsession_init(aiohttp))
wrap("aiohttp", "ClientSession._request", _traced_clientsession_request(aiohttp))


def patch():
# Legacy patch aiohttp_jinja2
from ddtrace.contrib.aiohttp_jinja2 import patch as aiohttp_jinja2_patch

aiohttp_jinja2_patch()

import aiohttp

if getattr(aiohttp, "_datadog_patch", False):
return

_patch_client(aiohttp)

setattr(aiohttp, "_datadog_patch", True)


def _unpatch_client(aiohttp):
unwrap(aiohttp.ClientSession, "__init__")
unwrap(aiohttp.ClientSession, "_request")


def unpatch():
from ddtrace.contrib.aiohttp_jinja2 import unpatch as aiohttp_jinja2_unpatch

aiohttp_jinja2_unpatch()

import aiohttp

if not getattr(aiohttp, "_datadog_patch", False):
return

_unpatch_client(aiohttp)

setattr(aiohttp, "_datadog_patch", False)
39 changes: 39 additions & 0 deletions ddtrace/contrib/trace_utils_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
async tracing utils
Note that this module should only be imported in Python 3.5+.
"""
from ddtrace import Pin
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


def with_traced_module(func):
"""Async version of trace_utils.with_traced_module.
Usage::
@with_traced_module
async def my_traced_wrapper(django, pin, func, instance, args, kwargs):
# Do tracing stuff
pass
def patch():
import django
wrap(django.somefunc, my_traced_wrapper(django))
"""

def with_mod(mod):
async def wrapper(wrapped, instance, args, kwargs):
pin = Pin._find(instance, mod)
if pin and not pin.enabled():
return await wrapped(*args, **kwargs)
elif not pin:
log.debug("Pin not found for traced method %r", wrapped)
return await wrapped(*args, **kwargs)
return await func(mod, pin, wrapped, instance, args, kwargs)

return wrapper

return with_mod
4 changes: 3 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ contacting support.
+==================================================+===============+================+
| :ref:`aiobotocore` | >= 0.2.3 | No |
+--------------------------------------------------+---------------+----------------+
| :ref:`aiohttp` | >= 2.0 | No |
| :ref:`aiohttp` (client) | >= 2.0 | Yes |
+--------------------------------------------------+---------------+----------------+
| :ref:`aiohttp` (server) | >= 2.0 | No |
+--------------------------------------------------+---------------+----------------+
| :ref:`aiopg` | >= 0.12.0, | Yes |
| | <= 0.16 | |
Expand Down
6 changes: 6 additions & 0 deletions releasenotes/notes/aiohttp-98ae9ce70dda1dbc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
aiohttp: add client integration. This integration traces requests made using the aiohttp client and includes support
for distributed tracing. See `the documentation <https://ddtrace.readthedocs.io/en/stable/integrations.html#aiohttp>`_
for more information.
31 changes: 27 additions & 4 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,11 +1400,34 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
},
),
Venv(
# Python 3.5 is deprecated for aiohttp >= 3.0
pys=select_pys(min_version="3.6", max_version="3.9"),
# pytest-asyncio is incompatible with aiohttp 3.0+ in Python 3.6
pys="3.6",
pkgs={
"aiohttp": [
"~=3.0",
"~=3.2",
"~=3.4",
"~=3.6",
"~=3.8",
latest,
],
"aiohttp_jinja2": latest,
"yarl": "~=1.0",
},
),
Venv(
pys=select_pys(min_version="3.7", max_version="3.10"),
pkgs={
"aiohttp": ["~=3.0", "~=3.1", "~=3.2", "~=3.3", "~=3.4", "~=3.5", "~=3.6"],
"aiohttp_jinja2": "~=0.15",
"pytest-asyncio": [latest],
"aiohttp": [
"~=3.0",
"~=3.2",
"~=3.4",
"~=3.6",
"~=3.8",
latest,
],
"aiohttp_jinja2": latest,
"yarl": "~=1.0",
},
),
Expand Down
10 changes: 5 additions & 5 deletions tests/contrib/aiohttp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from ddtrace.contrib.aiohttp.middlewares import trace_app
from ddtrace.contrib.aiohttp.patch import patch
from ddtrace.contrib.aiohttp_jinja2.patch import patch as patch_jinja2
from ddtrace.internal.utils import version
from ddtrace.pin import Pin

Expand All @@ -24,7 +24,7 @@ def app_tracer(tracer, loop):

@pytest.fixture
def patched_app_tracer(app_tracer):
patch()
patch_jinja2()
app, tracer = app_tracer
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand All @@ -34,7 +34,7 @@ def patched_app_tracer(app_tracer):

@pytest.fixture
def untraced_app_tracer(tracer, loop):
patch()
patch_jinja2()
app = setup_app()
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand All @@ -53,7 +53,7 @@ async def app_tracer(tracer, loop):

@pytest.fixture
async def patched_app_tracer(app_tracer):
patch()
patch_jinja2()
app, tracer = app_tracer
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand All @@ -63,7 +63,7 @@ async def patched_app_tracer(app_tracer):

@pytest.fixture
async def untraced_app_tracer(tracer, loop):
patch()
patch_jinja2()
app = setup_app()
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand Down
Loading

0 comments on commit 2ac1fc9

Please sign in to comment.