-
Notifications
You must be signed in to change notification settings - Fork 418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aiohttp client tracing #1372
aiohttp client tracing #1372
Changes from all commits
907a086
ca55ba4
26b708b
cd7e57f
1bb231a
837e8c2
4ebcc16
ad585cd
79d71be
a7da295
e715c9f
7649d89
c0ab3df
d5562e1
7b82dfc
8cc236e
2b7c8f1
ca15385
ef9a5a0
2c4fe87
da5dc97
9a5508e
df29727
13f8536
8c29099
93c13a7
98480dd
d3e49be
0ce4b56
b9ade72
28c67b6
64e71aa
90b65e3
950d65c
064e4d4
5afcdc0
b1257cc
78445de
69d59a9
34bb4c9
f82f666
8ec719d
f385deb
79ed2e0
e21d3ae
c58b95b
0107224
ecd49de
517d0be
2f1085e
3f7aa11
14823c3
8402096
4b29d60
77fdcc5
3b7ce2f
efb5e08
766609a
a74ff3d
2285b98
c54c031
421e88d
45b3f1b
2885f9b
984eb96
cb0e6a7
e977aa1
260e54b
f212020
4eadc04
249eea4
378c404
1f71ae9
caf025e
71511f3
2e86f11
e5924e7
5a39ced
eb046ce
d1eca92
e285168
e572209
e9d8056
d823662
dd47c2c
8a7ce47
a019984
c3b2cd9
ea3ba40
c0ede6e
69739aa
0c0f379
aa03185
8ebd4a2
5d747ba
be038a9
8b8c769
0f8e82c
58f78db
f3845d6
09813da
534d553
839e5dd
a6ab0c9
5af9b51
af07ff9
3f53e27
99fb053
d3bc98e
ae9521c
2232332
e90a0a8
d3b687d
3aca41e
4e4e7e3
50c487a
6619697
3021151
e13fb26
d776869
550b804
f2beee9
528bac5
3f169fa
ffeb9d3
ce78aad
51979e8
689c708
304c40a
e8d93c4
ed8996d
14a266f
dae156e
5da14d7
ff3e8f6
0f2a247
31df275
0028703
c86cf18
6dfb0b0
7566eb1
1315c93
85fa0cc
535cb59
1239e95
748a13d
3f89fe8
2b7629d
2b7c919
fc98f0a
be861e5
28b53b3
54eddb2
08cb583
ad8d45e
5023571
4dfe291
abb7853
5ba77f9
f422ad0
a732384
c7765fd
108e90a
5f4e19a
c24f472
f7276cb
54d7b2f
952e624
4a1d4b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,124 @@ | ||
import asyncio | ||
import functools | ||
|
||
from ..asyncio import context_provider | ||
from ...compat import stringify | ||
from ...constants import ANALYTICS_SAMPLE_RATE_KEY, SPAN_MEASURED_KEY | ||
from ...ext import SpanTypes, http | ||
from ...propagation.http import HTTPPropagator | ||
from ...pin import Pin | ||
from ...settings import config | ||
from ...context import Context | ||
from ...utils.wrappers import iswrapped | ||
from .patch import _WrappedStreamReader | ||
|
||
try: | ||
from aiohttp.web import middleware | ||
|
||
CONFIG_KEY = 'datadog_trace' | ||
REQUEST_CONTEXT_KEY = 'datadog_context' | ||
REQUEST_CONFIG_KEY = '__datadog_trace_config' | ||
REQUEST_SPAN_KEY = '__datadog_request_span' | ||
AIOHTTP_HAS_MIDDLEWARE = True | ||
except ImportError: | ||
AIOHTTP_HAS_MIDDLEWARE = False | ||
|
||
def middleware(f): | ||
return f | ||
|
||
@asyncio.coroutine | ||
def trace_middleware(app, handler): | ||
import aiohttp.streams | ||
|
||
|
||
CONFIG_KEY = "datadog_trace" | ||
REQUEST_CONTEXT_KEY = "datadog_context" | ||
REQUEST_CONFIG_KEY = "__datadog_trace_config" | ||
REQUEST_SPAN_KEY = "__datadog_request_span" | ||
|
||
propagator = HTTPPropagator() | ||
|
||
|
||
config._add("aiohttp_server", dict( | ||
service="aiohttp.server", | ||
distributed_tracing_enabled=True, | ||
)) | ||
|
||
|
||
@middleware | ||
async def trace_middleware_2x(request, handler, app=None): | ||
# application configs | ||
if app is None: | ||
app = request.app | ||
|
||
tracer = app[CONFIG_KEY]["tracer"] | ||
service = app[CONFIG_KEY]["service"] | ||
distributed_tracing = app[CONFIG_KEY]["distributed_tracing_enabled"] | ||
|
||
# Create a new context based on the propagated information. | ||
if distributed_tracing: | ||
context = propagator.extract(request.headers) | ||
|
||
if context.trace_id: | ||
tracer.context_provider.activate(context) | ||
else: | ||
# In case a non-distributed request comes after a distributed request we need to clear out | ||
# the previous context | ||
tracer.context_provider.activate(Context()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reasoning here for clearing the context? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated description to state: # In case a non-distributed request comes after a distributed request we need to clear out
# the previous context There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @thehesiod This should be handled by the ContextVars-enabled default context manager:
|
||
|
||
# trace the handler | ||
request_span = tracer.trace("aiohttp.request", service=service, span_type=SpanTypes.WEB) | ||
# Unset resource as we'll be setting this in all cases | ||
request_span.resource = None | ||
|
||
request_span.set_tag(SPAN_MEASURED_KEY) | ||
|
||
# Configure trace search sample rate | ||
analytics_enabled = app[CONFIG_KEY]["analytics_enabled"] | ||
if (config.analytics_enabled and analytics_enabled is not False) or analytics_enabled is True: | ||
request_span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, app[CONFIG_KEY].get("analytics_sample_rate", True)) | ||
|
||
# attach the context and the root span to the request; the Context | ||
# may be freely used by the application code | ||
request[REQUEST_CONTEXT_KEY] = request_span.context | ||
request[REQUEST_SPAN_KEY] = request_span | ||
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY] | ||
|
||
if request.match_info.route.resource: | ||
# collect the resource name based on http resource type | ||
res_info = request.match_info.route.resource.get_info() | ||
|
||
resource = None | ||
if res_info.get("path"): | ||
resource = res_info.get("path") | ||
elif res_info.get("formatter"): | ||
resource = res_info.get("formatter") | ||
elif res_info.get("prefix"): | ||
resource = res_info.get("prefix") | ||
|
||
if resource: | ||
# prefix the resource name by the http method | ||
resource = "{} {}".format(request.method, resource) | ||
request_span.resource = resource | ||
|
||
request_span.set_tag(http.METHOD, request.method) | ||
request_span.set_tag(http.URL, request.url.with_query(None)) | ||
trace_query_string = request[REQUEST_CONFIG_KEY].get("trace_query_string") | ||
if trace_query_string is None: | ||
trace_query_string = config._http.trace_query_string | ||
if trace_query_string: | ||
request_span.set_tag(http.QUERY_STRING, request.query_string) | ||
|
||
if not iswrapped(request._payload) and isinstance(request._payload, aiohttp.streams.StreamReader): | ||
tags = {tag: request_span.get_tag(tag) for tag in {http.URL, http.METHOD, http.QUERY_STRING} | ||
if request_span.get_tag(tag)} | ||
|
||
pin = Pin(service, tracer=tracer, tags=tags, _config=config.aiohttp_server) | ||
request._payload = _WrappedStreamReader(request._payload, pin, request_span.trace_id, request_span.span_id, | ||
request_span.resource) | ||
|
||
try: | ||
response = await handler(request) | ||
return response | ||
except Exception: | ||
request_span.set_traceback() | ||
raise | ||
|
||
|
||
async def trace_middleware_1x(app, handler): | ||
""" | ||
``aiohttp`` middleware that traces the handler execution. | ||
Because handlers are run in different tasks for each request, we attach the Context | ||
|
@@ -24,54 +127,13 @@ def trace_middleware(app, handler): | |
* the Task is used by the internal automatic instrumentation | ||
* the ``Context`` attached to the request can be freely used in the application code | ||
""" | ||
@asyncio.coroutine | ||
def attach_context(request): | ||
# application configs | ||
tracer = app[CONFIG_KEY]['tracer'] | ||
service = app[CONFIG_KEY]['service'] | ||
distributed_tracing = app[CONFIG_KEY]['distributed_tracing_enabled'] | ||
|
||
# Create a new context based on the propagated information. | ||
if distributed_tracing: | ||
propagator = HTTPPropagator() | ||
context = propagator.extract(request.headers) | ||
# Only need to active the new context if something was propagated | ||
if context.trace_id: | ||
tracer.context_provider.activate(context) | ||
|
||
# trace the handler | ||
request_span = tracer.trace( | ||
'aiohttp.request', | ||
service=service, | ||
span_type=SpanTypes.WEB, | ||
) | ||
request_span.set_tag(SPAN_MEASURED_KEY) | ||
|
||
# Configure trace search sample rate | ||
# DEV: aiohttp is special case maintains separate configuration from config api | ||
analytics_enabled = app[CONFIG_KEY]['analytics_enabled'] | ||
if (config.analytics_enabled and analytics_enabled is not False) or analytics_enabled is True: | ||
request_span.set_tag( | ||
ANALYTICS_SAMPLE_RATE_KEY, | ||
app[CONFIG_KEY].get('analytics_sample_rate', True) | ||
) | ||
|
||
# attach the context and the root span to the request; the Context | ||
# may be freely used by the application code | ||
request[REQUEST_CONTEXT_KEY] = request_span.context | ||
request[REQUEST_SPAN_KEY] = request_span | ||
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY] | ||
try: | ||
response = yield from handler(request) | ||
return response | ||
except Exception: | ||
request_span.set_traceback() | ||
raise | ||
return attach_context | ||
|
||
|
||
@asyncio.coroutine | ||
def on_prepare(request, response): | ||
return functools.partial(trace_middleware_2x, handler=handler, app=app) | ||
|
||
|
||
trace_middleware = trace_middleware_2x if AIOHTTP_HAS_MIDDLEWARE else trace_middleware_1x | ||
|
||
|
||
async def on_prepare(request, response): | ||
""" | ||
The on_prepare signal is used to close the request span that is created during | ||
the trace middleware execution. | ||
|
@@ -82,39 +144,18 @@ def on_prepare(request, response): | |
return | ||
|
||
# default resource name | ||
resource = stringify(response.status) | ||
|
||
if request.match_info.route.resource: | ||
# collect the resource name based on http resource type | ||
res_info = request.match_info.route.resource.get_info() | ||
|
||
if res_info.get('path'): | ||
resource = res_info.get('path') | ||
elif res_info.get('formatter'): | ||
resource = res_info.get('formatter') | ||
elif res_info.get('prefix'): | ||
resource = res_info.get('prefix') | ||
if not request_span.resource: | ||
request_span.resource = stringify(response.status) | ||
|
||
# prefix the resource name by the http method | ||
resource = '{} {}'.format(request.method, resource) | ||
request_span.set_tag(http.STATUS_CODE, response.status) | ||
|
||
if 500 <= response.status < 600: | ||
request_span.error = 1 | ||
|
||
request_span.resource = resource | ||
request_span.set_tag('http.method', request.method) | ||
request_span.set_tag('http.status_code', response.status) | ||
request_span.set_tag(http.URL, request.url.with_query(None)) | ||
# DEV: aiohttp is special case maintains separate configuration from config api | ||
trace_query_string = request[REQUEST_CONFIG_KEY].get('trace_query_string') | ||
if trace_query_string is None: | ||
trace_query_string = config._http.trace_query_string | ||
if trace_query_string: | ||
request_span.set_tag(http.QUERY_STRING, request.query_string) | ||
request_span.finish() | ||
|
||
|
||
def trace_app(app, tracer, service='aiohttp-web'): | ||
def trace_app(app, tracer, service="aiohttp-web"): | ||
""" | ||
Tracing function that patches the ``aiohttp`` application so that it will be | ||
traced using the given ``tracer``. | ||
|
@@ -125,17 +166,17 @@ def trace_app(app, tracer, service='aiohttp-web'): | |
""" | ||
|
||
# safe-guard: don't trace an application twice | ||
if getattr(app, '__datadog_trace', False): | ||
if getattr(app, "__datadog_trace", False): | ||
return | ||
setattr(app, '__datadog_trace', True) | ||
setattr(app, "__datadog_trace", True) | ||
|
||
# configure datadog settings | ||
app[CONFIG_KEY] = { | ||
'tracer': tracer, | ||
'service': config._get_service(default=service), | ||
'distributed_tracing_enabled': True, | ||
'analytics_enabled': None, | ||
'analytics_sample_rate': 1.0, | ||
"tracer": tracer, | ||
"service": config._get_service(default=service), | ||
"distributed_tracing_enabled": config.aiohttp_server.distributed_tracing_enabled, | ||
"analytics_enabled": None, | ||
"analytics_sample_rate": 1.0, | ||
} | ||
|
||
# the tracer must work with asynchronous Context propagation | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we instead make this two level?
ex:
conf.add("aiohttp", dict(server=dict(...), client=(...))
. otherwise is there any reason for using_
instead of.
?