diff --git a/tests/executor/test_smartq.py b/tests/executor/test_smartq.py index 2b7a139a8..bad82a4f7 100644 --- a/tests/executor/test_smartq.py +++ b/tests/executor/test_smartq.py @@ -109,3 +109,33 @@ async def invalid_worker(q): print("w end") assert outputs == {1, 2, 3} + + +@pytest.mark.asyncio +async def test_has_unassigned_items_doesnt_block(): + """Test if `has_unassigned_items` does not block if there are rescheduled items.""" + + loop = asyncio.get_event_loop() + + async def blocking_task_source(): + yield 1 + await asyncio.sleep(10) + assert False, "Task iterator should not block" + + q = SmartQueue(blocking_task_source()) + + async def worker(): + with q.new_consumer() as con: + async for handle in con: + print("Item retrieved:", handle.data) + # This will yield control to the other worker + await asyncio.sleep(0.1) + await q.reschedule(handle) + # After the item is rescheduled, the second worker should + # pick it up, instead of waiting until `blocking()` fails. + print("Item rescheduled") + return + + worker_1 = loop.create_task(worker()) + worker_2 = loop.create_task(worker()) + await asyncio.gather(worker_1, worker_2) diff --git a/yapapi/executor/_smartq.py b/yapapi/executor/_smartq.py index 710fb11c4..bd5e592d5 100644 --- a/yapapi/executor/_smartq.py +++ b/yapapi/executor/_smartq.py @@ -126,7 +126,13 @@ async def has_unassigned_items(self) -> bool: A queue has unassigned items iff the next call to `get()` will immediately return some item, without waiting for an item that is currently "in progress" to be rescheduled. """ - return await self.has_new_items() or bool(self._rescheduled_items) + while True: + if self._rescheduled_items: + return True + try: + return await asyncio.wait_for(self.has_new_items(), 1.0) + except asyncio.TimeoutError: + pass def new_consumer(self) -> "Consumer[Item]": return Consumer(self)