Skip to content

Commit

Permalink
Typing Part 3: events.py (blakeblackshear#3352)
Browse files Browse the repository at this point in the history
* Typing: events.py

* Remove unused variable

* Fix return Any from return statement

Not all elements from the event dict are sure to be something that can be evaluated

See e.g.: python/mypy#5697

* Sort out Event disambiguity

There was a name collision of multiprocessing Event type and frigate events

Co-authored-by: Sebastian Englbrecht <[email protected]>
  • Loading branch information
herostrat and Sebastian Englbrecht committed Nov 24, 2022
1 parent e0eed65 commit 12a1a63
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 24 deletions.
6 changes: 3 additions & 3 deletions frigate/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import multiprocessing as mp
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event
from multiprocessing.synchronize import Event as MpEvent
import os
import signal
import sys
Expand Down Expand Up @@ -38,10 +38,10 @@

class FrigateApp:
def __init__(self) -> None:
self.stop_event: Event = mp.Event()
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, Event] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.plus_api = PlusApi()
Expand Down
47 changes: 30 additions & 17 deletions frigate/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,55 @@
from frigate.config import EventsConfig, FrigateConfig, RecordConfig
from frigate.const import CLIPS_DIR
from frigate.models import Event
from frigate.types import CameraMetricsTypes

from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Dict

logger = logging.getLogger(__name__)


def should_insert_db(prev_event, current_event):
def should_insert_db(prev_event: Event, current_event: Event) -> bool:
"""If current event has new clip or snapshot."""
return (not prev_event["has_clip"] and not prev_event["has_snapshot"]) and (
current_event["has_clip"] or current_event["has_snapshot"]
)


def should_update_db(prev_event, current_event):
def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot)."""
return (current_event["has_clip"] or current_event["has_snapshot"]) and (
prev_event["top_score"] != current_event["top_score"]
or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["has_clip"] != current_event["has_clip"]
or prev_event["has_snapshot"] != current_event["has_snapshot"]
)
if current_event["has_clip"] or current_event["has_snapshot"]:
if (
prev_event["top_score"] != current_event["top_score"]
or prev_event["entered_zones"] != current_event["entered_zones"]
or prev_event["thumbnail"] != current_event["thumbnail"]
or prev_event["has_clip"] != current_event["has_clip"]
or prev_event["has_snapshot"] != current_event["has_snapshot"]
):
return True
return False


class EventProcessor(threading.Thread):
def __init__(
self, config, camera_processes, event_queue, event_processed_queue, stop_event
self,
config: FrigateConfig,
camera_processes: dict[str, CameraMetricsTypes],
event_queue: Queue,
event_processed_queue: Queue,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "event_processor"
self.config = config
self.camera_processes = camera_processes
self.cached_clips = {}
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue
self.events_in_process = {}
self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event

def run(self):
def run(self) -> None:
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
Expand Down Expand Up @@ -147,14 +159,15 @@ def run(self):


class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "event_cleanup"
self.config = config
self.stop_event = stop_event
self.camera_keys = list(self.config.cameras.keys())

def expire(self, media_type):
def expire(self, media_type: str) -> None:
# TODO: Refactor media_type to enum
## Expire events from unlisted cameras based on the global config
if media_type == "clips":
retain_config = self.config.record.events.retain
Expand Down Expand Up @@ -253,7 +266,7 @@ def expire(self, media_type):
)
update_query.execute()

def purge_duplicates(self):
def purge_duplicates(self) -> None:
duplicate_query = """with grouped_events as (
select id,
label,
Expand Down Expand Up @@ -287,7 +300,7 @@ def purge_duplicates(self):
.execute()
)

def run(self):
def run(self) -> None:
# only expire events every 5 minutes
while not self.stop_event.wait(300):
self.expire("clips")
Expand Down
3 changes: 3 additions & 0 deletions frigate/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ disallow_untyped_calls = false
[mypy-frigate.const]
ignore_errors = false

[mypy-frigate.events]
ignore_errors = false

[mypy-frigate.log]
ignore_errors = false

Expand Down
4 changes: 2 additions & 2 deletions frigate/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests
from typing import Optional, Any
from paho.mqtt.client import Client
from multiprocessing.synchronize import Event
from multiprocessing.synchronize import Event as MpEvent

from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(
stats_tracking: StatsTrackingTypes,
mqtt_client: Client,
topic_prefix: str,
stop_event: Event,
stop_event: MpEvent,
):
threading.Thread.__init__(self)
self.name = "frigate_stats_emitter"
Expand Down
4 changes: 2 additions & 2 deletions frigate/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

from frigate.object_detection import ObjectDetectProcess
from frigate.util import restart_frigate
from multiprocessing.synchronize import Event
from multiprocessing.synchronize import Event as MpEvent

logger = logging.getLogger(__name__)


class FrigateWatchdog(threading.Thread):
def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: Event):
def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "frigate_watchdog"
self.detectors = detectors
Expand Down

0 comments on commit 12a1a63

Please sign in to comment.