diff --git a/frigate/app.py b/frigate/app.py index 16cbf8735d..f720935460 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -1,7 +1,7 @@ import logging import multiprocessing as mp from multiprocessing.queues import Queue -from multiprocessing.synchronize import Event +from multiprocessing.synchronize import Event as MpEvent import os import signal import sys @@ -38,10 +38,10 @@ class FrigateApp: def __init__(self) -> None: - self.stop_event: Event = mp.Event() + self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} - self.detection_out_events: dict[str, Event] = {} + self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.plus_api = PlusApi() diff --git a/frigate/events.py b/frigate/events.py index 2dfe40f3e7..5f30f86338 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -11,43 +11,55 @@ from frigate.config import EventsConfig, FrigateConfig, RecordConfig from frigate.const import CLIPS_DIR from frigate.models import Event +from frigate.types import CameraMetricsTypes + +from multiprocessing.queues import Queue +from multiprocessing.synchronize import Event as MpEvent +from typing import Dict logger = logging.getLogger(__name__) -def should_insert_db(prev_event, current_event): +def should_insert_db(prev_event: Event, current_event: Event) -> bool: """If current event has new clip or snapshot.""" return (not prev_event["has_clip"] and not prev_event["has_snapshot"]) and ( current_event["has_clip"] or current_event["has_snapshot"] ) -def should_update_db(prev_event, current_event): +def should_update_db(prev_event: Event, current_event: Event) -> bool: """If current_event has updated fields and (clip or snapshot).""" - return (current_event["has_clip"] or current_event["has_snapshot"]) and ( - prev_event["top_score"] != current_event["top_score"] - or prev_event["entered_zones"] != current_event["entered_zones"] - or prev_event["thumbnail"] != current_event["thumbnail"] - or prev_event["has_clip"] != current_event["has_clip"] - or prev_event["has_snapshot"] != current_event["has_snapshot"] - ) + if current_event["has_clip"] or current_event["has_snapshot"]: + if ( + prev_event["top_score"] != current_event["top_score"] + or prev_event["entered_zones"] != current_event["entered_zones"] + or prev_event["thumbnail"] != current_event["thumbnail"] + or prev_event["has_clip"] != current_event["has_clip"] + or prev_event["has_snapshot"] != current_event["has_snapshot"] + ): + return True + return False class EventProcessor(threading.Thread): def __init__( - self, config, camera_processes, event_queue, event_processed_queue, stop_event + self, + config: FrigateConfig, + camera_processes: dict[str, CameraMetricsTypes], + event_queue: Queue, + event_processed_queue: Queue, + stop_event: MpEvent, ): threading.Thread.__init__(self) self.name = "event_processor" self.config = config self.camera_processes = camera_processes - self.cached_clips = {} self.event_queue = event_queue self.event_processed_queue = event_processed_queue - self.events_in_process = {} + self.events_in_process: Dict[str, Event] = {} self.stop_event = stop_event - def run(self): + def run(self) -> None: # set an end_time on events without an end_time on startup Event.update(end_time=Event.start_time + 30).where( Event.end_time == None @@ -147,14 +159,15 @@ def run(self): class EventCleanup(threading.Thread): - def __init__(self, config: FrigateConfig, stop_event): + def __init__(self, config: FrigateConfig, stop_event: MpEvent): threading.Thread.__init__(self) self.name = "event_cleanup" self.config = config self.stop_event = stop_event self.camera_keys = list(self.config.cameras.keys()) - def expire(self, media_type): + def expire(self, media_type: str) -> None: + # TODO: Refactor media_type to enum ## Expire events from unlisted cameras based on the global config if media_type == "clips": retain_config = self.config.record.events.retain @@ -253,7 +266,7 @@ def expire(self, media_type): ) update_query.execute() - def purge_duplicates(self): + def purge_duplicates(self) -> None: duplicate_query = """with grouped_events as ( select id, label, @@ -287,7 +300,7 @@ def purge_duplicates(self): .execute() ) - def run(self): + def run(self) -> None: # only expire events every 5 minutes while not self.stop_event.wait(300): self.expire("clips") diff --git a/frigate/mypy.ini b/frigate/mypy.ini index c789241d06..4769ab96aa 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -34,6 +34,9 @@ disallow_untyped_calls = false [mypy-frigate.const] ignore_errors = false +[mypy-frigate.events] +ignore_errors = false + [mypy-frigate.log] ignore_errors = false diff --git a/frigate/stats.py b/frigate/stats.py index d4899bb6d9..62c8eeec63 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -8,7 +8,7 @@ import requests from typing import Optional, Any from paho.mqtt.client import Client -from multiprocessing.synchronize import Event +from multiprocessing.synchronize import Event as MpEvent from frigate.config import FrigateConfig from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR @@ -148,7 +148,7 @@ def __init__( stats_tracking: StatsTrackingTypes, mqtt_client: Client, topic_prefix: str, - stop_event: Event, + stop_event: MpEvent, ): threading.Thread.__init__(self) self.name = "frigate_stats_emitter" diff --git a/frigate/watchdog.py b/frigate/watchdog.py index 4316dc316e..96ba2e371e 100644 --- a/frigate/watchdog.py +++ b/frigate/watchdog.py @@ -7,13 +7,13 @@ from frigate.object_detection import ObjectDetectProcess from frigate.util import restart_frigate -from multiprocessing.synchronize import Event +from multiprocessing.synchronize import Event as MpEvent logger = logging.getLogger(__name__) class FrigateWatchdog(threading.Thread): - def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: Event): + def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: MpEvent): threading.Thread.__init__(self) self.name = "frigate_watchdog" self.detectors = detectors