-
Notifications
You must be signed in to change notification settings - Fork 18
/
async.py
56 lines (41 loc) · 1.39 KB
/
async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from __future__ import annotations
import asyncio
from typing import Any
from deepl import DeepLCLI
def print_translated_text(future: asyncio.Future[str]) -> None:
"""receive the result of async func and print."""
try:
translated_text = future.result()
print(translated_text)
except asyncio.exceptions.CancelledError:
# catch if task has been canceled via task.cancel()
pass
except Exception as e: # noqa: BLE001
print(f"Failed to translate: {e}")
async def translate(text: str) -> str:
"""translate asynchronously."""
t = DeepLCLI("en", "ja")
return await t.translate_async(text)
async def do_something() -> None:
"""do something asynchronously."""
await asyncio.sleep(10)
print("Another task done!")
if __name__ == "__main__":
# create event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# create task to translate text and set callback
text = "hello"
translation_task = loop.create_task(translate(text))
translation_task.add_done_callback(print_translated_text)
# create another task
another_task = loop.create_task(do_something())
# assign tasks to event loop
tasks: list[asyncio.Task[Any]] = [
translation_task,
another_task,
]
wait_tasks = asyncio.wait(tasks)
# run event loop
loop.run_until_complete(wait_tasks)
loop.close()