Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aiohttp): add client integration (backport #3362) #3407

Merged
merged 2 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,12 @@ jobs:

aiohttp:
<<: *machine_executor
parallelism: 6
steps:
- run_test:
pattern: 'aiohttp'
snapshot: true
docker_services: 'httpbin_local'

asgi:
<<: *contrib_job_small
Expand Down
40 changes: 38 additions & 2 deletions ddtrace/contrib/aiohttp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,43 @@
"""
The ``aiohttp`` integration traces all requests defined in the application handlers.
The ``aiohttp`` integration traces requests made with the client or to the server.

Automatic instrumentation is not available for ``aiohttp.web.Application``, instead
The client is automatically instrumented while the server must be manually instrumented using middleware.

Client
******

Enabling
~~~~~~~~

The client integration is enabled automatically when using
:ref:`ddtrace-run<ddtracerun>` or :func:`patch_all()<ddtrace.patch_all>`.

Or use :func:`patch()<ddtrace.patch>` to manually enable the integration::
brettlangdon marked this conversation as resolved.
Show resolved Hide resolved

from ddtrace import patch
patch(aiohttp=True)


Global Configuration
~~~~~~~~~~~~~~~~~~~~

.. py:data:: ddtrace.config.aiohttp_client['distributed_tracing']

Include distributed tracing headers in requests sent from the aiohttp client.

This option can also be set with the ``DD_AIOHTTP_CLIENT_DISTRIBUTED_TRACING``
environment variable.

Default: ``True``


Server
******

Enabling
~~~~~~~~

Automatic instrumentation is not available for the server, instead
the provided ``trace_app`` function must be used::

from aiohttp import web
Expand Down
116 changes: 115 additions & 1 deletion ddtrace/contrib/aiohttp/patch.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,139 @@
import os

from yarl import URL

from ddtrace import config
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils import get_argument_value
from ddtrace.internal.utils.formats import asbool
from ddtrace.vendor import wrapt

from ...ext import SpanTypes
from ...internal.compat import parse
from ...pin import Pin
from ...propagation.http import HTTPPropagator
from ..trace_utils import ext_service
from ..trace_utils import set_http_meta
from ..trace_utils import unwrap
from ..trace_utils import with_traced_module as with_traced_module_sync
from ..trace_utils import wrap
from ..trace_utils_async import with_traced_module


log = get_logger(__name__)


# Server config
config._add(
"aiohttp",
dict(distributed_tracing=True),
)

config._add(
"aiohttp_client",
dict(
distributed_tracing=True,
distributed_tracing=asbool(os.getenv("DD_AIOHTTP_CLIENT_DISTRIBUTED_TRACING", True)),
),
)


class _WrappedConnectorClass(wrapt.ObjectProxy):
def __init__(self, obj, pin):
super().__init__(obj)
pin.onto(self)

async def connect(self, req, *args, **kwargs):
pin = Pin.get_from(self)
with pin.tracer.trace("%s.connect" % self.__class__.__name__):
result = await self.__wrapped__.connect(req, *args, **kwargs)
return result

async def _create_connection(self, req, *args, **kwargs):
pin = Pin.get_from(self)
with pin.tracer.trace("%s._create_connection" % self.__class__.__name__):
result = await self.__wrapped__._create_connection(req, *args, **kwargs)
return result


@with_traced_module
async def _traced_clientsession_request(aiohttp, pin, func, instance, args, kwargs):
method = get_argument_value(args, kwargs, 0, "method") # type: str
url = URL(get_argument_value(args, kwargs, 1, "url")) # type: URL
params = kwargs.get("params")
headers = kwargs.get("headers") or {}

with pin.tracer.trace(
"aiohttp.request", span_type=SpanTypes.HTTP, service=ext_service(pin, config.aiohttp_client)
) as span:
if pin._config["distributed_tracing"]:
HTTPPropagator.inject(span.context, headers)
kwargs["headers"] = headers

# Params can be included separate of the URL so the URL has to be constructed
# with the passed params.
url_str = str(url.update_query(params) if params else url)
parsed_url = parse.urlparse(url_str)
set_http_meta(
span,
config.aiohttp_client,
method=method,
url=url_str,
query=parsed_url.query,
request_headers=headers,
)
resp = await func(*args, **kwargs) # type: aiohttp.ClientResponse
set_http_meta(
span, config.aiohttp_client, response_headers=resp.headers, status_code=resp.status, status_msg=resp.reason
)
return resp


@with_traced_module_sync
def _traced_clientsession_init(aiohttp, pin, func, instance, args, kwargs):
func(*args, **kwargs)
instance._connector = _WrappedConnectorClass(instance._connector, pin)


def _patch_client(aiohttp):
Pin().onto(aiohttp)
pin = Pin(_config=config.aiohttp_client.copy())
pin.onto(aiohttp.ClientSession)

wrap("aiohttp", "ClientSession.__init__", _traced_clientsession_init(aiohttp))
wrap("aiohttp", "ClientSession._request", _traced_clientsession_request(aiohttp))


def patch():
# Legacy patch aiohttp_jinja2
from ddtrace.contrib.aiohttp_jinja2 import patch as aiohttp_jinja2_patch

aiohttp_jinja2_patch()

import aiohttp

if getattr(aiohttp, "_datadog_patch", False):
return

_patch_client(aiohttp)

setattr(aiohttp, "_datadog_patch", True)


def _unpatch_client(aiohttp):
unwrap(aiohttp.ClientSession, "__init__")
unwrap(aiohttp.ClientSession, "_request")


def unpatch():
from ddtrace.contrib.aiohttp_jinja2 import unpatch as aiohttp_jinja2_unpatch

aiohttp_jinja2_unpatch()

import aiohttp

if not getattr(aiohttp, "_datadog_patch", False):
return

_unpatch_client(aiohttp)

setattr(aiohttp, "_datadog_patch", False)
39 changes: 39 additions & 0 deletions ddtrace/contrib/trace_utils_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
async tracing utils

Note that this module should only be imported in Python 3.5+.
"""
from ddtrace import Pin
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)


def with_traced_module(func):
"""Async version of trace_utils.with_traced_module.
Usage::

@with_traced_module
async def my_traced_wrapper(django, pin, func, instance, args, kwargs):
# Do tracing stuff
pass

def patch():
import django
wrap(django.somefunc, my_traced_wrapper(django))
"""

def with_mod(mod):
async def wrapper(wrapped, instance, args, kwargs):
pin = Pin._find(instance, mod)
if pin and not pin.enabled():
return await wrapped(*args, **kwargs)
elif not pin:
log.debug("Pin not found for traced method %r", wrapped)
return await wrapped(*args, **kwargs)
return await func(mod, pin, wrapped, instance, args, kwargs)

return wrapper

return with_mod
4 changes: 3 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ contacting support.
+==================================================+===============+================+
| :ref:`aiobotocore` | >= 0.2.3 | No |
+--------------------------------------------------+---------------+----------------+
| :ref:`aiohttp` | >= 2.0 | No |
| :ref:`aiohttp` (client) | >= 2.0 | Yes |
+--------------------------------------------------+---------------+----------------+
| :ref:`aiohttp` (server) | >= 2.0 | No |
+--------------------------------------------------+---------------+----------------+
| :ref:`aiopg` | >= 0.12.0, | Yes |
| | <= 0.16 | |
Expand Down
6 changes: 6 additions & 0 deletions releasenotes/notes/aiohttp-98ae9ce70dda1dbc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
aiohttp: add client integration. This integration traces requests made using the aiohttp client and includes support
for distributed tracing. See `the documentation <https://ddtrace.readthedocs.io/en/stable/integrations.html#aiohttp>`_
for more information.
31 changes: 27 additions & 4 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1400,11 +1400,34 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
},
),
Venv(
# Python 3.5 is deprecated for aiohttp >= 3.0
pys=select_pys(min_version="3.6", max_version="3.9"),
# pytest-asyncio is incompatible with aiohttp 3.0+ in Python 3.6
pys="3.6",
pkgs={
"aiohttp": [
"~=3.0",
"~=3.2",
"~=3.4",
"~=3.6",
"~=3.8",
latest,
],
"aiohttp_jinja2": latest,
"yarl": "~=1.0",
},
),
Venv(
pys=select_pys(min_version="3.7", max_version="3.10"),
pkgs={
"aiohttp": ["~=3.0", "~=3.1", "~=3.2", "~=3.3", "~=3.4", "~=3.5", "~=3.6"],
"aiohttp_jinja2": "~=0.15",
"pytest-asyncio": [latest],
"aiohttp": [
"~=3.0",
"~=3.2",
"~=3.4",
"~=3.6",
"~=3.8",
latest,
],
"aiohttp_jinja2": latest,
"yarl": "~=1.0",
},
),
Expand Down
10 changes: 5 additions & 5 deletions tests/contrib/aiohttp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from ddtrace.contrib.aiohttp.middlewares import trace_app
from ddtrace.contrib.aiohttp.patch import patch
from ddtrace.contrib.aiohttp_jinja2.patch import patch as patch_jinja2
from ddtrace.internal.utils import version
from ddtrace.pin import Pin

Expand All @@ -24,7 +24,7 @@ def app_tracer(tracer, loop):

@pytest.fixture
def patched_app_tracer(app_tracer):
patch()
patch_jinja2()
app, tracer = app_tracer
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand All @@ -34,7 +34,7 @@ def patched_app_tracer(app_tracer):

@pytest.fixture
def untraced_app_tracer(tracer, loop):
patch()
patch_jinja2()
app = setup_app()
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand All @@ -53,7 +53,7 @@ async def app_tracer(tracer, loop):

@pytest.fixture
async def patched_app_tracer(app_tracer):
patch()
patch_jinja2()
app, tracer = app_tracer
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand All @@ -63,7 +63,7 @@ async def patched_app_tracer(app_tracer):

@pytest.fixture
async def untraced_app_tracer(tracer, loop):
patch()
patch_jinja2()
app = setup_app()
Pin.override(aiohttp_jinja2, tracer=tracer)
return app, tracer
Expand Down
Loading