diff --git a/aioredis/locks.py b/aioredis/locks.py new file mode 100644 index 000000000..c49e9b2a4 --- /dev/null +++ b/aioredis/locks.py @@ -0,0 +1,43 @@ +from asyncio.locks import Lock as _Lock +from asyncio import coroutine +from asyncio import futures + +from .util import create_future + +# Fixes an issue with all Python versions that leaves pending waiters +# without being awakened when the first waiter is canceled. +# Code adapted from the PR https://github.com/python/cpython/pull/1031 +# Waiting once it is merged to make a proper condition to relay on +# the stdlib implementation or this one patched + + +class Lock(_Lock): + @coroutine + def acquire(self): + """Acquire a lock. + This method blocks until the lock is unlocked, then sets it to + locked and returns True. + """ + if not self._locked and all(w.cancelled() for w in self._waiters): + self._locked = True + return True + + fut = create_future(self._loop) + + self._waiters.append(fut) + try: + yield from fut + self._locked = True + return True + except futures.CancelledError: + if not self._locked: # pragma: no cover + self._wake_up_first() + raise + finally: + self._waiters.remove(fut) + + def _wake_up_first(self): + """Wake up the first waiter who isn't cancelled.""" + for fut in self._waiters: + if not fut.done(): + fut.set_result(True) diff --git a/aioredis/pool.py b/aioredis/pool.py index c120ec093..a089072e9 100644 --- a/aioredis/pool.py +++ b/aioredis/pool.py @@ -9,6 +9,7 @@ from .util import async_task, _NOTSET from .errors import PoolClosedError from .abc import AbcPool +from .locks import Lock PY_35 = sys.version_info >= (3, 5) @@ -77,7 +78,7 @@ def __init__(self, address, db=None, password=None, encoding=None, self._pool = collections.deque(maxlen=maxsize) self._used = set() self._acquiring = 0 - self._cond = asyncio.Condition(loop=loop) + self._cond = asyncio.Condition(lock=Lock(loop=loop), loop=loop) self._close_state = asyncio.Event(loop=loop) self._close_waiter = async_task(self._do_close(), loop=loop) self._pubsub_conn = None diff --git a/tests/locks_test.py b/tests/locks_test.py new file mode 100644 index 000000000..385d11684 --- /dev/null +++ b/tests/locks_test.py @@ -0,0 +1,33 @@ +import asyncio +import pytest + +from aioredis.util import async_task +from aioredis.locks import Lock + + +@pytest.mark.run_loop +def test_finished_waiter_cancelled(loop): + lock = Lock(loop=loop) + + ta = async_task(lock.acquire(), loop=loop) + yield from asyncio.sleep(0, loop=loop) + assert lock.locked() + + tb = async_task(lock.acquire(), loop=loop) + yield from asyncio.sleep(0, loop=loop) + assert len(lock._waiters) == 1 + + # Create a second waiter, wake up the first, and cancel it. + # Without the fix, the second was not woken up and the lock + # will never be locked + async_task(lock.acquire(), loop=loop) + yield from asyncio.sleep(0, loop=loop) + lock.release() + tb.cancel() + + yield from asyncio.sleep(0, loop=loop) + assert ta.done() + assert tb.cancelled() + + yield from asyncio.sleep(0, loop=loop) + assert lock.locked()