From c039c7a3fde98e7cdef3efcfbeadfa0ced2f1676 Mon Sep 17 00:00:00 2001 From: Sidneys1 Date: Mon, 15 Jul 2024 19:33:30 +0000 Subject: [PATCH 1/2] WIP towards pluggable sources --- pyproject.toml | 8 +- src/memoria/__main__.py | 67 +++++++ src/memoria/downloader.py | 5 +- src/memoria/logic/source.py | 10 ++ src/memoria/model/allowlist.py | 12 +- src/memoria/model/orm/__init__.py | 1 + src/memoria/model/orm/configured_source.py | 18 ++ src/memoria/model/plugins.py | 9 + src/memoria/model/source.py | 52 ++++++ src/memoria/plugins/__init__.py | 24 ++- src/memoria/plugins/_plugin_suite.py | 102 ++++++----- src/memoria/plugins/_processing_manager.py | 20 ++- src/memoria/plugins/_source_manager.py | 99 +++++++++++ src/memoria/plugins/allowlist.py | 5 +- .../builtin/firefox_sync_client_source.py | 165 ++++++++++++++++++ src/memoria/plugins/source.py | 51 ++++++ src/memoria/tasks.py | 26 +++ src/memoria/web/lifecycle.py | 3 +- src/memoria/web/routes/api/__init__.py | 2 + src/memoria/web/routes/api/source.py | 25 +++ src/memoria/web/routes/api/source_ux.py | 124 +++++++++++++ src/memoria/web/routes/settings.py | 5 +- .../web/www/templates/settings.html.j2 | 89 ++++++++-- src/memoria/web/www/templates/settings.scss | 147 ++++++++++++++++ .../web/www/templates/source_item.html.j2 | 7 + .../web/www/templates/source_plugins.html.j2 | 7 + .../web/www/templates/sources_items.html.j2 | 9 + 27 files changed, 1016 insertions(+), 76 deletions(-) create mode 100644 src/memoria/__main__.py create mode 100644 src/memoria/logic/source.py create mode 100644 src/memoria/model/orm/configured_source.py create mode 100644 src/memoria/model/plugins.py create mode 100644 src/memoria/model/source.py create mode 100644 src/memoria/plugins/_source_manager.py create mode 100644 src/memoria/plugins/builtin/firefox_sync_client_source.py create mode 100644 src/memoria/plugins/source.py create mode 100644 src/memoria/tasks.py create mode 100644 src/memoria/web/routes/api/source.py create mode 100644 src/memoria/web/routes/api/source_ux.py create mode 100644 src/memoria/web/www/templates/source_item.html.j2 create mode 100644 src/memoria/web/www/templates/source_plugins.html.j2 create mode 100644 src/memoria/web/www/templates/sources_items.html.j2 diff --git a/pyproject.toml b/pyproject.toml index 1bc05e3..ab98dc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,9 +24,12 @@ dependencies = [ "fasthx~=0.2403.1", "fastapi~=0.111.0", "humanize~=4.9.0", + "ijson~=3.2.3", "pydantic-settings~=2.2.1", "python-magic~=0.4.27", - "SQLAlchemy[asyncio]~=2.0.30" + "rocketry~=2.5.1", + "SQLAlchemy[asyncio]~=2.0.30", + "websockets~=12.0" ] dynamic = ["version", "description", "authors"] @@ -38,6 +41,9 @@ Repository = "https://github.com/Sidneys1/Memoria" AiohttpDownloader = "memoria.plugins.builtin.aiohttp_downloader:AiohttpDownloader" HtmlContentFinder = "memoria.plugins.builtin.html_content_finder:HtmlContentFinder" HtmlExtractor = "memoria.plugins.builtin.html_extractor:HtmlExtractor" +PrefixAllowlistRule = "memoria.plugins.builtin.prefix_allowlistrule:PrefixAllowlistRule" +RegexAllowlistRule = "memoria.plugins.builtin.regex_allowlistrule:RegexAllowlistRule" +FirefoxSyncClientSource = "memoria.plugins.builtin.firefox_sync_client_source:FirefoxSyncClientSource" [project.optional-dependencies] dev = [ diff --git a/src/memoria/__main__.py b/src/memoria/__main__.py new file mode 100644 index 0000000..8b19957 --- /dev/null +++ b/src/memoria/__main__.py @@ -0,0 +1,67 @@ +import asyncio +from logging import basicConfig, DEBUG, INFO + +class importer: + async def add_many(self, items) -> None: + """""" + async for x in items: + # print("\t", x, sep='') + print('.', end='', flush=True) + print() + + async def add(self, item) -> None: + """""" + + async def flush(self) -> None: + """""" + +async def main() -> None: + from memoria.model.orm.configured_source import ConfiguredSource + from memoria.plugins import PluginSchedule + from memoria.plugins._source_manager import SourcePluginManager + from memoria.db_clients import create_sql_client + + basicConfig(level=INFO) + + async with create_sql_client() as session: + found: ConfiguredSource = await ConfiguredSource.find_one(session, ConfiguredSource.id == 1) # type: ignore + # found.display_name = 'Firefox Sync (sidneys1@live.com)' + # found.schedule = 2 + # found.schedule_value = 60 + # await session.delete(found) + + # new = ConfiguredSource(plugin_id='memoria.plugins.builtin.firefox_sync_client_source:FirefoxSyncClientSource', display_name='Firefox Sync: sidneys1@live.com', config='{}') + # session.add(new) + await session.commit() + + async for x in await ConfiguredSource.find_all(session): + print(x.id, x.plugin_id, x.display_name, x.config, x.schedule, x.schedule_value, x.enabled, sep=', ') + + # async with SourcePluginManager() as manager: + # print(manager._instances) + # print(manager._config) + # print(manager._schedules) + + + # plugin_id: str = None # type: ignore + # plugin_type: type[Source] = None # type: ignore + # for pid, plugin in suite.get_plugins_of_type(Source): + # # print(plugin.__name__, ','.join(x.name for x in plugin.SUPPORTED_SCHEDULES)) + # plugin_type = plugin + # plugin_id = pid + + # config: JsonValue = None + + + + # print("Before 'with'") + # async with plugin_type(config) as plugin: + # return + # i = importer() + # print("Before 'run'") + # await plugin.run(i) + # print("After 'run'") + # print("After 'with'") + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/memoria/downloader.py b/src/memoria/downloader.py index ef2a436..7d1c788 100644 --- a/src/memoria/downloader.py +++ b/src/memoria/downloader.py @@ -15,7 +15,8 @@ from queue import Queue from .model.imported_history import ImportedHistory -from .plugins import ProcessingPluginManager, PluginSuite, Result +from .plugins._processing_manager import ProcessingPluginManager +from .plugins.processing import Result from .settings import SETTINGS _LOG = getLogger(__spec__.name) @@ -86,7 +87,7 @@ async def check_exists(result: Result): async def worker(log: Logger, queue: 'Queue[ImportedHistory]', no_more: 'Event', canceled: 'Event') -> None: from .db_clients import create_elasticsearch_client, create_sql_client - processor = PluginSuite().create_processing_manager() + processor = ProcessingPluginManager() es = await create_elasticsearch_client(SETTINGS.elastic_host, basic_auth=(SETTINGS.elastic_user, SETTINGS.elastic_password)) async with create_sql_client() as sql_session, es, processor: diff --git a/src/memoria/logic/source.py b/src/memoria/logic/source.py new file mode 100644 index 0000000..b8abe60 --- /dev/null +++ b/src/memoria/logic/source.py @@ -0,0 +1,10 @@ +from typing import TYPE_CHECKING, Coroutine, AsyncIterable + +from ..model.source import Source +from ..model.orm.configured_source import ConfiguredSource + +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncSession + +def get_sources(session: 'AsyncSession', skip: int = 0, limit: int|None = None) -> Coroutine[None, None, AsyncIterable[ConfiguredSource]]: + return ConfiguredSource.find_all(session, skip=skip, limit=limit, order_by=ConfiguredSource.id.desc()) diff --git a/src/memoria/model/allowlist.py b/src/memoria/model/allowlist.py index 42b528d..848d6d8 100644 --- a/src/memoria/model/allowlist.py +++ b/src/memoria/model/allowlist.py @@ -21,16 +21,12 @@ def color(self) -> str|None: @staticmethod @lru_cache - def _get_options(plugin_id: str) -> AllowlistPlugin.DisplayOptions|None: + def _get_options(plugin_id: str) -> AllowlistPlugin.DisplayOptions: from ..plugins._plugin_suite import PluginSuite from ..plugins.allowlist import AllowlistRule - suite = PluginSuite() - for plugin in suite._plugins.values(): - if not issubclass(plugin, AllowlistRule): continue - if plugin.identifier != plugin_id: continue - if (options := plugin.DISPLAY_OPTIONS) is None: - return None - return options + plugin = PluginSuite().get_plugin_by_short_name(plugin_id + "AllowlistRule") + assert plugin is not None and issubclass(plugin.type, AllowlistRule), f"{plugin_id}" + return plugin.type.DISPLAY_OPTIONS class Config: from_attributes = True diff --git a/src/memoria/model/orm/__init__.py b/src/memoria/model/orm/__init__.py index 2b42deb..87d94f6 100644 --- a/src/memoria/model/orm/__init__.py +++ b/src/memoria/model/orm/__init__.py @@ -62,5 +62,6 @@ async def find_all(cls, from .history import * from .page import * from .allowlist import * +from .configured_source import * __all__ = tuple() diff --git a/src/memoria/model/orm/configured_source.py b/src/memoria/model/orm/configured_source.py new file mode 100644 index 0000000..67f6475 --- /dev/null +++ b/src/memoria/model/orm/configured_source.py @@ -0,0 +1,18 @@ +from sqlalchemy import Boolean, Integer, String + +from . import Column, CrudBase + + +class ConfiguredSource(CrudBase): + __tablename__ = 'configured_sources' + + id = Column(Integer, primary_key=True) + plugin_id = Column(String) + display_name = Column(String) + config = Column(String, default="null") + schedule = Column(Integer, default=0) + schedule_value = Column(String, default="") + enabled = Column(Boolean, default=False) + + +__all__ = tuple() diff --git a/src/memoria/model/plugins.py b/src/memoria/model/plugins.py new file mode 100644 index 0000000..d8595c4 --- /dev/null +++ b/src/memoria/model/plugins.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel + +class Plugin(BaseModel): + id: str + display_name: str + description: str|None + +class SourcePlugin(Plugin): + ... diff --git a/src/memoria/model/source.py b/src/memoria/model/source.py new file mode 100644 index 0000000..18ce493 --- /dev/null +++ b/src/memoria/model/source.py @@ -0,0 +1,52 @@ +from pydantic import BaseModel, computed_field + +from ..plugins.source import PluginSchedule + + +class Source(BaseModel): + id: int + plugin_id: str + display_name: str + config: str + schedule: PluginSchedule + schedule_value: str|None + enabled: bool + + @computed_field + def display_schedule(self) -> str: + match self.schedule: + case PluginSchedule.Disabled: + return "disabled" + case PluginSchedule.Continuous: + return "Continuous" + case PluginSchedule.Intermittent: + assert isinstance(self.schedule_value, str) + value = float(self.schedule_value) + if value == 60: + return "Hourly" + elif value == 1440: + return "Daily" + import humanize + return "Every " + humanize.naturaldelta(60.0 * value) + case PluginSchedule.Scheduled: + return "Scheduled" + case PluginSchedule.OnDemand: + return "OnDemand" + + # @computed_field + # def display_name(self) -> str: + # config = self._get_options(self.plugin_id) + # ret = config.display_name or self.plugin_id + # print('!!!!!!! DIsplay name of', self.plugin_id, 'is', ret) + # return ret + + # @staticmethod + # @lru_cache + # def _get_options(plugin_id: str) -> SourcePlugin.UxConfig: + # from ..plugins._plugin_suite import PluginSuite, PluginId + # plugin = PluginSuite().get_plugin_by_id(PluginId(plugin_id)) + # assert plugin is not None and issubclass(plugin.type, SourcePlugin), f"{plugin_id}" + # return plugin.type.UX_CONFIG + + class Config: + from_attributes = True diff --git a/src/memoria/plugins/__init__.py b/src/memoria/plugins/__init__.py index 6d7a497..0557dcc 100644 --- a/src/memoria/plugins/__init__.py +++ b/src/memoria/plugins/__init__.py @@ -1,8 +1,28 @@ from abc import ABC from contextlib import AbstractAsyncContextManager -from typing import TypeAlias +from enum import IntFlag, auto +from typing import NewType + +type Html = str +PluginId = NewType('PluginId', str) + + +class PluginSchedule(IntFlag): + Disabled = 0 + """The plugin will not be run.""" + + Continuous = auto() + """The plugin will manage its own lifecycle (running continuously) through `__aenter__` and `__aexit__`.""" + + Intermittent = auto() + """The plugin can be configured to run at specific intervals.""" + + Scheduled = auto() + """The plugin can be configured to run at specific cron schedules.""" + + OnDemand = auto() + """The plugin can be run on-demand, even if other schedules are selected.""" -Html: TypeAlias = str class Plugin(AbstractAsyncContextManager, ABC): async def __aexit__(self, *_, **__): diff --git a/src/memoria/plugins/_plugin_suite.py b/src/memoria/plugins/_plugin_suite.py index 84c7a13..d7141cb 100644 --- a/src/memoria/plugins/_plugin_suite.py +++ b/src/memoria/plugins/_plugin_suite.py @@ -1,47 +1,55 @@ +from dataclasses import dataclass from importlib.metadata import entry_points from logging import getLogger -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from ._processing_manager import ProcessingPluginManager +from typing import Generator, TypeVar, cast from ..settings import SETTINGS from ..util import Singleton +from . import PluginId from .processing import Downloader, Extractor, Filter, Plugin _LOG = getLogger(__spec__.name) +TPlugin = TypeVar('TPlugin', bound=Plugin) + +@dataclass(repr=False, eq=False, frozen=True, slots=True) +class PluginDefinition[TPlugin]: + id: PluginId + short_name: str + type: type[TPlugin] class PluginSuite(metaclass=Singleton): - _modules: dict[str, str] - _plugins: dict[str, type[Plugin]] + _plugins: dict[PluginId, PluginDefinition[Plugin]] + _short_names: dict[str, PluginId] def __init__(self) -> None: from .builtin.aiohttp_downloader import AiohttpDownloader + from .builtin.firefox_sync_client_source import FirefoxSyncClientSource from .builtin.html_content_finder import HtmlContentFinder from .builtin.html_extractor import HtmlExtractor from .builtin.prefix_allowlistrule import PrefixAllowlistRule from .builtin.regex_allowlistrule import RegexAllowlistRule - self._modules = { - 'AiohttpDownloader': 'memoria.plugins.builtin.aiohttp_downloader', - 'HtmlContentFinder': 'memoria.plugins.builtin.html_content_finder', - 'HtmlExtractor': 'memoria.plugins.builtin.html_extractor', - 'PrefixAllowlistRule': 'memoria.plugins.buitin.prefix_allowlistrule', - 'RegexAllowlistRule': 'memoria.plugins.buitin.regex_allowlistrule', - } - self._plugins = { - 'AiohttpDownloader': AiohttpDownloader, - 'HtmlContentFinder': HtmlContentFinder, - 'HtmlExtractor': HtmlExtractor, - 'PrefixRule': PrefixAllowlistRule, - 'RegexRule': RegexAllowlistRule, + self._plugins = {} + self._short_names = {} + + builtin_plugins = { + PluginId('memoria.plugins.builtin.aiohttp_downloader:AiohttpDownloader'): AiohttpDownloader, + PluginId('memoria.plugins.builtin.html_content_finder:HtmlContentFinder'): HtmlContentFinder, + PluginId('memoria.plugins.builtin.html_extractor:HtmlExtractor'): HtmlExtractor, + PluginId('memoria.plugins.builtin.prefix_allowlistrule:PrefixAllowlistRule'): PrefixAllowlistRule, + PluginId('memoria.plugins.builtin.regex_allowlistrule:RegexAllowlistRule'): RegexAllowlistRule, + PluginId('memoria.plugins.builtin.firefox_sync_client_source:FirefoxSyncClientSource'): FirefoxSyncClientSource, } + for id, type in builtin_plugins.items(): + module, name = id.rsplit(':', maxsplit=1) + self._add_plugin(id, module, name, type) + for entry_point in entry_points(group='memoria'): if entry_point.module.startswith('memoria.plugins.builtin.'): - _LOG.debug("Skipping import of built-in plugin `%s.%s`", entry_point.module, entry_point.name) continue + _LOG.debug("Importing plugin `%s` in `%s`.", entry_point.name, entry_point.module) try: @@ -49,49 +57,61 @@ def __init__(self) -> None: if not issubclass(plugin, Plugin): _LOG.error("Plugin `%s` does not adhere to API!", entry_point.name) continue - self._add_plugin(entry_point.module, entry_point.name, plugin) + self._add_plugin(PluginId(entry_point.value), entry_point.module, entry_point.name, plugin) except: _LOG.exception("Failed to load `%s`:", entry_point.value) - _LOG.info("Loaded plugins: [`%s`]", '`, `'.join(self._plugins.keys())) + + _LOG.info("Loaded plugins: [`%s`]", '`, `'.join(self._short_names.keys())) self._check() - def _add_plugin(self, module: str, name: str, plugin: type[Plugin]) -> None: - if name in self._plugins: - if (fqdn := f'{module}.{name}') in self._plugins: - raise RuntimeError(f"Same module used twice despite disambiguation: {fqdn}!") + def _add_plugin(self, id: PluginId, module: str, name: str, plugin: type[Plugin]) -> None: + if id in self._plugins: + _LOG.warning("Same plugin (`%s`) imported twice?", id) + return - _LOG.warning("Same plugin name used twice (in `%s` first, and now `%s`). Renaming current plugin to `%s`.", - self._modules[name], module, fqdn) - name = fqdn + if name in self._short_names: + _LOG.warning("Plugin name `%s` reused (first as `%s`, now `%s`).", name, self._short_names[name], id) + name = id + if name in self._short_names: + _LOG.error("Plugin name still collides after disambiguation! Skipping.") + return - self._modules[name] = module - self._plugins[name] = plugin + self._short_names[name] = id + self._plugins[id] = PluginDefinition(id, name, plugin) def _check(self) -> None: errors: list[Exception] = [] - if SETTINGS.downloader not in self._plugins or not issubclass(self._plugins[SETTINGS.downloader], Downloader): + if SETTINGS.downloader not in self._short_names or not issubclass(self._plugins[self._short_names[SETTINGS.downloader]].type, Downloader): errors.append( ValueError(f"No Downloader plugin by name `{SETTINGS.downloader}`. " - f"Available Downloaders: [`{'`, `'.join(self._downloaders)}`]")) + f"Available Downloaders: [`{'`, `'.join(x.short_name for x in self.get_plugins_of_type(Downloader))}`]")) - if SETTINGS.extractor not in self._plugins or not issubclass(self._plugins[SETTINGS.extractor], Extractor): + if SETTINGS.extractor not in self._short_names or not issubclass(self._plugins[self._short_names[SETTINGS.extractor]].type, Extractor): errors.append( ValueError(f"No Extractor plugin by name `{SETTINGS.extractor}`. " - f"Available Extractors: [`{'`, `'.join(self._extractors)}`]")) + f"Available Extractors: [`{'`, `'.join(x.short_name for x in self.get_plugins_of_type(Extractor))}`]")) for filter_ in SETTINGS.filter_stack: - if filter_ not in self._plugins or not issubclass(self._plugins[filter_], Filter): + if filter_ not in self._short_names or not issubclass(self._plugins[self._short_names[filter_]].type, Filter): errors.append( ValueError( f"No Filter plugin by name `{filter_}`. " - f"Available Filters: [`{'`, `'.join(n for n, t in self._plugins.items() if issubclass(t, Filter))}`]" + f"Available Filters: [`{'`, `'.join(x.short_name for x in self.get_plugins_of_type(Filter))}`]" )) if errors: raise ExceptionGroup("Invalid configuration:", errors) - def create_processing_manager(self) -> 'ProcessingPluginManager': - from ._processing_manager import ProcessingPluginManager - return ProcessingPluginManager(self._plugins[SETTINGS.downloader], self._plugins[SETTINGS.extractor], - (self._plugins[name] for name in SETTINGS.filter_stack)) + def get_plugins_of_type(self, t: type[TPlugin]) -> Generator[PluginDefinition[TPlugin], None, None]: + for plugin in self._plugins.values(): + if issubclass(plugin.type, t): + yield cast(PluginDefinition[t], plugin) + + def get_plugin_by_id(self, id: PluginId) -> PluginDefinition[Plugin] | None: + return self._plugins[id] + + def get_plugin_by_short_name(self, short_name: str) -> PluginDefinition[Plugin] | None: + if short_name not in self._short_names: + return None + return self._plugins[self._short_names[short_name]] diff --git a/src/memoria/plugins/_processing_manager.py b/src/memoria/plugins/_processing_manager.py index d3f8f3b..8e4c6b2 100644 --- a/src/memoria/plugins/_processing_manager.py +++ b/src/memoria/plugins/_processing_manager.py @@ -1,11 +1,12 @@ from contextlib import AbstractAsyncContextManager from itertools import chain, pairwise from logging import getLogger -from typing import Callable, Coroutine, Iterable, Self +from typing import Callable, Coroutine, Self from ..model.imported_history import ImportedHistory from ..settings import SETTINGS from .processing import Downloader, Extractor, Filter, Plugin, Result +from ._plugin_suite import PluginSuite def _get_plugin_subclass(clazz: type) -> str: @@ -31,8 +32,17 @@ class ProcessingPluginManager(AbstractAsyncContextManager): filters: list[Filter] _LOG = getLogger(__spec__.name + '.PluginProcessor') - def __init__(self, downloader: type[Downloader], extractor: type[Extractor], - filters: Iterable[type[Filter]]) -> None: + def __init__(self) -> None: + suite = PluginSuite() + downloader = suite.get_plugin_by_short_name(SETTINGS.downloader) + assert downloader is not None and issubclass(downloader.type, Downloader) + downloader = downloader.type + extractor = suite.get_plugin_by_short_name(SETTINGS.extractor) + assert extractor is not None and issubclass(extractor.type, Extractor) + extractor = extractor.type + filters = (x.type for x in + (suite.get_plugin_by_short_name(name) for name in SETTINGS.filter_stack) + if x is not None and issubclass(x.type, Filter)) errors = [] if not issubclass(downloader, Downloader): @@ -60,7 +70,7 @@ def __init__(self, downloader: type[Downloader], extractor: type[Extractor], f"plugin only produces [`{'`, `'.join(c_type)}`].")) continue - _LOG.debug("\tFilter #%d: `%s` [`%s`] -> [`%s`]", i, filter_.__name__, + self._LOG.debug("\tFilter #%d: `%s` [`%s`] -> [`%s`]", i, filter_.__name__, '`, `'.join(c_type.intersection(filter_.accept)), '`, `'.join(filter_.content_types)) c_type = filter_.content_types @@ -80,7 +90,7 @@ def __init__(self, downloader: type[Downloader], extractor: type[Extractor], self.extractor = extractor() self.filters = [f() for f in filters] - if not c_type.intersection(self.extractor.accept): + if c_type is not None and not c_type.intersection(self.extractor.accept): errors.append( ValueError(f"Filter stack produces [`{'`, `'.join(c_type)}`], but selected Extractor " f"`{SETTINGS.extractor}` only accepts [`{'`, `'.join(extractor.accept)}`].")) diff --git a/src/memoria/plugins/_source_manager.py b/src/memoria/plugins/_source_manager.py new file mode 100644 index 0000000..ec7164a --- /dev/null +++ b/src/memoria/plugins/_source_manager.py @@ -0,0 +1,99 @@ +from contextlib import AbstractAsyncContextManager, AsyncExitStack +from typing import TYPE_CHECKING, Self +from logging import getLogger + +from ._plugin_suite import PluginSuite +from .source import Source +from . import PluginSchedule + +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncSession + from pydantic import JsonValue + + +class SourcePluginManager(AbstractAsyncContextManager): + _LOG = getLogger(__spec__.name + '.SourcePluginManager') + + _stack: AsyncExitStack|None + _instances: dict[int, Source] + _config: dict[int, 'JsonValue'] + _schedules: dict[int, tuple[PluginSchedule, str]] + + def __init__(self) -> None: + self._stack = None + self._instances = {} + self._config = {} + self._schedules = {} + + def start(self) -> None: + from ..tasks import APP + from rocketry.conds import cron + + if self._stack is None: + raise RuntimeError() + + for id, (schedule, value) in self._schedules.items(): + match schedule: + case PluginSchedule.Intermittent: + APP.task(f'every {value} minutes', func=self._instances[id].run) + case PluginSchedule.Scheduled: + APP.task(cron(value), func=self._instances[id].run) + + APP.run() + + + async def _load(self, session: 'AsyncSession') -> None: + type_map = {p.id: p.type for p in PluginSuite().get_plugins_of_type(Source)} + + from ..model.orm.configured_source import ConfiguredSource + async for x in await ConfiguredSource.find_all(session): + import json + + id: int = x.id # type: ignore + plugin_id: str = x.plugin_id # type: ignore + schedule: PluginSchedule = PluginSchedule(x.schedule) # type: ignore + schedule_value: str = x.schedule_value # type: ignore + enabled: bool = x.enabled # type: ignore + + if not enabled: + self._LOG.info("Skipping disabled Source plugin...") + continue + + if plugin_id not in type_map: + self._LOG.error("Configured Source plugin is of nonexistent type `%s`.", x.plugin_id) + continue + + config: 'JsonValue' = None + if x.config: # type: ignore + try: + config = json.loads(x.config) # type: ignore + except: + pass + + self._config[id] = config + try: + self._instances[id] = type_map[plugin_id][1](config) + except ValueError: + self._LOG.error("Configured Source plugin failed to load config.") + continue + + self._schedules[id] = schedule, schedule_value + + async def __aenter__(self) -> Self: + from ..db_clients import create_sql_client + async with create_sql_client() as session: + await self._load(session) + + self._stack = AsyncExitStack() + await self._stack.__aenter__() + + for x in self._instances.values(): + await self._stack.enter_async_context(x) + + return self + + async def __aexit__(self, *args, **kwargs) -> bool | None: + assert self._stack is not None + await self._stack.__aexit__(*args, **kwargs) + self._stack = None + return False diff --git a/src/memoria/plugins/allowlist.py b/src/memoria/plugins/allowlist.py index 5bd6258..6333a62 100644 --- a/src/memoria/plugins/allowlist.py +++ b/src/memoria/plugins/allowlist.py @@ -7,11 +7,10 @@ if TYPE_CHECKING: from urllib.parse import ParseResult -from . import Plugin, Html +from . import Plugin, Html, PluginId type Url = str Hostname = NewType('Hostname', str) -PluginId = NewType('PluginId', str) type InputItem = tuple[Hostname, Url, 'ParseResult'] @@ -22,7 +21,7 @@ class DisplayOptions: prefix: str|None = None display_name: str|None = None - DISPLAY_OPTIONS: ClassVar[DisplayOptions|None] = None + DISPLAY_OPTIONS: ClassVar[DisplayOptions] = DisplayOptions() """Used for UI/UX display purposes. Can be `None`.""" LONG_DOCUMENTATION: ClassVar[Html|None] = None diff --git a/src/memoria/plugins/builtin/firefox_sync_client_source.py b/src/memoria/plugins/builtin/firefox_sync_client_source.py new file mode 100644 index 0000000..28f93e5 --- /dev/null +++ b/src/memoria/plugins/builtin/firefox_sync_client_source.py @@ -0,0 +1,165 @@ +from logging import getLogger +from typing import TYPE_CHECKING, AsyncIterator + +if TYPE_CHECKING: + from pydantic import JsonValue + +from memoria.plugins import Html +from memoria.plugins.source import HistoryImporter, ImportedHistory, PluginSchedule, Source, UxHost + +_MOZILLA_URL = "https://www.mozilla.org/en-US/firefox/features/sync/" +_GITHUB_URL = "https://github.com/Mikescher/firefox-sync-client" +_DESC: Html = f"""

+ Collects web history + + synced to a Mozilla account + using the open-source + + ffsclient + . +

""" + +_LOGIN_FORM = """
+

Login

+ + + + +
""" + +_TOTP_FORM = """
+

One-Time-Password

+ + +
+""" + +_LOG = getLogger(__spec__.name) + +def _get_ffsclient_path() -> str|None: + from shutil import which + return which('ffsclient') + +async def create_sync_client(host: UxHost) -> tuple[Html, 'JsonValue']: + import asyncio + import json + from tempfile import TemporaryDirectory + from pathlib import Path + from asyncio.subprocess import create_subprocess_exec, DEVNULL, PIPE + + with TemporaryDirectory() as path: + temp_path = Path(path) / 'session.json' + + result = await host.update_dialog(_LOGIN_FORM) + assert isinstance(result, dict) and 'email' in result and isinstance(result['email'], str) and 'password' in result and isinstance(result['password'], str) + + email = result['email'] + + await host.waiting("Logging in...") + + ffsclient_path = _get_ffsclient_path() + assert ffsclient_path is not None + + CMD = ( + ffsclient_path, + 'login', + email, + result['password'], + '--sessionfile', str(temp_path), + ) + + print('Running:', ' '.join(CMD)) + process = await create_subprocess_exec(*CMD, stdin=PIPE, stdout=DEVNULL, stderr=PIPE) + + assert process.stdout is not None and process.stdin is not None + + stderr = b'' + try: + line = await asyncio.wait_for(process.stdout.readline(), 5) + if line == b'Enter your OTP (2-Factor Authentication Code): \n': + result = await host.update_dialog(_TOTP_FORM) + assert isinstance(result, dict) and 'totp' in result and isinstance(result['totp'], str) + await host.waiting() + process.stdin.write(result['totp'].encode()) + + _, stderr = await process.communicate() + if stderr: + _LOG.error("Expected success or TOTP request from subprocess, got %r.", line) + raise ChildProcessError('Failed to log in.') + except asyncio.TimeoutError: + await host.error("Timed out waiting for login.") + raise + except ChildProcessError: + first_line = stderr.find(b'\n') or 0 + last_line = stderr.rfind(b'\n', first_line, -1) or len(stderr) + error = stderr[:first_line or len(stderr)].decode() + import json + details = error + try: + details = json.loads(stderr[last_line+1:-1].decode()).get('message') + except ValueError: + pass + import html + if not details: + await host.error("""Failed to log in:
{}
""".format(html.escape(error))) + else: + await host.error("""Failed to log in: {}.""".format(html.escape(details))) + raise + + + with temp_path.open() as fp: + session = json.load(fp) + + return f"Firefox Sync {email}", {'session': session} + + +class FirefoxSyncClientSource(Source): + UX_CONFIG = Source.UxConfig(display_name="Firefox Sync", description=_DESC, create=create_sync_client) + SUPPORTS_ON_DEMAND = True + SUPPORTED_SCHEDULES = PluginSchedule.Intermittent | PluginSchedule.Scheduled | PluginSchedule.OnDemand + + _CHECK_SESSION_ARGS = ('check-session', ) + _HISTORY_LIST_ARGS = ('history', 'list', '--format', 'json', '--minimized-json', '--ignore-schema-errors') + + _firefox_sync_client_path: str | None + + def __init__(self, config: 'JsonValue') -> None: + self._firefox_sync_client_path = _get_ffsclient_path() + + async def __aenter__(self): + import asyncio + from subprocess import DEVNULL + + if self._firefox_sync_client_path is None: + raise EnvironmentError("`ffsclient` is not on PATH.") + + process = await asyncio.subprocess.create_subprocess_exec(self._firefox_sync_client_path, + *self._CHECK_SESSION_ARGS, + stdin=DEVNULL, + stdout=DEVNULL, + stderr=DEVNULL) + if (await process.wait()) != 0: + raise EnvironmentError("`ffsclient` does not have a valid session.") + + return await super().__aenter__() + + async def _stream_items(self) -> AsyncIterator[ImportedHistory]: + from asyncio.subprocess import DEVNULL, PIPE, create_subprocess_exec + + import ijson + + from memoria.model.imported_history import ImportedMozillaHistory + + assert self._firefox_sync_client_path is not None + + process = await create_subprocess_exec(self._firefox_sync_client_path, + *self._HISTORY_LIST_ARGS, + stdin=DEVNULL, + stdout=PIPE, + stderr=DEVNULL) + async for i in ijson.items(process.stdout, 'item'): + i['last_visit'] = max(x['date_unix'] for x in i.get('visits')) + yield ImportedMozillaHistory.model_validate(i) + + async def run(self, importer: HistoryImporter) -> None: + await importer.add_many(self._stream_items()) diff --git a/src/memoria/plugins/source.py b/src/memoria/plugins/source.py new file mode 100644 index 0000000..f19382a --- /dev/null +++ b/src/memoria/plugins/source.py @@ -0,0 +1,51 @@ +from abc import ABC +from dataclasses import dataclass +from typing import AsyncIterator, ClassVar, Protocol, Callable, Coroutine, AsyncGenerator + +from pydantic import JsonValue + +from ..model.imported_history import ImportedHistory +from . import Plugin, PluginSchedule, Html + + +class HistoryImporter(Protocol): + async def add_many(self, items: AsyncIterator[ImportedHistory]) -> None: + """""" + + async def add(self, item: ImportedHistory) -> None: + """""" + + async def flush(self) -> None: + """""" + +class UxHost(Protocol): + async def update_dialog(self, form: Html) -> 'JsonValue': + ... + + async def done(self) -> None: + ... + + async def waiting(self, text: Html|None = None) -> None: + ... + + async def error(self, message: Html) -> None: + ... + + +class Source(Plugin, ABC): + @dataclass(repr=False, eq=False, match_args=False, slots=True, kw_only=True) + class UxConfig: + display_name: str|None = None + description: str|None = None + create: Callable[['UxHost'], Coroutine[None, None, tuple[Html, 'JsonValue']]]|None = None + + UX_CONFIG: ClassVar[UxConfig] = UxConfig() + SUPPORTED_SCHEDULES: ClassVar[PluginSchedule] + + def __init__(self, config: JsonValue) -> None: + super().__init__() + + async def run(self, importer: 'HistoryImporter') -> None: + ... + + diff --git a/src/memoria/tasks.py b/src/memoria/tasks.py new file mode 100644 index 0000000..385aa4b --- /dev/null +++ b/src/memoria/tasks.py @@ -0,0 +1,26 @@ +import asyncio +from contextlib import asynccontextmanager + +from rocketry import Rocketry +from multiprocessing import Process, Event, Manager + + +APP = Rocketry(execution='async') + + +async def _main() -> None: + from .plugins._source_manager import SourcePluginManager + async with SourcePluginManager() as manager: + manager.start() + + +SUBPROCESS: Process = Process(target=lambda: asyncio.run(_main())) +SUBPROCESS.daemon = True + + +@asynccontextmanager +async def tasks_lifecycle(): + SUBPROCESS.start() + yield + SUBPROCESS.kill() # TODO: .join() + SUBPROCESS = None # type: ignore diff --git a/src/memoria/web/lifecycle.py b/src/memoria/web/lifecycle.py index a6b022a..de8d8c5 100644 --- a/src/memoria/web/lifecycle.py +++ b/src/memoria/web/lifecycle.py @@ -8,5 +8,6 @@ @asynccontextmanager async def lifespan(_: 'FastAPI'): from .db_dependencies import es_lifecycle, sqlalchemy_lifecycle - async with es_lifecycle(), sqlalchemy_lifecycle(): + # from ..tasks import tasks_lifecycle + async with es_lifecycle(), sqlalchemy_lifecycle(): #, tasks_lifecycle(): yield diff --git a/src/memoria/web/routes/api/__init__.py b/src/memoria/web/routes/api/__init__.py index e36d0de..7d11760 100644 --- a/src/memoria/web/routes/api/__init__.py +++ b/src/memoria/web/routes/api/__init__.py @@ -23,6 +23,8 @@ class HtmxHeaderValue(str, Enum): from .allowlist import * from .allowlist_rules import * from .denylist import * +from .source import * +from .source_ux import * # Include router APP.include_router(API) diff --git a/src/memoria/web/routes/api/source.py b/src/memoria/web/routes/api/source.py new file mode 100644 index 0000000..66b46ec --- /dev/null +++ b/src/memoria/web/routes/api/source.py @@ -0,0 +1,25 @@ +from fasthx import JinjaContext + +from ....logic.source import Source, get_sources +from ...db_dependencies import SqlSession +from .. import HX +from . import API + +_RESPONSES = {200: {'content': {'text/html': {}}}} + + +@API.get("/sources", response_model=list[Source], responses=_RESPONSES) +@HX.hx('sources_items.html.j2', make_context=JinjaContext.unpack_result_with_route_context) +async def api_sources(session: SqlSession, skip: int = 0, limit: int | None = None): + return [Source.model_validate(x) async for x in await get_sources(session, skip=skip, limit=limit)] + + +@API.delete("/sources/{id}") +@HX.hx('blank.html.j2') +async def api_delete_source(session: SqlSession, id: int): + from ....model.orm.configured_source import ConfiguredSource + async with session.begin(): + await session.delete(await ConfiguredSource.find_one(session, ConfiguredSource.id == id)) + + +__all__ = tuple() diff --git a/src/memoria/web/routes/api/source_ux.py b/src/memoria/web/routes/api/source_ux.py new file mode 100644 index 0000000..177166d --- /dev/null +++ b/src/memoria/web/routes/api/source_ux.py @@ -0,0 +1,124 @@ +from typing import Annotated, AsyncGenerator, TYPE_CHECKING, cast +from urllib.parse import quote_plus +from logging import getLogger +import json + +from fastapi import Form, Response, WebSocket, HTTPException, WebSocketDisconnect +from fastapi.responses import HTMLResponse + +from ....plugins.source import Source, Html, UxHost +from ....plugins._plugin_suite import PluginSuite +from ....model.plugins import SourcePlugin +from ....model.orm.configured_source import ConfiguredSource +from ...db_dependencies import SqlSession +from ... import APP +from .. import HX +from . import API, HtmxHeader + +if TYPE_CHECKING: + from pydantic import JsonValue + +_LOG = getLogger(__spec__.name) + +@API.get("/plugins/sources") +@HX.hx('source_plugins.html.j2') +async def source_plugins() -> list[SourcePlugin]: + ret: list[SourcePlugin] = [] + for plugin in PluginSuite().get_plugins_of_type(Source): + if (display_name := plugin.type.UX_CONFIG.display_name) is None: + display_name = plugin.short_name + if (description := plugin.type.UX_CONFIG.description) is None: + description = plugin.type.__doc__ + + ret.append(SourcePlugin(id=plugin.id, display_name=display_name, description=description)) + + return ret + +@API.get("/plugins/sources/create") +@HX.hx('blank.html.j2', no_data=True) +async def source_create() -> None: + return None + +class Ux(UxHost): + _is_reset: bool + def __init__(self, ws: WebSocket): + self._is_reset = False + self._ws = ws + + async def error(self, message: Html) -> None: + if self._is_reset: + raise RuntimeError('Websocket has been closed.') + await self._ws.close(code=1011, reason=message) + self._is_reset = True + + async def done(self) -> None: + if self._is_reset: + raise RuntimeError('Websocket has been closed.') + await self._ws.close() + self._is_reset = True + + async def waiting(self, text: Html|None = None) -> None: + if self._is_reset: + raise RuntimeError('Websocket has been closed.') + await self._ws.send_text(f""" +
+
{text}
+ +
+""") + + async def update_dialog(self, form: Html) -> 'JsonValue': + if self._is_reset: + raise RuntimeError('Websocket has been closed.') + await self._ws.send_text(form) + ret = await self._ws.receive_json() + if 'HEADERS' in ret: + del ret['HEADERS'] + return cast('JsonValue', ret) + +@API.websocket("/plugins/sources/create") +async def source_create_websocket(ws: WebSocket, session: SqlSession): + try: + await ws.accept() + msg = await ws.receive_json() + if 'id' not in msg or not isinstance(msg['id'], str): + _LOG.error("id not provided") + await ws.close() + return + + plugin = PluginSuite().get_plugin_by_id(msg['id']) + if plugin is None or not issubclass(plugin.type, Source): + _LOG.error("Requested Source plugin `%s` does not exist.", msg['id']) + await ws.close() + return + + if plugin.type.UX_CONFIG.create is None: + # No workflow. + await ws.send_text(f"""
done
""") + return + + ux = Ux(ws) + + try: + display_name, config = await plugin.type.UX_CONFIG.create(ux) + except Exception as ex: + await ux.error(str(ex)) + return + + if not ux._is_reset: + await ux.done() + + source = ConfiguredSource(plugin_id=msg['id'], display_name=display_name, config=json.dumps(config)) + session.add(source) + except WebSocketDisconnect: + print('client disconnected') + +# @API.get("/plugins/sources") +# async def source_plugins(id: str = Form()) -> Plugin: +# plugin = PluginSuite().get_plugin_by_id(id) +# if not issubclass(plugin, Source): +# raise HTTPException(404) +# plugin.UX_CONFIG. + + +__all__ = tuple() diff --git a/src/memoria/web/routes/settings.py b/src/memoria/web/routes/settings.py index 0da843a..d781f79 100644 --- a/src/memoria/web/routes/settings.py +++ b/src/memoria/web/routes/settings.py @@ -9,8 +9,9 @@ async def settings(): suite = PluginSuite() plugins = {} docs = {} - for plugin in (x for x in suite._plugins.values() if issubclass(x, AllowlistRule)): - ident = plugin.identifier + for plugin in suite.get_plugins_of_type(AllowlistRule): + ident = plugin.short_name + plugin = plugin.type if plugin.DISPLAY_OPTIONS is not None: plugins[ident] = {'display_name': plugin.DISPLAY_OPTIONS.display_name or ident, 'color': plugin.DISPLAY_OPTIONS.color, 'prefix': plugin.DISPLAY_OPTIONS.prefix} else: diff --git a/src/memoria/web/www/templates/settings.html.j2 b/src/memoria/web/www/templates/settings.html.j2 index e979615..591ee73 100644 --- a/src/memoria/web/www/templates/settings.html.j2 +++ b/src/memoria/web/www/templates/settings.html.j2 @@ -49,7 +49,52 @@ newScrapingRuleFilterHostname.value = hostname; newScrapingRuleFilterDialog.showModal(); } + + function closeNewSourceDialog() { + newSourceDialog.close(); + newSourceDialog.innerHTML = ""; + newSourceDialog.removeAttribute("hx-ext"); + newSourceDialog.removeAttribute("ws-connect"); + htmx.process(newSourceDialog); + } + + function onNewSourceClose(event) { + const closeEvent = event.detail.event; + console.debug(closeEvent); + if (closeEvent.code === 1000) { + // Success! + closeNewSourceDialog(); + htmx.trigger('#sources-table-body', 'reload-sources'); + } else if (closeEvent.code === 1005) { + // Closed from the client-side (e.g., clicked cancel) + } else { + document.getElementById('new-source-submit').remove(); + document.getElementById('form-contents').outerHTML = + `
${closeEvent.reason}
`; + } + } + + function onNewSourceError(event) { + console.error(event.detail.error); + } + + function showNewSourceDialog() { + newSourceDialog.innerHTML = ""; + + /** @type {HTMLTemplateElement} */ + const template = document.getElementById('new-source-template'); + newSourceDialog.appendChild(document.adoptNode(template.content.cloneNode(true))); + + newSourceDialog.setAttribute("hx-ext","ws"); + newSourceDialog.setAttribute("ws-connect","{{url_for('source_create')}}"); + {# newSourceDialog.setAttribute("hx-on:htmx:ws-error","onError(event);"); + newSourceDialog.setAttribute("hx-on:htmx:ws-close","onClose(event);"); #} + + htmx.process(newSourceDialog); + newSourceDialog.showModal(); + } + {%- endblock%} {% block content %} @@ -59,11 +104,11 @@

History Sources

- {# -
#} +
- {# #} + {#
    +
#} +
+
Type
Schedule
Enabled
+
+
+

@@ -128,13 +177,28 @@
+ + + + +

New Scraping Rule

-
- - - - + + + + +
@@ -259,9 +323,12 @@
Placeholder...
-