Skip to content

Commit

Permalink
Don't wait for new items if there are rescheduled ones in SmartQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
azawlocki committed May 24, 2021
1 parent 03c5f55 commit 9cbee0f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
30 changes: 30 additions & 0 deletions tests/executor/test_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,33 @@ async def invalid_worker(q):
print("w end")

assert outputs == {1, 2, 3}


@pytest.mark.asyncio
async def test_has_unassigned_items_doesnt_block():
"""Test if `has_unassigned_items` does not block if there are rescheduled items."""

loop = asyncio.get_event_loop()

async def blocking_task_source():
yield 1
await asyncio.sleep(10)
assert False, "Task iterator should not block"

q = SmartQueue(blocking_task_source())

async def worker():
with q.new_consumer() as con:
async for handle in con:
print("Item retrieved:", handle.data)
# This will yield control to the other worker
await asyncio.sleep(0.1)
await q.reschedule(handle)
# After the item is rescheduled, the second worker should
# pick it up, instead of waiting until `blocking()` fails.
print("Item rescheduled")
return

worker_1 = loop.create_task(worker())
worker_2 = loop.create_task(worker())
await asyncio.gather(worker_1, worker_2)
8 changes: 7 additions & 1 deletion yapapi/executor/_smartq.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,13 @@ async def has_unassigned_items(self) -> bool:
A queue has unassigned items iff the next call to `get()` will immediately return
some item, without waiting for an item that is currently "in progress" to be rescheduled.
"""
return await self.has_new_items() or bool(self._rescheduled_items)
while True:
if self._rescheduled_items:
return True
try:
return await asyncio.wait_for(self.has_new_items(), 1.0)
except asyncio.TimeoutError:
pass

def new_consumer(self) -> "Consumer[Item]":
return Consumer(self)
Expand Down

0 comments on commit 9cbee0f

Please sign in to comment.