-
Notifications
You must be signed in to change notification settings - Fork 0
/
taskQueue.py
82 lines (72 loc) · 3.26 KB
/
taskQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
from asyncio import Queue, TimeoutError
from dataclasses import dataclass
from logging import Logger
from typing import Any, Callable, Dict, List, Optional
from uuid import uuid4
@dataclass
class Task:
func: Callable[...,Any]
args: tuple[Any]
kwargs: dict[str,Any]
retries: int = 0
max_retries: int = 3
delay: float = 1.0
dependent_task: Optional['Task'] = None
class TaskQueue:
def __init__(self,parent_logger:Logger,name:str="",nworkers:int=3,qsize:int=20):
self._nworkers = nworkers
self._queue:Queue[Any] = Queue(qsize)
self._workers:List[Any] = []
self.is_running = False
self._id=str(uuid4())
self._name = name if name else f"TQ-{self._id[:8]}"
self._logger = parent_logger.getChild(self._name)
async def add_task(self,func:Callable[...,Any],*args:Any,**kwargs:Any):
task = Task(func,args,kwargs)
await self._queue.put(task)
self._logger.info("Added task to queue: %s",func.__name__)
async def run_worker(self, worker_id: int):
while self.is_running:
try:
task = await asyncio.wait_for(self._queue.get(),timeout=1.0)
except TimeoutError:
continue
self._logger.info("Worker %s processing task: %s",worker_id,task.func.__name__)
try:
result = await task.func(*task.args,**task.kwargs)
self._logger.info("Worker %s completed task: %s",worker_id,task.func.__name__)
if task.dependent_task:
await self._queue.put(task.dependent_task)
self._logger.info("Queued dependent task: %s",task.dependent_task.func.__name__)
return result
except Exception as e:
self._logger.error("Error in task %s: %s",task.func.__name__,str(e),exc_info=True)#self._logger.error(format_exc())
if task.retries < task.max_retries:
task.retries += 1
task.delay *= 2
self._logger.info("Retrying task %s in %d seconds (attempt: %d/%d)",task.func.__name__,task.delay,task.retries,task.max_retries)
await asyncio.sleep(task.delay)
await self._queue.put(task)
else:
self._logger.error("Task %s failed after %d retries",task.func.__name__,task.max_retries)
finally:
self._queue.task_done()
async def start(self):
self.is_running = True
self.workers = [asyncio.create_task(self.run_worker(i)) for i in range(self._nworkers)]
self._logger.info("Started %d workers",self._nworkers)
async def stop(self):
self.is_running = False
await self._queue.join()
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self.workers,return_exceptions=True)
self._logger.info("All workers stopped")
def get_info(self)->Dict[str,Any]:
return {'name':self._name,
'id': self._id,
'nworkers':self._nworkers,
'max_qsize': self._queue.maxsize,
'current_qsize': self._queue.qsize,
'logger': str(self._logger),}