Skip to content

Commit

Permalink
0.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
loRes228 committed Mar 18, 2024
1 parent 4aed182 commit c7b3339
Show file tree
Hide file tree
Showing 28 changed files with 596 additions and 159 deletions.
2 changes: 1 addition & 1 deletion aiogram_broadcaster/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.2"
__version__ = "0.2.3"
229 changes: 151 additions & 78 deletions aiogram_broadcaster/broadcaster.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,47 @@
from datetime import timedelta
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
Union,
)
from uuid import uuid4

from aiogram import Bot, Dispatcher
from pydantic_core import PydanticSerializationError

from .contents import BaseContent
from .event import EventRouter
from .event import EventRouter, RootEventRouter
from .l10n import BaseLanguageGetter, L10nContentAdapter
from .logger import logger
from .mailer import Mailer, MailerStatus
from .mailer.chat_manager import ChatManager, ChatState
from .mailer.multiple import MultipleMailers
from .mailer.settings import MailerSettings
from .placeholder import PlaceholderWizard
from .storage.base import BaseBCRStorage, DataComposer
from .utils.id import generate_id


class Broadcaster:
_bots: Dict[int, Bot]
storage: Optional[BaseBCRStorage]
language_getter: Optional[BaseLanguageGetter]
context_key: str
interval: float
dynamic_interval: bool
run_on_startup: bool
disable_events: bool
handle_retry_after: bool
destroy_on_complete: bool
preserve_mailers: bool
kwargs: Dict[str, Any]
placeholder_wizard: PlaceholderWizard
event: EventRouter
_mailers: Dict[int, Mailer]

Expand All @@ -29,36 +50,45 @@ def __init__(
*bots: Bot,
storage: Optional[BaseBCRStorage] = None,
language_getter: Optional[BaseLanguageGetter] = None,
placeholders: Optional[Mapping[str, Any]] = None,
context_key: str = "broadcaster",
interval: float = 1,
dynamic_interval: bool = False,
run_on_startup: bool = False,
delete_on_complete: bool = False,
disable_events: bool = False,
handle_retry_after: bool = False,
destroy_on_complete: bool = False,
preserve_mailers: bool = True,
**kwargs: Any,
) -> None:
self._bots = {bot.id: bot for bot in bots}
self.storage = storage
self.language_getter = language_getter
self.context_key = context_key
self.interval = interval
self.dynamic_interval = dynamic_interval
self.run_on_startup = run_on_startup
self.delete_on_complete = delete_on_complete
self.disable_events = disable_events
self.handle_retry_after = handle_retry_after
self.destroy_on_complete = destroy_on_complete
self.preserve_mailers = preserve_mailers
self.context_key = context_key
self.kwargs = kwargs
self.kwargs["broadcaster"] = self

self.event = EventRouter()
self.placeholder_wizard = PlaceholderWizard(
placeholders={} if placeholders is None else placeholders,
)
self.event = RootEventRouter(name="root")
self._mailers = {}

if self.delete_on_complete:
self._apply_delete_on_complete()

def __repr__(self) -> str:
return f"{type(self).__name__}(total_mailers={len(self)})"
return f"Broadcaster(total_mailers={len(self)})"

def __str__(self) -> str:
return f"{type(self).__name__}[{', '.join(map(repr, self))}]"
return f"Broadcaster[{', '.join(map(repr, self))}]"

def __contains__(self, item: Mailer) -> bool:
return item in self._mailers.values()
def __contains__(self, item: int) -> bool:
return item in self._mailers

def __getitem__(self, item: int) -> Mailer:
if mailer := self._mailers.get(item):
Expand All @@ -76,62 +106,120 @@ def bots(self) -> Tuple[Bot, ...]:
return tuple(self._bots.values())

@property
def mailers(self) -> List[Mailer]:
def mailers(self) -> Dict[int, Mailer]:
return self._mailers

def get_mailers(self) -> List[Mailer]:
return list(self._mailers.values())

def get_mailer(self, mailer_id: int) -> Optional[Mailer]:
return self._mailers.get(mailer_id)

def include_event(self, event: EventRouter) -> None:
self.event.include_event(event=event)
def include_event(self, event: EventRouter) -> EventRouter:
return self.event.include_event(event=event)

def include_events(self, *events: EventRouter) -> None:
if not events:
raise ValueError("At least one event must be provided.")
for event in events:
self.include_event(event=event)

async def delete_mailer(self, mailer_id: int) -> None:
mailer = self[mailer_id]
if mailer.status is MailerStatus.STARTED:
await mailer.stop()
if mailer.settings.preserved and self.storage:
await self.storage.delete(mailer_id=mailer_id)
del self._mailers[mailer_id]
mailer._deleted = True # noqa: SLF001

async def create_mailer(
self.event.include_events(*events)

def as_multiple(self) -> MultipleMailers:
return MultipleMailers(mailers=self._mailers.values())

async def create_mailers(
self,
*bots: Bot,
content: BaseContent,
chats: Iterable[int],
interval: Optional[float] = None,
dynamic_interval: Optional[bool] = None,
run_on_startup: Optional[bool] = None,
disable_events: Optional[bool] = None,
handle_retry_after: Optional[bool] = None,
destroy_on_complete: Optional[bool] = None,
exclude_placeholders: Optional[Union[Literal[True], Set[str]]] = None,
preserve: Optional[bool] = None,
**kwargs: Any,
) -> MultipleMailers:
if not bots:
raise ValueError("At least one bot must be provided.")
mailers = [
await self.create_mailer(
content=content,
chats=chats,
bot=bot,
interval=interval,
dynamic_interval=dynamic_interval,
run_on_startup=run_on_startup,
disable_events=disable_events,
handle_retry_after=handle_retry_after,
destroy_on_complete=destroy_on_complete,
exclude_placeholders=exclude_placeholders,
preserve=preserve,
**kwargs,
)
for bot in bots
]
return MultipleMailers(mailers=mailers)

async def create_mailer( # noqa: C901, C901, PLR0912
self,
content: BaseContent,
chats: Iterable[int],
*,
bot: Optional[Bot] = None,
interval: Union[float, timedelta] = 1,
dynamic_interval: bool = False,
disable_events: bool = False,
preserve: bool = True,
interval: Optional[float] = None,
dynamic_interval: Optional[bool] = None,
run_on_startup: Optional[bool] = None,
disable_events: Optional[bool] = None,
handle_retry_after: Optional[bool] = None,
destroy_on_complete: Optional[bool] = None,
exclude_placeholders: Optional[Union[Literal[True], Set[str]]] = None,
preserve: Optional[bool] = None,
**kwargs: Any,
) -> Mailer:
if not chats:
raise ValueError("At least one chat id must be provided.")

if not bot and not self._bots:
raise ValueError("At least one bot must be provided.")
raise ValueError("At least one bot must be specified.")

if isinstance(content, L10nContentAdapter) and not self.language_getter:
raise RuntimeError("To use 'L10nContentAdapter' language_getter must be specified.")
raise RuntimeError("To use 'L10nContentAdapter' language_getter must be provided.")

if not bot:
bot = self.bots[-1]
else:
self._bots[bot.id] = bot

if isinstance(interval, timedelta):
interval = interval.total_seconds()
if interval is None:
interval = self.interval
if dynamic_interval is None:
dynamic_interval = self.dynamic_interval
if run_on_startup is None:
run_on_startup = self.run_on_startup
if disable_events is None:
disable_events = self.disable_events
if handle_retry_after is None:
handle_retry_after = self.handle_retry_after
if destroy_on_complete is None:
destroy_on_complete = self.destroy_on_complete
if preserve is None:
preserve = self.preserve_mailers

if preserve and self.storage:
try:
content.model_dump_json(exclude_defaults=True)
except PydanticSerializationError as error:
raise ValueError("Content cant be serialized to preserving.") from error

if dynamic_interval:
interval = interval / len(set(chats))

mailer_id = generate_id(container=self._mailers)
mailer_id = hash(uuid4())
settings = MailerSettings(
interval=interval,
run_on_startup=run_on_startup,
disable_events=disable_events,
preserved=preserve,
handle_retry_after=handle_retry_after,
destroy_on_complete=destroy_on_complete,
exclude_placeholders=exclude_placeholders,
)
chat_manager = ChatManager.from_iterable(
iterable=chats,
Expand All @@ -146,10 +234,13 @@ async def create_mailer(
content=content,
event=self.event,
language_getter=self.language_getter,
placeholder_wizard=self.placeholder_wizard,
storage=self.storage if preserve else None,
mailer_container=self._mailers,
bot=bot,
handle_retry_after=self.handle_retry_after,
kwargs={**self.kwargs, **kwargs},
)
logger.info("Mailer id=%d was created.", mailer_id)
if not preserve:
return mailer
self._mailers[mailer_id] = mailer
Expand All @@ -170,57 +261,39 @@ async def restore_mailers(self) -> None:
data = await self.storage.get_data(mailer_id=mailer_id)
if data.bot_id not in self._bots:
logger.error(
"Failed to restore mailer for bot id=%d, mailer id=%d.",
data.bot_id,
"Failed to restore mailer id=%d, bot with id=%d not specified.",
mailer_id,
data.bot_id,
)
continue
unique_mailer_id = await self._ensure_unique_mailer_id(mailer_id=mailer_id)
mailer = Mailer(
id=unique_mailer_id,
id=mailer_id,
settings=data.settings,
chat_manager=data.chat_manager,
content=data.content,
event=self.event,
language_getter=self.language_getter,
placeholder_wizard=self.placeholder_wizard,
storage=self.storage,
mailer_container=self._mailers,
bot=self._bots[data.bot_id],
handle_retry_after=self.handle_retry_after,
kwargs=self.kwargs.copy(),
)
self._mailers[unique_mailer_id] = mailer
self._mailers[mailer_id] = mailer
logger.info("Mailer id=%d restored from storage.", mailer_id)

async def run_mailers(self) -> None:
for mailer in self._mailers.values():
if mailer.status is not MailerStatus.COMPLETED:
mailer.start()
if not mailer.settings.run_on_startup:
continue
if mailer.status is not MailerStatus.STOPPED:
continue
mailer.start()

def setup(self, dispatcher: Dispatcher, *, include_data: bool = True) -> None:
dispatcher[self.context_key] = self
if include_data:
self.kwargs.update(dispatcher.workflow_data)
if self.storage:
dispatcher.startup.register(self.restore_mailers)
if self.run_on_startup:
dispatcher.startup.register(self.run_mailers)

async def _ensure_unique_mailer_id(self, mailer_id: int) -> int:
if mailer_id not in self._mailers:
return mailer_id
new_mailer_id = generate_id(container=self._mailers)
logger.warning(
"Duplicate mailer id=%d detected. Generating a new id=%d",
mailer_id,
new_mailer_id,
)
if self.storage:
await self.storage.migrate_keys(
old_mailer_id=mailer_id,
new_mailer_id=new_mailer_id,
)
return new_mailer_id

def _apply_delete_on_complete(self) -> None:
async def delete_on_complete(mailer: Mailer, broadcaster: "Broadcaster") -> None:
await broadcaster.delete_mailer(mailer_id=mailer.id)

self.event.completed.register(callback=delete_on_complete)
dispatcher.startup.register(self.run_mailers)
2 changes: 2 additions & 0 deletions aiogram_broadcaster/contents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .from_chat_forward import FromChatForwardContent
from .game import GameContent
from .location import LocationContent
from .media_group import MediaGroupContent
from .message_copy import MessageCopyContent
from .message_forward import MessageForwardContent
from .message_send import MessageSendContent
Expand All @@ -34,6 +35,7 @@
"FromChatForwardContent",
"GameContent",
"LocationContent",
"MediaGroupContent",
"MessageCopyContent",
"MessageForwardContent",
"MessageSendContent",
Expand Down
Loading

0 comments on commit c7b3339

Please sign in to comment.