diff --git a/.gitignore b/.gitignore index 8d7ae16e..ed8f776e 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ develop-eggs/ dist/ eggs/ parts/ +__pycache__/ MANIFEST # Project files for VS Code, idea, eclipse, and netbeans diff --git a/pyproject.toml b/pyproject.toml index 0633c721..e9043448 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ ignore_missing_imports = true follow_imports = "skip" # Ensure full coverage -#disallow_untyped_defs = true [TODO] +disallow_untyped_defs = true disallow_incomplete_defs = true disallow_untyped_calls = true diff --git a/src/watchdog/events.py b/src/watchdog/events.py index 82804e0a..a7d6d31f 100644 --- a/src/watchdog/events.py +++ b/src/watchdog/events.py @@ -100,9 +100,13 @@ import os.path import re from dataclasses import dataclass, field +from typing import TYPE_CHECKING from watchdog.utils.patterns import match_any_paths +if TYPE_CHECKING: + from collections.abc import Generator + EVENT_TYPE_MOVED = "moved" EVENT_TYPE_DELETED = "deleted" EVENT_TYPE_CREATED = "created" @@ -121,8 +125,8 @@ class FileSystemEvent: can be used as keys in dictionaries or be added to sets. """ - src_path: str - dest_path: str = "" + src_path: bytes | str + dest_path: bytes | str = "" event_type: str = field(default="", init=False) is_directory: bool = field(default=False, init=False) @@ -309,10 +313,10 @@ class PatternMatchingEventHandler(FileSystemEventHandler): def __init__( self, *, - patterns=None, - ignore_patterns=None, - ignore_directories=False, - case_sensitive=False, + patterns: list[str] | None = None, + ignore_patterns: list[str] | None = None, + ignore_directories: bool = False, + case_sensitive: bool = False, ): super().__init__() @@ -322,28 +326,28 @@ def __init__( self._case_sensitive = case_sensitive @property - def patterns(self): + def patterns(self) -> list[str] | None: """(Read-only) Patterns to allow matching event paths. """ return self._patterns @property - def ignore_patterns(self): + def ignore_patterns(self) -> list[str] | None: """(Read-only) Patterns to ignore matching event paths. """ return self._ignore_patterns @property - def ignore_directories(self): + def ignore_directories(self) -> bool: """(Read-only) ``True`` if directories should be ignored; ``False`` otherwise. """ return self._ignore_directories @property - def case_sensitive(self): + def case_sensitive(self) -> bool: """(Read-only) ``True`` if path names should be matched sensitive to case; ``False`` otherwise. @@ -384,10 +388,10 @@ class RegexMatchingEventHandler(FileSystemEventHandler): def __init__( self, *, - regexes=None, - ignore_regexes=None, - ignore_directories=False, - case_sensitive=False, + regexes: list[str] | None = None, + ignore_regexes: list[str] | None = None, + ignore_directories: bool = False, + case_sensitive: bool = False, ): super().__init__() @@ -407,28 +411,28 @@ def __init__( self._case_sensitive = case_sensitive @property - def regexes(self): + def regexes(self) -> list[re.Pattern[str]]: """(Read-only) Regexes to allow matching event paths. """ return self._regexes @property - def ignore_regexes(self): + def ignore_regexes(self) -> list[re.Pattern[str]]: """(Read-only) Regexes to ignore matching event paths. """ return self._ignore_regexes @property - def ignore_directories(self): + def ignore_directories(self) -> bool: """(Read-only) ``True`` if directories should be ignored; ``False`` otherwise. """ return self._ignore_directories @property - def case_sensitive(self): + def case_sensitive(self) -> bool: """(Read-only) ``True`` if path names should be matched sensitive to case; ``False`` otherwise. @@ -506,7 +510,10 @@ def on_opened(self, event: FileOpenedEvent) -> None: self.logger.info("Opened file: %s", event.src_path) -def generate_sub_moved_events(src_dir_path, dest_dir_path): +def generate_sub_moved_events( + src_dir_path: bytes | str, + dest_dir_path: bytes | str, +) -> Generator[DirMovedEvent | FileMovedEvent]: """Generates an event list of :class:`DirMovedEvent` and :class:`FileMovedEvent` objects for all the files and directories within the given moved directory that were moved along with the directory. @@ -519,18 +526,18 @@ def generate_sub_moved_events(src_dir_path, dest_dir_path): An iterable of file system events of type :class:`DirMovedEvent` and :class:`FileMovedEvent`. """ - for root, directories, filenames in os.walk(dest_dir_path): + for root, directories, filenames in os.walk(dest_dir_path): # type: ignore[type-var] for directory in directories: - full_path = os.path.join(root, directory) + full_path = os.path.join(root, directory) # type: ignore[call-overload] renamed_path = full_path.replace(dest_dir_path, src_dir_path) if src_dir_path else "" yield DirMovedEvent(renamed_path, full_path, is_synthetic=True) for filename in filenames: - full_path = os.path.join(root, filename) + full_path = os.path.join(root, filename) # type: ignore[call-overload] renamed_path = full_path.replace(dest_dir_path, src_dir_path) if src_dir_path else "" yield FileMovedEvent(renamed_path, full_path, is_synthetic=True) -def generate_sub_created_events(src_dir_path): +def generate_sub_created_events(src_dir_path: bytes | str) -> Generator[DirCreatedEvent | FileCreatedEvent]: """Generates an event list of :class:`DirCreatedEvent` and :class:`FileCreatedEvent` objects for all the files and directories within the given moved directory that were moved along with the directory. @@ -541,8 +548,10 @@ def generate_sub_created_events(src_dir_path): An iterable of file system events of type :class:`DirCreatedEvent` and :class:`FileCreatedEvent`. """ - for root, directories, filenames in os.walk(src_dir_path): + for root, directories, filenames in os.walk(src_dir_path): # type: ignore[type-var] for directory in directories: - yield DirCreatedEvent(os.path.join(root, directory), is_synthetic=True) + full_path = os.path.join(root, directory) # type: ignore[call-overload] + yield DirCreatedEvent(full_path, is_synthetic=True) for filename in filenames: - yield FileCreatedEvent(os.path.join(root, filename), is_synthetic=True) + full_path = os.path.join(root, filename) # type: ignore[call-overload] + yield FileCreatedEvent(full_path, is_synthetic=True) diff --git a/src/watchdog/observers/api.py b/src/watchdog/observers/api.py index 7097f55d..7bb5e52a 100644 --- a/src/watchdog/observers/api.py +++ b/src/watchdog/observers/api.py @@ -19,10 +19,14 @@ import queue import threading from pathlib import Path +from typing import TYPE_CHECKING from watchdog.utils import BaseThread from watchdog.utils.bricks import SkipRepeatsQueue +if TYPE_CHECKING: + from watchdog.events import FileSystemEvent, FileSystemEventHandler + DEFAULT_EMITTER_TIMEOUT = 1 # in seconds. DEFAULT_OBSERVER_TIMEOUT = 1 # in seconds. @@ -47,42 +51,46 @@ class ObservedWatch: Optional collection of :class:`watchdog.events.FileSystemEvent` to watch """ - def __init__(self, path, *, recursive, event_filter=None): + def __init__(self, path: str | Path, *, recursive: bool, event_filter: list[FileSystemEvent] | None = None): self._path = str(path) if isinstance(path, Path) else path self._is_recursive = recursive self._event_filter = frozenset(event_filter) if event_filter is not None else None @property - def path(self): + def path(self) -> str: """The path that this watch monitors.""" return self._path @property - def is_recursive(self): + def is_recursive(self) -> bool: """Determines whether subdirectories are watched for the path.""" return self._is_recursive @property - def event_filter(self): + def event_filter(self) -> frozenset[FileSystemEvent] | None: """Collection of event types watched for the path""" return self._event_filter @property - def key(self): + def key(self) -> tuple[str, bool, frozenset[FileSystemEvent] | None]: return self.path, self.is_recursive, self.event_filter - def __eq__(self, watch): + def __eq__(self, watch: object) -> bool: + if not isinstance(watch, ObservedWatch): + return NotImplemented return self.key == watch.key - def __ne__(self, watch): + def __ne__(self, watch: object) -> bool: + if not isinstance(watch, ObservedWatch): + return NotImplemented return self.key != watch.key - def __hash__(self): + def __hash__(self) -> int: return hash(self.key) - def __repr__(self): + def __repr__(self) -> str: if self.event_filter is not None: - event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) + event_filter_str = "|".join(sorted(_cls.__name__ for _cls in self.event_filter)) # type: ignore[attr-defined] event_filter_str = f", event_filter={event_filter_str}" else: event_filter_str = "" @@ -112,7 +120,14 @@ class EventEmitter(BaseThread): Iterable[:class:`watchdog.events.FileSystemEvent`] | None """ - def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None): + def __init__( + self, + event_queue: EventQueue, + watch: ObservedWatch, + *, + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + ) -> None: super().__init__() self._event_queue = event_queue self._watch = watch @@ -120,16 +135,16 @@ def __init__(self, event_queue, watch, timeout=DEFAULT_EMITTER_TIMEOUT, event_fi self._event_filter = frozenset(event_filter) if event_filter is not None else None @property - def timeout(self): + def timeout(self) -> int: """Blocking timeout for reading events.""" return self._timeout @property - def watch(self): + def watch(self) -> ObservedWatch: """The watch associated with this emitter.""" return self._watch - def queue_event(self, event): + def queue_event(self, event: FileSystemEvent) -> None: """Queues a single event. :param event: @@ -138,10 +153,10 @@ def queue_event(self, event): An instance of :class:`watchdog.events.FileSystemEvent` or a subclass. """ - if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): + if self._event_filter is None or any(isinstance(event, cls) for cls in self._event_filter): # type: ignore[arg-type] self._event_queue.put((event, self.watch)) - def queue_events(self, timeout): + def queue_events(self, timeout: int) -> None: """Override this method to populate the event queue with events per interval period. @@ -152,7 +167,7 @@ def queue_events(self, timeout): ``float`` """ - def run(self): + def run(self) -> None: while self.should_keep_running(): self.queue_events(self.timeout) @@ -171,30 +186,30 @@ class EventDispatcher(BaseThread): stop_event = object() """Event inserted into the queue to signal a requested stop.""" - def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): + def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__() self._event_queue = EventQueue() self._timeout = timeout @property - def timeout(self): + def timeout(self) -> int: """Timeout value to construct emitters with.""" return self._timeout - def stop(self): + def stop(self) -> None: BaseThread.stop(self) with contextlib.suppress(queue.Full): self.event_queue.put_nowait(EventDispatcher.stop_event) @property - def event_queue(self): + def event_queue(self) -> EventQueue: """The event queue which is populated with file system events by emitters and from which events are dispatched by a dispatcher thread. """ return self._event_queue - def dispatch_events(self, event_queue): + def dispatch_events(self, event_queue: EventQueue) -> None: """Override this method to consume events from an event queue, blocking on the queue for the specified timeout before raising :class:`queue.Empty`. @@ -206,7 +221,7 @@ def dispatch_events(self, event_queue): :class:`queue.Empty` """ - def run(self): + def run(self) -> None: while self.should_keep_running(): try: self.dispatch_events(self.event_queue) @@ -217,27 +232,27 @@ def run(self): class BaseObserver(EventDispatcher): """Base observer.""" - def __init__(self, emitter_class, *, timeout=DEFAULT_OBSERVER_TIMEOUT): - super().__init__(timeout) + def __init__(self, emitter_class: type[EventEmitter], *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: + super().__init__(timeout=timeout) self._emitter_class = emitter_class self._lock = threading.RLock() - self._watches = set() - self._handlers = {} - self._emitters = set() - self._emitter_for_watch = {} + self._watches: set[ObservedWatch] = set() + self._handlers: dict[ObservedWatch, set[FileSystemEventHandler]] = {} + self._emitters: set[EventEmitter] = set() + self._emitter_for_watch: dict[ObservedWatch, EventEmitter] = {} - def _add_emitter(self, emitter): + def _add_emitter(self, emitter: EventEmitter) -> None: self._emitter_for_watch[emitter.watch] = emitter self._emitters.add(emitter) - def _remove_emitter(self, emitter): + def _remove_emitter(self, emitter: EventEmitter) -> None: del self._emitter_for_watch[emitter.watch] self._emitters.remove(emitter) emitter.stop() with contextlib.suppress(RuntimeError): emitter.join() - def _clear_emitters(self): + def _clear_emitters(self) -> None: for emitter in self._emitters: emitter.stop() for emitter in self._emitters: @@ -246,20 +261,20 @@ def _clear_emitters(self): self._emitters.clear() self._emitter_for_watch.clear() - def _add_handler_for_watch(self, event_handler, watch): + def _add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: if watch not in self._handlers: self._handlers[watch] = set() self._handlers[watch].add(event_handler) - def _remove_handlers_for_watch(self, watch): + def _remove_handlers_for_watch(self, watch: ObservedWatch) -> None: del self._handlers[watch] @property - def emitters(self): + def emitters(self) -> set[EventEmitter]: """Returns event emitter created by this observer.""" return self._emitters - def start(self): + def start(self) -> None: for emitter in self._emitters.copy(): try: emitter.start() @@ -268,7 +283,14 @@ def start(self): raise super().start() - def schedule(self, event_handler, path, *, recursive=False, event_filter=None): + def schedule( + self, + event_handler: FileSystemEventHandler, + path: str, + *, + recursive: bool = False, + event_filter: list[FileSystemEvent] | None = None, + ) -> ObservedWatch: """Schedules watching a path and calls appropriate methods specified in the given event handler in response to file system events. @@ -308,7 +330,7 @@ def schedule(self, event_handler, path, *, recursive=False, event_filter=None): self._watches.add(watch) return watch - def add_handler_for_watch(self, event_handler, watch): + def add_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: """Adds a handler for the given watch. :param event_handler: @@ -326,7 +348,7 @@ def add_handler_for_watch(self, event_handler, watch): with self._lock: self._add_handler_for_watch(event_handler, watch) - def remove_handler_for_watch(self, event_handler, watch): + def remove_handler_for_watch(self, event_handler: FileSystemEventHandler, watch: ObservedWatch) -> None: """Removes a handler for the given watch. :param event_handler: @@ -344,7 +366,7 @@ def remove_handler_for_watch(self, event_handler, watch): with self._lock: self._handlers[watch].remove(event_handler) - def unschedule(self, watch): + def unschedule(self, watch: ObservedWatch) -> None: """Unschedules a watch. :param watch: @@ -359,7 +381,7 @@ def unschedule(self, watch): self._remove_emitter(emitter) self._watches.remove(watch) - def unschedule_all(self): + def unschedule_all(self) -> None: """Unschedules all watches and detaches all associated event handlers. """ @@ -368,10 +390,10 @@ def unschedule_all(self): self._clear_emitters() self._watches.clear() - def on_thread_stop(self): + def on_thread_stop(self) -> None: self.unschedule_all() - def dispatch_events(self, event_queue): + def dispatch_events(self, event_queue: EventQueue) -> None: entry = event_queue.get(block=True) if entry is EventDispatcher.stop_event: return diff --git a/src/watchdog/observers/fsevents.py b/src/watchdog/observers/fsevents.py index a0672961..5aad8783 100644 --- a/src/watchdog/observers/fsevents.py +++ b/src/watchdog/observers/fsevents.py @@ -27,6 +27,7 @@ import threading import time import unicodedata +from typing import TYPE_CHECKING import _watchdog_fsevents as _fsevents @@ -45,6 +46,11 @@ from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter from watchdog.utils.dirsnapshot import DirectorySnapshot +if TYPE_CHECKING: + from watchdog.events import FileSystemEvent, FileSystemEventHandler + from watchdog.observers.api import EventQueue, ObservedWatch + + logger = logging.getLogger("fsevents") @@ -73,26 +79,26 @@ class FSEventsEmitter(EventEmitter): def __init__( self, - event_queue, - watch, + event_queue: EventQueue, + watch: ObservedWatch, *, - timeout=DEFAULT_EMITTER_TIMEOUT, - event_filter=None, - suppress_history=False, - ): + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + suppress_history: bool = False, + ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) - self._fs_view = set() + self._fs_view: set[int] = set() self.suppress_history = suppress_history self._start_time = 0.0 - self._starting_state = None + self._starting_state: DirectorySnapshot | None = None self._lock = threading.Lock() self._absolute_watch_path = os.path.realpath(os.path.abspath(os.path.expanduser(self.watch.path))) - def on_thread_stop(self): + def on_thread_stop(self) -> None: _fsevents.remove_watch(self.watch) _fsevents.stop(self) - def queue_event(self, event): + def queue_event(self, event: FileSystemEvent) -> None: # fsevents defaults to be recursive, so if the watch was meant to be non-recursive then we need to drop # all the events here which do not have a src_path / dest_path that matches the watched path if self._watch.is_recursive or not self._is_recursive_event(event): @@ -101,7 +107,7 @@ def queue_event(self, event): else: logger.debug("drop event %s", event) - def _is_recursive_event(self, event): + def _is_recursive_event(self, event: FileSystemEvent) -> bool: src_path = event.src_path if event.is_directory else os.path.dirname(event.src_path) if src_path == self._absolute_watch_path: return False @@ -115,28 +121,35 @@ def _is_recursive_event(self, event): return True - def _queue_created_event(self, event, src_path, dirname): + def _queue_created_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None: cls = DirCreatedEvent if event.is_directory else FileCreatedEvent self.queue_event(cls(src_path)) self.queue_event(DirModifiedEvent(dirname)) - def _queue_deleted_event(self, event, src_path, dirname): + def _queue_deleted_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None: cls = DirDeletedEvent if event.is_directory else FileDeletedEvent self.queue_event(cls(src_path)) self.queue_event(DirModifiedEvent(dirname)) - def _queue_modified_event(self, event, src_path, dirname): + def _queue_modified_event(self, event: FileSystemEvent, src_path: bytes | str, dirname: bytes | str) -> None: cls = DirModifiedEvent if event.is_directory else FileModifiedEvent self.queue_event(cls(src_path)) - def _queue_renamed_event(self, src_event, src_path, dst_path, src_dirname, dst_dirname): + def _queue_renamed_event( + self, + src_event: FileSystemEvent, + src_path: bytes | str, + dst_path: bytes | str, + src_dirname: bytes | str, + dst_dirname: bytes | str, + ) -> None: cls = DirMovedEvent if src_event.is_directory else FileMovedEvent dst_path = self._encode_path(dst_path) self.queue_event(cls(src_path, dst_path)) self.queue_event(DirModifiedEvent(src_dirname)) self.queue_event(DirModifiedEvent(dst_dirname)) - def _is_historic_created_event(self, event): + def _is_historic_created_event(self, event: _fsevents.NativeEvent) -> bool: # We only queue a created event if the item was created after we # started the FSEventsStream. @@ -154,11 +167,11 @@ def _is_historic_created_event(self, event): return in_history or before_start @staticmethod - def _is_meta_mod(event): + def _is_meta_mod(event: _fsevents.NativeEvent) -> bool: """Returns True if the event indicates a change in metadata.""" return event.is_inode_meta_mod or event.is_xattr_mod or event.is_owner_change - def queue_events(self, timeout, events): + def queue_events(self, timeout: int, events: list[_fsevents.NativeEvent]) -> None: # type: ignore[override] if logger.getEffectiveLevel() <= logging.DEBUG: for event in events: flags = ", ".join(attr for attr in dir(event) if getattr(event, attr) is True) @@ -241,8 +254,8 @@ def queue_events(self, timeout, events): self._queue_renamed_event(event, src_path, dst_path, src_dirname, dst_dirname) self._fs_view.add(event.inode) - for sub_event in generate_sub_moved_events(src_path, dst_path): - self.queue_event(sub_event) + for sub_moved_event in generate_sub_moved_events(src_path, dst_path): + self.queue_event(sub_moved_event) # Process any coalesced flags for the dst_event. @@ -261,8 +274,8 @@ def queue_events(self, timeout, events): self._queue_created_event(event, src_path, src_dirname) self._fs_view.add(event.inode) - for sub_event in generate_sub_created_events(src_path): - self.queue_event(sub_event) + for sub_created_event in generate_sub_created_events(src_path): + self.queue_event(sub_created_event) else: # This is the source event, item was moved out of the watched @@ -287,7 +300,7 @@ def queue_events(self, timeout, events): self._fs_view.clear() - def events_callback(self, paths, inodes, flags, ids): + def events_callback(self, paths: list[bytes], inodes: list[int], flags: list[int], ids: list[int]) -> None: """Callback passed to FSEventStreamCreate(), it will receive all FS events and queue them. """ @@ -302,7 +315,7 @@ def events_callback(self, paths, inodes, flags, ids): except Exception: logger.exception("Unhandled exception in fsevents callback") - def run(self): + def run(self) -> None: self.pathnames = [self.watch.path] self._start_time = time.monotonic() try: @@ -311,21 +324,28 @@ def run(self): except Exception: logger.exception("Unhandled exception in FSEventsEmitter") - def on_thread_start(self): + def on_thread_start(self) -> None: if self.suppress_history: watch_path = os.fsdecode(self.watch.path) if isinstance(self.watch.path, bytes) else self.watch.path self._starting_state = DirectorySnapshot(watch_path) - def _encode_path(self, path): + def _encode_path(self, path: bytes | str) -> bytes | str: """Encode path only if bytes were passed to this emitter.""" return os.fsencode(path) if isinstance(self.watch.path, bytes) else path class FSEventsObserver(BaseObserver): - def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): + def __init__(self, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(FSEventsEmitter, timeout=timeout) - def schedule(self, event_handler, path, *, recursive=False, event_filter=None): + def schedule( + self, + event_handler: FileSystemEventHandler, + path: str, + *, + recursive: bool = False, + event_filter: list[FileSystemEvent] | None = None, + ) -> ObservedWatch: # Fix for issue #26: Trace/BPT error when given a unicode path # string. https://github.com/gorakhargosh/watchdog/issues#issue/26 if isinstance(path, str): diff --git a/src/watchdog/observers/fsevents2.py b/src/watchdog/observers/fsevents2.py index dc35d594..72f986d2 100644 --- a/src/watchdog/observers/fsevents2.py +++ b/src/watchdog/observers/fsevents2.py @@ -25,6 +25,7 @@ import unicodedata import warnings from threading import Thread +from typing import TYPE_CHECKING # pyobjc import AppKit @@ -68,6 +69,11 @@ ) from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter +if TYPE_CHECKING: + from typing import Callable + + from watchdog.observers.api import EventQueue, ObservedWatch + logger = logging.getLogger(__name__) message = "watchdog.observers.fsevents2 is deprecated and will be removed in a future release." @@ -78,7 +84,7 @@ class FSEventsQueue(Thread): """Low level FSEvents client.""" - def __init__(self, path): + def __init__(self, path: bytes | str) -> None: Thread.__init__(self) self._queue: queue.Queue[list[NativeEvent] | None] = queue.Queue() self._run_loop = None @@ -102,7 +108,7 @@ def __init__(self, path): error = "FSEvents. Could not create stream." raise OSError(error) - def run(self): + def run(self) -> None: pool = AppKit.NSAutoreleasePool.alloc().init() self._run_loop = CFRunLoopGetCurrent() FSEventStreamScheduleWithRunLoop(self._stream_ref, self._run_loop, kCFRunLoopDefaultMode) @@ -120,18 +126,26 @@ def run(self): # Make sure waiting thread is notified self._queue.put(None) - def stop(self): + def stop(self) -> None: if self._run_loop is not None: CFRunLoopStop(self._run_loop) - def _callback(self, stream_ref, client_callback_info, num_events, event_paths, event_flags, event_ids): + def _callback( + self, + stream_ref: int, + client_callback_info: Callable, + num_events: int, + event_paths: list[bytes], + event_flags: list[int], + event_ids: list[int], + ) -> None: events = [NativeEvent(path, flags, _id) for path, flags, _id in zip(event_paths, event_flags, event_ids)] logger.debug("FSEvents callback. Got %d events:", num_events) for e in events: logger.debug(e) self._queue.put(events) - def read_events(self): + def read_events(self) -> list[NativeEvent] | None: """Returns a list or one or more events, or None if there are no more events to be read. """ @@ -139,7 +153,7 @@ def read_events(self): class NativeEvent: - def __init__(self, path, flags, event_id): + def __init__(self, path: bytes, flags: int, event_id: int) -> None: self.path = path self.flags = flags self.event_id = event_id @@ -155,7 +169,7 @@ def __init__(self, path, flags, event_id): self.is_directory = bool(flags & kFSEventStreamEventFlagItemIsDir) @property - def _event_type(self): + def _event_type(self) -> str: if self.is_created: return "Created" if self.is_removed: @@ -170,7 +184,7 @@ def _event_type(self): return "XattrMod" return "Unknown" - def __repr__(self): + def __repr__(self) -> str: return ( f"<{type(self).__name__}: path={self.path!r}, type={self._event_type}," f" is_dir={self.is_directory}, flags={hex(self.flags)}, id={self.event_id}>" @@ -180,15 +194,22 @@ def __repr__(self): class FSEventsEmitter(EventEmitter): """FSEvents based event emitter. Handles conversion of native events.""" - def __init__(self, event_queue, watch, *, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None): + def __init__( + self, + event_queue: EventQueue, + watch: ObservedWatch, + *, + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + ): super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._fsevents = FSEventsQueue(watch.path) self._fsevents.start() - def on_thread_stop(self): + def on_thread_stop(self) -> None: self._fsevents.stop() - def queue_events(self, timeout): + def queue_events(self, timeout: int) -> None: events = self._fsevents.read_events() if events is None: return @@ -240,5 +261,5 @@ def queue_events(self, timeout): class FSEventsObserver2(BaseObserver): - def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): + def __init__(self, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(FSEventsEmitter, timeout=timeout) diff --git a/src/watchdog/observers/inotify.py b/src/watchdog/observers/inotify.py index c6cf338c..b2b908dc 100644 --- a/src/watchdog/observers/inotify.py +++ b/src/watchdog/observers/inotify.py @@ -68,6 +68,7 @@ import logging import os import threading +from typing import TYPE_CHECKING from watchdog.events import ( DirCreatedEvent, @@ -86,9 +87,11 @@ generate_sub_moved_events, ) from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter +from watchdog.observers.inotify_buffer import InotifyBuffer +from watchdog.observers.inotify_c import InotifyConstants -from .inotify_buffer import InotifyBuffer -from .inotify_c import InotifyConstants +if TYPE_CHECKING: + from watchdog.observers.api import EventQueue, ObservedWatch logger = logging.getLogger(__name__) @@ -112,22 +115,29 @@ class InotifyEmitter(EventEmitter): Iterable[:class:`watchdog.events.FileSystemEvent`] | None """ - def __init__(self, event_queue, watch, *, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None): + def __init__( + self, + event_queue: EventQueue, + watch: ObservedWatch, + *, + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._lock = threading.Lock() - self._inotify = None + self._inotify: InotifyBuffer | None = None - def on_thread_start(self): + def on_thread_start(self) -> None: path = os.fsencode(self.watch.path) event_mask = self.get_event_mask_from_filter() self._inotify = InotifyBuffer(path, recursive=self.watch.is_recursive, event_mask=event_mask) - def on_thread_stop(self): + def on_thread_stop(self) -> None: if self._inotify: self._inotify.close() self._inotify = None - def queue_events(self, timeout, *, full_events=False): + def queue_events(self, timeout: int, *, full_events: bool = False) -> None: # If "full_events" is true, then the method will report unmatched move events as separate events # This behavior is by default only called by a InotifyFullEmitter if self._inotify is None: @@ -151,8 +161,8 @@ def queue_events(self, timeout, *, full_events=False): self.queue_event(DirModifiedEvent(os.path.dirname(src_path))) self.queue_event(DirModifiedEvent(os.path.dirname(dest_path))) if move_from.is_directory and self.watch.is_recursive: - for sub_event in generate_sub_moved_events(src_path, dest_path): - self.queue_event(sub_event) + for sub_moved_event in generate_sub_moved_events(src_path, dest_path): + self.queue_event(sub_moved_event) return src_path = self._decode_path(event.src_path) @@ -165,8 +175,8 @@ def queue_events(self, timeout, *, full_events=False): self.queue_event(cls(src_path)) self.queue_event(DirModifiedEvent(os.path.dirname(src_path))) if event.is_directory and self.watch.is_recursive: - for sub_event in generate_sub_created_events(src_path): - self.queue_event(sub_event) + for sub_created_event in generate_sub_created_events(src_path): + self.queue_event(sub_created_event) elif event.is_attrib or event.is_modify: cls = DirModifiedEvent if event.is_directory else FileModifiedEvent self.queue_event(cls(src_path)) @@ -198,23 +208,24 @@ def queue_events(self, timeout, *, full_events=False): cls = FileClosedNoWriteEvent self.queue_event(cls(src_path)) - def _decode_path(self, path): + def _decode_path(self, path: bytes | str) -> bytes | str: """Decode path only if unicode string was passed to this emitter.""" return path if isinstance(self.watch.path, bytes) else os.fsdecode(path) - def get_event_mask_from_filter(self): - """Optimization: Only include events we are filtering in inotify call""" + def get_event_mask_from_filter(self) -> int | None: + """Optimization: Only include events we are filtering in inotify call.""" if self._event_filter is None: return None - # always listen to delete self + # Always listen to delete self event_mask = InotifyConstants.IN_DELETE_SELF + for cls in self._event_filter: - if cls in (DirMovedEvent, FileMovedEvent): + if cls in {DirMovedEvent, FileMovedEvent}: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_MOVE - elif cls in (DirCreatedEvent, FileCreatedEvent): + elif cls in {DirCreatedEvent, FileCreatedEvent}: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_MOVE | InotifyConstants.IN_CREATE - elif cls is DirModifiedEvent: + elif cls is DirModifiedEvent: # type: ignore[comparison-overlap] event_mask |= ( InotifyConstants.IN_MOVE | InotifyConstants.IN_ATTRIB @@ -222,16 +233,17 @@ def get_event_mask_from_filter(self): | InotifyConstants.IN_CREATE | InotifyConstants.IN_CLOSE_WRITE ) - elif cls is FileModifiedEvent: + elif cls is FileModifiedEvent: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_ATTRIB | InotifyConstants.IN_MODIFY - elif cls in (DirDeletedEvent, FileDeletedEvent): + elif cls in {DirDeletedEvent, FileDeletedEvent}: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_DELETE - elif cls is FileClosedEvent: + elif cls is FileClosedEvent: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_CLOSE_WRITE - elif cls is FileClosedNoWriteEvent: + elif cls is FileClosedNoWriteEvent: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_CLOSE_NOWRITE - elif cls is FileOpenedEvent: + elif cls is FileOpenedEvent: # type: ignore[comparison-overlap] event_mask |= InotifyConstants.IN_OPEN + return event_mask @@ -240,8 +252,8 @@ class InotifyFullEmitter(InotifyEmitter): Such move events will have a ``None`` value for the unmatched part. """ - def queue_events(self, timeout, *, events=True): - InotifyEmitter.queue_events(self, timeout, full_events=events) + def queue_events(self, timeout: int, *, events: bool = True) -> None: # type: ignore[override] + super().queue_events(timeout, full_events=events) class InotifyObserver(BaseObserver): @@ -249,6 +261,6 @@ class InotifyObserver(BaseObserver): calls to event handlers. """ - def __init__(self, *, timeout=DEFAULT_OBSERVER_TIMEOUT, generate_full_events=False): + def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT, generate_full_events: bool = False) -> None: cls = InotifyFullEmitter if generate_full_events else InotifyEmitter super().__init__(cls, timeout=timeout) diff --git a/src/watchdog/observers/inotify_buffer.py b/src/watchdog/observers/inotify_buffer.py index ddfdefc3..07a34613 100644 --- a/src/watchdog/observers/inotify_buffer.py +++ b/src/watchdog/observers/inotify_buffer.py @@ -30,47 +30,48 @@ class InotifyBuffer(BaseThread): delay = 0.5 - def __init__(self, path, *, recursive=False, event_mask=None): + def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | None = None) -> None: super().__init__() - self._queue = DelayedQueue[InotifyEvent](self.delay) + # XXX: Remove quotes after Python 3.9 drop + self._queue = DelayedQueue["InotifyEvent | tuple[InotifyEvent, InotifyEvent]"](self.delay) self._inotify = Inotify(path, recursive=recursive, event_mask=event_mask) self.start() - def read_event(self): + def read_event(self) -> InotifyEvent | tuple[InotifyEvent, InotifyEvent] | None: """Returns a single event or a tuple of from/to events in case of a paired move event. If this buffer has been closed, immediately return None. """ return self._queue.get() - def on_thread_stop(self): + def on_thread_stop(self) -> None: self._inotify.close() self._queue.close() - def close(self): + def close(self) -> None: self.stop() self.join() - def _group_events(self, event_list): + def _group_events(self, event_list: list[InotifyEvent]) -> list[InotifyEvent | tuple[InotifyEvent, InotifyEvent]]: """Group any matching move events""" grouped: list[InotifyEvent | tuple[InotifyEvent, InotifyEvent]] = [] for inotify_event in event_list: logger.debug("in-event %s", inotify_event) - def matching_from_event(event): + def matching_from_event(event: InotifyEvent | tuple[InotifyEvent, InotifyEvent]) -> bool: return not isinstance(event, tuple) and event.is_moved_from and event.cookie == inotify_event.cookie if inotify_event.is_moved_to: # Check if move_from is already in the buffer for index, event in enumerate(grouped): if matching_from_event(event): - grouped[index] = (event, inotify_event) + grouped[index] = (event, inotify_event) # type: ignore[assignment] break else: # Check if move_from is in delayqueue already from_event = self._queue.remove(matching_from_event) if from_event is not None: - grouped.append((from_event, inotify_event)) + grouped.append((from_event, inotify_event)) # type: ignore[arg-type] else: logger.debug("could not find matching move_from event") grouped.append(inotify_event) @@ -78,7 +79,7 @@ def matching_from_event(event): grouped.append(inotify_event) return grouped - def run(self): + def run(self) -> None: """Read event from `inotify` and add them to `queue`. When reading a IN_MOVE_TO event, remove the previous added matching IN_MOVE_FROM event and add them back to the queue as a tuple. diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index 6f020a9f..b5cecba5 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -24,9 +24,13 @@ import threading from ctypes import c_char_p, c_int, c_uint32 from functools import reduce +from typing import TYPE_CHECKING from watchdog.utils import UnsupportedLibcError +if TYPE_CHECKING: + from collections.abc import Generator + libc = ctypes.CDLL(None) if not hasattr(libc, "inotify_init") or not hasattr(libc, "inotify_add_watch") or not hasattr(libc, "inotify_rm_watch"): @@ -152,7 +156,7 @@ class Inotify: ``True`` if subdirectories should be monitored; ``False`` otherwise. """ - def __init__(self, path, *, recursive=False, event_mask=None): + def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | None = None) -> None: # The file descriptor associated with the inotify instance. inotify_fd = inotify_init() if inotify_fd == -1: @@ -161,8 +165,8 @@ def __init__(self, path, *, recursive=False, event_mask=None): self._lock = threading.Lock() # Stores the watch descriptor for a given path. - self._wd_for_path = {} - self._path_for_wd = {} + self._wd_for_path: dict[bytes, int] = {} + self._path_for_wd: dict[int, bytes] = {} self._path = path # Default to all events @@ -171,36 +175,36 @@ def __init__(self, path, *, recursive=False, event_mask=None): self._event_mask = event_mask self._is_recursive = recursive if os.path.isdir(path): - self._add_dir_watch(path, recursive, event_mask) + self._add_dir_watch(path, event_mask, recursive=recursive) else: self._add_watch(path, event_mask) - self._moved_from_events = {} + self._moved_from_events: dict[int, InotifyEvent] = {} @property - def event_mask(self): + def event_mask(self) -> int: """The event mask for this inotify instance.""" return self._event_mask @property - def path(self): + def path(self) -> bytes: """The path associated with the inotify instance.""" return self._path @property - def is_recursive(self): + def is_recursive(self) -> bool: """Whether we are watching directories recursively.""" return self._is_recursive @property - def fd(self): + def fd(self) -> int: """The file descriptor associated with the inotify instance.""" return self._inotify_fd - def clear_move_records(self): + def clear_move_records(self) -> None: """Clear cached records of MOVED_FROM events""" self._moved_from_events = {} - def source_for_move(self, destination_event): + def source_for_move(self, destination_event: InotifyEvent) -> bytes | None: """The source path corresponding to the given MOVED_TO event. If the source path is outside the monitored directories, None @@ -211,13 +215,13 @@ def source_for_move(self, destination_event): return None - def remember_move_from_event(self, event): + def remember_move_from_event(self, event: InotifyEvent) -> None: """Save this event as the source event for future MOVED_TO events to reference. """ self._moved_from_events[event.cookie] = event - def add_watch(self, path): + def add_watch(self, path: bytes) -> None: """Adds a watch for the given path. :param path: @@ -226,7 +230,7 @@ def add_watch(self, path): with self._lock: self._add_watch(path, self._event_mask) - def remove_watch(self, path): + def remove_watch(self, path: bytes) -> None: """Removes a watch for the given path. :param path: @@ -238,7 +242,7 @@ def remove_watch(self, path): if inotify_rm_watch(self._inotify_fd, wd) == -1: Inotify._raise_error() - def close(self): + def close(self) -> None: """Closes the inotify instance and removes all associated watches.""" with self._lock: if self._path in self._wd_for_path: @@ -249,14 +253,14 @@ def close(self): with contextlib.suppress(OSError): os.close(self._inotify_fd) - def read_events(self, event_buffer_size=DEFAULT_EVENT_BUFFER_SIZE): + def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]: """Reads events from inotify and yields them.""" # HACK: We need to traverse the directory path # recursively and simulate events for newly # created subdirectories/files. This will handle # mkdir -p foobar/blah/bar; touch foobar/afile - def _recursive_simulate(src_path): + def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]: events = [] for root, dirnames, filenames in os.walk(src_path): for dirname in dirnames: @@ -352,7 +356,7 @@ def _recursive_simulate(src_path): return event_list # Non-synchronized methods. - def _add_dir_watch(self, path, recursive, mask): + def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None: """Adds a watch (optionally recursively) for the given directory path to monitor events specified by the mask. @@ -374,7 +378,7 @@ def _add_dir_watch(self, path, recursive, mask): continue self._add_watch(full_path, mask) - def _add_watch(self, path, mask): + def _add_watch(self, path: bytes, mask: int) -> int: """Adds a watch for the given path to monitor events specified by the mask. @@ -391,7 +395,7 @@ def _add_watch(self, path, mask): return wd @staticmethod - def _raise_error(): + def _raise_error() -> None: """Raises errors for inotify failures.""" err = ctypes.get_errno() @@ -405,7 +409,7 @@ def _raise_error(): raise OSError(err, os.strerror(err)) @staticmethod - def _parse_event_buffer(event_buffer): + def _parse_event_buffer(event_buffer: bytes) -> Generator[tuple[int, int, int, bytes]]: """Parses an event buffer of ``inotify_event`` structs returned by inotify:: @@ -444,7 +448,7 @@ class InotifyEvent: Full event source path. """ - def __init__(self, wd, mask, cookie, name, src_path): + def __init__(self, wd: int, mask: int, cookie: int, name: bytes, src_path: bytes) -> None: self._wd = wd self._mask = mask self._cookie = cookie @@ -452,103 +456,107 @@ def __init__(self, wd, mask, cookie, name, src_path): self._src_path = src_path @property - def src_path(self): + def src_path(self) -> bytes: return self._src_path @property - def wd(self): + def wd(self) -> int: return self._wd @property - def mask(self): + def mask(self) -> int: return self._mask @property - def cookie(self): + def cookie(self) -> int: return self._cookie @property - def name(self): + def name(self) -> bytes: return self._name @property - def is_modify(self): + def is_modify(self) -> bool: return self._mask & InotifyConstants.IN_MODIFY > 0 @property - def is_close_write(self): + def is_close_write(self) -> bool: return self._mask & InotifyConstants.IN_CLOSE_WRITE > 0 @property - def is_close_nowrite(self): + def is_close_nowrite(self) -> bool: return self._mask & InotifyConstants.IN_CLOSE_NOWRITE > 0 @property - def is_open(self): + def is_open(self) -> bool: return self._mask & InotifyConstants.IN_OPEN > 0 @property - def is_access(self): + def is_access(self) -> bool: return self._mask & InotifyConstants.IN_ACCESS > 0 @property - def is_delete(self): + def is_delete(self) -> bool: return self._mask & InotifyConstants.IN_DELETE > 0 @property - def is_delete_self(self): + def is_delete_self(self) -> bool: return self._mask & InotifyConstants.IN_DELETE_SELF > 0 @property - def is_create(self): + def is_create(self) -> bool: return self._mask & InotifyConstants.IN_CREATE > 0 @property - def is_moved_from(self): + def is_moved_from(self) -> bool: return self._mask & InotifyConstants.IN_MOVED_FROM > 0 @property - def is_moved_to(self): + def is_moved_to(self) -> bool: return self._mask & InotifyConstants.IN_MOVED_TO > 0 @property - def is_move(self): + def is_move(self) -> bool: return self._mask & InotifyConstants.IN_MOVE > 0 @property - def is_move_self(self): + def is_move_self(self) -> bool: return self._mask & InotifyConstants.IN_MOVE_SELF > 0 @property - def is_attrib(self): + def is_attrib(self) -> bool: return self._mask & InotifyConstants.IN_ATTRIB > 0 @property - def is_ignored(self): + def is_ignored(self) -> bool: return self._mask & InotifyConstants.IN_IGNORED > 0 @property - def is_directory(self): + def is_directory(self) -> bool: # It looks like the kernel does not provide this information for # IN_DELETE_SELF and IN_MOVE_SELF. In this case, assume it's a dir. # See also: https://github.com/seb-m/pyinotify/blob/2c7e8f8/python2/pyinotify.py#L897 return self.is_delete_self or self.is_move_self or self._mask & InotifyConstants.IN_ISDIR > 0 @property - def key(self): + def key(self) -> tuple[bytes, int, int, int, bytes]: return self._src_path, self._wd, self._mask, self._cookie, self._name - def __eq__(self, inotify_event): + def __eq__(self, inotify_event: object) -> bool: + if not isinstance(inotify_event, InotifyEvent): + return NotImplemented return self.key == inotify_event.key - def __ne__(self, inotify_event): + def __ne__(self, inotify_event: object) -> bool: + if not isinstance(inotify_event, InotifyEvent): + return NotImplemented return self.key != inotify_event.key - def __hash__(self): + def __hash__(self) -> int: return hash(self.key) @staticmethod - def _get_mask_string(mask): + def _get_mask_string(mask: int) -> str: masks = [] for c in dir(InotifyConstants): if c.startswith("IN_") and c not in {"IN_ALL_EVENTS", "IN_MOVE"}: @@ -557,7 +565,7 @@ def _get_mask_string(mask): masks.append(c) return "|".join(masks) - def __repr__(self): + def __repr__(self) -> str: return ( f"<{type(self).__name__}: src_path={self.src_path!r}, wd={self.wd}," f" mask={self._get_mask_string(self.mask)}, cookie={self.cookie}," diff --git a/src/watchdog/observers/kqueue.py b/src/watchdog/observers/kqueue.py index 5596f899..5765f626 100644 --- a/src/watchdog/observers/kqueue.py +++ b/src/watchdog/observers/kqueue.py @@ -68,7 +68,7 @@ # The `select` module varies between platforms. # mypy may complain about missing module attributes depending on which platform it's running on. # The comment below disables mypy's attribute check. -# mypy: disable-error-code=attr-defined +# mypy: disable-error-code="attr-defined, name-defined" from __future__ import annotations @@ -79,6 +79,7 @@ import select import threading from stat import S_ISDIR +from typing import TYPE_CHECKING from watchdog.events import ( EVENT_TYPE_CREATED, @@ -98,6 +99,13 @@ from watchdog.utils import platform from watchdog.utils.dirsnapshot import DirectorySnapshot +if TYPE_CHECKING: + from collections.abc import Generator + from typing import Callable + + from watchdog.events import FileSystemEvent + from watchdog.observers.api import EventQueue, ObservedWatch + # Maximum number of events to process. MAX_EVENTS = 4096 @@ -119,30 +127,30 @@ ) -def absolute_path(path): +def absolute_path(path: bytes | str) -> bytes | str: return os.path.abspath(os.path.normpath(path)) # Flag tests. -def is_deleted(kev): +def is_deleted(kev: select.kevent) -> bool: """Determines whether the given kevent represents deletion.""" return kev.fflags & select.KQ_NOTE_DELETE -def is_modified(kev): +def is_modified(kev: select.kevent) -> bool: """Determines whether the given kevent represents modification.""" fflags = kev.fflags return (fflags & select.KQ_NOTE_EXTEND) or (fflags & select.KQ_NOTE_WRITE) -def is_attrib_modified(kev): +def is_attrib_modified(kev: select.kevent) -> bool: """Determines whether the given kevent represents attribute modification.""" return kev.fflags & select.KQ_NOTE_ATTRIB -def is_renamed(kev): +def is_renamed(kev: select.kevent) -> bool: """Determines whether the given kevent represents movement.""" return kev.fflags & select.KQ_NOTE_RENAME @@ -150,34 +158,26 @@ def is_renamed(kev): class KeventDescriptorSet: """Thread-safe kevent descriptor collection.""" - def __init__(self): - # Set of KeventDescriptor - self._descriptors = set() - - # Descriptor for a given path. - self._descriptor_for_path = {} - - # Descriptor for a given fd. - self._descriptor_for_fd = {} - - # List of kevent objects. - self._kevents = [] - + def __init__(self) -> None: + self._descriptors: set[KeventDescriptor] = set() + self._descriptor_for_path: dict[bytes | str, KeventDescriptor] = {} + self._descriptor_for_fd: dict[int, KeventDescriptor] = {} + self._kevents: list[select.kevent] = [] self._lock = threading.Lock() @property - def kevents(self): + def kevents(self) -> list[select.kevent]: """List of kevents monitored.""" with self._lock: return self._kevents @property - def paths(self): + def paths(self) -> list[bytes | str]: """List of paths for which kevents have been created.""" with self._lock: return list(self._descriptor_for_path.keys()) - def get_for_fd(self, fd): + def get_for_fd(self, fd: int) -> KeventDescriptor: """Given a file descriptor, returns the kevent descriptor object for it. @@ -191,7 +191,7 @@ def get_for_fd(self, fd): with self._lock: return self._descriptor_for_fd[fd] - def get(self, path): + def get(self, path: bytes | str) -> KeventDescriptor: """Obtains a :class:`KeventDescriptor` object for the specified path. :param path: @@ -201,7 +201,7 @@ def get(self, path): path = absolute_path(path) return self._get(path) - def __contains__(self, path): + def __contains__(self, path: bytes | str) -> bool: """Determines whether a :class:`KeventDescriptor has been registered for the specified path. @@ -212,7 +212,7 @@ def __contains__(self, path): path = absolute_path(path) return self._has_path(path) - def add(self, path, is_directory): + def add(self, path: bytes | str, *, is_directory: bool) -> None: """Adds a :class:`KeventDescriptor` to the collection for the given path. @@ -227,9 +227,9 @@ def add(self, path, is_directory): with self._lock: path = absolute_path(path) if not self._has_path(path): - self._add_descriptor(KeventDescriptor(path, is_directory)) + self._add_descriptor(KeventDescriptor(path, is_directory=is_directory)) - def remove(self, path): + def remove(self, path: bytes | str) -> None: """Removes the :class:`KeventDescriptor` object for the given path if it already exists. @@ -242,7 +242,7 @@ def remove(self, path): if self._has_path(path): self._remove_descriptor(self._get(path)) - def clear(self): + def clear(self) -> None: """Clears the collection and closes all open descriptors.""" with self._lock: for descriptor in self._descriptors: @@ -253,17 +253,17 @@ def clear(self): self._kevents = [] # Thread-unsafe methods. Locking is provided at a higher level. - def _get(self, path): + def _get(self, path: bytes | str) -> KeventDescriptor: """Returns a kevent descriptor for a given path.""" return self._descriptor_for_path[path] - def _has_path(self, path): + def _has_path(self, path: bytes | str) -> bool: """Determines whether a :class:`KeventDescriptor` for the specified path exists already in the collection. """ return path in self._descriptor_for_path - def _add_descriptor(self, descriptor): + def _add_descriptor(self, descriptor: KeventDescriptor) -> None: """Adds a descriptor to the collection. :param descriptor: @@ -274,7 +274,7 @@ def _add_descriptor(self, descriptor): self._descriptor_for_path[descriptor.path] = descriptor self._descriptor_for_fd[descriptor.fd] = descriptor - def _remove_descriptor(self, descriptor): + def _remove_descriptor(self, descriptor: KeventDescriptor) -> None: """Removes a descriptor from the collection. :param descriptor: @@ -303,7 +303,7 @@ class KeventDescriptor: ``bool`` """ - def __init__(self, path, is_directory): + def __init__(self, path: bytes | str, *, is_directory: bool) -> None: self._path = absolute_path(path) self._is_directory = is_directory self._fd = os.open(path, WATCHDOG_OS_OPEN_FLAGS) @@ -315,22 +315,22 @@ def __init__(self, path, is_directory): ) @property - def fd(self): + def fd(self) -> int: """OS file descriptor for the kevent descriptor.""" return self._fd @property - def path(self): + def path(self) -> bytes | str: """The path associated with the kevent descriptor.""" return self._path @property - def kevent(self): + def kevent(self) -> select.kevent: """The kevent object associated with the kevent descriptor.""" return self._kev @property - def is_directory(self): + def is_directory(self) -> bool: """Determines whether the kevent descriptor refers to a directory. :returns: @@ -338,25 +338,29 @@ def is_directory(self): """ return self._is_directory - def close(self): + def close(self) -> None: """Closes the file descriptor associated with a kevent descriptor.""" with contextlib.suppress(OSError): os.close(self.fd) @property - def key(self): + def key(self) -> tuple[bytes | str, bool]: return (self.path, self.is_directory) - def __eq__(self, descriptor): + def __eq__(self, descriptor: object) -> bool: + if not isinstance(descriptor, KeventDescriptor): + return NotImplemented return self.key == descriptor.key - def __ne__(self, descriptor): + def __ne__(self, descriptor: object) -> bool: + if not isinstance(descriptor, KeventDescriptor): + return NotImplemented return self.key != descriptor.key - def __hash__(self): + def __hash__(self) -> int: return hash(self.key) - def __repr__(self): + def __repr__(self) -> str: return f"<{type(self).__name__}: path={self.path!r}, is_directory={self.is_directory}>" @@ -408,7 +412,15 @@ class KqueueEmitter(EventEmitter): :param stat: stat function. See ``os.stat`` for details. """ - def __init__(self, event_queue, watch, *, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None, stat=os.stat): + def __init__( + self, + event_queue: EventQueue, + watch: ObservedWatch, + *, + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + stat: Callable = os.stat, + ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._kq = select.kqueue() @@ -417,14 +429,14 @@ def __init__(self, event_queue, watch, *, timeout=DEFAULT_EMITTER_TIMEOUT, event # A collection of KeventDescriptor. self._descriptors = KeventDescriptorSet() - def custom_stat(path, self=self): + def custom_stat(path: str, cls: KqueueEmitter = self) -> os.stat_result: stat_info = stat(path) - self._register_kevent(path, S_ISDIR(stat_info.st_mode)) + cls._register_kevent(path, is_directory=S_ISDIR(stat_info.st_mode)) return stat_info self._snapshot = DirectorySnapshot(watch.path, recursive=watch.is_recursive, stat=custom_stat) - def _register_kevent(self, path, is_directory): + def _register_kevent(self, path: bytes | str, *, is_directory: bool) -> None: """Registers a kevent descriptor for the given path. :param path: @@ -435,7 +447,7 @@ def _register_kevent(self, path, is_directory): ``bool`` """ try: - self._descriptors.add(path, is_directory) + self._descriptors.add(path, is_directory=is_directory) except OSError as e: if e.errno == errno.ENOENT: # Probably dealing with a temporary file that was created @@ -459,7 +471,7 @@ def _register_kevent(self, path, is_directory): # All other errors are propagated. raise - def _unregister_kevent(self, path): + def _unregister_kevent(self, path: bytes | str) -> None: """Convenience function to close the kevent descriptor for a specified kqueue-monitored path. @@ -468,7 +480,7 @@ def _unregister_kevent(self, path): """ self._descriptors.remove(path) - def queue_event(self, event): + def queue_event(self, event: FileSystemEvent) -> None: """Handles queueing a single event object. :param event: @@ -481,14 +493,16 @@ def queue_event(self, event): # for all those events anyway. EventEmitter.queue_event(self, event) if event.event_type == EVENT_TYPE_CREATED: - self._register_kevent(event.src_path, event.is_directory) + self._register_kevent(event.src_path, is_directory=event.is_directory) elif event.event_type == EVENT_TYPE_MOVED: self._unregister_kevent(event.src_path) - self._register_kevent(event.dest_path, event.is_directory) + self._register_kevent(event.dest_path, is_directory=event.is_directory) elif event.event_type == EVENT_TYPE_DELETED: self._unregister_kevent(event.src_path) - def _gen_kqueue_events(self, kev, ref_snapshot, new_snapshot): + def _gen_kqueue_events( + self, kev: select.kevent, ref_snapshot: DirectorySnapshot, new_snapshot: DirectorySnapshot + ) -> Generator[FileSystemEvent]: """Generate events from the kevent list returned from the call to :meth:`select.kqueue.control`. @@ -505,7 +519,12 @@ def _gen_kqueue_events(self, kev, ref_snapshot, new_snapshot): # Kqueue does not specify the destination names for renames # to, so we have to process these using the a snapshot # of the directory. - yield from self._gen_renamed_events(src_path, descriptor.is_directory, ref_snapshot, new_snapshot) + yield from self._gen_renamed_events( + src_path, + ref_snapshot, + new_snapshot, + is_directory=descriptor.is_directory, + ) elif is_attrib_modified(kev): if descriptor.is_directory: yield DirModifiedEvent(src_path) @@ -527,11 +546,18 @@ def _gen_kqueue_events(self, kev, ref_snapshot, new_snapshot): else: yield FileDeletedEvent(src_path) - def _parent_dir_modified(self, src_path): + def _parent_dir_modified(self, src_path: bytes | str) -> DirModifiedEvent: """Helper to generate a DirModifiedEvent on the parent of src_path.""" return DirModifiedEvent(os.path.dirname(src_path)) - def _gen_renamed_events(self, src_path, is_directory, ref_snapshot, new_snapshot): + def _gen_renamed_events( + self, + src_path: bytes | str, + ref_snapshot: DirectorySnapshot, + new_snapshot: DirectorySnapshot, + *, + is_directory: bool, + ) -> Generator[FileSystemEvent]: """Compares information from two directory snapshots (one taken before the rename operation and another taken right after) to determine the destination path of the file system object renamed, and yields @@ -579,7 +605,7 @@ def _gen_renamed_events(self, src_path, is_directory, ref_snapshot, new_snapshot yield FileDeletedEvent(src_path) yield self._parent_dir_modified(src_path) - def _read_events(self, timeout=None): + def _read_events(self, timeout: float | None = None) -> list[select.kevent]: """Reads events from a call to the blocking :meth:`select.kqueue.control()` method. @@ -588,9 +614,9 @@ def _read_events(self, timeout=None): :type timeout: ``float`` (seconds) """ - return self._kq.control(self._descriptors.kevents, MAX_EVENTS, timeout) + return self._kq.control(self._descriptors.kevents, MAX_EVENTS, timeout=timeout) - def queue_events(self, timeout): + def queue_events(self, timeout: int) -> None: """Queues events by reading them from a call to the blocking :meth:`select.kqueue.control()` method. @@ -607,7 +633,7 @@ def queue_events(self, timeout): # Take a fresh snapshot of the directory and update the # saved snapshot. - new_snapshot = DirectorySnapshot(self.watch.path, self.watch.is_recursive) + new_snapshot = DirectorySnapshot(self.watch.path, recursive=self.watch.is_recursive) ref_snapshot = self._snapshot self._snapshot = new_snapshot diff_events = new_snapshot - ref_snapshot @@ -628,7 +654,7 @@ def queue_events(self, timeout): if e.errno != errno.EBADF: raise - def on_thread_stop(self): + def on_thread_stop(self) -> None: # Clean up. with self._lock: self._descriptors.clear() @@ -640,5 +666,5 @@ class KqueueObserver(BaseObserver): calls to event handlers. """ - def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): + def __init__(self, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(KqueueEmitter, timeout=timeout) diff --git a/src/watchdog/observers/polling.py b/src/watchdog/observers/polling.py index 1436d00d..9817783c 100644 --- a/src/watchdog/observers/polling.py +++ b/src/watchdog/observers/polling.py @@ -36,6 +36,7 @@ import os import threading from functools import partial +from typing import TYPE_CHECKING from watchdog.events import ( DirCreatedEvent, @@ -50,6 +51,12 @@ from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff, EmptyDirectorySnapshot +if TYPE_CHECKING: + from typing import Callable + + from watchdog.events import FileSystemEvent + from watchdog.observers.api import EventQueue, ObservedWatch + class PollingEmitter(EventEmitter): """Platform-independent emitter that polls a directory to detect file @@ -58,28 +65,28 @@ class PollingEmitter(EventEmitter): def __init__( self, - event_queue, - watch, + event_queue: EventQueue, + watch: ObservedWatch, *, - timeout=DEFAULT_EMITTER_TIMEOUT, - event_filter=None, - stat=os.stat, - listdir=os.scandir, - ): + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + stat: Callable = os.stat, + listdir: Callable = os.scandir, + ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._snapshot: DirectorySnapshot = EmptyDirectorySnapshot() self._lock = threading.Lock() - self._take_snapshot = lambda: DirectorySnapshot( + self._take_snapshot: Callable[[], DirectorySnapshot] = lambda: DirectorySnapshot( self.watch.path, recursive=self.watch.is_recursive, stat=stat, listdir=listdir, ) - def on_thread_start(self): + def on_thread_start(self) -> None: self._snapshot = self._take_snapshot() - def queue_events(self, timeout): + def queue_events(self, timeout: int) -> None: # We don't want to hit the disk continuously. # timeout behaves like an interval for polling emitters. if self.stopped_event.wait(timeout): @@ -127,18 +134,18 @@ class PollingObserver(BaseObserver): system changes. """ - def __init__(self, *, timeout=DEFAULT_OBSERVER_TIMEOUT): + def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(PollingEmitter, timeout=timeout) class PollingObserverVFS(BaseObserver): """File system independent observer that polls a directory to detect changes.""" - def __init__(self, stat, listdir, polling_interval=1): + def __init__(self, stat: Callable, listdir: Callable, polling_interval: int = 1) -> None: """:param stat: stat function. See ``os.stat`` for details. :param listdir: listdir function. See ``os.scandir`` for details. :type polling_interval: float :param polling_interval: interval in seconds between polling the file system. """ emitter_cls = partial(PollingEmitter, stat=stat, listdir=listdir) - super().__init__(emitter_cls, timeout=polling_interval) + super().__init__(emitter_cls, timeout=polling_interval) # type: ignore[arg-type] diff --git a/src/watchdog/observers/read_directory_changes.py b/src/watchdog/observers/read_directory_changes.py index 8690169c..3701e15f 100644 --- a/src/watchdog/observers/read_directory_changes.py +++ b/src/watchdog/observers/read_directory_changes.py @@ -19,6 +19,7 @@ import os.path import platform import threading +from typing import TYPE_CHECKING from watchdog.events import ( DirCreatedEvent, @@ -35,37 +36,53 @@ from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter from watchdog.observers.winapi import close_directory_handle, get_directory_handle, read_events +if TYPE_CHECKING: + from ctypes.wintypes import HANDLE + + from watchdog.events import FileSystemEvent + from watchdog.observers.api import EventQueue, ObservedWatch + from watchdog.observers.winapi import WinAPINativeEvent + class WindowsApiEmitter(EventEmitter): """Windows API-based emitter that uses ReadDirectoryChangesW to detect file system changes for a watch. """ - def __init__(self, event_queue, watch, *, timeout=DEFAULT_EMITTER_TIMEOUT, event_filter=None): + def __init__( + self, + event_queue: EventQueue, + watch: ObservedWatch, + *, + timeout: int = DEFAULT_EMITTER_TIMEOUT, + event_filter: list[FileSystemEvent] | None = None, + ) -> None: super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter) self._lock = threading.Lock() - self._whandle = None + self._whandle: HANDLE | None = None - def on_thread_start(self): + def on_thread_start(self) -> None: self._whandle = get_directory_handle(self.watch.path) if platform.python_implementation() == "PyPy": - def start(self): + def start(self) -> None: """PyPy needs some time before receiving events, see #792.""" from time import sleep super().start() sleep(0.01) - def on_thread_stop(self): + def on_thread_stop(self) -> None: if self._whandle: close_directory_handle(self._whandle) - def _read_events(self): - return read_events(self._whandle, self.watch.path, self.watch.is_recursive) + def _read_events(self) -> list[WinAPINativeEvent]: + if not self._whandle: + return [] + return read_events(self._whandle, self.watch.path, recursive=self.watch.is_recursive) - def queue_events(self, timeout): + def queue_events(self, timeout: int) -> None: winapi_events = self._read_events() with self._lock: last_renamed_src_path = "" @@ -106,5 +123,5 @@ class WindowsApiObserver(BaseObserver): calls to event handlers. """ - def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT): + def __init__(self, *, timeout: int = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(WindowsApiEmitter, timeout=timeout) diff --git a/src/watchdog/observers/winapi.py b/src/watchdog/observers/winapi.py index 001d4649..5a780588 100644 --- a/src/watchdog/observers/winapi.py +++ b/src/watchdog/observers/winapi.py @@ -36,9 +36,14 @@ from __future__ import annotations +import contextlib import ctypes.wintypes from dataclasses import dataclass from functools import reduce +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any LPVOID = ctypes.wintypes.LPVOID @@ -102,13 +107,13 @@ class OVERLAPPED(ctypes.Structure): ) -def _errcheck_bool(value, func, args): +def _errcheck_bool(value: Any | None, func: Any, args: Any) -> Any: if not value: raise ctypes.WinError() return args -def _errcheck_handle(value, func, args): +def _errcheck_handle(value: Any | None, func: Any, args: Any) -> Any: if not value: raise ctypes.WinError() if value == INVALID_HANDLE_VALUE: @@ -116,7 +121,7 @@ def _errcheck_handle(value, func, args): return args -def _errcheck_dword(value, func, args): +def _errcheck_dword(value: Any | None, func: Any, args: Any) -> Any: if value == 0xFFFFFFFF: raise ctypes.WinError() return args @@ -281,10 +286,10 @@ class FileNotifyInformation(ctypes.Structure): PATH_BUFFER_SIZE = 2048 -def _parse_event_buffer(read_buffer, n_bytes): +def _parse_event_buffer(read_buffer: bytes, n_bytes: int) -> list[tuple[int, str]]: results = [] while n_bytes > 0: - fni = ctypes.cast(read_buffer, LPFNI)[0] + fni = ctypes.cast(read_buffer, LPFNI)[0] # type: ignore[arg-type] ptr = ctypes.addressof(fni) + FileNotifyInformation.FileName.offset filename = ctypes.string_at(ptr, fni.FileNameLength) results.append((fni.Action, filename.decode("utf-16"))) @@ -292,11 +297,11 @@ def _parse_event_buffer(read_buffer, n_bytes): if num_to_skip <= 0: break read_buffer = read_buffer[num_to_skip:] - n_bytes -= num_to_skip # numToSkip is long. nBytes should be long too. + n_bytes -= num_to_skip # num_to_skip is long. n_bytes should be long too. return results -def _is_observed_path_deleted(handle, path): +def _is_observed_path_deleted(handle: ctypes.wintypes.HANDLE, path: str) -> bool: # Comparison of observed path and actual path, returned by # GetFinalPathNameByHandleW. If directory moved to the trash bin, or # deleted, actual path will not be equal to observed path. @@ -305,17 +310,17 @@ def _is_observed_path_deleted(handle, path): return buff.value != path -def _generate_observed_path_deleted_event(): +def _generate_observed_path_deleted_event() -> tuple[bytes, int]: # Create synthetic event for notify that observed directory is deleted path = ctypes.create_unicode_buffer(".") event = FileNotifyInformation(0, FILE_ACTION_DELETED_SELF, len(path), path.value.encode("utf-8")) event_size = ctypes.sizeof(event) buff = ctypes.create_string_buffer(PATH_BUFFER_SIZE) ctypes.memmove(buff, ctypes.addressof(event), event_size) - return buff, event_size + return buff.raw, event_size -def get_directory_handle(path): +def get_directory_handle(path: str) -> ctypes.wintypes.HANDLE: """Returns a Windows handle to the specified directory path.""" return CreateFileW( path, @@ -328,21 +333,19 @@ def get_directory_handle(path): ) -def close_directory_handle(handle): +def close_directory_handle(handle: ctypes.wintypes.HANDLE) -> None: try: CancelIoEx(handle, None) # force ReadDirectoryChangesW to return - CloseHandle(handle) # close directory handle + CloseHandle(handle) except OSError: - try: - CloseHandle(handle) # close directory handle - except Exception: - return + with contextlib.suppress(Exception): + CloseHandle(handle) -def read_directory_changes(handle, path, recursive): +def read_directory_changes(handle: ctypes.wintypes.HANDLE, path: str, *, recursive: bool) -> tuple[bytes, int]: """Read changes to the directory using the specified directory handle. - http://timgolden.me.uk/pywin32-docs/win32file__ReadDirectoryChangesW_meth.html + https://timgolden.me.uk/pywin32-docs/win32file__ReadDirectoryChangesW_meth.html """ event_buffer = ctypes.create_string_buffer(BUFFER_SIZE) nbytes = ctypes.wintypes.DWORD() @@ -359,7 +362,7 @@ def read_directory_changes(handle, path, recursive): ) except OSError as e: if e.winerror == ERROR_OPERATION_ABORTED: - return [], 0 + return event_buffer.raw, 0 # Handle the case when the root path is deleted if _is_observed_path_deleted(handle, path): @@ -376,31 +379,31 @@ class WinAPINativeEvent: src_path: str @property - def is_added(self): + def is_added(self) -> bool: return self.action == FILE_ACTION_CREATED @property - def is_removed(self): + def is_removed(self) -> bool: return self.action == FILE_ACTION_REMOVED @property - def is_modified(self): + def is_modified(self) -> bool: return self.action == FILE_ACTION_MODIFIED @property - def is_renamed_old(self): + def is_renamed_old(self) -> bool: return self.action == FILE_ACTION_RENAMED_OLD_NAME @property - def is_renamed_new(self): + def is_renamed_new(self) -> bool: return self.action == FILE_ACTION_RENAMED_NEW_NAME @property - def is_removed_self(self): + def is_removed_self(self) -> bool: return self.action == FILE_ACTION_REMOVED_SELF -def read_events(handle, path, recursive): - buf, nbytes = read_directory_changes(handle, path, recursive) +def read_events(handle: ctypes.wintypes.HANDLE, path: str, *, recursive: bool) -> list[WinAPINativeEvent]: + buf, nbytes = read_directory_changes(handle, path, recursive=recursive) events = _parse_event_buffer(buf, nbytes) return [WinAPINativeEvent(action, src_path) for action, src_path in events] diff --git a/src/watchdog/tricks/__init__.py b/src/watchdog/tricks/__init__.py index cd0f2f2e..48ad5a8f 100644 --- a/src/watchdog/tricks/__init__.py +++ b/src/watchdog/tricks/__init__.py @@ -62,7 +62,7 @@ class Trick(PatternMatchingEventHandler): """Your tricks should subclass this class.""" @classmethod - def generate_yaml(cls): + def generate_yaml(cls) -> str: return f"""- {cls.__module__}.{cls.__name__}: args: - argument1 @@ -90,13 +90,13 @@ class ShellCommandTrick(Trick): def __init__( self, - shell_command, + shell_command: str, *, - patterns=None, - ignore_patterns=None, - ignore_directories=False, - wait_for_process=False, - drop_during_process=False, + patterns: list[str] | None = None, + ignore_patterns: list[str] | None = None, + ignore_directories: bool = False, + wait_for_process: bool = False, + drop_during_process: bool = False, ): super().__init__( patterns=patterns, @@ -107,8 +107,8 @@ def __init__( self.wait_for_process = wait_for_process self.drop_during_process = drop_during_process - self.process = None - self._process_watchers = set() + self.process: subprocess.Popen[bytes] | None = None + self._process_watchers: set[ProcessWatcher] = set() def on_any_event(self, event: FileSystemEvent) -> None: if event.event_type in {EVENT_TYPE_OPENED, EVENT_TYPE_CLOSED_NO_WRITE}: @@ -168,15 +168,15 @@ class AutoRestartTrick(Trick): def __init__( self, - command, + command: list[str], *, - patterns=None, - ignore_patterns=None, - ignore_directories=False, - stop_signal=signal.SIGINT, - kill_after=10, - debounce_interval_seconds=0, - restart_on_command_exit=True, + patterns: list[str] | None = None, + ignore_patterns: list[str] | None = None, + ignore_directories: bool = False, + stop_signal: signal.Signals = signal.SIGINT, + kill_after: int = 10, + debounce_interval_seconds: int = 0, + restart_on_command_exit: bool = True, ): if kill_after < 0: error = "kill_after must be non-negative." @@ -197,16 +197,16 @@ def __init__( self.debounce_interval_seconds = debounce_interval_seconds self.restart_on_command_exit = restart_on_command_exit - self.process = None - self.process_watcher = None - self.event_debouncer = None + self.process: subprocess.Popen[bytes] | None = None + self.process_watcher: ProcessWatcher | None = None + self.event_debouncer: EventDebouncer | None = None self.restart_count = 0 self._is_process_stopping = False self._is_trick_stopping = False self._stopping_lock = threading.RLock() - def start(self): + def start(self) -> None: if self.debounce_interval_seconds: self.event_debouncer = EventDebouncer( debounce_interval_seconds=self.debounce_interval_seconds, @@ -215,7 +215,7 @@ def start(self): self.event_debouncer.start() self._start_process() - def stop(self): + def stop(self) -> None: # Ensure the body of the function is only run once. with self._stopping_lock: if self._is_trick_stopping: diff --git a/src/watchdog/utils/__init__.py b/src/watchdog/utils/__init__.py index 65bc18f7..48712d38 100644 --- a/src/watchdog/utils/__init__.py +++ b/src/watchdog/utils/__init__.py @@ -32,6 +32,10 @@ import sys import threading +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from types import ModuleType class UnsupportedLibcError(Exception): @@ -54,21 +58,21 @@ def __init__(self) -> None: self._stopped_event = threading.Event() @property - def stopped_event(self): + def stopped_event(self) -> threading.Event: return self._stopped_event - def should_keep_running(self): + def should_keep_running(self) -> bool: """Determines whether the thread should continue running.""" return not self._stopped_event.is_set() - def on_thread_stop(self): + def on_thread_stop(self) -> None: """Override this method instead of :meth:`stop()`. :meth:`stop()` calls this method. This method is called immediately after the thread is signaled to stop. """ - def stop(self): + def stop(self) -> None: """Signals the thread to stop.""" self._stopped_event.set() self.on_thread_stop() @@ -86,7 +90,7 @@ def start(self) -> None: threading.Thread.start(self) -def load_module(module_name): +def load_module(module_name: str) -> ModuleType: """Imports a module given its name and returns a handle to it.""" try: __import__(module_name) @@ -96,7 +100,7 @@ def load_module(module_name): return sys.modules[module_name] -def load_class(dotted_path): +def load_class(dotted_path: str) -> type: """Loads and returns a class definition provided a dotted path specification the last part of the dotted path is the class name and there is at least one module name preceding the class name. diff --git a/src/watchdog/utils/bricks.py b/src/watchdog/utils/bricks.py index 45a80437..b540948b 100644 --- a/src/watchdog/utils/bricks.py +++ b/src/watchdog/utils/bricks.py @@ -36,6 +36,10 @@ from __future__ import annotations import queue +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any class SkipRepeatsQueue(queue.Queue): @@ -80,11 +84,11 @@ def __hash__(self): based on the OrderedSetQueue below """ - def _init(self, maxsize): + def _init(self, maxsize: int) -> None: super()._init(maxsize) self._last_item = None - def _put(self, item): + def _put(self, item: Any) -> None: if self._last_item is None or item != self._last_item: super()._put(item) self._last_item = item @@ -93,7 +97,7 @@ def _put(self, item): # anything into the queue here self.unfinished_tasks -= 1 - def _get(self): + def _get(self) -> Any: item = super()._get() if item is self._last_item: self._last_item = None diff --git a/src/watchdog/utils/delayed_queue.py b/src/watchdog/utils/delayed_queue.py index e65f875a..fc7dca0b 100644 --- a/src/watchdog/utils/delayed_queue.py +++ b/src/watchdog/utils/delayed_queue.py @@ -23,7 +23,7 @@ class DelayedQueue(Generic[T]): - def __init__(self, delay): + def __init__(self, delay: float) -> None: self.delay_sec = delay self._lock = threading.Lock() self._not_empty = threading.Condition(self._lock) @@ -37,7 +37,7 @@ def put(self, element: T, *, delay: bool = False) -> None: self._not_empty.notify() self._lock.release() - def close(self): + def close(self) -> None: """Close queue, indicating no more items will be added.""" self._closed = True # Interrupt the blocking _not_empty.wait() call in get diff --git a/src/watchdog/utils/dirsnapshot.py b/src/watchdog/utils/dirsnapshot.py index b4c57228..cda2f247 100644 --- a/src/watchdog/utils/dirsnapshot.py +++ b/src/watchdog/utils/dirsnapshot.py @@ -89,18 +89,18 @@ def __init__( snapshot: DirectorySnapshot, *, ignore_device: bool = False, - ): + ) -> None: created = snapshot.paths - ref.paths deleted = ref.paths - snapshot.paths if ignore_device: - def get_inode(directory: DirectorySnapshot, full_path: str) -> int | tuple[int, int]: + def get_inode(directory: DirectorySnapshot, full_path: bytes | str) -> int | tuple[int, int]: return directory.inode(full_path)[0] else: - def get_inode(directory: DirectorySnapshot, full_path: str) -> int | tuple[int, int]: + def get_inode(directory: DirectorySnapshot, full_path: bytes | str) -> int | tuple[int, int]: return directory.inode(full_path) # check that all unchanged paths have the same inode @@ -110,7 +110,7 @@ def get_inode(directory: DirectorySnapshot, full_path: str) -> int | tuple[int, deleted.add(path) # find moved paths - moved: set[tuple[str, str]] = set() + moved: set[tuple[bytes | str, bytes | str]] = set() for path in set(deleted): inode = ref.inode(path) new_path = snapshot.path(inode) @@ -128,7 +128,7 @@ def get_inode(directory: DirectorySnapshot, full_path: str) -> int | tuple[int, # find modified paths # first check paths that have not moved - modified: set[str] = set() + modified: set[bytes | str] = set() for path in ref.paths & snapshot.paths: if get_inode(ref, path) == get_inode(snapshot, path) and ( ref.mtime(path) != snapshot.mtime(path) or ref.size(path) != snapshot.size(path) @@ -170,22 +170,22 @@ def __repr__(self) -> str: ) @property - def files_created(self) -> list[str]: + def files_created(self) -> list[bytes | str]: """List of files that were created.""" return self._files_created @property - def files_deleted(self) -> list[str]: + def files_deleted(self) -> list[bytes | str]: """List of files that were deleted.""" return self._files_deleted @property - def files_modified(self) -> list[str]: + def files_modified(self) -> list[bytes | str]: """List of files that were modified.""" return self._files_modified @property - def files_moved(self) -> list[tuple[str, str]]: + def files_moved(self) -> list[tuple[bytes | str, bytes | str]]: """List of files that were moved. Each event is a two-tuple the first item of which is the path @@ -194,12 +194,12 @@ def files_moved(self) -> list[tuple[str, str]]: return self._files_moved @property - def dirs_modified(self) -> list[str]: + def dirs_modified(self) -> list[bytes | str]: """List of directories that were modified.""" return self._dirs_modified @property - def dirs_moved(self) -> list[tuple[str, str]]: + def dirs_moved(self) -> list[tuple[bytes | str, bytes | str]]: """List of directories that were moved. Each event is a two-tuple the first item of which is the path @@ -208,12 +208,12 @@ def dirs_moved(self) -> list[tuple[str, str]]: return self._dirs_moved @property - def dirs_deleted(self) -> list[str]: + def dirs_deleted(self) -> list[bytes | str]: """List of directories that were deleted.""" return self._dirs_deleted @property - def dirs_created(self) -> list[str]: + def dirs_created(self) -> list[bytes | str]: """List of directories that were created.""" return self._dirs_created @@ -258,17 +258,17 @@ def __init__( stat: Callable[[str], os.stat_result] = os.stat, listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir, ignore_device: bool = False, - ): + ) -> None: self.path = path self.recursive = recursive self.stat = stat self.listdir = listdir self.ignore_device = ignore_device - def __enter__(self): + def __enter__(self) -> None: self.pre_snapshot = self.get_snapshot() - def __exit__(self, *args): + def __exit__(self, *args: object) -> None: self.post_snapshot = self.get_snapshot() self.diff = DirectorySnapshotDiff( self.pre_snapshot, @@ -276,7 +276,7 @@ def __exit__(self, *args): ignore_device=self.ignore_device, ) - def get_snapshot(self): + def get_snapshot(self) -> DirectorySnapshot: return DirectorySnapshot( path=self.path, recursive=self.recursive, @@ -314,13 +314,13 @@ def __init__( recursive: bool = True, stat: Callable[[str], os.stat_result] = os.stat, listdir: Callable[[str | None], Iterator[os.DirEntry]] = os.scandir, - ): + ) -> None: self.recursive = recursive self.stat = stat self.listdir = listdir - self._stat_info: dict[str, os.stat_result] = {} - self._inode_to_path: dict[tuple[int, int], str] = {} + self._stat_info: dict[bytes | str, os.stat_result] = {} + self._inode_to_path: dict[tuple[int, int], bytes | str] = {} st = self.stat(path) self._stat_info[path] = st @@ -358,29 +358,29 @@ def walk(self, root: str) -> Iterator[tuple[str, os.stat_result]]: yield from self.walk(path) @property - def paths(self) -> set[str]: + def paths(self) -> set[bytes | str]: """Set of file/directory paths in the snapshot.""" return set(self._stat_info.keys()) - def path(self, uid: tuple[int, int]) -> str | None: + def path(self, uid: tuple[int, int]) -> bytes | str | None: """Returns path for id. None if id is unknown to this snapshot.""" return self._inode_to_path.get(uid) - def inode(self, path: str) -> tuple[int, int]: + def inode(self, path: bytes | str) -> tuple[int, int]: """Returns an id for path.""" st = self._stat_info[path] return (st.st_ino, st.st_dev) - def isdir(self, path: str) -> bool: + def isdir(self, path: bytes | str) -> bool: return S_ISDIR(self._stat_info[path].st_mode) - def mtime(self, path: str) -> float: + def mtime(self, path: bytes | str) -> float: return self._stat_info[path].st_mtime - def size(self, path: str) -> int: + def size(self, path: bytes | str) -> int: return self._stat_info[path].st_size - def stat_info(self, path: str) -> os.stat_result: + def stat_info(self, path: bytes | str) -> os.stat_result: """Returns a stat information object for the specified path from the snapshot. @@ -416,7 +416,7 @@ class EmptyDirectorySnapshot(DirectorySnapshot): in the directory as created. """ - def __init__(self): + def __init__(self) -> None: pass @staticmethod diff --git a/src/watchdog/utils/echo.py b/src/watchdog/utils/echo.py index 73683c00..6d02ee28 100644 --- a/src/watchdog/utils/echo.py +++ b/src/watchdog/utils/echo.py @@ -37,19 +37,24 @@ def my_function(args): import inspect import sys +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from types import MethodType + from typing import Any, Callable -def name(item): + +def name(item: Callable) -> str: """Return an item's name.""" return item.__name__ -def is_classmethod(instancemethod, klass): +def is_classmethod(instancemethod: MethodType, klass: type) -> bool: """Determine if an instancemethod is a classmethod.""" return inspect.ismethod(instancemethod) and instancemethod.__self__ is klass -def is_static_method(method, klass): +def is_static_method(method: MethodType, klass: type) -> bool: """Returns True if method is an instance method of klass.""" return next( (isinstance(c.__dict__[name(method)], staticmethod) for c in klass.mro() if name(method) in c.__dict__), @@ -57,13 +62,13 @@ def is_static_method(method, klass): ) -def is_class_private_name(name): +def is_class_private_name(name: str) -> bool: """Determine if a name is a class private name.""" # Exclude system defined names such as __init__, __add__ etc return name.startswith("__") and not name.endswith("__") -def method_name(method): +def method_name(method: MethodType) -> str: """Return a method's name. This function returns the name the method is accessed by from @@ -75,7 +80,7 @@ def method_name(method): return mname -def format_arg_value(arg_val): +def format_arg_value(arg_val: tuple[str, tuple[Any, ...]]) -> str: """Return a string representing a (name, value) pair. >>> format_arg_value(("x", (1, 2, 3))) @@ -85,7 +90,7 @@ def format_arg_value(arg_val): return f"{arg}={val!r}" -def echo(fn, write=sys.stdout.write): +def echo(fn: Callable, write: Callable = sys.stdout.write) -> Callable: """Echo calls to a function. Returns a decorated version of the input function which "echoes" calls @@ -98,11 +103,11 @@ def echo(fn, write=sys.stdout.write): code = fn.__code__ argcount = code.co_argcount argnames = code.co_varnames[:argcount] - fn_defaults = fn.__defaults__ or [] + fn_defaults: tuple[Any] = fn.__defaults__ or () argdefs = dict(list(zip(argnames[-len(fn_defaults) :], fn_defaults))) @functools.wraps(fn) - def wrapped(*v, **k): + def wrapped(*v: Any, **k: Any) -> Callable: # Collect function arguments by chaining together positional, # defaulted, extra positional and keyword arguments. positional = list(map(format_arg_value, list(zip(argnames, v)))) @@ -116,26 +121,25 @@ def wrapped(*v, **k): return wrapped -def echo_instancemethod(klass, method, write=sys.stdout.write): +def echo_instancemethod(klass: type, method: MethodType, write: Callable = sys.stdout.write) -> None: """Change an instancemethod so that calls to it are echoed. Replacing a classmethod is a little more tricky. See: http://www.python.org/doc/current/ref/types.html """ mname = method_name(method) - never_echo = ( - "__str__", - "__repr__", - ) # Avoid recursion printing method calls - if mname in never_echo: - pass - elif is_classmethod(method, klass): + + # Avoid recursion printing method calls + if mname in {"__str__", "__repr__"}: + return + + if is_classmethod(method, klass): setattr(klass, mname, classmethod(echo(method.__func__, write))) else: setattr(klass, mname, echo(method, write)) -def echo_class(klass, write=sys.stdout.write): +def echo_class(klass: type, write: Callable = sys.stdout.write) -> None: """Echo calls to class methods and static functions""" for _, method in inspect.getmembers(klass, inspect.ismethod): # In python 3 only class methods are returned here @@ -148,7 +152,7 @@ def echo_class(klass, write=sys.stdout.write): echo_instancemethod(klass, fn, write) -def echo_module(mod, write=sys.stdout.write): +def echo_module(mod: MethodType, write: Callable = sys.stdout.write) -> None: """Echo calls to functions and methods in a module.""" for fname, fn in inspect.getmembers(mod, inspect.isfunction): setattr(mod, fname, echo(fn, write)) diff --git a/src/watchdog/utils/event_debouncer.py b/src/watchdog/utils/event_debouncer.py index 8fa317e6..0692a9b9 100644 --- a/src/watchdog/utils/event_debouncer.py +++ b/src/watchdog/utils/event_debouncer.py @@ -2,9 +2,15 @@ import logging import threading +from typing import TYPE_CHECKING from watchdog.utils import BaseThread +if TYPE_CHECKING: + from typing import Callable + + from watchdog.events import FileSystemEvent + logger = logging.getLogger(__name__) @@ -18,25 +24,25 @@ class EventDebouncer(BaseThread): events in the order in which they were received. """ - def __init__(self, debounce_interval_seconds, events_callback): + def __init__(self, debounce_interval_seconds: int, events_callback: Callable) -> None: super().__init__() self.debounce_interval_seconds = debounce_interval_seconds self.events_callback = events_callback - self._events = [] + self._events: list[FileSystemEvent] = [] self._cond = threading.Condition() - def handle_event(self, event): + def handle_event(self, event: FileSystemEvent) -> None: with self._cond: self._events.append(event) self._cond.notify() - def stop(self): + def stop(self) -> None: with self._cond: super().stop() self._cond.notify() - def run(self): + def run(self) -> None: with self._cond: while True: # Wait for first event (or shutdown). diff --git a/src/watchdog/watchmedo.py b/src/watchdog/watchmedo.py index 4a9063dc..e095a88a 100644 --- a/src/watchdog/watchmedo.py +++ b/src/watchdog/watchmedo.py @@ -40,6 +40,9 @@ from argparse import Namespace, _SubParsersAction from typing import Callable + from watchdog.events import FileSystemEventHandler + from watchdog.observers.api import BaseObserver + logging.basicConfig(level=logging.INFO) @@ -57,12 +60,12 @@ class HelpFormatter(RawDescriptionHelpFormatter): Source: https://github.com/httpie/httpie/blob/2423f89/httpie/cli/argparser.py#L31 """ - def __init__(self, *args, max_help_position=6, **kwargs): + def __init__(self, *args: Any, max_help_position: int = 6, **kwargs: Any) -> None: # A smaller indent for args help. kwargs["max_help_position"] = max_help_position super().__init__(*args, **kwargs) - def _split_lines(self, text, width): + def _split_lines(self, text: str, width: int) -> list[str]: text = dedent(text).strip() + "\n\n" return text.splitlines() @@ -119,7 +122,7 @@ def decorator(func: Callable) -> Callable: return decorator -def path_split(pathname_spec, separator=os.pathsep): +def path_split(pathname_spec: str, *, separator: str = os.pathsep) -> list[str]: """Splits a pathname specification separated by an OS-dependent separator. :param pathname_spec: @@ -127,10 +130,10 @@ def path_split(pathname_spec, separator=os.pathsep): :param separator: (OS Dependent) `:` on Unix and `;` on Windows or user-specified. """ - return list(pathname_spec.split(separator)) + return pathname_spec.split(separator) -def add_to_sys_path(pathnames, index=0): +def add_to_sys_path(pathnames: list[str], *, index: int = 0) -> None: """Adds specified paths at specified index into the sys.path list. :param paths: @@ -143,7 +146,7 @@ def add_to_sys_path(pathnames, index=0): sys.path.insert(index, pathname) -def load_config(tricks_file_pathname): +def load_config(tricks_file_pathname: str) -> dict: """Loads the YAML configuration from the specified file. :param tricks_file_path: @@ -157,7 +160,9 @@ def load_config(tricks_file_pathname): return yaml.safe_load(f.read()) -def parse_patterns(patterns_spec, ignore_patterns_spec, separator=";"): +def parse_patterns( + patterns_spec: str, ignore_patterns_spec: str, *, separator: str = ";" +) -> tuple[list[str], list[str]]: """Parses pattern argument specs and returns a two-tuple of (patterns, ignore_patterns). """ @@ -165,10 +170,16 @@ def parse_patterns(patterns_spec, ignore_patterns_spec, separator=";"): ignore_patterns = ignore_patterns_spec.split(separator) if ignore_patterns == [""]: ignore_patterns = [] - return (patterns, ignore_patterns) + return patterns, ignore_patterns -def observe_with(observer, event_handler, pathnames, recursive): +def observe_with( + observer: BaseObserver, + event_handler: FileSystemEventHandler, + pathnames: list[str], + *, + recursive: bool, +) -> None: """Single observer thread with a scheduled path and event handler. :param observer: @@ -181,7 +192,7 @@ def observe_with(observer, event_handler, pathnames, recursive): ``True`` if recursive; ``False`` otherwise. """ for pathname in set(pathnames): - observer.schedule(event_handler, pathname, recursive) + observer.schedule(event_handler, pathname, recursive=recursive) observer.start() try: while True: @@ -191,7 +202,7 @@ def observe_with(observer, event_handler, pathnames, recursive): observer.join() -def schedule_tricks(observer, tricks, pathname, recursive): +def schedule_tricks(observer: BaseObserver, tricks: dict, pathname: str, *, recursive: bool) -> None: """Schedules tricks with the specified observer and for the given watch path. @@ -205,7 +216,7 @@ def schedule_tricks(observer, tricks, pathname, recursive): ``True`` if recursive; ``False`` otherwise. """ for trick in tricks: - for name, value in list(trick.items()): + for name, value in trick.items(): trick_cls = load_class(name) handler = trick_cls(**value) trick_pathname = getattr(handler, "source_directory", None) or pathname @@ -258,27 +269,40 @@ def schedule_tricks(observer, tricks, pathname, recursive): ], cmd_aliases=["tricks"], ) -def tricks_from(args): +def tricks_from(args: Namespace) -> None: """Command to execute tricks from a tricks configuration file.""" + observer_cls: type[BaseObserver] if args.debug_force_polling: - from watchdog.observers.polling import PollingObserver as Observer + from watchdog.observers.polling import PollingObserver + + observer_cls = PollingObserver elif args.debug_force_kqueue: - from watchdog.observers.kqueue import KqueueObserver as Observer + from watchdog.observers.kqueue import KqueueObserver + + observer_cls = KqueueObserver elif (not TYPE_CHECKING and args.debug_force_winapi) or (TYPE_CHECKING and platform.is_windows()): - from watchdog.observers.read_directory_changes import WindowsApiObserver as Observer + from watchdog.observers.read_directory_changes import WindowsApiObserver + + observer_cls = WindowsApiObserver elif args.debug_force_inotify: - from watchdog.observers.inotify import InotifyObserver as Observer + from watchdog.observers.inotify import InotifyObserver + + observer_cls = InotifyObserver elif args.debug_force_fsevents: - from watchdog.observers.fsevents import FSEventsObserver as Observer + from watchdog.observers.fsevents import FSEventsObserver + + observer_cls = FSEventsObserver else: # Automatically picks the most appropriate observer for the platform # on which it is running. from watchdog.observers import Observer + observer_cls = Observer + add_to_sys_path(path_split(args.python_path)) observers = [] for tricks_file in args.files: - observer = Observer(timeout=args.timeout) + observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] if not os.path.exists(tricks_file): raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), tricks_file) @@ -295,7 +319,7 @@ def tricks_from(args): add_to_sys_path(config[CONFIG_KEY_PYTHON_PATH]) dir_path = os.path.dirname(tricks_file) or os.path.relpath(os.getcwd()) - schedule_tricks(observer, tricks, dir_path, args.recursive) + schedule_tricks(observer, tricks, dir_path, recursive=args.recursive) observer.start() observers.append(observer) @@ -341,7 +365,7 @@ def tricks_from(args): ], cmd_aliases=["generate-tricks-yaml"], ) -def tricks_generate_yaml(args): +def tricks_generate_yaml(args: Namespace) -> None: """Command to generate Yaml configuration for tricks named on the command line.""" import yaml @@ -351,7 +375,7 @@ def tricks_generate_yaml(args): for trick_path in args.trick_paths: trick_cls = load_class(trick_path) - output.write(trick_cls.generate_yaml()) + output.write(trick_cls.generate_yaml()) # type: ignore[attr-defined] content = output.getvalue() output.close() @@ -440,7 +464,7 @@ def tricks_generate_yaml(args): ), ], ) -def log(args): +def log(args: Namespace) -> None: """Command to log file system events to the console.""" from watchdog.tricks import LoggerTrick from watchdog.utils import echo @@ -456,22 +480,36 @@ def log(args): ignore_directories=args.ignore_directories, ) + observer_cls: type[BaseObserver] if args.debug_force_polling: - from watchdog.observers.polling import PollingObserver as Observer + from watchdog.observers.polling import PollingObserver + + observer_cls = PollingObserver elif args.debug_force_kqueue: - from watchdog.observers.kqueue import KqueueObserver as Observer + from watchdog.observers.kqueue import KqueueObserver + + observer_cls = KqueueObserver elif (not TYPE_CHECKING and args.debug_force_winapi) or (TYPE_CHECKING and platform.is_windows()): - from watchdog.observers.read_directory_changes import WindowsApiObserver as Observer + from watchdog.observers.read_directory_changes import WindowsApiObserver + + observer_cls = WindowsApiObserver elif args.debug_force_inotify: - from watchdog.observers.inotify import InotifyObserver as Observer + from watchdog.observers.inotify import InotifyObserver + + observer_cls = InotifyObserver elif args.debug_force_fsevents: - from watchdog.observers.fsevents import FSEventsObserver as Observer + from watchdog.observers.fsevents import FSEventsObserver + + observer_cls = FSEventsObserver else: # Automatically picks the most appropriate observer for the platform # on which it is running. from watchdog.observers import Observer - observer = Observer(timeout=args.timeout) - observe_with(observer, handler, args.directories, args.recursive) + + observer_cls = Observer + + observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] + observe_with(observer, handler, args.directories, recursive=args.recursive) @command( @@ -559,18 +597,23 @@ def log(args): argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), ], ) -def shell_command(args): +def shell_command(args: Namespace) -> None: """Command to execute shell commands in response to file system events.""" from watchdog.tricks import ShellCommandTrick if not args.command: args.command = None + observer_cls: type[BaseObserver] if args.debug_force_polling: - from watchdog.observers.polling import PollingObserver as Observer + from watchdog.observers.polling import PollingObserver + + observer_cls = PollingObserver else: from watchdog.observers import Observer + observer_cls = Observer + patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns) handler = ShellCommandTrick( args.command, @@ -580,8 +623,8 @@ def shell_command(args): wait_for_process=args.wait_for_process, drop_during_process=args.drop_during_process, ) - observer = Observer(timeout=args.timeout) - observe_with(observer, handler, args.directories, args.recursive) + observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] + observe_with(observer, handler, args.directories, recursive=args.recursive) @command( @@ -676,13 +719,18 @@ def shell_command(args): ), ], ) -def auto_restart(args): +def auto_restart(args: Namespace) -> None: """Command to start a long-running subprocess and restart it on matched events.""" + observer_cls: type[BaseObserver] if args.debug_force_polling: - from watchdog.observers.polling import PollingObserver as Observer + from watchdog.observers.polling import PollingObserver + + observer_cls = PollingObserver else: from watchdog.observers import Observer + observer_cls = Observer + import signal from watchdog.tricks import AutoRestartTrick @@ -700,7 +748,7 @@ def auto_restart(args): if hasattr(signal, "SIGHUP"): termination_signals.add(signal.SIGHUP) - def handler_termination_signal(_signum, _frame): + def handler_termination_signal(_signum: signal._SIGNUM, _frame: object) -> None: # Neuter all signals so that we don't attempt a double shutdown for signum in termination_signals: signal.signal(signum, signal.SIG_IGN) @@ -713,7 +761,7 @@ def handler_termination_signal(_signum, _frame): command = [args.command] command.extend(args.command_args) handler = AutoRestartTrick( - command=command, + command, patterns=patterns, ignore_patterns=ignore_patterns, ignore_directories=args.ignore_directories, @@ -723,9 +771,9 @@ def handler_termination_signal(_signum, _frame): restart_on_command_exit=args.restart_on_command_exit, ) handler.start() - observer = Observer(timeout=args.timeout) + observer = observer_cls(timeout=args.timeout) # type: ignore[call-arg] try: - observe_with(observer, handler, args.directories, args.recursive) + observe_with(observer, handler, args.directories, recursive=args.recursive) except WatchdogShutdownError: pass finally: diff --git a/tests/test_observers_winapi.py b/tests/test_observers_winapi.py index 9c979204..ed677ac5 100644 --- a/tests/test_observers_winapi.py +++ b/tests/test_observers_winapi.py @@ -117,9 +117,9 @@ def test_root_deleted(event_queue, emitter): File "watchdog\observers\read_directory_changes.py", line 76, in queue_events winapi_events = self._read_events() File "watchdog\observers\read_directory_changes.py", line 73, in _read_events - return read_events(self._whandle, self.watch.path, self.watch.is_recursive) + return read_events(self._whandle, self.watch.path, recursive=self.watch.is_recursive) File "watchdog\observers\winapi.py", line 387, in read_events - buf, nbytes = read_directory_changes(handle, path, recursive) + buf, nbytes = read_directory_changes(handle, path, recursive=recursive) File "watchdog\observers\winapi.py", line 340, in read_directory_changes return _generate_observed_path_deleted_event() File "watchdog\observers\winapi.py", line 298, in _generate_observed_path_deleted_event