Skip to content

Commit

Permalink
Merge branch 'thehesiod-aio-utils' into thehesiod-aiohttp
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod authored Jun 22, 2017
2 parents 45b3f1b + e977aa1 commit 4eadc04
Show file tree
Hide file tree
Showing 15 changed files with 717 additions and 65 deletions.
15 changes: 15 additions & 0 deletions ddtrace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ def __init__(self):
self._finished_spans = 0
self._current_span = None
self._lock = threading.Lock()
self._base_parent_span_id = None
self._base_parent_trace_id = None

def set_base_parent_span_ids(self, trace_id, span_id):
"""
Will set the base parent trace/span IDs. These can be used as fall-backs
for distributed tracing in case a full Span is not available.
"""
with self._lock:
self._base_parent_trace_id = trace_id
self._base_parent_span_id = span_id

def get_base_parent_span_ids(self):
""" Returns tuple of base trace_id, span_id for distributed tracing."""
return self._base_parent_trace_id, self._base_parent_span_id

def get_current_span(self):
"""
Expand Down
156 changes: 142 additions & 14 deletions ddtrace/contrib/aiohttp/patch.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,167 @@
import asyncio
import functools
import logging
import wrapt
from yarl import URL

from ...pin import Pin
from ddtrace.util import unwrap

from .middlewares import _SPAN_MIN_ERROR, PARENT_TRACE_HEADER_ID, \
PARENT_SPAN_HEADER_ID
from ...pin import Pin
from ...ext import http as ext_http
from ..httplib.patch import should_skip_request
import aiohttp.client


try:
# instrument external packages only if they're available
import aiohttp_jinja2
from .template import _trace_render_template

template_module = True
except ImportError:
template_module = False
_trace_render_template = None


log = logging.getLogger(__name__)


class _WrappedResponseClass(wrapt.ObjectProxy):
@asyncio.coroutine
def start(self, *args, **kwargs):
# This will get called once per connect
pin = Pin.get_from(self)

# This will parent correctly as we'll always have an enclosing span
with pin.tracer.trace('{}.start'.format(self.__class__.__name__),
span_type=ext_http.TYPE) as span:
_set_request_tags(span, self.url)
result = yield from self.__wrapped__.start(*args, **kwargs)
span.set_tag(ext_http.STATUS_CODE, self.status)
span.error = int(_SPAN_MIN_ERROR <= self.status)

return result

@asyncio.coroutine
def read(self, *args, **kwargs):
pin = Pin.get_from(self)
# This will not have a parent as the request completed
parent_span = getattr(self, '_datadog_span')
with pin.tracer.trace('{}.read'.format(self.__class__.__name__),
span_type=ext_http.TYPE) as span:
span.trace_id = parent_span.trace_id
span.parent_id = parent_span.span_id
_set_request_tags(span, self.url)
result = yield from self.__wrapped__.read(*args, **kwargs)
span.set_tag(ext_http.STATUS_CODE, self.status)
span.error = int(_SPAN_MIN_ERROR <= self.status)

return result


def _create_wrapped_response(client_session, cls, instance, args, kwargs):
obj = _WrappedResponseClass(cls(*args, **kwargs))
Pin.get_from(client_session).onto(obj)
span = getattr(client_session, '_datadog_span')
setattr(obj, '_datadog_span', span)
return obj


def _wrap_clientsession_init(func, instance, args, kwargs):
response_class = kwargs.get('response_class', aiohttp.client.ClientResponse)
wrapper = functools.partial(_create_wrapped_response, instance)
kwargs['response_class'] = wrapt.FunctionWrapper(response_class, wrapper)

return func(*args, **kwargs)


def _set_request_tags(span, url):
if (url.scheme == 'http' and url.port == 80) or \
(url.scheme == 'https' and url.port == 443):
port = ''
else:
port = url.port

def patch():
url_str = '{scheme}://{host}{port}{path}'.format(
scheme=url.scheme, host=url.host, port=port, path=url.path)
span.set_tag(ext_http.URL, url_str)
span.resource = url.path


@asyncio.coroutine
def _wrap_request(enable_distributed, func, instance, args, kwargs):
# Use any attached tracer if available, otherwise use the global tracer
pin = Pin.get_from(instance)
method, url = args[0], URL(args[1])

if should_skip_request(pin, url):
result = yield from func(*args, **kwargs)
return result

# Create a new span and attach to this instance (so we can
# retrieve/update/close later on the response)
# Note that we aren't tracing redirects
with pin.tracer.trace('ClientSession.request',
span_type=ext_http.TYPE) as span:
setattr(instance, '_datadog_span', span)

if enable_distributed:
headers = kwargs.get('headers', {})
headers[PARENT_TRACE_HEADER_ID] = str(span.trace_id)
headers[PARENT_SPAN_HEADER_ID] = str(span.span_id)
kwargs['headers'] = headers

_set_request_tags(span, url)
span.set_tag(ext_http.METHOD, method)

resp = yield from func(*args, **kwargs)

span.set_tag(ext_http.STATUS_CODE, resp.status)
span.error = int(_SPAN_MIN_ERROR <= resp.status)

return resp


def patch(tracer=None, enable_distributed=False):
"""
Patch aiohttp third party modules:
* aiohttp_jinja2
* aiohttp ClientSession request
:param tracer: tracer to use
:param enable_distributed: enable aiohttp client to set parent span IDs in
requests
"""
if template_module:
if getattr(aiohttp_jinja2, '__datadog_patch', False):
return

_w = wrapt.wrap_function_wrapper
if not getattr(aiohttp, '__datadog_patch', False):
setattr(aiohttp, '__datadog_patch', True)
pin = Pin(app='aiohttp', service=None, app_type=ext_http.TYPE,
tracer=tracer)
pin.onto(aiohttp.client.ClientSession)

_w('aiohttp.client', 'ClientSession.__init__', _wrap_clientsession_init)

wrapper = functools.partial(_wrap_request, enable_distributed)
_w('aiohttp.client', 'ClientSession._request', wrapper)

if _trace_render_template and \
not getattr(aiohttp_jinja2, '__datadog_patch', False):
setattr(aiohttp_jinja2, '__datadog_patch', True)

_w = wrapt.wrap_function_wrapper
_w('aiohttp_jinja2', 'render_template', _trace_render_template)
Pin(app='aiohttp', service=None, app_type='web').onto(aiohttp_jinja2)
Pin(app='aiohttp', service=None, app_type='web',
tracer=tracer).onto(aiohttp_jinja2)


def unpatch():
"""
Remove tracing from patched modules.
"""
if template_module:
if getattr(aiohttp_jinja2, '__datadog_patch', False):
setattr(aiohttp_jinja2, '__datadog_patch', False)
unwrap(aiohttp_jinja2, 'render_template')
if getattr(aiohttp, '__datadog_patch', False):
unwrap(aiohttp.client.ClientSession, '__init__')
unwrap(aiohttp.client.ClientSession, '_request')

if _trace_render_template and getattr(aiohttp_jinja2, '__datadog_patch',
False):
setattr(aiohttp_jinja2, '__datadog_patch', False)
unwrap(aiohttp_jinja2, 'render_template')
28 changes: 28 additions & 0 deletions ddtrace/contrib/aiopg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Instrument aiopg to report Postgres queries.
``patch`` will automatically patch your aiopg connection to make it work.
::
from ddtrace import Pin, patch
import aiopg
# If not patched yet, you can patch aiopg specifically
patch(aiopg=True)
# This will report a span with the default settings
async with aiopg.connect(DSN) as db:
with (await db.cursor()) as cursor:
await cursor.execute("select * from users where id = 1")
# Use a pin to specify metadata related to this connection
Pin.override(db, service='postgres-users')
"""
from ..util import require_modules

required_modules = ['aiopg']

with require_modules(required_modules) as missing_modules:
if not missing_modules:
from .patch import patch

__all__ = ['patch']
152 changes: 152 additions & 0 deletions ddtrace/contrib/aiopg/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# 3p
import asyncio
import aiopg.connection
from aiopg.utils import _ContextManager
import functools
import wrapt
import psycopg2.extensions

from ddtrace.contrib import dbapi
from ddtrace.contrib.psycopg.patch import _patch_extensions, \
patch_conn as psycppg_patch_conn
from ddtrace.ext import sql
from ddtrace import Pin


# Original connect method, we don't want the _ContextManager
_connect = aiopg.connection._connect


class AIOTracedCursor(wrapt.ObjectProxy):
""" TracedCursor wraps a psql cursor and traces it's queries. """

_datadog_pin = None
_datadog_name = None

def __init__(self, cursor, pin):
super(AIOTracedCursor, self).__init__(cursor)
self._datadog_pin = pin
name = pin.app or 'sql'
self._datadog_name = '%s.query' % name

@asyncio.coroutine
def _trace_method(self, method, resource, extra_tags, *args, **kwargs):
pin = self._datadog_pin
if not pin or not pin.enabled():
result = yield from method(*args, **kwargs) # noqa: E999
return result
service = pin.service

with pin.tracer.trace(self._datadog_name, service=service,
resource=resource) as s:
s.span_type = sql.TYPE
s.set_tag(sql.QUERY, resource)
s.set_tags(pin.tags)

for k, v in extra_tags.items():
s.set_tag(k, v)

try:
result = yield from method(*args, **kwargs)
return result
finally:
s.set_metric("db.rowcount", self.rowcount)

@asyncio.coroutine
def executemany(self, query, *args, **kwargs):
# FIXME[matt] properly handle kwargs here. arg names can be different
# with different libs.
result = yield from self._trace_method(
self.__wrapped__.executemany, query, {'sql.executemany': 'true'},
query, *args, **kwargs) # noqa: E999
return result

@asyncio.coroutine
def execute(self, query, *args, **kwargs):
result = yield from self._trace_method(
self.__wrapped__.execute, query, {}, query, *args, **kwargs)
return result

@asyncio.coroutine
def callproc(self, proc, args):
result = yield from self._trace_method(
self.__wrapped__.callproc, proc, {}, proc, args) # noqa: E999
return result

# aiopg doesn't support __enter__/__exit__ however we're adding it here to
# support unittests with both styles
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.__wrapped__.close()


class AIOTracedConnection(wrapt.ObjectProxy):
""" TracedConnection wraps a Connection with tracing code. """

_datadog_pin = None

def __init__(self, conn):
super(AIOTracedConnection, self).__init__(conn)
name = dbapi._get_vendor(conn)
Pin(service=name, app=name).onto(self)

def cursor(self, *args, **kwargs):
# unfortunately we also need to patch this method as otherwise "self"
# ends up being the aiopg connection object
coro = self._cursor(*args, **kwargs)
return _ContextManager(coro)

@asyncio.coroutine
def _cursor(self, *args, **kwargs):
cursor = yield from self.__wrapped__._cursor(*args, **kwargs) # noqa: E999
pin = self._datadog_pin
if not pin:
return cursor
return AIOTracedCursor(cursor, pin)


def patch(tracer=None):
""" Patch monkey patches psycopg's connection function
so that the connection's functions are traced.
"""
if getattr(aiopg, '_datadog_patch', False):
return
setattr(aiopg, '_datadog_patch', True)

wrapt.wrap_function_wrapper(aiopg.connection, '_connect', functools.partial(patched_connect, tracer=tracer))
_patch_extensions(_aiopg_extensions) # do this early just in case


def unpatch():
if getattr(aiopg, '_datadog_patch', False):
setattr(aiopg, '_datadog_patch', False)
aiopg.connection._connect = _connect


@asyncio.coroutine
def patched_connect(connect_func, _, args, kwargs, tracer=None):
conn = yield from connect_func(*args, **kwargs)
return psycppg_patch_conn(conn, tracer, traced_conn_cls=AIOTracedConnection)


def _extensions_register_type(func, _, args, kwargs):
def _unroll_args(obj, scope=None):
return obj, scope
obj, scope = _unroll_args(*args, **kwargs)

# register_type performs a c-level check of the object
# type so we must be sure to pass in the actual db connection
if scope and isinstance(scope, wrapt.ObjectProxy):
scope = scope.__wrapped__._conn

return func(obj, scope) if scope else func(obj)


# extension hooks
_aiopg_extensions = [
(psycopg2.extensions.register_type,
psycopg2.extensions, 'register_type',
_extensions_register_type),
]
Loading

0 comments on commit 4eadc04

Please sign in to comment.