Skip to content

Commit

Permalink
Client signals #2313 (#2429)
Browse files Browse the repository at this point in the history
* Client signals #2313

Implementation of the client signals exposed by the `ClientSession`
class, to get a list of the all signals implementation please visit
the documentation.

* Removed not already implemented signal

* Removed support for on_content and on_headers signals

* fixed small sutff

* Get rid of no longer used helpers.create_future

* Update 2313.feature

* on_request_start receives the whole URL object

* on_request_end and error flavor receive the method, URL and headers

* TraceConfig as a way to configure the ClientSession tracing

* Fix flake import issues

* Increase dns tracing coverage

* Tracing signals are explicits

* Removed not used session kw argument

* Accept multiple TraceConfig objects for the ClientSession class

* Renamed trace context vars

* Fix invalid test func definition

* Fixed docstring params codification
  • Loading branch information
pfreixes authored and asvetlov committed Nov 18, 2017
1 parent 992499d commit ca8878a
Show file tree
Hide file tree
Showing 16 changed files with 1,199 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGES/2313.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ClientSession publishes a set of signals to track the HTTP request execution.
2 changes: 2 additions & 0 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .payload import * # noqa
from .payload_streamer import * # noqa
from .resolver import * # noqa
from .tracing import * # noqa

try:
from .worker import GunicornWebWorker, GunicornUVLoopWebWorker # noqa
Expand All @@ -30,6 +31,7 @@
payload.__all__ + # noqa
payload_streamer.__all__ + # noqa
streams.__all__ + # noqa
tracing.__all__ + # noqa
('hdrs', 'HttpVersion', 'HttpVersion10', 'HttpVersion11',
'WSMsgType', 'WSCloseCode',
'WebSocketError', 'WSMessage',
Expand Down
59 changes: 55 additions & 4 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .http import WS_KEY, WebSocketReader, WebSocketWriter
from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse
from .streams import FlowControlDataQueue
from .tracing import Trace


__all__ = (client_exceptions.__all__ + # noqa
Expand Down Expand Up @@ -57,7 +58,8 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
version=http.HttpVersion11,
cookie_jar=None, connector_owner=True, raise_for_status=False,
read_timeout=sentinel, conn_timeout=None,
auto_decompress=True, trust_env=False):
auto_decompress=True, trust_env=False,
trace_configs=None):

implicit_loop = False
if loop is None:
Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None,

if cookies is not None:
self._cookie_jar.update_cookies(cookies)

self._connector = connector
self._connector_owner = connector_owner
self._default_auth = auth
Expand Down Expand Up @@ -124,6 +127,10 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._response_class = response_class
self._ws_response_class = ws_response_class

self._trace_configs = trace_configs or []
for trace_config in self._trace_configs:
trace_config.freeze()

def __del__(self, _warnings=warnings):
if not self.closed:
_warnings.warn("Unclosed client session {!r}".format(self),
Expand Down Expand Up @@ -159,7 +166,8 @@ def _request(self, method, url, *,
verify_ssl=None,
fingerprint=None,
ssl_context=None,
proxy_headers=None):
proxy_headers=None,
trace_request_ctx=None):

# NOTE: timeout clamps existing connect and read timeouts. We cannot
# set the default to None because we need to detect if the user wants
Expand Down Expand Up @@ -216,6 +224,22 @@ def _request(self, method, url, *,
handle = tm.start()

url = URL(url)

traces = [
Trace(
trace_config,
self,
trace_request_ctx=trace_request_ctx)
for trace_config in self._trace_configs
]

for trace in traces:
yield from trace.send_request_start(
method,
url,
headers
)

timer = tm.timer()
try:
with timer:
Expand Down Expand Up @@ -264,7 +288,10 @@ def _request(self, method, url, *,
# connection timeout
try:
with CeilTimeout(self._conn_timeout, loop=self._loop):
conn = yield from self._connector.connect(req)
conn = yield from self._connector.connect(
req,
traces=traces
)
except asyncio.TimeoutError as exc:
raise ServerTimeoutError(
'Connection timeout '
Expand All @@ -289,6 +316,15 @@ def _request(self, method, url, *,
# redirects
if resp.status in (
301, 302, 303, 307, 308) and allow_redirects:

for trace in traces:
yield from trace.send_request_redirect(
method,
url,
headers,
resp
)

redirects += 1
history.append(resp)
if max_redirects and redirects >= max_redirects:
Expand Down Expand Up @@ -352,15 +388,30 @@ def _request(self, method, url, *,
handle.cancel()

resp._history = tuple(history)

for trace in traces:
yield from trace.send_request_end(
method,
url,
headers,
resp
)
return resp

except Exception:
except Exception as e:
# cleanup timer
tm.close()
if handle:
handle.cancel()
handle = None

for trace in traces:
yield from trace.send_request_exception(
method,
url,
headers,
e
)
raise

def ws_connect(self, url, *,
Expand Down
3 changes: 1 addition & 2 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,7 @@ def send(self, conn):
self.method, self.original_url,
writer=self._writer, continue100=self._continue, timer=self._timer,
request_info=self.request_info,
auto_decompress=self._auto_decompress
)
auto_decompress=self._auto_decompress)

self.response._post_init(self.loop, self._session)
return self.response
Expand Down
93 changes: 80 additions & 13 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def closed(self):
"""
return self._closed

async def connect(self, req):
async def connect(self, req, traces=None):
"""Get from pool or create new connection."""
key = req.connection_key

Expand Down Expand Up @@ -375,6 +375,11 @@ async def connect(self, req):
# This connection will now count towards the limit.
waiters = self._waiters[key]
waiters.append(fut)

if traces:
for trace in traces:
await trace.send_connection_queued_start()

try:
await fut
finally:
Expand All @@ -383,13 +388,25 @@ async def connect(self, req):
if not waiters:
del self._waiters[key]

if traces:
for trace in traces:
await trace.send_connection_queued_end()

proto = self._get(key)
if proto is None:
placeholder = _TransportPlaceholder()
self._acquired.add(placeholder)
self._acquired_per_host[key].add(placeholder)

if traces:
for trace in traces:
await trace.send_connection_create_start()

try:
proto = await self._create_connection(req)
proto = await self._create_connection(
req,
traces=traces
)
if self._closed:
proto.close()
raise ClientConnectionError("Connector is closed.")
Expand All @@ -405,6 +422,14 @@ async def connect(self, req):
self._acquired.remove(placeholder)
self._acquired_per_host[key].remove(placeholder)

if traces:
for trace in traces:
await trace.send_connection_create_end()
else:
if traces:
for trace in traces:
await trace.send_connection_reuseconn()

self._acquired.add(proto)
self._acquired_per_host[key].add(proto)
return Connection(self, key, proto, self._loop)
Expand Down Expand Up @@ -497,7 +522,7 @@ def _release(self, key, protocol, *, should_close=False):
self._cleanup_handle = helpers.weakref_handle(
self, '_cleanup', self._keepalive_timeout, self._loop)

async def _create_connection(self, req):
async def _create_connection(self, req, traces=None):
raise NotImplementedError()


Expand Down Expand Up @@ -685,31 +710,63 @@ def clear_dns_cache(self, host=None, port=None):
else:
self._cached_hosts.clear()

async def _resolve_host(self, host, port):
async def _resolve_host(self, host, port, traces=None):
if is_ip_address(host):
return [{'hostname': host, 'host': host, 'port': port,
'family': self._family, 'proto': 0, 'flags': 0}]

if not self._use_dns_cache:
return (await self._resolver.resolve(

if traces:
for trace in traces:
await trace.send_dns_resolvehost_start()

res = (await self._resolver.resolve(
host, port, family=self._family))

if traces:
for trace in traces:
await trace.send_dns_resolvehost_end()

return res

key = (host, port)

if (key in self._cached_hosts) and \
(not self._cached_hosts.expired(key)):

if traces:
for trace in traces:
await trace.send_dns_cache_hit()

return self._cached_hosts.next_addrs(key)

if key in self._throttle_dns_events:
if traces:
for trace in traces:
await trace.send_dns_cache_hit()
await self._throttle_dns_events[key].wait()
else:
self._throttle_dns_events[key] = EventResultOrError(self._loop)
if traces:
for trace in traces:
await trace.send_dns_cache_miss()
self._throttle_dns_events[key] = \
EventResultOrError(self._loop)
try:

if traces:
for trace in traces:
await trace.send_dns_resolvehost_start()

addrs = await \
asyncio.shield(self._resolver.resolve(host,
port,
family=self._family),
loop=self._loop)
if traces:
for trace in traces:
await trace.send_dns_resolvehost_end()

self._cached_hosts.add(key, addrs)
self._throttle_dns_events[key].set()
except Exception as e:
Expand All @@ -722,15 +779,21 @@ async def _resolve_host(self, host, port):

return self._cached_hosts.next_addrs(key)

async def _create_connection(self, req):
async def _create_connection(self, req, traces=None):
"""Create connection.
Has same keyword arguments as BaseEventLoop.create_connection.
"""
if req.proxy:
_, proto = await self._create_proxy_connection(req)
_, proto = await self._create_proxy_connection(
req,
traces=None
)
else:
_, proto = await self._create_direct_connection(req)
_, proto = await self._create_direct_connection(
req,
traces=None
)

return proto

Expand Down Expand Up @@ -787,12 +850,16 @@ async def _wrap_create_connection(self, *args,
raise client_error(req.connection_key, exc) from exc

async def _create_direct_connection(self, req,
*, client_error=ClientConnectorError):
*, client_error=ClientConnectorError,
traces=None):
sslcontext = self._get_ssl_context(req)
fingerprint, hashfunc = self._get_fingerprint_and_hashfunc(req)

try:
hosts = await self._resolve_host(req.url.raw_host, req.port)
hosts = await self._resolve_host(
req.url.raw_host,
req.port,
traces=traces)
except OSError as exc:
# in case of proxy it is not ClientProxyConnectionError
# it is problem of resolving proxy ip itself
Expand Down Expand Up @@ -841,7 +908,7 @@ async def _create_direct_connection(self, req,
else:
raise last_exc

async def _create_proxy_connection(self, req):
async def _create_proxy_connection(self, req, traces=None):
headers = {}
if req.proxy_headers is not None:
headers = req.proxy_headers
Expand Down Expand Up @@ -943,7 +1010,7 @@ def path(self):
"""Path to unix socket."""
return self._path

async def _create_connection(self, req):
async def _create_connection(self, req, traces=None):
try:
_, proto = await self._loop.create_unix_connection(
self._factory, self._path)
Expand Down
Loading

0 comments on commit ca8878a

Please sign in to comment.