Skip to content

Commit

Permalink
Fix MinLoop.watch_pipe regression for the callback outcome not False (
Browse files Browse the repository at this point in the history
#848)

Start adding unittests for `MainLoop` since it had ~0% of real coverage

Fix: #847

Co-authored-by: Aleksei Stepanov <[email protected]>
  • Loading branch information
penguinolog and Aleksei Stepanov authored Feb 28, 2024
1 parent 9cee5b4 commit 210a79d
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 25 deletions.
117 changes: 117 additions & 0 deletions tests/test_main_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

import concurrent.futures
import os
import socket
import sys
import threading
import typing
import unittest.mock

import urwid

if typing.TYPE_CHECKING:
from types import TracebackType

IS_WINDOWS = sys.platform == "win32"


class ClosingTemporaryFilesPair(typing.ContextManager[typing.Tuple[typing.TextIO, typing.TextIO]]):
"""File pair context manager that closes temporary files on exit.
Since `sys.stdout` is TextIO, tests have to use compatible api for the proper behavior imitation.
"""

__slots__ = ("rd_s", "wr_s", "rd_f", "wr_f")

def __init__(self) -> None:
self.rd_s: socket.socket | None = None
self.wr_s: socket.socket | None = None
self.rd_f: typing.TextIO | None = None
self.wr_f: typing.TextIO | None = None

def __enter__(self) -> tuple[typing.TextIO, typing.TextIO]:
self.rd_s, self.wr_s = socket.socketpair()
self.rd_f = os.fdopen(self.rd_s.fileno(), "r", encoding="utf-8", closefd=False)
self.wr_f = os.fdopen(self.wr_s.fileno(), "w", encoding="utf-8", closefd=False)
return self.rd_f, self.wr_f

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Close everything explicit without waiting for garbage collected."""
if self.rd_f is not None and not self.rd_f.closed:
self.rd_f.close()
if self.rd_s is not None:
self.rd_s.close()
if self.wr_f is not None and not self.wr_f.closed:
self.wr_f.close()
if self.wr_s is not None:
self.wr_s.close()


def stop_screen_cb(*_args, **_kwargs) -> typing.NoReturn:
raise urwid.ExitMainLoop


class TestMainLoop(unittest.TestCase):
@unittest.skipIf(IS_WINDOWS, "selectors for pipe are not supported on Windows")
def test_watch_pipe(self):
"""Test watching pipe is stopped on explicit False only."""
evt = threading.Event() # We need thread synchronization
outcome: list[bytes] = []

def pipe_cb(data: bytes) -> typing.Any:
outcome.append(data)

if not evt.is_set():
evt.set()

if data == b"false":
return False
if data == b"true":
return True
if data == b"null":
return None
return object()

def pipe_writer(fd: int) -> None:
os.write(fd, b"something")
if evt.wait(0.1):
evt.clear()
os.write(fd, b"true")
if evt.wait(0.1):
evt.clear()
os.write(fd, b"null")
if evt.wait(0.1):
evt.clear()
os.write(fd, b"false")

with ClosingTemporaryFilesPair() as (
rd_r,
wr_r,
), ClosingTemporaryFilesPair() as (
rd_w,
wr_w,
), concurrent.futures.ThreadPoolExecutor(
max_workers=1,
) as executor, unittest.mock.patch(
"subprocess.Popen", # we want to be sure that nothing outside is called
autospec=True,
):
evl = urwid.MainLoop(
urwid.SolidFill(),
screen=urwid.display.raw.Screen(input=rd_r, output=wr_w), # We need screen which support mocked IO
handle_mouse=False, # Less external calls - better
)
evl.set_alarm_in(1, stop_screen_cb)
pipe_fd = evl.watch_pipe(pipe_cb)
executor.submit(pipe_writer, pipe_fd)

evl.run()
self.assertEqual([b"something", b"true", b"null", b"false"], outcome)
not_removed = evl.remove_watch_pipe(pipe_fd)
self.assertFalse(not_removed)
45 changes: 20 additions & 25 deletions urwid/event_loop/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def pop_ups(self, pop_ups: bool) -> None:
else:
self._topmost_widget = self._widget

def _set_pop_ups(self, pop_ups) -> None:
def _set_pop_ups(self, pop_ups: bool) -> None:
warnings.warn(
f"method `{self.__class__.__name__}._set_pop_ups` is deprecated, "
f"please use `{self.__class__.__name__}.pop_ups` property",
Expand All @@ -214,7 +214,7 @@ def set_alarm_in(self, sec: float, callback: Callable[[Self, _T], typing.Any], u
"""
self.logger.debug(f"Setting alarm in {sec!r} seconds with callback {callback!r}")

def cb():
def cb() -> None:
callback(self, user_data)

return self.event_loop.alarm(sec, cb)
Expand All @@ -236,7 +236,7 @@ def set_alarm_at(self, tm: float, callback: Callable[[Self, _T], typing.Any], us
sec = tm - time.time()
self.logger.debug(f"Setting alarm in {sec!r} seconds with callback {callback!r}")

def cb():
def cb() -> None:
callback(self, user_data)

return self.event_loop.alarm(sec, cb)
Expand All @@ -250,33 +250,28 @@ def remove_alarm(self, handle) -> bool:

if not IS_WINDOWS:

def watch_pipe(self, callback: Callable[[bytes], bool]) -> int:
def watch_pipe(self, callback: Callable[[bytes], bool | None]) -> int:
"""
Create a pipe for use by a subprocess or thread to trigger a callback
in the process/thread running the main loop.
:param callback: function taking one parameter to call from within
the process/thread running the main loop
:param callback: function taking one parameter to call from within the process/thread running the main loop
:type callback: callable
This method returns a file descriptor attached to the write end of a
pipe. The read end of the pipe is added to the list of files
:attr:`event_loop` is watching. When data is written to the pipe the
callback function will be called and passed a single value containing
data read from the pipe.
This method returns a file descriptor attached to the write end of a pipe.
The read end of the pipe is added to the list of files :attr:`event_loop` is watching.
When data is written to the pipe the callback function will be called
and passed a single value containing data read from the pipe.
This method may be used any time you want to update widgets from
another thread or subprocess.
This method may be used any time you want to update widgets from another thread or subprocess.
Data may be written to the returned file descriptor with
``os.write(fd, data)``. Ensure that data is less than 512 bytes (or 4K
on Linux) so that the callback will be triggered just once with the
complete value of data passed in.
Data may be written to the returned file descriptor with ``os.write(fd, data)``.
Ensure that data is less than 512 bytes (or 4K on Linux)
so that the callback will be triggered just once with the complete value of data passed in.
If the callback returns ``False`` then the watch will be removed from
:attr:`event_loop` and the read end of the pipe will be closed. You
are responsible for closing the write end of the pipe with
``os.close(fd)``.
If the callback returns ``False`` then the watch will be removed from :attr:`event_loop`
and the read end of the pipe will be closed.
You are responsible for closing the write end of the pipe with ``os.close(fd)``.
"""
import fcntl

Expand All @@ -286,7 +281,7 @@ def watch_pipe(self, callback: Callable[[bytes], bool]) -> int:

def cb() -> None:
data = os.read(pipe_rd, PIPE_BUFFER_READ_SIZE)
if not callback(data):
if callback(data) is False:
self.event_loop.remove_watch_file(watch_handle)
os.close(pipe_rd)

Expand All @@ -296,9 +291,9 @@ def cb() -> None:

def remove_watch_pipe(self, write_fd: int) -> bool:
"""
Close the read end of the pipe and remove the watch created by
:meth:`watch_pipe`. You are responsible for closing the write end of
the pipe.
Close the read end of the pipe and remove the watch created by :meth:`watch_pipe`.
..note:: You are responsible for closing the write end of the pipe.
Returns ``True`` if the watch pipe exists, ``False`` otherwise
"""
Expand Down

0 comments on commit 210a79d

Please sign in to comment.