-
Notifications
You must be signed in to change notification settings - Fork 416
/
middlewares.py
159 lines (129 loc) · 5.63 KB
/
middlewares.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from ddtrace import config
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
from .. import trace_utils
from ...constants import ANALYTICS_SAMPLE_RATE_KEY
from ...constants import SPAN_KIND
from ...constants import SPAN_MEASURED_KEY
from ...ext import SpanKind
from ...ext import SpanTypes
from ...ext import http
from ...internal.compat import stringify
from ...internal.schema import schematize_url_operation
from ..asyncio import context_provider
CONFIG_KEY = "datadog_trace"
REQUEST_CONTEXT_KEY = "datadog_context"
REQUEST_CONFIG_KEY = "__datadog_trace_config"
REQUEST_SPAN_KEY = "__datadog_request_span"
async def trace_middleware(app, handler):
"""
``aiohttp`` middleware that traces the handler execution.
Because handlers are run in different tasks for each request, we attach the Context
instance both to the Task and to the Request objects. In this way:
* the Task is used by the internal automatic instrumentation
* the ``Context`` attached to the request can be freely used in the application code
"""
async def attach_context(request):
# application configs
tracer = app[CONFIG_KEY]["tracer"]
service = app[CONFIG_KEY]["service"]
distributed_tracing = app[CONFIG_KEY]["distributed_tracing_enabled"]
# Create a new context based on the propagated information.
trace_utils.activate_distributed_headers(
tracer,
int_config=config.aiohttp,
request_headers=request.headers,
override=distributed_tracing,
)
# trace the handler
request_span = tracer.trace(
schematize_url_operation("aiohttp.request", protocol="http", direction=SpanDirection.INBOUND),
service=service,
span_type=SpanTypes.WEB,
)
request_span.set_tag(SPAN_MEASURED_KEY)
request_span.set_tag_str(COMPONENT, config.aiohttp.integration_name)
# set span.kind tag equal to type of request
request_span.set_tag_str(SPAN_KIND, SpanKind.SERVER)
# Configure trace search sample rate
# DEV: aiohttp is special case maintains separate configuration from config api
analytics_enabled = app[CONFIG_KEY]["analytics_enabled"]
if (config.analytics_enabled and analytics_enabled is not False) or analytics_enabled is True:
request_span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, app[CONFIG_KEY].get("analytics_sample_rate", True))
# attach the context and the root span to the request; the Context
# may be freely used by the application code
request[REQUEST_CONTEXT_KEY] = request_span.context
request[REQUEST_SPAN_KEY] = request_span
request[REQUEST_CONFIG_KEY] = app[CONFIG_KEY]
try:
response = await handler(request)
return response
except Exception:
request_span.set_traceback()
raise
return attach_context
async def on_prepare(request, response):
"""
The on_prepare signal is used to close the request span that is created during
the trace middleware execution.
"""
# safe-guard: discard if we don't have a request span
request_span = request.get(REQUEST_SPAN_KEY, None)
if not request_span:
return
# default resource name
resource = stringify(response.status)
if request.match_info.route.resource:
# collect the resource name based on http resource type
res_info = request.match_info.route.resource.get_info()
if res_info.get("path"):
resource = res_info.get("path")
elif res_info.get("formatter"):
resource = res_info.get("formatter")
elif res_info.get("prefix"):
resource = res_info.get("prefix")
# prefix the resource name by the http method
resource = "{} {}".format(request.method, resource)
request_span.resource = resource
# DEV: aiohttp is special case maintains separate configuration from config api
trace_query_string = request[REQUEST_CONFIG_KEY].get("trace_query_string")
if trace_query_string is None:
trace_query_string = config.http.trace_query_string
if trace_query_string:
request_span.set_tag_str(http.QUERY_STRING, request.query_string)
trace_utils.set_http_meta(
request_span,
config.aiohttp,
method=request.method,
url=str(request.url), # DEV: request.url is a yarl's URL object
status_code=response.status,
request_headers=request.headers,
response_headers=response.headers,
)
request_span.finish()
def trace_app(app, tracer, service="aiohttp-web"):
"""
Tracing function that patches the ``aiohttp`` application so that it will be
traced using the given ``tracer``.
:param app: aiohttp application to trace
:param tracer: tracer instance to use
:param service: service name of tracer
"""
# safe-guard: don't trace an application twice
if getattr(app, "__datadog_trace", False):
return
app.__datadog_trace = True
# configure datadog settings
app[CONFIG_KEY] = {
"tracer": tracer,
"service": config._get_service(default=service),
"distributed_tracing_enabled": None,
"analytics_enabled": None,
"analytics_sample_rate": 1.0,
}
# the tracer must work with asynchronous Context propagation
tracer.configure(context_provider=context_provider)
# add the async tracer middleware as a first middleware
# and be sure that the on_prepare signal is the last one
app.middlewares.insert(0, trace_middleware)
app.on_response_prepare.append(on_prepare)