Skip to content

Commit

Permalink
Merge pull request #333 from sysblok/feat/asyncio-migration-2
Browse files Browse the repository at this point in the history
feat: cheat asyncio migration
  • Loading branch information
alexeyqu authored Oct 20, 2024
2 parents 157c3c7 + c1dfb40 commit 4b18917
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 48 deletions.
4 changes: 4 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from src.utils.log_handler import ErrorBroadcastHandler
from src.utils.uptrace_logger import add_uptrace_logging

import nest_asyncio
nest_asyncio.apply()

locale.setlocale(locale.LC_TIME, "ru_RU.UTF-8")
logging.basicConfig(format=consts.LOG_FORMAT, level=logging.INFO)

Expand Down Expand Up @@ -94,4 +97,5 @@ def report_critical_error(e: BaseException):
try:
get_bot().run()
except BaseException as e:
print(e.with_traceback())
report_critical_error(e)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pyparsing==2.4.7
pytest==5.4.1
pytest-asyncio==0.12.0
python-dateutil==2.8.1
python-telegram-bot==12.6.1
python-telegram-bot==21.4.0
pytoml==0.1.21
pytz==2020.1
PyYAML==6.0.1
Expand Down
55 changes: 32 additions & 23 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from typing import Callable

from telegram.ext import (
ApplicationBuilder,
CallbackQueryHandler,
CommandHandler,
Filters,
filters,
MessageHandler,
PicklePersistence,
Updater,
)
from telegram.ext.dispatcher import run_async

from .app_context import AppContext
from .config_manager import ConfigManager
Expand All @@ -37,6 +37,10 @@ def usage(self, message, *args, **kws):
logger = logging.getLogger(__name__)


async def async_wrap(f: Callable, *args, **kwargs):
return f(*args, **kwargs)


class SysBlokBot:
def __init__(
self,
Expand All @@ -46,15 +50,16 @@ def __init__(
):
self.config_manager = config_manager
tg_config = config_manager.get_telegram_config()
self.updater = Updater(
tg_config["token"],
use_context=True,
user_sig_handler=signal_handler,
persistence=PicklePersistence(filename="persistent_storage.pickle"),
self.application = (
ApplicationBuilder()
.persistence(PicklePersistence(filepath="persistent_storage.pickle"))
.token(tg_config["token"])
# .post_shutdown(signal_handler)
.concurrent_updates(True)
.build()
)
self.dp = self.updater.dispatcher
self.telegram_sender = sender.TelegramSender(
bot=self.dp.bot, tg_config=tg_config
bot=self.application.bot, tg_config=tg_config
)
try:
self.app_context = AppContext(config_manager, skip_db_update)
Expand Down Expand Up @@ -415,42 +420,46 @@ def init_handlers(self):
)

# on non-command user message
self.dp.add_handler(MessageHandler(Filters.text, handlers.handle_user_message))
self.dp.add_handler(CallbackQueryHandler(handlers.handle_callback_query))
self.dp.add_handler(

def asyncify(func):
async def wrapper(*args, **kwargs):
results = func(*args, **kwargs)
return results

return wrapper

self.application.add_handler(MessageHandler(filters.TEXT, asyncify(handlers.handle_user_message)))
self.application.add_handler(CallbackQueryHandler(asyncify(handlers.handle_callback_query)))
self.application.add_handler(
MessageHandler(
Filters.status_update.new_chat_members, handlers.handle_new_members
filters.StatusUpdate.NEW_CHAT_MEMBERS, asyncify(handlers.handle_new_members)
)
)

# log all errors
self.dp.add_error_handler(handlers.error)
self.application.add_error_handler(async_wrap(handlers.error))

def run(self):
# Start the Bot
logger.info("Starting polling...")
self.updater.start_polling()

# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. start_polling() is non-blocking and will
# stop the bot gracefully.
self.updater.idle()
# TODO add non-blocking runtime (graceful termination for SIGTERM etc)
self.application.run_polling()

# Methods, adding command handlers and setting them to /help cmd for proper audience
def add_handler(self, handler_cmd: str, handler_func: Callable):
"""Adds handler silently. Noone will see it in /help output"""

def add_usage_logging(func):
def wrapper(*args, **kwargs):
async def wrapper(*args, **kwargs):
logger.usage(f"Handler {handler_cmd} was called...")
results = func(*args, **kwargs)
logger.usage(f"Handler {handler_cmd} finished")
return results

return wrapper

self.dp.add_handler(
CommandHandler(handler_cmd, run_async(add_usage_logging(handler_func)))
self.application.add_handler(
CommandHandler(handler_cmd, add_usage_logging(handler_func))
)

def add_admin_handler(
Expand Down
21 changes: 12 additions & 9 deletions src/jobs/tg_analytics_report_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ class TgAnalyticsReportJob(BaseJob):
def _execute(
app_context: AppContext, send: Callable[[str], None], called_from_handler=False
):
with app_context.tg_client.api_client:
stats = app_context.tg_client.api_client.loop.run_until_complete(
app_context.tg_client.api_client.get_stats(
app_context.tg_client.channel
)
app_context.tg_client.api_client.loop.run_until_complete(
app_context.tg_client.api_client.connect()
)
stats = app_context.tg_client.api_client.loop.run_until_complete(
app_context.tg_client.api_client.get_stats(
app_context.tg_client.channel
)
entity = app_context.tg_client.api_client.loop.run_until_complete(
app_context.tg_client.api_client.get_entity(
app_context.tg_client.channel
)
)
entity = app_context.tg_client.api_client.loop.run_until_complete(
app_context.tg_client.api_client.get_entity(
app_context.tg_client.channel
)
)
app_context.tg_client.api_client.disconnect()
new_posts_count = len(stats.recent_message_interactions)
followers_stats = TgAnalyticsReportJob._get_followers_stats(stats)
message = load(
Expand Down
2 changes: 1 addition & 1 deletion src/tg/handlers/user_message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def handle_user_message(
trello_lists = trello_client.get_lists(board_id)
trello_lists = trello_lists[::-1]
except Exception as e:
logger.warning(e)
logger.warning(e, exc_info=e)
reply(
load(
"get_tasks_report_handler__enter_the_number",
Expand Down
33 changes: 21 additions & 12 deletions src/tg/sender.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Sends messages"""

import asyncio
import logging
import re
import time
Expand Down Expand Up @@ -69,17 +70,25 @@ def send_to_chat_id(self, message_text: str, chat_id: int, **kwargs) -> bool:
message_text = re.sub(r"\S*\.png", "", message_text)
if message_text != "":
try:
pretty_send(
[message_text.strip()],
lambda msg: self.bot.send_message(
text=msg,
chat_id=chat_id,
disable_notification=self.is_silent,
disable_web_page_preview=self.disable_web_page_preview,
parse_mode=telegram.ParseMode.HTML,
**kwargs,
),
)
messages = paragraphs_to_messages([message_text.strip()])
for i, message in enumerate(messages):
if i > 0:
time.sleep(MESSAGE_DELAY_SEC)
if message.startswith("<code>") and "</code>" not in message:
message = message + "</code>"
elif message.endswith("</code>") and "<code>" not in message:
message = "<code>" + message
loop = asyncio.get_event_loop()
loop.run_until_complete(
self.bot.send_message(
text=message,
chat_id=chat_id,
disable_notification=self.is_silent,
disable_web_page_preview=self.disable_web_page_preview,
parse_mode=telegram.constants.ParseMode.HTML,
**kwargs,
)
)
return True
except telegram.TelegramError as e:
logger.error(f"Could not send a message to {chat_id}: {e}")
Expand Down Expand Up @@ -178,7 +187,7 @@ def pretty_send(paragraphs: List[str], send: Callable[[str], None]) -> str:

def paragraphs_to_messages(
paragraphs: List[str],
char_limit=telegram.constants.MAX_MESSAGE_LENGTH,
char_limit=telegram.constants.MessageLimit.MAX_TEXT_LENGTH,
delimiter="\n\n",
) -> List[str]:
"""
Expand Down
5 changes: 3 additions & 2 deletions src/tg/tg_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def _update_from_config(self):
)
# we need this to properly reauth in case the tokens need to be updated
# we need "with" to open and close the event loop
with self.api_client as client:
client(functions.auth.ResetAuthorizationsRequest())
# removed as part of v20.0 migration
# with self.api_client as client:
# client(functions.auth.ResetAuthorizationsRequest())
self.sysblok_chats = self._tg_config["sysblok_chats"]
self.channel = self._tg_config["channel"]

Expand Down

0 comments on commit 4b18917

Please sign in to comment.