diff --git a/tests/test_aioca.py b/tests/test_aioca.py index 0a26bb0..a3f5b7b 100644 --- a/tests/test_aioca.py +++ b/tests/test_aioca.py @@ -11,6 +11,7 @@ from typing import Callable, List, Tuple, Union import pytest +from _pytest.unraisableexception import catch_unraisable_exception from epicscorelibs.ca import cadef, dbr from aioca import ( @@ -530,41 +531,46 @@ async def monitor_for_a_bit(callback: Callable, ioc) -> Subscription: return m -def test_closing_event_loop(ioc: subprocess.Popen, capsys) -> None: - def closed_messages(text): - return [x for x in text.splitlines() if x.endswith("Event loop is closed")] - - q: queue.Queue = queue.Queue() - m = run(monitor_for_a_bit(q.put, ioc)) - # Most of we get the initial value 0, then an update of 1 - # On travis sometimes the IOC is too late to startup and we - # get 1 and 2 - updates = (q.get_nowait(), q.get_nowait()) - assert updates in [(0, 1), (1, 2)] - assert q.qsize() == 0 - assert len(m.pending_values) == 0 +def test_closing_event_loop( + ioc: subprocess.Popen, +) -> None: + with catch_unraisable_exception() as cm: + q: queue.Queue = queue.Queue() + m = run(monitor_for_a_bit(q.put, ioc)) + # Most of we get the initial value 0, then an update of 1 + # On travis sometimes the IOC is too late to startup and we + # get 1 and 2 + updates = (q.get_nowait(), q.get_nowait()) + assert updates in [(0, 1), (1, 2)] + assert q.qsize() == 0 + assert len(m.pending_values) == 0 - time.sleep(2.0) - m.close() - time.sleep(1.0) + time.sleep(2.0) + m.close() + time.sleep(1.0) - # We should have 2 more updates that didn't make it to the queue - # because loop closed - assert q.qsize() == 0 - assert len(m.pending_values) == 2 + # We should have 2 more updates that didn't make it to the queue + # because loop closed + assert q.qsize() == 0 + assert len(m.pending_values) == 2 - # Check that there are no more updates - time.sleep(2.0) - assert len(m.pending_values) == 2 + # Check that there are no more updates + time.sleep(2.0) + assert len(m.pending_values) == 2 - ioc.communicate("exit") - time.sleep(0.5) - # There should be no more updates, but one error from the close message - assert q.qsize() == 0 - assert len(m.pending_values) == 2 - captured = capsys.readouterr() - assert captured.out == "" - assert len(closed_messages(captured.err)) == 1, captured.err + ioc.communicate("exit") + time.sleep(0.5) + # There should be no more updates, but one error from the close message + assert q.qsize() == 0 + assert len(m.pending_values) == 2 + + # We expect there to be an unraiseable exception, caused by trying dispatch a + # function to a closed event loop. + # Detecting this is a bit hacky - pytest has code to catch unraiseable + # exceptions, but no publicly declared API to examine them nor configuration + # settings to do anything but ignore them. + # So we use an internal API to examine it. + assert "Event loop is closed" in cm.unraisable.exc_value.args @pytest.mark.asyncio