diff --git a/src/textual/_animator.py b/src/textual/_animator.py index 65b00244b3e..0c031b046fe 100644 --- a/src/textual/_animator.py +++ b/src/textual/_animator.py @@ -179,10 +179,13 @@ def animate( raise AttributeError( f"Can't animate attribute {attribute!r} on {obj!r}; attribute does not exist" ) + assert not all( + (duration, speed) + ), "An Animation should have a duration OR a speed, received both" if final_value is ...: final_value = value - start_time = self._timer.get_time() + start_time = self._get_time() animation_key = (id(obj), attribute) @@ -233,9 +236,15 @@ def __call__(self) -> None: if not self._animations: self._timer.pause() else: - animation_time = self._timer.get_time() + animation_time = self._get_time() animation_keys = list(self._animations.keys()) for animation_key in animation_keys: animation = self._animations[animation_key] if animation(animation_time): del self._animations[animation_key] + + def _get_time(self) -> float: + """Get the current wall clock time, via the internal Timer.""" + # N.B. We could remove this method and always call `self._timer.get_time()` internally, + # but it's handy to have in mocking situations + return self._timer.get_time() diff --git a/src/textual/_timer.py b/src/textual/_timer.py index a3a49c168d7..8b55e0122c2 100644 --- a/src/textual/_timer.py +++ b/src/textual/_timer.py @@ -19,6 +19,9 @@ TimerCallback = Union[Callable[[], Awaitable[None]], Callable[[], None]] +# /!\ This should only be changed in an "integration tests" context, in which we mock time +_TIMERS_CAN_SKIP: bool = True + class EventTargetGone(Exception): pass @@ -27,8 +30,6 @@ class EventTargetGone(Exception): @rich_repr class Timer: _timer_count: int = 1 - # Used to mock Timers' behaviour in a Textual app's integration test: - _instances: weakref.WeakSet[Timer] = weakref.WeakSet() def __init__( self, @@ -64,7 +65,6 @@ def __init__( self._repeat = repeat self._skip = skip self._active = Event() - Timer._instances.add(self) if not pause: self._active.set() @@ -126,11 +126,10 @@ async def _run(self) -> None: try: while _repeat is None or count <= _repeat: next_timer = start + ((count + 1) * _interval) - now = self.get_time() - if self._skip and next_timer < now: + if self._skip and _TIMERS_CAN_SKIP and next_timer < self.get_time(): count += 1 continue - wait_time = max(0, next_timer - now) + wait_time = max(0, next_timer - self.get_time()) if wait_time: await self._sleep(wait_time) count += 1 diff --git a/src/textual/screen.py b/src/textual/screen.py index 9375cd17fcf..cad5be6c39f 100644 --- a/src/textual/screen.py +++ b/src/textual/screen.py @@ -1,5 +1,7 @@ from __future__ import annotations +import sys + from rich.console import RenderableType import rich.repr from rich.style import Style @@ -12,6 +14,14 @@ from .reactive import Reactive from .widget import Widget +if sys.version_info >= (3, 8): + from typing import Final +else: + from typing_extensions import Final + +# Screen updates will be batched so that they don't happen more often than 20 times per second: +UPDATE_PERIOD: Final = 1 / 20 + @rich.repr.auto class Screen(Widget): @@ -154,7 +164,9 @@ async def handle_layout(self, message: messages.Layout) -> None: self.refresh_layout() def on_mount(self, event: events.Mount) -> None: - self._update_timer = self.set_interval(1 / 20, self._on_update, pause=True) + self._update_timer = self.set_interval( + UPDATE_PERIOD, self._on_update, pause=True + ) async def on_resize(self, event: events.Resize) -> None: self.size_updated(event.size, event.virtual_size, event.container_size) diff --git a/tests/test_animator.py b/tests/test_animator.py index 84a242fee65..fd3f7c03890 100644 --- a/tests/test_animator.py +++ b/tests/test_animator.py @@ -177,12 +177,12 @@ def __init__(self, *args) -> None: self._time = 0.0 self._on_animation_frame_called = False - def get_time(self): - return self._time - def on_animation_frame(self): self._on_animation_frame_called = True + def _get_time(self): + return self._time + def test_animator(): diff --git a/tests/test_integration_scrolling.py b/tests/test_integration_scrolling.py index 60cf1154a0d..038270954ae 100644 --- a/tests/test_integration_scrolling.py +++ b/tests/test_integration_scrolling.py @@ -1,17 +1,8 @@ from __future__ import annotations - -import sys from typing import Sequence, cast -if sys.version_info >= (3, 8): - from typing import Literal -else: - from typing_extensions import Literal # pragma: no cover - - import pytest -from sandbox.vertical_container import VerticalContainer from tests.utilities.test_app import AppTest from textual.app import ComposeResult from textual.geometry import Size @@ -31,23 +22,19 @@ "scroll_to_animate", "waiting_duration", "last_screen_expected_placeholder_ids", - "last_screen_expected_out_of_viewport_placeholder_ids", ), ( - [SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4), "others"], - [SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4), "others"], - [SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5), "others"], - [SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7), "others"], - [SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9), "others"], + [SCREEN_SIZE, 10, None, None, 0.01, (0, 1, 2, 3, 4)], + [SCREEN_SIZE, 10, "placeholder_3", False, 0.01, (0, 1, 2, 3, 4)], + [SCREEN_SIZE, 10, "placeholder_5", False, 0.01, (1, 2, 3, 4, 5)], + [SCREEN_SIZE, 10, "placeholder_7", False, 0.01, (3, 4, 5, 6, 7)], + [SCREEN_SIZE, 10, "placeholder_9", False, 0.01, (5, 6, 7, 8, 9)], # N.B. Scroll duration is hard-coded to 0.2 in the `scroll_to_widget` method atm # Waiting for this duration should allow us to see the scroll finished: - [SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9), "others"], + [SCREEN_SIZE, 10, "placeholder_9", True, 0.21, (5, 6, 7, 8, 9)], # After having waited for approximately half of the scrolling duration, we should # see the middle Placeholders as we're scrolling towards the last of them. - # The state of the screen at this "halfway there" timing looks to not be deterministic though, - # depending on the environment - so let's only assert stuff for the middle placeholders - # and the first and last ones, but without being too specific about the others: - [SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (6, 7, 8), (1, 2, 5, 9)], + [SCREEN_SIZE, 10, "placeholder_9", True, 0.1, (4, 5, 6, 7, 8)], ), ) async def test_scroll_to_widget( @@ -57,9 +44,19 @@ async def test_scroll_to_widget( scroll_to_placeholder_id: str | None, waiting_duration: float | None, last_screen_expected_placeholder_ids: Sequence[int], - last_screen_expected_out_of_viewport_placeholder_ids: Sequence[int] - | Literal["others"], ): + class VerticalContainer(Widget): + CSS = """ + VerticalContainer { + layout: vertical; + overflow: hidden auto; + } + VerticalContainer Placeholder { + margin: 1 0; + height: 5; + } + """ + class MyTestApp(AppTest): CSS = """ Placeholder { @@ -77,7 +74,7 @@ def compose(self) -> ComposeResult: app = MyTestApp(size=screen_size, test_name="scroll_to_widget") - async with app.in_running_state(waiting_duration_post_yield=waiting_duration or 0): + async with app.in_running_state(waiting_duration_after_yield=waiting_duration or 0): if scroll_to_placeholder_id: target_widget_container = cast(Widget, app.query("#root").first()) target_widget = cast( @@ -93,24 +90,24 @@ def compose(self) -> ComposeResult: id_: f"placeholder_{id_}" in last_display_capture for id_ in range(placeholders_count) } - print(f"placeholders_visibility_by_id={placeholders_visibility_by_id}") + # Let's start by checking placeholders that should be visible: for placeholder_id in last_screen_expected_placeholder_ids: - assert ( - placeholders_visibility_by_id[placeholder_id] is True - ), f"Placeholder '{placeholder_id}' should be visible but isn't" + assert placeholders_visibility_by_id[placeholder_id] is True, ( + f"Placeholder '{placeholder_id}' should be visible but isn't" + f" :: placeholders_visibility_by_id={placeholders_visibility_by_id}" + ) # Ok, now for placeholders that should *not* be visible: - if last_screen_expected_out_of_viewport_placeholder_ids == "others": - # We're simply going to check that all the placeholders that are not in - # `last_screen_expected_placeholder_ids` are not on the screen: - last_screen_expected_out_of_viewport_placeholder_ids = sorted( - tuple( - set(range(placeholders_count)) - - set(last_screen_expected_placeholder_ids) - ) + # We're simply going to check that all the placeholders that are not in + # `last_screen_expected_placeholder_ids` are not on the screen: + last_screen_expected_out_of_viewport_placeholder_ids = sorted( + tuple( + set(range(placeholders_count)) - set(last_screen_expected_placeholder_ids) ) + ) for placeholder_id in last_screen_expected_out_of_viewport_placeholder_ids: - assert ( - placeholders_visibility_by_id[placeholder_id] is False - ), f"Placeholder '{placeholder_id}' should not be visible but is" + assert placeholders_visibility_by_id[placeholder_id] is False, ( + f"Placeholder '{placeholder_id}' should not be visible but is" + f" :: placeholders_visibility_by_id={placeholders_visibility_by_id}" + ) diff --git a/tests/utilities/test_app.py b/tests/utilities/test_app.py index 18bfb53e5b1..ec6291de9b2 100644 --- a/tests/utilities/test_app.py +++ b/tests/utilities/test_app.py @@ -3,19 +3,25 @@ import asyncio import contextlib import io +import sys +from math import ceil from pathlib import Path from time import monotonic -from typing import AsyncContextManager, cast, ContextManager, Callable +from typing import AsyncContextManager, cast, ContextManager from unittest import mock from rich.console import Console from textual import events -from textual._timer import Timer from textual.app import App, ComposeResult from textual.driver import Driver from textual.geometry import Size +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol + # N.B. These classes would better be named TestApp/TestConsole/TestDriver/etc, # but it makes pytest emit warning as it will try to collect them as classes containing test cases :-/ @@ -24,6 +30,12 @@ CLEAR_SCREEN_SEQUENCE = "\x1bP=1s\x1b\\" +class MockedTimeMoveClockForward(Protocol): + async def __call__(self, *, seconds: float) -> tuple[float, int]: + """Returns the new current (mocked) monotonic time and the number of activated Timers""" + ... + + class AppTest(App): def __init__( self, @@ -41,6 +53,9 @@ def __init__( log_color_system="256", ) + # Let's disable all features by default + self.features = frozenset() + # We need this so the `CLEAR_SCREEN_SEQUENCE` is always sent for a screen refresh, # whatever the environment: self._sync_available = True @@ -61,57 +76,86 @@ def compose(self) -> ComposeResult: def in_running_state( self, *, + time_mocking_ticks_granularity_fps: int = 60, # i.e. when moving forward by 1 second we'll do it though 60 ticks waiting_duration_after_initialisation: float = 0.1, - waiting_duration_post_yield: float = 0, - time_acceleration: bool = True, - time_acceleration_factor: float = 10, - # force_timers_tick_after_yield: bool = True, - ) -> AsyncContextManager: + waiting_duration_after_yield: float = 0, + ) -> AsyncContextManager[MockedTimeMoveClockForward]: async def run_app() -> None: await self.process_messages() - if time_acceleration: - waiting_duration_after_initialisation /= time_acceleration_factor - waiting_duration_post_yield /= time_acceleration_factor - - time_acceleration_context: ContextManager = ( - textual_timers_accelerate_time(acceleration_factor=time_acceleration_factor) - if time_acceleration - else contextlib.nullcontext() - ) - @contextlib.asynccontextmanager async def get_running_state_context_manager(): self._set_active() - with time_acceleration_context: + + with mock_textual_timers( + ticks_granularity_fps=time_mocking_ticks_granularity_fps + ) as move_time_forward: run_task = asyncio.create_task(run_app()) - timeout_before_yielding_task = asyncio.create_task( - asyncio.sleep(waiting_duration_after_initialisation) - ) - done, pending = await asyncio.wait( - ( - run_task, - timeout_before_yielding_task, - ), - return_when=asyncio.FIRST_COMPLETED, - ) - if run_task in done or run_task not in pending: - raise RuntimeError( - "TestApp is no longer running after its initialization period" - ) - yield - waiting_duration = max( - waiting_duration_post_yield or 0, - self.screen._update_timer._interval, - ) - await asyncio.sleep(waiting_duration) + await asyncio.sleep(0.001) + # timeout_before_yielding_task = asyncio.create_task( + # asyncio.sleep(waiting_duration_after_initialisation) + # ) + # done, pending = await asyncio.wait( + # ( + # run_task, + # timeout_before_yielding_task, + # ), + # return_when=asyncio.FIRST_COMPLETED, + # ) + # if run_task in done or run_task not in pending: + # raise RuntimeError( + # "TestApp is no longer running after its initialization period" + # ) + + await move_time_forward(seconds=waiting_duration_after_initialisation) + + assert self._driver is not None + + self.force_screen_update() + + yield move_time_forward + + await move_time_forward(seconds=waiting_duration_after_yield) + + self.force_screen_update() + # waiting_duration = max( + # waiting_duration_post_yield or 0, + # self.screen._update_timer._interval, + # ) + # await asyncio.sleep(waiting_duration) + # if force_timers_tick_after_yield: # await textual_timers_force_tick() + assert not run_task.done() await self.shutdown() return get_running_state_context_manager() + async def boot_and_shutdown( + self, + *, + waiting_duration_after_initialisation: float = 0.001, + waiting_duration_before_shutdown: float = 0, + ): + """Just a commodity shortcut for `async with app.in_running_state(): pass`, for simple cases""" + async with self.in_running_state( + waiting_duration_after_initialisation=waiting_duration_after_initialisation, + waiting_duration_after_yield=waiting_duration_before_shutdown, + ): + pass + + def force_screen_update(self, *, repaint: bool = True, layout: bool = True) -> None: + try: + self.screen.refresh(repaint=repaint, layout=layout) + self.screen._on_update() + except IndexError: + pass # the app may not have a screen yet + + def on_exception(self, error: Exception) -> None: + # In tests we want the errors to be raised, rather than printed to a Console + raise error + def run(self): raise NotImplementedError( "Use `async with my_test_app.in_running_state()` rather than `my_test_app.run()`" @@ -185,43 +229,71 @@ def stop_application_mode(self) -> None: pass -async def textual_timers_force_tick() -> None: - timer_instances_tick_tasks: list[asyncio.Task] = [] - for timer in Timer._instances: - task = asyncio.create_task(timer._tick(next_timer=0, count=0)) - timer_instances_tick_tasks.append(task) - await asyncio.wait(timer_instances_tick_tasks) +def mock_textual_timers( + *, + ticks_granularity_fps: int = 60, +) -> ContextManager[MockedTimeMoveClockForward]: + single_tick_duration = 1.0 / ticks_granularity_fps + pending_sleep_events: list[tuple[float, asyncio.Event]] = [] -def textual_timers_accelerate_time( - *, acceleration_factor: float = 10 -) -> ContextManager: @contextlib.contextmanager - def accelerate_time_for_timer_context_manager(): - starting_time = monotonic() + def mock_textual_timers_context_manager(): + # N.B. `start_time` is not used, but it is useful to have when we set breakpoints there :-) + start_time = current_time = monotonic() # Our replacement for "textual._timer.Timer._sleep": - async def timer_sleep(duration: float) -> None: - await asyncio.sleep(duration / acceleration_factor) - - # Our replacement for "textual._timer.Timer.get_time": - def timer_get_time() -> float: - real_now = monotonic() - real_elapsed_time = real_now - starting_time - accelerated_elapsed_time = real_elapsed_time * acceleration_factor - print( - f"timer_get_time:: accelerated_elapsed_time={accelerated_elapsed_time}" - ) - return starting_time + accelerated_elapsed_time - - with mock.patch("textual._timer.Timer._sleep") as timer_sleep_mock, mock.patch( - "textual._timer.Timer.get_time" - ) as timer_get_time_mock, mock.patch( - "textual.message.Message._get_time" - ) as message_get_time_mock: - timer_sleep_mock.side_effect = timer_sleep - timer_get_time_mock.side_effect = timer_get_time - message_get_time_mock.side_effect = timer_get_time - yield - - return accelerate_time_for_timer_context_manager() + async def sleep_mock(duration: float) -> None: + event = asyncio.Event() + target_event_monotonic_time = current_time + duration + pending_sleep_events.append((target_event_monotonic_time, event)) + # Ok, let's wait for this Event + # - which can only be "unlocked" by calls to `move_clock_forward()` + await event.wait() + + # Our replacement for "textual._timer.Timer.get_time" and "textual.message.Message._get_time": + def get_time_mock() -> float: + return current_time + + async def move_clock_forward(*, seconds: float) -> tuple[float, int]: + nonlocal current_time, start_time + + ticks_count = ceil(seconds * ticks_granularity_fps) + activated_timers_count_total = 0 + for tick_counter in range(ticks_count): + current_time += single_tick_duration + activated_timers_count_total += check_sleep_timers_to_activate() + + # Let's give an opportunity to asyncio-related stuff to happen, + # now that we unlocked some occurrences of `await sleep(duration)`: + await asyncio.sleep(0.0001) + + return current_time, activated_timers_count_total + + def check_sleep_timers_to_activate() -> int: + nonlocal pending_sleep_events + + activated_timers_count = 0 + for i, (target_event_monotonic_time, event) in enumerate( + pending_sleep_events + ): + if target_event_monotonic_time < current_time: + continue + # Right, let's release this waiting event! + event.set() + activated_timers_count += 1 + # ...and remove it from our pending sleep events list: + del pending_sleep_events[i] + + return activated_timers_count + + with mock.patch("textual._timer._TIMERS_CAN_SKIP", new=False), mock.patch( + "textual._timer.Timer._sleep", side_effect=sleep_mock + ), mock.patch( + "textual._timer.Timer.get_time", side_effect=get_time_mock + ), mock.patch( + "textual.message.Message._get_time", side_effect=get_time_mock + ): + yield move_clock_forward + + return mock_textual_timers_context_manager()