From 764a23494b82e163b92617377d0347cf72e304d2 Mon Sep 17 00:00:00 2001 From: Kyle Altendorf Date: Wed, 15 Mar 2023 11:32:46 -0400 Subject: [PATCH] tests: refactor test setups towards fixtures and hinting (#968) * refactor test setups towards fixtures and hinting * cleanup * emitter: EventEmitter * fixup protocol * if TYPE_CHECKING: for the TestEventQueue type alias * give in * isort * apply to other tests * fixup * rename tests.util to tests.utils * isort * flake8 * stop using a (nonexistant) emitter fixture * formatting --- src/watchdog/observers/__init__.py | 5 +- src/watchdog/observers/api.py | 7 +- src/watchdog/utils/__init__.py | 12 +++ tests/conftest.py | 29 ++++++ tests/test_emitter.py | 139 ++++++++++++----------------- tests/test_fsevents.py | 65 +++++--------- tests/test_inotify_c.py | 66 ++++---------- tests/test_observer.py | 3 +- tests/utils.py | 109 ++++++++++++++++++++++ 9 files changed, 257 insertions(+), 178 deletions(-) create mode 100644 tests/utils.py diff --git a/src/watchdog/observers/__init__.py b/src/watchdog/observers/__init__.py index 3c334026..339f9417 100644 --- a/src/watchdog/observers/__init__.py +++ b/src/watchdog/observers/__init__.py @@ -54,13 +54,12 @@ import sys import warnings -from typing import Type from watchdog.utils import UnsupportedLibc -from .api import BaseObserver +from .api import BaseObserverSubclassCallable -Observer: Type[BaseObserver] +Observer: BaseObserverSubclassCallable if sys.platform.startswith("linux"): diff --git a/src/watchdog/observers/api.py b/src/watchdog/observers/api.py index ca622432..2612f79a 100644 --- a/src/watchdog/observers/api.py +++ b/src/watchdog/observers/api.py @@ -19,7 +19,7 @@ import threading from pathlib import Path -from watchdog.utils import BaseThread +from watchdog.utils import BaseThread, Protocol from watchdog.utils.bricks import SkipRepeatsQueue DEFAULT_EMITTER_TIMEOUT = 1 # in seconds. @@ -379,3 +379,8 @@ def dispatch_events(self, event_queue): if handler in self._handlers.get(watch, []): handler.dispatch(event) event_queue.task_done() + + +class BaseObserverSubclassCallable(Protocol): + def __call__(self, timeout: float = ...) -> BaseObserver: + ... diff --git a/src/watchdog/utils/__init__.py b/src/watchdog/utils/__init__.py index bd8df58e..b2ad79f2 100644 --- a/src/watchdog/utils/__init__.py +++ b/src/watchdog/utils/__init__.py @@ -32,6 +32,7 @@ import sys import threading +from typing import TYPE_CHECKING class UnsupportedLibc(Exception): @@ -136,3 +137,14 @@ def load_class(dotted_path): raise AttributeError( f"Module {module_name} does not have class attribute {klass_name}" ) + + +if TYPE_CHECKING or sys.version_info >= (3, 8): + # using `as` to explicitly re-export this since this is a compatibility layer + from typing import Protocol as Protocol +else: + # Provide a dummy Protocol class when not available from stdlib. Should be used + # only for hinting. This could be had from typing_protocol, but not worth adding + # the _first_ dependency just for this. + class Protocol: + ... diff --git a/tests/conftest.py b/tests/conftest.py index 86968254..1011efd6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextlib import gc import os import threading @@ -7,6 +8,8 @@ import pytest +from .utils import ExpectEvent, Helper, P, StartWatching, TestEventQueue + @pytest.fixture() def p(tmpdir, *args): @@ -51,3 +54,29 @@ def no_warnings(recwarn): warnings.append("{w.filename}:{w.lineno} {w.message}".format(w=warning)) print(warnings) assert not warnings + + +@pytest.fixture(name="helper") +def helper_fixture(tmpdir): + with contextlib.closing(Helper(tmp=os.fspath(tmpdir))) as helper: + yield helper + + +@pytest.fixture(name="p") +def p_fixture(helper: Helper) -> P: + return helper.joinpath + + +@pytest.fixture(name="event_queue") +def event_queue_fixture(helper: Helper) -> TestEventQueue: + return helper.event_queue + + +@pytest.fixture(name="start_watching") +def start_watching_fixture(helper: Helper) -> StartWatching: + return helper.start_watching + + +@pytest.fixture(name="expect_event") +def expect_event_fixture(helper: Helper) -> ExpectEvent: + return helper.expect_event diff --git a/tests/test_emitter.py b/tests/test_emitter.py index 1f0ffb29..6b3f54f4 100644 --- a/tests/test_emitter.py +++ b/tests/test_emitter.py @@ -17,11 +17,8 @@ import logging import os import stat -import sys import time -from functools import partial -from queue import Empty, Queue -from typing import Type +from queue import Empty import pytest @@ -37,22 +34,10 @@ FileMovedEvent, FileOpenedEvent, ) -from watchdog.observers.api import EventEmitter, ObservedWatch from watchdog.utils import platform from .shell import mkdir, mkfile, mv, rm, touch - -Emitter: Type[EventEmitter] - -if sys.platform.startswith("linux"): - from watchdog.observers.inotify import InotifyEmitter as Emitter - from watchdog.observers.inotify import InotifyFullEmitter -elif sys.platform.startswith("darwin"): - from watchdog.observers.fsevents import FSEventsEmitter as Emitter -elif sys.platform.startswith("win"): - from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter -elif sys.platform.startswith(("dragonfly", "freebsd", "netbsd", "openbsd", "bsd")): - from watchdog.observers.kqueue import KqueueEmitter as Emitter +from .utils import ExpectEvent, P, StartWatching, TestEventQueue logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) @@ -64,36 +49,6 @@ fsevents_logger.setLevel(logging.DEBUG) -@pytest.fixture(autouse=True) -def setup_teardown(tmpdir): - global p, emitter, event_queue - p = partial(os.path.join, tmpdir) - event_queue = Queue() - - yield - - emitter.stop() - emitter.join(5) - assert not emitter.is_alive() - - -def start_watching(path=None, use_full_emitter=False, recursive=True): - # todo: check if other platforms expect the trailing slash (e.g. `p('')`) - path = p() if path is None else path - global emitter - if platform.is_linux() and use_full_emitter: - emitter = InotifyFullEmitter( - event_queue, ObservedWatch(path, recursive=recursive) - ) - else: - emitter = Emitter(event_queue, ObservedWatch(path, recursive=recursive)) - - if platform.is_darwin(): - emitter.suppress_history = True - - emitter.start() - - def rerun_filter(exc, *args): time.sleep(5) if issubclass(exc[0], Empty) and platform.is_windows(): @@ -102,20 +57,8 @@ def rerun_filter(exc, *args): return False -def expect_event(expected_event, timeout=2): - """Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue. - - Provides some robustness for the otherwise flaky nature of asynchronous notifications. - """ - try: - event = event_queue.get(timeout=timeout)[0] - assert event == expected_event - except Empty: - raise - - @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_create(): +def test_create(p: P, event_queue: TestEventQueue, start_watching: StartWatching, expect_event: ExpectEvent) -> None: start_watching() open(p("a"), "a").close() @@ -138,7 +81,7 @@ def test_create(): not platform.is_linux(), reason="FileCloseEvent only supported in GNU/Linux" ) @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_close(): +def test_close(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: f_d = open(p("a"), "a") start_watching() f_d.close() @@ -163,7 +106,7 @@ def test_close(): platform.is_darwin() or platform.is_windows(), reason="Windows and macOS enforce proper encoding", ) -def test_create_wrong_encoding(): +def test_create_wrong_encoding(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: start_watching() open(p("a_\udce4"), "a").close() @@ -178,7 +121,7 @@ def test_create_wrong_encoding(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_delete(): +def test_delete(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkfile(p("a")) start_watching() @@ -191,7 +134,7 @@ def test_delete(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_modify(): +def test_modify(p: P, event_queue: TestEventQueue, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkfile(p("a")) start_watching() @@ -211,7 +154,7 @@ def test_modify(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_chmod(): +def test_chmod(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkfile(p("a")) start_watching() @@ -226,7 +169,7 @@ def test_chmod(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_move(): +def test_move(p: P, event_queue: TestEventQueue, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkdir(p("dir1")) mkdir(p("dir2")) mkfile(p("dir1", "a")) @@ -255,7 +198,12 @@ def test_move(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_case_change(): +def test_case_change( + p: P, + event_queue: TestEventQueue, + start_watching: StartWatching, + expect_event: ExpectEvent, +) -> None: mkdir(p("dir1")) mkdir(p("dir2")) mkfile(p("dir1", "file")) @@ -284,7 +232,7 @@ def test_case_change(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_move_to(): +def test_move_to(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkdir(p("dir1")) mkdir(p("dir2")) mkfile(p("dir1", "a")) @@ -301,7 +249,7 @@ def test_move_to(): @pytest.mark.skipif( not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux" ) -def test_move_to_full(): +def test_move_to_full(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: mkdir(p("dir1")) mkdir(p("dir2")) mkfile(p("dir1", "a")) @@ -315,7 +263,7 @@ def test_move_to_full(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_move_from(): +def test_move_from(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkdir(p("dir1")) mkdir(p("dir2")) mkfile(p("dir1", "a")) @@ -332,7 +280,7 @@ def test_move_from(): @pytest.mark.skipif( not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux" ) -def test_move_from_full(): +def test_move_from_full(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: mkdir(p("dir1")) mkdir(p("dir2")) mkfile(p("dir1", "a")) @@ -346,7 +294,7 @@ def test_move_from_full(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_separate_consecutive_moves(): +def test_separate_consecutive_moves(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkdir(p("dir1")) mkfile(p("dir1", "a")) mkfile(p("b")) @@ -376,9 +324,9 @@ def test_separate_consecutive_moves(): @pytest.mark.skipif( platform.is_bsd(), reason="BSD create another set of events for this test" ) -def test_delete_self(): +def test_delete_self(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: mkdir(p("dir1")) - start_watching(p("dir1")) + emitter = start_watching(p("dir1")) rm(p("dir1"), True) expect_event(DirDeletedEvent(p("dir1"))) emitter.join(5) @@ -389,7 +337,7 @@ def test_delete_self(): platform.is_windows() or platform.is_bsd(), reason="Windows|BSD create another set of events for this test", ) -def test_fast_subdirectory_creation_deletion(): +def test_fast_subdirectory_creation_deletion(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: root_dir = p("dir1") sub_dir = p("dir1", "subdir1") times = 30 @@ -423,7 +371,7 @@ def test_fast_subdirectory_creation_deletion(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_passing_unicode_should_give_unicode(): +def test_passing_unicode_should_give_unicode(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: start_watching(str(p())) mkfile(p("a")) event = event_queue.get(timeout=5)[0] @@ -434,7 +382,7 @@ def test_passing_unicode_should_give_unicode(): platform.is_windows(), reason="Windows ReadDirectoryChangesW supports only" " unicode for paths.", ) -def test_passing_bytes_should_give_bytes(): +def test_passing_bytes_should_give_bytes(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: start_watching(p().encode()) mkfile(p("a")) event = event_queue.get(timeout=5)[0] @@ -442,7 +390,7 @@ def test_passing_bytes_should_give_bytes(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_recursive_on(): +def test_recursive_on(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: mkdir(p("dir1", "dir2", "dir3"), True) start_watching() touch(p("dir1", "dir2", "dir3", "a")) @@ -468,7 +416,12 @@ def test_recursive_on(): @pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter) -def test_recursive_off(): +def test_recursive_off( + p: P, + event_queue: TestEventQueue, + start_watching: StartWatching, + expect_event: ExpectEvent, +) -> None: mkdir(p("dir1")) start_watching(recursive=False) touch(p("dir1", "a")) @@ -511,7 +464,12 @@ def test_recursive_off(): @pytest.mark.skipif( platform.is_windows(), reason="Windows create another set of events for this test" ) -def test_renaming_top_level_directory(): +def test_renaming_top_level_directory( + p: P, + event_queue: TestEventQueue, + start_watching: StartWatching, + expect_event: ExpectEvent, +) -> None: start_watching() mkdir(p("a")) @@ -572,7 +530,11 @@ def test_renaming_top_level_directory(): not platform.is_windows(), reason="Non-Windows create another set of events for this test", ) -def test_renaming_top_level_directory_on_windows(): +def test_renaming_top_level_directory_on_windows( + p: P, + event_queue: TestEventQueue, + start_watching: StartWatching, +) -> None: start_watching() mkdir(p("a")) @@ -631,7 +593,12 @@ def test_renaming_top_level_directory_on_windows(): @pytest.mark.skipif( platform.is_windows(), reason="Windows create another set of events for this test" ) -def test_move_nested_subdirectories(): +def test_move_nested_subdirectories( + p: P, + event_queue: TestEventQueue, + start_watching: StartWatching, + expect_event: ExpectEvent, +) -> None: mkdir(p("dir1/dir2/dir3"), parents=True) mkfile(p("dir1/dir2/dir3", "a")) start_watching() @@ -670,7 +637,11 @@ def test_move_nested_subdirectories(): not platform.is_windows(), reason="Non-Windows create another set of events for this test", ) -def test_move_nested_subdirectories_on_windows(): +def test_move_nested_subdirectories_on_windows( + p: P, + event_queue: TestEventQueue, + start_watching: StartWatching, +) -> None: mkdir(p("dir1/dir2/dir3"), parents=True) mkfile(p("dir1/dir2/dir3", "a")) start_watching(p("")) @@ -713,7 +684,7 @@ def test_move_nested_subdirectories_on_windows(): @pytest.mark.skipif( platform.is_bsd(), reason="BSD create another set of events for this test" ) -def test_file_lifecyle(): +def test_file_lifecyle(p: P, start_watching: StartWatching, expect_event: ExpectEvent) -> None: start_watching() mkfile(p("a")) diff --git a/tests/test_fsevents.py b/tests/test_fsevents.py index 01516141..bcec61a7 100644 --- a/tests/test_fsevents.py +++ b/tests/test_fsevents.py @@ -10,9 +10,7 @@ import logging import os import time -from functools import partial from os import mkdir, rmdir -from queue import Queue from random import random from threading import Thread from time import sleep @@ -22,41 +20,16 @@ from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer -from watchdog.observers.api import ObservedWatch +from watchdog.observers.api import BaseObserver, ObservedWatch from watchdog.observers.fsevents import FSEventsEmitter -from .shell import mkdtemp, rm, touch +from .shell import touch +from .utils import P, StartWatching, TestEventQueue logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) -def setup_function(function): - global p, event_queue - tmpdir = os.path.realpath(mkdtemp()) - p = partial(os.path.join, tmpdir) - event_queue = Queue() - - -def teardown_function(function): - try: - emitter.stop() - emitter.join(5) - assert not emitter.is_alive() - except NameError: - pass # `name 'emitter' is not defined` unless we call `start_watching` - rm(p(""), recursive=True) - - -def start_watching(path=None, recursive=True, use_full_emitter=False): - global emitter - path = p("") if path is None else path - emitter = FSEventsEmitter( - event_queue, ObservedWatch(path, recursive=recursive), suppress_history=True - ) - emitter.start() - - @pytest.fixture def observer(): obs = Observer() @@ -100,7 +73,7 @@ def test_coalesced_event_check(event, expectation): assert event.is_coalesced == expectation -def test_add_watch_twice(observer): +def test_add_watch_twice(observer: BaseObserver, p: P) -> None: """Adding the same watch twice used to result in a null pointer return without an exception. See https://github.com/gorakhargosh/watchdog/issues/765 @@ -121,7 +94,11 @@ def callback(path, inodes, flags, ids): rmdir(a) -def test_watcher_deletion_while_receiving_events_1(caplog, observer): +def test_watcher_deletion_while_receiving_events_1( + caplog: pytest.LogCaptureFixture, + p: P, + start_watching: StartWatching, +) -> None: """ When the watcher is stopped while there are events, such exception could happen: @@ -145,7 +122,7 @@ def cb(*args): with caplog.at_level(logging.ERROR), patch.object( FSEventsEmitter, "events_callback", new=cb ): - start_watching(tmpdir) + emitter = start_watching(tmpdir) # Less than 100 is not enough events to trigger the error for n in range(100): touch(p("{}.txt".format(n))) @@ -153,7 +130,11 @@ def cb(*args): assert not caplog.records -def test_watcher_deletion_while_receiving_events_2(caplog): +def test_watcher_deletion_while_receiving_events_2( + caplog: pytest.LogCaptureFixture, + p: P, + start_watching: StartWatching, +) -> None: """Note: that test takes about 20 seconds to complete. Quite similar test to prevent another issue @@ -173,7 +154,7 @@ def test_watcher_deletion_while_receiving_events_2(caplog): def try_to_fail(): tmpdir = p() - start_watching(tmpdir) + emitter = start_watching(tmpdir) def create_files(): # Less than 2000 is not enough events to trigger the error @@ -204,7 +185,7 @@ def stop(em): assert not caplog.records -def test_remove_watch_twice(): +def test_remove_watch_twice(start_watching: StartWatching) -> None: """ ValueError: PyCapsule_GetPointer called with invalid PyCapsule object The above exception was the direct cause of the following exception: @@ -221,14 +202,14 @@ def on_thread_stop(self): (FSEvents.framework) FSEventStreamInvalidate(): failed assertion 'streamRef != NULL' (FSEvents.framework) FSEventStreamRelease(): failed assertion 'streamRef != NULL' """ - start_watching() + emitter = start_watching() # This one must work emitter.stop() # This is allowed to call several times .stop() emitter.stop() -def test_unschedule_removed_folder(observer): +def test_unschedule_removed_folder(observer: BaseObserver, p: P) -> None: """ TypeError: PyCObject_AsVoidPtr called with null pointer The above exception was the direct cause of the following exception: @@ -250,11 +231,11 @@ def on_thread_stop(self): observer.unschedule(w) -def test_converting_cfstring_to_pyunicode(): +def test_converting_cfstring_to_pyunicode(p: P, start_watching: StartWatching, event_queue: TestEventQueue) -> None: """See https://github.com/gorakhargosh/watchdog/issues/762""" tmpdir = p() - start_watching(tmpdir) + emitter = start_watching(tmpdir) dirname = "TéstClass" @@ -266,7 +247,7 @@ def test_converting_cfstring_to_pyunicode(): emitter.stop() -def test_recursive_check_accepts_relative_paths(): +def test_recursive_check_accepts_relative_paths(p: P) -> None: """See https://github.com/gorakhargosh/watchdog/issues/797 The test code provided in the defect observes the current working directory @@ -316,7 +297,7 @@ def done(self): observer.join() -def test_watchdog_recursive(): +def test_watchdog_recursive(p: P) -> None: """See https://github.com/gorakhargosh/watchdog/issues/706""" import os.path diff --git a/tests/test_inotify_c.py b/tests/test_inotify_c.py index cea8646b..e143fb50 100644 --- a/tests/test_inotify_c.py +++ b/tests/test_inotify_c.py @@ -7,52 +7,22 @@ if not platform.is_linux(): # noqa pytest.skip("GNU/Linux only.", allow_module_level=True) -import contextlib import ctypes import errno import logging import os import struct -from functools import partial -from queue import Queue from unittest.mock import patch from watchdog.events import DirCreatedEvent, DirDeletedEvent, DirModifiedEvent -from watchdog.observers.api import ObservedWatch -from watchdog.observers.inotify import InotifyEmitter, InotifyFullEmitter from watchdog.observers.inotify_c import Inotify, InotifyConstants, InotifyEvent -from .shell import mkdtemp, rm +from .utils import Helper, P, StartWatching, TestEventQueue logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) -def setup_function(function): - global p, event_queue - tmpdir = os.path.realpath(mkdtemp()) - p = partial(os.path.join, tmpdir) - event_queue = Queue() - - -@contextlib.contextmanager -def watching(path=None, use_full_emitter=False): - path = p("") if path is None else path - global emitter - Emitter = InotifyFullEmitter if use_full_emitter else InotifyEmitter - emitter = Emitter(event_queue, ObservedWatch(path, recursive=True)) - emitter.start() - yield - emitter.stop() - emitter.join(5) - - -def teardown_function(function): - rm(p(""), recursive=True) - with contextlib.suppress(NameError): - assert not emitter.is_alive() - - def struct_inotify(wd, mask, cookie=0, length=0, name=b""): assert len(name) <= length struct_format = ( @@ -66,7 +36,7 @@ def struct_inotify(wd, mask, cookie=0, length=0, name=b""): return struct.pack(struct_format, wd, mask, cookie, length, name) -def test_late_double_deletion(): +def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: inotify_fd = type("FD", (object,), {})() inotify_fd.last = 0 inotify_fd.wds = [] @@ -125,7 +95,8 @@ def inotify_rm_watch(fd, wd): mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch) mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch) - with mock1, mock2, mock3, mock4, mock5, watching(p("")): + with mock1, mock2, mock3, mock4, mock5: + start_watching(p("")) # Watchdog Events for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2: event = event_queue.get(timeout=5)[0] @@ -134,6 +105,7 @@ def inotify_rm_watch(fd, wd): event = event_queue.get(timeout=5)[0] assert isinstance(event, DirModifiedEvent) assert event.src_path == p("").rstrip(os.path.sep) + helper.close() assert inotify_fd.last == 3 # Number of directories assert inotify_fd.buf == b"" # Didn't miss any event @@ -159,31 +131,31 @@ def test_raise_error(error, patterns): assert any(pattern in str(exc.value) for pattern in patterns) -def test_non_ascii_path(): +def test_non_ascii_path(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: """ Inotify can construct an event for a path containing non-ASCII. """ path = p("\N{SNOWMAN}") - with watching(p("")): - os.mkdir(path) - event, _ = event_queue.get(timeout=5) - assert isinstance(event.src_path, type("")) - assert event.src_path == path - # Just make sure it doesn't raise an exception. - assert repr(event) + start_watching(p("")) + os.mkdir(path) + event, _ = event_queue.get(timeout=5) + assert isinstance(event.src_path, type("")) + assert event.src_path == path + # Just make sure it doesn't raise an exception. + assert repr(event) -def test_watch_file(): +def test_watch_file(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: path = p("this_is_a_file") with open(path, "a"): pass - with watching(path): - os.remove(path) - event, _ = event_queue.get(timeout=5) - assert repr(event) + start_watching(path) + os.remove(path) + event, _ = event_queue.get(timeout=5) + assert repr(event) -def test_event_equality(): +def test_event_equality(p: P) -> None: wd_parent_dir = 42 filename = "file.ext" full_path = p(filename) diff --git a/tests/test_observer.py b/tests/test_observer.py index ef1ee91f..4c26df15 100644 --- a/tests/test_observer.py +++ b/tests/test_observer.py @@ -16,6 +16,7 @@ import contextlib import threading +from typing import Iterator from unittest.mock import patch import pytest @@ -25,7 +26,7 @@ @pytest.fixture -def observer(): +def observer() -> Iterator[BaseObserver]: obs = BaseObserver(EventEmitter) yield obs obs.stop() diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000..00dcf401 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import dataclasses +import os +import sys +from queue import Empty, Queue +from typing import List, Optional, Tuple, Type, Union + +from watchdog.events import FileSystemEvent +from watchdog.observers.api import EventEmitter, ObservedWatch +from watchdog.utils import Protocol + +Emitter: Type[EventEmitter] + +if sys.platform.startswith("linux"): + from watchdog.observers.inotify import InotifyEmitter as Emitter + from watchdog.observers.inotify import InotifyFullEmitter +elif sys.platform.startswith("darwin"): + from watchdog.observers.fsevents import FSEventsEmitter as Emitter +elif sys.platform.startswith("win"): + from watchdog.observers.read_directory_changes import WindowsApiEmitter as Emitter +elif sys.platform.startswith(("dragonfly", "freebsd", "netbsd", "openbsd", "bsd")): + from watchdog.observers.kqueue import KqueueEmitter as Emitter + + +class P(Protocol): + def __call__(self, *args: str) -> str: + ... + + +class StartWatching(Protocol): + def __call__( + self, + path: Optional[Union[str, bytes]] = ..., + use_full_emitter: bool = ..., + recursive: bool = ..., + ) -> EventEmitter: + ... + + +class ExpectEvent(Protocol): + def __call__(self, expected_event: FileSystemEvent, timeout: float = ...) -> None: + ... + + +TestEventQueue = Union["Queue[Tuple[FileSystemEvent, ObservedWatch]]"] + + +@dataclasses.dataclass() +class Helper: + tmp: str + emitters: List[EventEmitter] = dataclasses.field(default_factory=list) + event_queue: TestEventQueue = dataclasses.field(default_factory=Queue) + + def joinpath(self, *args: str) -> str: + return os.path.join(self.tmp, *args) + + def start_watching( + self, + path: Optional[Union[str, bytes]] = None, + use_full_emitter: bool = False, + recursive: bool = True, + ) -> EventEmitter: + # todo: check if other platforms expect the trailing slash (e.g. `p('')`) + path = self.tmp if path is None else path + + emitter: EventEmitter + if sys.platform.startswith("linux") and use_full_emitter: + emitter = InotifyFullEmitter( + self.event_queue, ObservedWatch(path, recursive=recursive) + ) + else: + emitter = Emitter(self.event_queue, ObservedWatch(path, recursive=recursive)) + + self.emitters.append(emitter) + + if sys.platform.startswith("darwin"): + # TODO: I think this could be better... .suppress_history should maybe + # become a common attribute. + from watchdog.observers.fsevents import FSEventsEmitter + assert isinstance(emitter, FSEventsEmitter) + emitter.suppress_history = True + + emitter.start() + + return emitter + + def expect_event(self, expected_event: FileSystemEvent, timeout: float = 2) -> None: + """Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue. + + Provides some robustness for the otherwise flaky nature of asynchronous notifications. + """ + try: + event = self.event_queue.get(timeout=timeout)[0] + assert event == expected_event + except Empty: + raise + + def close(self) -> None: + for emitter in self.emitters: + emitter.stop() + + for emitter in self.emitters: + if emitter.is_alive(): + emitter.join(5) + + alive = [emitter.is_alive() for emitter in self.emitters] + self.emitters = [] + assert alive == [False] * len(alive)