From f861bf68730e083cd7f94666c127df374bfe2824 Mon Sep 17 00:00:00 2001 From: Marek Rusinowski Date: Sun, 9 Jul 2023 22:01:54 +0200 Subject: [PATCH] Introduce map_syncer tool This is a daemon that allows to automatically sync live set of maps to a directory. Target usage is on autohosts. --- tools/map_syncer/.envrc | 3 + tools/map_syncer/.gitignore | 4 + tools/map_syncer/README.md | 46 ++ tools/map_syncer/map_syncer.py | 424 ++++++++++++++++++ tools/map_syncer/map_syncer_test.py | 328 ++++++++++++++ tools/map_syncer/pyproject.toml | 37 ++ tools/map_syncer/requirements.txt | 11 + .../stubs/pyfakefs/fake_filesystem.pyi | 35 ++ 8 files changed, 888 insertions(+) create mode 100644 tools/map_syncer/.envrc create mode 100644 tools/map_syncer/.gitignore create mode 100644 tools/map_syncer/README.md create mode 100755 tools/map_syncer/map_syncer.py create mode 100644 tools/map_syncer/map_syncer_test.py create mode 100644 tools/map_syncer/pyproject.toml create mode 100644 tools/map_syncer/requirements.txt create mode 100644 tools/map_syncer/stubs/pyfakefs/fake_filesystem.pyi diff --git a/tools/map_syncer/.envrc b/tools/map_syncer/.envrc new file mode 100644 index 0000000..9306b16 --- /dev/null +++ b/tools/map_syncer/.envrc @@ -0,0 +1,3 @@ +if [ -f .pyenv/bin/activate ]; then + source .pyenv/bin/activate +fi diff --git a/tools/map_syncer/.gitignore b/tools/map_syncer/.gitignore new file mode 100644 index 0000000..d9a1854 --- /dev/null +++ b/tools/map_syncer/.gitignore @@ -0,0 +1,4 @@ +.pyenv/ +__pycache__/ +.mypy_cache/ +.pytest_cache/ diff --git a/tools/map_syncer/README.md b/tools/map_syncer/README.md new file mode 100644 index 0000000..457dbdf --- /dev/null +++ b/tools/map_syncer/README.md @@ -0,0 +1,46 @@ +Map Syncer +========== + +A daemon for keeping a directory with maps in sync with the maps-metadata repo. + +It supports: + +- Delayed deletion of maps that are no longer listed as live +- Periodic time based sync +- Sync on demand triggered by MQTT message +- Monitoring via reporting to https://healthchecks.io/ compatible endpoint + +Production +---------- + +Copy `map_syncer.py` to target and run `./map_syncer.py --help` to see +available options. + +The only runtime dependency on top of Python >= 3.8 is `paho-mqtt`. On Debian +based systems it's `python3-paho-mqtt` package. + +Development +----------- + +### Setup + +```sh +python3 -m venv .pyenv +source .pyenv/bin/activate +pip install -r requirements.txt +``` + +### Lint + +```sh +black . +isort . +ruff . +mypy +``` + +### Test + +```sh +pytest +``` diff --git a/tools/map_syncer/map_syncer.py b/tools/map_syncer/map_syncer.py new file mode 100755 index 0000000..a7a20f4 --- /dev/null +++ b/tools/map_syncer/map_syncer.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: 2023 Marek Rusinowski +# SPDX-License-Identifier: Apache-2.0 OR MIT +# +"""Syncs live maps to a specified directory. + +This script periodically downloads the list of live maps from the given URL +and downloads the maps that are not in the directory. It also optionally +deletes maps that are not seen on the live list for long enough. +""" + +import argparse +import hashlib +import json +import logging +import os +import queue +import shutil +import signal +import sys +import threading +import time +import urllib.request +from contextlib import contextmanager, nullcontext +from dataclasses import dataclass +from enum import Enum +from http.client import HTTPResponse +from pathlib import Path +from types import FrameType +from typing import ( + TYPE_CHECKING, + ContextManager, + Dict, + Iterator, + List, + Optional, + Tuple, + cast, +) + +import paho.mqtt.client as mqtt + +USER_AGENT = "maps-metadata-sync-maps/1.0" +DEFAULT_LIVE_MAPS_URL = ( + "https://maps-metadata.beyondallreason.dev/latest/live_maps.validated.json" +) +DEFAULT_MQTT_TOPIC = "dev.beyondallreason.maps-metadata/live_maps/updated:v1" +DEFAULT_DELETE_AFTER = 4 * 60 * 60 # 4 hours +DEFAULT_POLL_INTERVAL = 10 * 60 # 10 minutes + + +@dataclass +class LiveMapEntry: + spring_name: str + file_name: str + download_url: str + md5: str + + +@dataclass +class MQTTConfig: + host: str + port: int + tls: bool + topic: str + username: Optional[str] + password: Optional[str] + + +def fetch_live_maps(url: str) -> List[LiveMapEntry]: + """Fetches live maps list from given URL and parses it.""" + + req = urllib.request.Request( + url, + headers={ + "User-Agent": USER_AGENT, + "Cache-Control": "no-cache", + }, + ) + res: HTTPResponse + with urllib.request.urlopen(req) as res: + # We assume that read url is well typed according to json schema + data: List[Dict[str, str]] = json.loads(res.read().decode()) + return [ + LiveMapEntry(d["springName"], d["fileName"], d["downloadURL"], d["md5"]) + for d in data + ] + + +def send_healthcheck(url: str, timeout: float = 5000) -> None: + """Sends a healthcheck to the given URL.""" + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + try: + res: HTTPResponse + with urllib.request.urlopen(req, timeout=timeout) as res: + res.read() + except (urllib.error.URLError, OSError) as e: + logging.warning("Error while sending healthcheck: %s", e) + + +def download_file(url: str, destination: Path, md5: str) -> None: + """Downloads a file from the URL to the destination path and checks the MD5.""" + + tmp_destination = Path(f"{destination}.tmp") + req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT}) + res: HTTPResponse + with urllib.request.urlopen(req) as res, tmp_destination.open("wb") as f: + shutil.copyfileobj(res, f) + f.flush() + os.fsync(f.fileno()) + if not md5_match(tmp_destination, md5): + msg = f"MD5 mismatch when validating {destination}" + raise RuntimeError(msg) + tmp_destination.replace(destination) + + +def md5_match(file_path: Path, expected_md5: str) -> bool: + """Checks the MD5 checksum of a file.""" + + hasher = hashlib.md5() + with file_path.open("rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hasher.update(chunk) + return hasher.hexdigest() == expected_md5 + + +def sync_files(directory: Path, url: str, delete_after: int) -> None: + live_maps = fetch_live_maps(url) + + # Download the maps that are not in the directory + for map_info in live_maps: + file_path = directory.joinpath(map_info.file_name) + if not file_path.exists(): + logging.info("Downloading %s", map_info.file_name) + download_file(map_info.download_url, file_path, map_info.md5) + + # Skip deletion if it's disabled + if delete_after < 0: + return + + # Load tombstones file if it exists. + tombstones_file = directory.joinpath("tombstones.json") + not_seen_since: Dict[str, int] = {} + if tombstones_file.exists(): + with tombstones_file.open() as f: + not_seen_since = json.load(f) + logging.debug("Loaded tombstones from file") + + live_map_files = {file_info.file_name for file_info in live_maps} + + # Delete files that are not seen for long enough and rebuild the tombstones + new_not_seen_since: Dict[str, int] = {} + for file_path in directory.iterdir(): + if file_path.name in live_map_files or file_path.suffix not in { + ".sd7", + ".sdz", + ".tmp", + }: + continue + + t = not_seen_since.get(file_path.name, int(time.time())) + if time.time() - t > delete_after: + logging.info("Deleting %s", file_path.name) + file_path.unlink() + else: + new_not_seen_since[file_path.name] = t + logging.debug("Tombstone %s", file_path.name) + + # Save tombstones file if it changed + if not_seen_since != new_not_seen_since: + with tombstones_file.open("w") as f: + json.dump(new_not_seen_since, f) + + +class SyncOp(Enum): + SYNC = 1 + STOP = 2 + + +if TYPE_CHECKING: # noqa: SIM108 + SyncQueue = queue.Queue[Tuple[SyncOp, str]] +else: + SyncQueue = queue.Queue + + +@contextmanager +def mqtt_sync_trigger( + mqtt_config: MQTTConfig, sync_trigger: SyncQueue +) -> Iterator[None]: + """Pushes SYNC trigger to the queue when a message is received on the MQTT topic.""" + + def on_mqtt_message( + client: mqtt.Client, userdata: None, msg: mqtt.MQTTMessage + ) -> None: + if msg.topic == mqtt_config.topic: + sync_trigger.put((SyncOp.SYNC, "MQTT")) + + def on_mqtt_connect( + client: mqtt.Client, userdata: None, flags: Dict[str, int], rc: int + ) -> None: + client.subscribe(mqtt_config.topic) + + mqtt_client = mqtt.Client() + mqtt_client.on_message = on_mqtt_message + mqtt_client.on_connect = on_mqtt_connect + if mqtt_config.username is not None: + mqtt_client.username_pw_set(mqtt_config.username, mqtt_config.password) + mqtt_client.enable_logger(logging.getLogger("mqtt")) + if mqtt_config.tls: + mqtt_client.tls_set() + mqtt_client.connect_async(mqtt_config.host, mqtt_config.port) + mqtt_client.loop_start() + + try: + yield + finally: + mqtt_client.disconnect() + mqtt_client.loop_stop() + + +@contextmanager +def timer_sync_trigger(interval: float, sync_trigger: SyncQueue) -> Iterator[None]: + """Pushes SYNC trigger to the queue every interval seconds.""" + + lock = threading.Lock() + cv = threading.Condition(lock) + stop = False + + def thread() -> None: + nonlocal stop + while not stop: + sync_trigger.put((SyncOp.SYNC, "timer")) + start = time.time() + with cv: + while not stop and time.time() - start < interval: + cv.wait(interval - (time.time() - start)) + + t = threading.Thread(target=thread) + t.start() + + try: + yield + finally: + with cv: + stop = True + cv.notify() + t.join() + + +class TerminateException(BaseException): + pass + + +@contextmanager +def signal_sync_trigger(sync_trigger: SyncQueue) -> Iterator[None]: + """Pushes STOP trigger to the queue when SIGINT or SIGTERM is received.""" + first_signal = True + + def signal_handler(sig: int, frame: Optional[FrameType]) -> None: + nonlocal first_signal + if not first_signal: + logging.warning("Got signal again, exiting immediately") + raise TerminateException() + + logging.warning("Got signal, stopping sync loop...") + sync_trigger.put((SyncOp.STOP, "signal")) + first_signal = False + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + yield + finally: + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + +def polling_sync( + directory: Path, + url: str, + delete_after: int, + sync_trigger: SyncQueue, + healthcheck_url: Optional[str] = None, +) -> None: + """Syncs maps in a loop triggered by queue until STOP is received.""" + + while True: + op, msg = sync_trigger.get() + # Drain the queue because it doesn't make sense to sync multiple + # times in a row. + while True: + if op == SyncOp.STOP: + logging.info("Stopped sync (trigger: %s)", msg) + return + try: + op, msg = sync_trigger.get_nowait() + except queue.Empty: + break + logging.info("Syncing maps (%s)", msg) + try: + start = time.time() + sync_files(directory, url, delete_after) + logging.info("Synced maps in %f seconds", time.time() - start) + if healthcheck_url is not None: + send_healthcheck(healthcheck_url) + except Exception: + logging.exception("Error while syncing maps") + + +def main(argv: List[str]) -> None: + parser = argparse.ArgumentParser(description="Sync live maps to directory.") + parser.add_argument("maps_directory", help="Directory where the maps are stored") + parser.add_argument( + "--log-level", + metavar="LEVEL", + default="WARNING", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level (default: WARNING)", + ) + parser.add_argument( + "--live-maps-url", + default=DEFAULT_LIVE_MAPS_URL, + metavar="URL", + help=f"URL with the list of live maps. Default: {DEFAULT_LIVE_MAPS_URL}", + ) + parser.add_argument( + "--delete-after", + type=int, + metavar="SECONDS", + default=DEFAULT_DELETE_AFTER, + help=( + "Time to delete a map after it's not seen on the live list " + "(in seconds). Set as negative to disable deletions. " + f"Default: {DEFAULT_DELETE_AFTER}" + ), + ) + parser.add_argument( + "--polling-interval", + type=int, + metavar="SECONDS", + default=DEFAULT_POLL_INTERVAL, + help=f"Map polling interval (in seconds). Default: {DEFAULT_POLL_INTERVAL}", + ) + parser.add_argument( + "--mqtt-host", + type=str, + metavar="HOST", + help="MQTT host, when set, enables MQTT based sync trigger", + ) + parser.add_argument( + "--mqtt-port", + type=int, + metavar="PORT", + default=8883, + help="MQTT port, default: 8883", + ) + parser.add_argument( + "--mqtt-no-tls", + action="store_true", + default=False, + help="Disable TLS for MQTT connection", + ) + parser.add_argument( + "--mqtt-topic", + type=str, + metavar="TOPIC", + default=DEFAULT_MQTT_TOPIC, + help=f"MQTT topic, default: {DEFAULT_MQTT_TOPIC}", + ) + parser.add_argument( + "--mqtt-username", + type=str, + metavar="USERNAME", + default=os.environ.get("MQTT_USERNAME"), + help="MQTT username, default: MQTT_USERNAME environment variable", + ) + parser.add_argument( + "--mqtt-password", + type=str, + metavar="PASSWORD", + default=os.environ.get("MQTT_PASSWORD"), + help="MQTT password, default: MQTT_PASSWORD environment variable", + ) + parser.add_argument( + "--healthcheck-url", + type=str, + metavar="URL", + help=( + "https://healthchecks.io/ compatible URL to ping after each " + "sync. Default: None" + ), + default=None, + ) + args = parser.parse_args(args=argv[1:]) + logging.basicConfig(level=getattr(logging, args.log_level)) # type: ignore + + sync_trigger: SyncQueue = queue.Queue() + mqtt_ctx: ContextManager[None] = nullcontext() + if cast(Optional[str], args.mqtt_host) is not None: + mqtt_config = MQTTConfig( + cast(str, args.mqtt_host), + cast(int, args.mqtt_port), + not cast(bool, args.mqtt_no_tls), + cast(str, args.mqtt_topic), + cast(Optional[str], args.mqtt_username), + cast(Optional[str], args.mqtt_password), + ) + mqtt_ctx = mqtt_sync_trigger(mqtt_config, sync_trigger) + + timer_ctx = timer_sync_trigger(cast(int, args.polling_interval), sync_trigger) + + with signal_sync_trigger(sync_trigger), mqtt_ctx, timer_ctx: + polling_sync( + Path(cast(str, args.maps_directory)), + cast(str, args.live_maps_url), + cast(int, args.delete_after), + sync_trigger, + cast(Optional[str], args.healthcheck_url), + ) + + +if __name__ == "__main__": + main(sys.argv) diff --git a/tools/map_syncer/map_syncer_test.py b/tools/map_syncer/map_syncer_test.py new file mode 100644 index 0000000..4f9e0f9 --- /dev/null +++ b/tools/map_syncer/map_syncer_test.py @@ -0,0 +1,328 @@ +import json +import logging +import os +import pathlib +import queue +import secrets +import threading +import time +from contextlib import nullcontext +from typing import Dict, List, Tuple, cast +from unittest.mock import ANY + +import pytest +from pyfakefs.fake_filesystem import FakeFilesystem +from pytest_httpserver import HTTPServer +from pytest_mock import MockerFixture +from pytest_mqtt.capmqtt import MqttCaptureFixture # type: ignore +from werkzeug.wrappers.request import Request as HTTPRequest +from werkzeug.wrappers.response import Response as HTTPResponse + +import map_syncer + +ANY_SYNC_QUEUE = cast(map_syncer.SyncQueue, ANY) + + +def test_main_default_args(mocker: MockerFixture) -> None: + polling_sync = mocker.patch("map_syncer.polling_sync") + mqtt_trigger = mocker.patch("map_syncer.mqtt_sync_trigger") + mqtt_trigger.return_value = nullcontext() + timer_trigger = mocker.patch("map_syncer.timer_sync_trigger") + timer_trigger.return_value = nullcontext() + timer_trigger = mocker.patch("map_syncer.timer_sync_trigger") + timer_trigger.return_value = nullcontext() + log_basic_config = mocker.patch("logging.basicConfig") + map_syncer.main(["map_syncer.py", "map_dir"]) + polling_sync.assert_called_once_with( + pathlib.Path("map_dir"), + map_syncer.DEFAULT_LIVE_MAPS_URL, + map_syncer.DEFAULT_DELETE_AFTER, + ANY_SYNC_QUEUE, + None, + ) + timer_trigger.assert_called_once_with( + map_syncer.DEFAULT_POLL_INTERVAL, ANY_SYNC_QUEUE + ) + mqtt_trigger.assert_not_called() + log_basic_config.assert_called_once_with(level=logging.WARNING) + + +def test_main_all_args(mocker: MockerFixture) -> None: + polling_sync = mocker.patch("map_syncer.polling_sync") + mqtt_trigger = mocker.patch("map_syncer.mqtt_sync_trigger") + mqtt_trigger.return_value = nullcontext() + timer_trigger = mocker.patch("map_syncer.timer_sync_trigger") + timer_trigger.return_value = nullcontext() + log_basic_config = mocker.patch("logging.basicConfig") + mocker.patch.dict(os.environ, {"MQTT_PASSWORD": "password1"}) + map_syncer.main( + [ + "map_syncer.py", + "map_dir", + "--log-level=DEBUG", + "--live-maps-url=http://example.com/live_maps.json", + "--delete-after=123", + "--polling-interval=456", + "--mqtt-host=mqtt.example.com", + "--mqtt-port=1234", + "--mqtt-no-tls", + "--mqtt-topic=topic", + "--mqtt-username=user", + "--healthcheck-url=http://example.com/health", + ] + ) + polling_sync.assert_called_once_with( + pathlib.Path("map_dir"), + "http://example.com/live_maps.json", + 123, + ANY_SYNC_QUEUE, + "http://example.com/health", + ) + timer_trigger.assert_called_once_with(456, ANY_SYNC_QUEUE) + mqtt_trigger.assert_called_once_with( + map_syncer.MQTTConfig( + "mqtt.example.com", + 1234, + False, + "topic", + "user", + "password1", + ), + ANY_SYNC_QUEUE, + ) + log_basic_config.assert_called_once_with(level=logging.DEBUG) + + +def test_fetch_live_maps_parsing(httpserver: HTTPServer) -> None: + response: List[Dict[str, str]] = [ + { + "springName": "Map 1", + "fileName": "map1.sd7", + "downloadURL": "http://example.com/map1.sd7", + "md5": "1234567890abcdef1234567890abcdef", + }, + { + "springName": "Map 2", + "fileName": "map2.sd7", + "downloadURL": "http://example.com/map2.sd7", + "md5": "1234567890abcdef1234567890abcdef", + }, + ] + httpserver.expect_request( + "/live_maps.json", headers={"Cache-Control": "no-cache"} + ).respond_with_json(response) + live_maps = map_syncer.fetch_live_maps( + cast(str, httpserver.url_for("/live_maps.json")) + ) + excepted_live_maps = [ + map_syncer.LiveMapEntry( + "Map 1", + "map1.sd7", + "http://example.com/map1.sd7", + "1234567890abcdef1234567890abcdef", + ), + map_syncer.LiveMapEntry( + "Map 2", + "map2.sd7", + "http://example.com/map2.sd7", + "1234567890abcdef1234567890abcdef", + ), + ] + assert live_maps == excepted_live_maps + + +def test_download_file(httpserver: HTTPServer, fs: FakeFilesystem) -> None: + httpserver.expect_request("/map1.sd7").respond_with_data(b"map1contents") + map1md5 = "462e462688fddf33e4bf4b756015f9a1" + map_syncer.download_file( + cast(str, httpserver.url_for("/map1.sd7")), + pathlib.Path("map1.sd7"), + map1md5, + ) + assert fs.get_object("map1.sd7").contents == "map1contents" + + +def test_send_healthcheck_basic(httpserver: HTTPServer) -> None: + called = False + + def handler(request: HTTPRequest) -> HTTPResponse: + nonlocal called + called = True + return HTTPResponse("OK") + + httpserver.expect_request("/health").respond_with_handler(handler) + map_syncer.send_healthcheck(cast(str, httpserver.url_for("/health"))) + assert called + + +def test_send_healthcheck_ignores_failures(httpserver: HTTPServer) -> None: + httpserver.expect_request("/health").respond_with_data(b"NOT OK", status=500) + map_syncer.send_healthcheck(cast(str, httpserver.url_for("/health"))) + + +def test_send_healthcheck_timeout(httpserver: HTTPServer) -> None: + def handler(request: HTTPRequest) -> HTTPResponse: + time.sleep(0.5) + return HTTPResponse("OK") + + httpserver.expect_request("/health").respond_with_handler(handler) + start = time.time() + map_syncer.send_healthcheck(cast(str, httpserver.url_for("/health")), timeout=0.1) + assert time.time() - start < 0.2 + + +def test_send_healthcheck_dns_error() -> None: + map_syncer.send_healthcheck( + f"http://doesnotexist.{secrets.token_hex(16)}.dev/health" + ) + + +def test_download_file_md5_mismatch(httpserver: HTTPServer, fs: FakeFilesystem) -> None: + httpserver.expect_request("/map1.sd7").respond_with_data(b"map1contents") + with pytest.raises(RuntimeError) as excinfo: + map_syncer.download_file( + cast(str, httpserver.url_for("/map1.sd7")), + pathlib.Path("map1.sd7"), + "asdasdasd", + ) + assert "MD5 mismatch" in str(excinfo.value) + + +def test_mqtt_triggers_on_message( + mosquitto: Tuple[str, int], capmqtt: MqttCaptureFixture +) -> None: + mqtt_config = map_syncer.MQTTConfig( + mosquitto[0], mosquitto[1], False, "topic", None, None + ) + sync_trigger: map_syncer.SyncQueue = queue.Queue() + with map_syncer.mqtt_sync_trigger(mqtt_config, sync_trigger): + with pytest.raises(queue.Empty): + sync_trigger.get(timeout=0.5) + capmqtt.publish("topic", "random stuff") # type: ignore + assert sync_trigger.get(timeout=0.5) == (map_syncer.SyncOp.SYNC, "MQTT") + + +def test_timer_triggers_on_interval(mocker: MockerFixture) -> None: + sync_trigger: map_syncer.SyncQueue = queue.Queue() + + interval = 0.02 + messages = 6 + min_duration = interval * (messages - 1) - (interval / 2) + max_duration = interval * (messages - 1) + (interval / 2) + + # This is pretty flaky, but it's the best we can do without mocking time + start = time.time() + with map_syncer.timer_sync_trigger(interval, sync_trigger): + for _ in range(messages): + t, msg = sync_trigger.get(timeout=10 * interval) + assert (t, msg) == (map_syncer.SyncOp.SYNC, "timer") + duration = time.time() - start + assert duration > min_duration and duration < max_duration + + +def test_poller_starts_sync_correctly(mocker: MockerFixture) -> None: + sync_files = mocker.patch("map_syncer.sync_files") + send_healthcheck = mocker.patch("map_syncer.send_healthcheck") + sync_trigger: map_syncer.SyncQueue = queue.Queue() + t = threading.Thread( + target=lambda: map_syncer.polling_sync( + pathlib.Path(""), "", 0, sync_trigger, "http://x.com/health" + ) + ) + t.start() + for _ in range(3): + sync_trigger.put((map_syncer.SyncOp.SYNC, "A")) + time.sleep(0.1) + sync_trigger.put((map_syncer.SyncOp.STOP, "stop")) + t.join() + assert sync_files.call_count == 3 + assert send_healthcheck.call_count == 3 + + +def test_poller_catches_exceptions(mocker: MockerFixture) -> None: + sync_files = mocker.patch("map_syncer.sync_files") + exc_logging = mocker.patch("logging.exception") + sync_files.side_effect = RuntimeError("test") + sync_trigger: map_syncer.SyncQueue = queue.Queue() + t = threading.Thread( + target=lambda: map_syncer.polling_sync(pathlib.Path(""), "", 0, sync_trigger) + ) + t.start() + sync_trigger.put((map_syncer.SyncOp.SYNC, "A")) + time.sleep(0.1) + sync_trigger.put((map_syncer.SyncOp.STOP, "stop")) + t.join() + assert sync_files.call_count == 1 + assert exc_logging.call_count == 1 + + +def test_poller_ignores_duplicate_requests(mocker: MockerFixture) -> None: + sync_files = mocker.patch("map_syncer.sync_files") + sync_trigger: map_syncer.SyncQueue = queue.Queue() + t = threading.Thread( + target=lambda: map_syncer.polling_sync(pathlib.Path(""), "", 0, sync_trigger) + ) + t.start() + for _ in range(10): + sync_trigger.put((map_syncer.SyncOp.SYNC, "A")) + sync_trigger.put((map_syncer.SyncOp.STOP, "stop")) + t.join() + assert sync_files.call_count < 2 + + +@pytest.fixture(scope="function") +def live_maps_url(httpserver: HTTPServer) -> str: + maps = [ + ("Map 1", "map1.sd7", b"map1contents", "462e462688fddf33e4bf4b756015f9a1"), + ("Map 2", "map2.sd7", b"map2contents", "a4b06ce39970cb157729504ac1d740a3"), + ("Map 3", "map3.sd7", b"map3contents", "d8720a22142996af63b6b962e4d338c7"), + ] + response: List[Dict[str, str]] = [ + { + "springName": name, + "fileName": file, + "downloadURL": cast(str, httpserver.url_for("/map/" + file)), + "md5": md5, + } + for (name, file, _, md5) in maps + ] + httpserver.expect_request("/live_maps.json").respond_with_json(response) + for _, file, contents, _ in maps: + httpserver.expect_request("/map/" + file).respond_with_data(contents) + return cast(str, httpserver.url_for("/live_maps.json")) + + +def test_sync_files_simple(live_maps_url: str, fs: FakeFilesystem) -> None: + d = pathlib.Path("maps") + fs.create_dir(d) + fs.create_file(d / "map_old.sd7", contents="mapoldcontents") + fs.create_file(d / "file.bla") + map_syncer.sync_files(d, live_maps_url, delete_after=0) + assert fs.get_object(d / "map1.sd7").contents == "map1contents" + assert fs.get_object(d / "map2.sd7").contents == "map2contents" + assert fs.get_object(d / "map3.sd7").contents == "map3contents" + assert not fs.exists(d / "map_old.sd7") + assert fs.exists(d / "file.bla") + + +def test_sync_files_deletes_only_old(live_maps_url: str, fs: FakeFilesystem) -> None: + d = pathlib.Path("maps") + fs.create_dir(d) + fs.create_file(d / "map_old_1.sd7") + fs.create_file(d / "map_old_2.sd7") + initial_tombstones: Dict[str, int] = { + "map_old_1.sd7": int(time.time()) - 100, + "map_old_2.sd7": int(time.time()) - 300, + } + fs.create_file( + d / "tombstones.json", + contents=json.dumps(initial_tombstones), + ) + map_syncer.sync_files(d, live_maps_url, delete_after=200) + assert fs.exists(d / "map_old_1.sd7") + assert not fs.exists(d / "map_old_2.sd7") + assert cast( + Dict[str, int], json.loads(fs.get_object(d / "tombstones.json").contents) + ) == { + "map_old_1.sd7": initial_tombstones["map_old_1.sd7"], + } diff --git a/tools/map_syncer/pyproject.toml b/tools/map_syncer/pyproject.toml new file mode 100644 index 0000000..0b00f75 --- /dev/null +++ b/tools/map_syncer/pyproject.toml @@ -0,0 +1,37 @@ +[project] +name = 'map_syncer' +version = '0.0.1' +requires-python = ">=3.8.0" + +[tool.isort] +profile = "black" +extend_skip = [".pyenv"] + +[tool.mypy] +files = "*.py" +pretty = true +mypy_path = "$MYPY_CONFIG_FILE_DIR/stubs" +show_error_context = true + +# Mypy rules: +warn_unused_configs = true +disallow_any_generics = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = true +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_return_any = true +no_implicit_reexport = true +strict_equality = true +disallow_any_expr = true + +[tool.ruff] +select = [ + "E", "F", "C90", "N", "UP", "YTT", "B", "A", "C4", "EM", "ISC", + "G", "RUF", "PIE", "SIM", "PTH", "PLC", "PLE", "PLW" +] diff --git a/tools/map_syncer/requirements.txt b/tools/map_syncer/requirements.txt new file mode 100644 index 0000000..28682ee --- /dev/null +++ b/tools/map_syncer/requirements.txt @@ -0,0 +1,11 @@ +black>=23.3.0 +isort>=5.12.0 +mypy>=1.4.1 +paho-mqtt>=1.6.1 +pyfakefs>=5.2.2 +pytest-httpserver>=1.0.8 +pytest-mock>=3.11.1 +pytest-mqtt>=0.2.0 +pytest>=7.4.0 +ruff>=0.0.277 +types-paho-mqtt>=1.6.0.6 diff --git a/tools/map_syncer/stubs/pyfakefs/fake_filesystem.pyi b/tools/map_syncer/stubs/pyfakefs/fake_filesystem.pyi new file mode 100644 index 0000000..8708708 --- /dev/null +++ b/tools/map_syncer/stubs/pyfakefs/fake_filesystem.pyi @@ -0,0 +1,35 @@ +# Partially generated using stubgen and left only pieces we need. +import os +from typing import Callable + +AnyPath = str | bytes | os.PathLike +AnyString = str | bytes + +class FakeDirectory: + pass + +class FakeFile: + # In reality contents can be also None, but only in case when simulating + # very large files, and we don't do that. + contents: str + +class FakeFilesystem: + def exists(self, file_path: AnyPath, check_link: bool = ...) -> bool: ... + def create_dir( + self, directory_path: AnyPath, perm_bits: int = ... + ) -> FakeDirectory: ... + def create_file( + self, + file_path: AnyPath, + st_mode: int = ..., + contents: AnyString = ..., + st_size: int | None = ..., + create_missing_dirs: bool = ..., + apply_umask: bool = ..., + encoding: str | None = ..., + errors: str | None = ..., + side_effect: Callable[[FakeFile], None] | None = ..., + ) -> FakeFile: ... + def get_object( + self, file_path: AnyPath, check_read_perm: bool = ... + ) -> FakeFile: ...