Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] first draft of more generic timeouts throughout the request lifecycle #2842

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 52 additions & 76 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import warnings
from collections.abc import Coroutine

import attr
from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
from yarl import URL

Expand All @@ -31,7 +32,7 @@
from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse
from .streams import FlowControlDataQueue
from .tcp_helpers import tcp_cork, tcp_nodelay
from .tracing import Trace
from .lifecycle import RequestLifecycle, RequestTimeouts


__all__ = (client_exceptions.__all__ + # noqa
Expand All @@ -41,7 +42,7 @@


# 5 Minute default read and connect timeout
DEFAULT_TIMEOUT = 5 * 60
DEFAULT_TIMEOUTS = RequestTimeouts(uber_timeout=5 * 60)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like this should be request_timeout instead of uber

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the old timeout worked like the uber_timeout is working now, so I though this would be suited best for keeping backwards compatibility. If changing the way things work is okay, I'd be happy to use a more sensible timeout for the default. ;)



class ClientSession:
Expand Down Expand Up @@ -69,7 +70,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
ws_response_class=ClientWebSocketResponse,
version=http.HttpVersion11,
cookie_jar=None, connector_owner=True, raise_for_status=False,
read_timeout=sentinel, conn_timeout=None,
timeout=None, read_timeout=sentinel, conn_timeout=None,
auto_decompress=True, trust_env=False,
trace_configs=None):

Expand Down Expand Up @@ -116,13 +117,21 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._default_auth = auth
self._version = version
self._json_serialize = json_serialize
self._read_timeout = (read_timeout if read_timeout is not sentinel
else DEFAULT_TIMEOUT)
self._conn_timeout = conn_timeout
self._raise_for_status = raise_for_status
self._auto_decompress = auto_decompress
self._trust_env = trust_env

if (read_timeout is not sentinel or conn_timeout is not None) and (timeout is not None):
raise ValueError("Can't not specify a RequestTimeouts config via `timeout` parameter together "
"with legacy parameters `read_timeout` or `conn_timeout`. "
"Please merge the timeout values into the timeout config object.")
elif timeout:
self._timeout = timeout
else:
self._timeout = attr.evolve(
DEFAULT_TIMEOUTS, connection_acquiring_timeout=conn_timeout,
uber_timeout=(read_timeout if read_timeout is not sentinel else DEFAULT_TIMEOUTS.uber_timeout))

# Convert to list of tuples
if headers:
headers = CIMultiDict(headers)
Expand Down Expand Up @@ -191,18 +200,14 @@ async def _request(self, method, url, *,
read_until_eof=True,
proxy=None,
proxy_auth=None,
timeout=sentinel,
timeout=sentinel, # sentinel -> inherit from session, None -> disable
verify_ssl=None,
fingerprint=None,
ssl_context=None,
ssl=None,
proxy_headers=None,
trace_request_ctx=None):

# NOTE: timeout clamps existing connect and read timeouts. We cannot
# set the default to None because we need to detect if the user wants
# to use the existing timeouts by setting timeout to None.

if self.closed:
raise RuntimeError('Session is closed')

Expand Down Expand Up @@ -242,33 +247,13 @@ async def _request(self, method, url, *,
except ValueError:
raise InvalidURL(proxy)

# timeout is cumulative for all request operations
# (request, redirects, responses, data consuming)
tm = TimeoutHandle(
self._loop,
timeout if timeout is not sentinel else self._read_timeout)
handle = tm.start()

traces = [
Trace(
self,
trace_config,
trace_config.trace_config_ctx(
trace_request_ctx=trace_request_ctx)
)
for trace_config in self._trace_configs
]

for trace in traces:
await trace.send_request_start(
method,
url,
headers
)

timer = tm.timer()
if timeout == sentinel:
timeout = self._timeout
elif isinstance(timeout, int):
timeout = attr.evolve(self._timeout, uber_timeout=timeout)
lifecycle = RequestLifecycle(self, self._loop, self._trace_configs, trace_request_ctx, timeout)
try:
with timer:
with lifecycle.request_timer_context:
while True:
url, auth_from_url = strip_auth_from_url(url)
if auth and auth_from_url:
Expand Down Expand Up @@ -307,17 +292,16 @@ async def _request(self, method, url, *,
compress=compress, chunked=chunked,
expect100=expect100, loop=self._loop,
response_class=self._response_class,
proxy=proxy, proxy_auth=proxy_auth, timer=timer,
proxy=proxy, proxy_auth=proxy_auth, lifecycle=lifecycle,
session=self, auto_decompress=self._auto_decompress,
ssl=ssl, proxy_headers=proxy_headers, traces=traces)
ssl=ssl, proxy_headers=proxy_headers)

# connection timeout
try:
with CeilTimeout(self._conn_timeout, loop=self._loop):
conn = await self._connector.connect(
req,
traces=traces
)
conn = await self._connector.connect(
req,
lifecycle=lifecycle
)
except asyncio.TimeoutError as exc:
raise ServerTimeoutError(
'Connection timeout '
Expand Down Expand Up @@ -347,13 +331,12 @@ async def _request(self, method, url, *,
if resp.status in (
301, 302, 303, 307, 308) and allow_redirects:

for trace in traces:
await trace.send_request_redirect(
method,
url,
headers,
resp
)
await lifecycle.send_request_redirect(
method,
url,
headers,
resp
)

redirects += 1
history.append(resp)
Expand Down Expand Up @@ -411,37 +394,30 @@ async def _request(self, method, url, *,
resp.raise_for_status()

# register connection
if handle is not None:
if resp.connection is not None:
resp.connection.add_callback(handle.cancel)
else:
handle.cancel()
# XXX this will only be required if there are still valid open timeouts after request_end
if resp.connection is not None:
resp.connection.add_callback(lifecycle.clear_timeouts)
else:
lifecycle.clear_timeouts()

resp._history = tuple(history)

for trace in traces:
await trace.send_request_end(
method,
url,
headers,
resp
)
await lifecycle.send_request_end(
method,
url,
headers,
resp
)
return resp

except BaseException as e:
# cleanup timer
tm.close()
if handle:
handle.cancel()
handle = None

for trace in traces:
await trace.send_request_exception(
method,
url,
headers,
e
)
await lifecycle.send_request_exception(
method,
url,
headers,
e
)
lifecycle.clear_timeouts()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this as part of a context, it's too easy to get wrong otherwise

Copy link
Author

@N-Coder N-Coder Mar 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether it is required at all, clear_timeouts is analogous to the old handle.cancel e.g. also used in resp.connection.add_callback(handle.cancel). As timeouts are automatically cancelled once the end signal is emitted, we probably don't need any clean-up any more. But I'm not 100% sure whether there were any side effects through resp.connection.add_callback that I'm not aware of right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, ya lets figure that out

raise

def ws_connect(self, url, *,
Expand Down
34 changes: 14 additions & 20 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,8 @@ def __init__(self, method, url, *,
chunked=None, expect100=False,
loop=None, response_class=None,
proxy=None, proxy_auth=None,
timer=None, session=None, auto_decompress=True,
ssl=None,
proxy_headers=None,
traces=None):
lifecycle=None, session=None, auto_decompress=True,
ssl=None, proxy_headers=None):

if loop is None:
loop = asyncio.get_event_loop()
Expand All @@ -199,7 +197,7 @@ def __init__(self, method, url, *,
self.loop = loop
self.length = None
self.response_class = response_class or ClientResponse
self._timer = timer if timer is not None else TimerNoop()
self._lifecycle = lifecycle
self._auto_decompress = auto_decompress
self._ssl = ssl

Expand All @@ -219,9 +217,6 @@ def __init__(self, method, url, *,
if data or self.method not in self.GET_METHODS:
self.update_transfer_encoding()
self.update_expect_continue(expect100)
if traces is None:
traces = []
self._traces = traces

def is_ssl(self):
return self.url.scheme in ('https', 'wss')
Expand Down Expand Up @@ -527,10 +522,10 @@ async def send(self, conn):

self.response = self.response_class(
self.method, self.original_url,
writer=self._writer, continue100=self._continue, timer=self._timer,
writer=self._writer, continue100=self._continue,
request_info=self.request_info,
auto_decompress=self._auto_decompress,
traces=self._traces,
lifecycle=self._lifecycle,
loop=self.loop,
session=self._session
)
Expand All @@ -550,8 +545,8 @@ def terminate(self):
self._writer = None

async def _on_chunk_request_sent(self, chunk):
for trace in self._traces:
await trace.send_request_chunk_sent(chunk)
if self._lifecycle:
await self._lifecycle.send_request_chunk_sent(chunk)


class ClientResponse(HeadersMixin):
Expand All @@ -573,9 +568,9 @@ class ClientResponse(HeadersMixin):
_closed = True # to allow __del__ for non-initialized properly response

def __init__(self, method, url, *,
writer, continue100, timer,
writer, continue100, lifecycle,
request_info, auto_decompress,
traces, loop, session):
loop, session):
assert isinstance(url, URL)

self.method = method
Expand All @@ -589,10 +584,9 @@ def __init__(self, method, url, *,
self._closed = True
self._history = ()
self._request_info = request_info
self._timer = timer if timer is not None else TimerNoop()
self._lifecycle = lifecycle
self._auto_decompress = auto_decompress # True by default
self._cache = {} # required for @reify method decorator
self._traces = traces
self._loop = loop
self._session = session # store a reference to session #1985
if loop.get_debug():
Expand Down Expand Up @@ -683,12 +677,12 @@ async def start(self, connection, read_until_eof=False):
self._connection = connection

connection.protocol.set_response_params(
timer=self._timer,
timer=self._lifecycle.request_timer_context,
skip_payload=self.method.lower() == 'head',
read_until_eof=read_until_eof,
auto_decompress=self._auto_decompress)

with self._timer:
with self._lifecycle.request_timer_context:
while True:
# read response
try:
Expand Down Expand Up @@ -813,8 +807,8 @@ async def read(self):
if self._body is None:
try:
self._body = await self.content.read()
for trace in self._traces:
await trace.send_response_chunk_received(self._body)
if self._lifecycle:
await self._lifecycle.send_response_chunk_received(self._body)
except BaseException:
self.close()
raise
Expand Down
Loading