-
Notifications
You must be signed in to change notification settings - Fork 281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
有没兴趣开发个async版本的么 #39
Comments
You can do this: import asyncio
from asyncio import AbstractEventLoop
from typing import Optional
from dingtalkchatbot.chatbot import DingtalkChatbot
from crawlers_customsdata.config import settings
class DingBot:
"""Ding bot"""
def __init__(
self,
token: str,
loop: Optional[AbstractEventLoop] = None
):
"""
:param token:
:param loop:
"""
self._loop = loop or asyncio.get_event_loop()
self.bot = DingtalkChatbot(f'https://oapi.dingtalk.com/robot/send?access_token={token}')
@property
def loop(self) -> AbstractEventLoop:
return self._loop
async def send(self, title: str, text: str) -> None:
"""
Send something to bot
:param title: str
:param text: str
:return: None
"""
await self.loop.run_in_executor(None, self.bot.send_markdown, title, text)
async def main() -> None:
"""unit test"""
bot = DingBot(settings.DINGTALK_TOKEN)
await bot.send('Hello world', '### Hello world\n'
'> I sand massage to ding talk. Wonderful!!!'
)
if __name__ == '__main__':
asyncio.run(main()) You can wrap the sync method around |
非常感谢,我去学习学习 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
非常感谢分享
但在使用过程当中发现每次发完消息之后代码都卡住好久了,然后才发现原来是代码没用异步,
因此建议下大佬可以试试async
The text was updated successfully, but these errors were encountered: