Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes some bugs in the limit connection feature #2964

Merged
merged 12 commits into from
May 4, 2018
1 change: 1 addition & 0 deletions CHANGES/2964.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes some bugs in the limit connection feature
83 changes: 49 additions & 34 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import functools
import random
import sys
import traceback
import warnings
Expand Down Expand Up @@ -359,9 +360,14 @@ def closed(self):
"""
return self._closed

async def connect(self, req, traces=None):
"""Get from pool or create new connection."""
key = req.connection_key
def _available_connections(self, key):
"""
Return number of available connections taking into account
the limit, limit_per_host and the connection key.

If it returns less than 1 means that there is no connections
availables.
"""

if self._limit:
# total calc available connections
Expand All @@ -380,6 +386,13 @@ async def connect(self, req, traces=None):
else:
available = 1

return available

async def connect(self, req, traces=None):
"""Get from pool or create new connection."""
key = req.connection_key
available = self._available_connections(key)

# Wait if there are no available connections.
if available <= 0:
fut = self._loop.create_future()
Expand All @@ -394,7 +407,7 @@ async def connect(self, req, traces=None):

try:
await fut
except BaseException:
except BaseException as e:
# remove a waiter even if it was cancelled, normally it's
# removed when it's notified
try:
Expand All @@ -405,7 +418,7 @@ async def connect(self, req, traces=None):
if not waiters:
del self._waiters[key]

raise
raise e

if traces:
for trace in traces:
Expand All @@ -430,12 +443,12 @@ async def connect(self, req, traces=None):
proto.close()
raise ClientConnectionError("Connector is closed.")
except BaseException:
# signal to waiter
if key in self._waiters:
waiters = self._waiters[key]
self._release_key_waiter(key, waiters)
if not self._closed:
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)
self._release_waiter()
raise
finally:
else:
if not self._closed:
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)
Expand Down Expand Up @@ -477,35 +490,37 @@ def _get(self, key):
del self._conns[key]
return None

def _release_key_waiter(self, key, waiters):
if not waiters:
return False
def _release_waiter(self):
"""
Iterates over all waiters till found one that is not finsihed and
belongs to a host that has available connections.
"""
if not self._waiters:
return

waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)
# Having the dict keys ordered this avoids to iterate
# at the same order at each call.
queues = list(self._waiters.keys())
random.shuffle(queues)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need shuffling?
The operation is not free.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that hurts me. But without the shuffle the iteration of the queues will be done always in the same order, having chances of starvation for certain queues.

We could try to get rid of that operation, but it will imply some important changes.

Copy link
Contributor Author

@pfreixes pfreixes May 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of @thehesiod of having a "true" FIFO would get rid of this operation, but my gut feeling says that this FIFO queue will have enough footprint - has to take care of the limit per host - to make it slower than just a call shuffle.

PD: a shuffle operation takes around 1 microsecond for a list of 10 elements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiters is a deque.
deque.rotate() is cheap. Can we use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember that _waiters is the dictionary that has the queues. So, unfortunately we can't use it.

We might implement the FIFO on top a unique deque using the rotate method as a way to skip a waiter that can't be deallocated because of the limit per host restriction. I didn't know but the slice of the head element is O(1), so the check and later the skip can be done without losing the deque properties.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The PR's code is ok to me.


if not waiters:
del self._waiters[key]
for key in queues:
if self._available_connections(key) < 1:
continue

return True
waiters = self._waiters[key]

def _release_waiter(self):
# always release only one waiter
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)

if self._limit:
# if we have limit and we have available
if self._limit - len(self._acquired) > 0:
for key, waiters in self._waiters.items():
if self._release_key_waiter(key, waiters):
break

elif self._limit_per_host:
# if we have dont have limit but have limit per host
# then release first available
for key, waiters in self._waiters.items():
if self._release_key_waiter(key, waiters):
break
if not waiters:
del self._waiters[key]

return

if not waiters:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition is always true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it would be nice to have a test case where waiters become non-empty after releasing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it doesn't make sense right now let me get rid of this part of the code and I will check the tests.

del self._waiters[key]

def _release_acquired(self, key, proto):
if self._closed:
Expand Down
56 changes: 49 additions & 7 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,32 @@ async def go(app):
loop.run_until_complete(runner.cleanup())


def test_connection_del(loop):
connector = mock.Mock()
key = mock.Mock()
protocol = mock.Mock()
loop.set_debug(1)
conn = Connection(connector, key, protocol, loop=loop)
exc_handler = mock.Mock()
loop.set_exception_handler(exc_handler)

with pytest.warns(ResourceWarning):
del conn
gc.collect()

connector._release.assert_called_with(
key,
protocol,
should_close=True
)
msg = {
'message': mock.ANY,
'client_connection': mock.ANY,
'source_traceback': mock.ANY
}
exc_handler.assert_called_with(loop, msg)


def test_del(loop):
conn = aiohttp.BaseConnector(loop=loop)
proto = mock.Mock(should_close=False)
Expand Down Expand Up @@ -292,18 +318,19 @@ def test_release_already_closed(loop):
assert not conn._release_acquired.called


def test_release_waiter(loop, key, key2):
def test_release_waiter_no_limit(loop, key, key2):
# limit is 0
conn = aiohttp.BaseConnector(limit=0, loop=loop)
w = mock.Mock()
w.done.return_value = False
conn._waiters[key].append(w)
conn._release_waiter()
assert len(conn._waiters) == 1
assert not w.done.called
assert len(conn._waiters) == 0
assert w.done.called
conn.close()

# release first available

def test_release_waiter_first_available(loop, key, key2):
conn = aiohttp.BaseConnector(loop=loop)
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
Expand All @@ -315,7 +342,8 @@ def test_release_waiter(loop, key, key2):
not w1.set_result.called and w2.set_result.called)
conn.close()

# limited available

def test_release_waiter_release_first(loop, key, key2):
conn = aiohttp.BaseConnector(loop=loop, limit=1)
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
Expand All @@ -326,15 +354,16 @@ def test_release_waiter(loop, key, key2):
assert not w2.set_result.called
conn.close()

# limited available

def test_release_waiter_skip_done_waiter(loop, key, key2):
conn = aiohttp.BaseConnector(loop=loop, limit=1)
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = True
w2.done.return_value = False
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert not w1.set_result.called
assert not w2.set_result.called
assert w2.set_result.called
conn.close()


Expand All @@ -352,6 +381,19 @@ def test_release_waiter_per_host(loop, key, key2):
conn.close()


def test_release_waiter_no_available(loop, key, key2):
# limit is 0
conn = aiohttp.BaseConnector(limit=0, loop=loop)
w = mock.Mock()
w.done.return_value = False
conn._waiters[key].append(w)
conn._available_connections = mock.Mock(return_value=0)
conn._release_waiter()
assert len(conn._waiters) == 1
assert not w.done.called
conn.close()


def test_release_close(loop):
conn = aiohttp.BaseConnector(loop=loop)
proto = mock.Mock(should_close=True)
Expand Down