-
Notifications
You must be signed in to change notification settings - Fork 6
/
worker.py
86 lines (74 loc) · 1.83 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
Thread pool system
"""
from queue import Queue
from threading import Thread
import traceback
class Task(object):
"""
A queuable function/method call
"""
target = None
args = ()
kwargs = {}
class Pool(object):
def __init__(self, count, logger):
self.queue = Queue()
self.logger = logger
self.workers = []
while len(self.workers) != count:
worker = Worker()
self.workers.append(worker)
worker.setQueue(self.queue)
worker.setPool(self)
worker.start()
def begin(self, target, *args, **kwargs):
"""
Start an operation on the background threads
:param target:
:param args:
:param kwargs:
:return:
"""
task = Task()
task.target = target
task.args = args
task.kwargs = kwargs
self.queue.put_nowait(task)
class Worker(Thread):
"""
A worker thread
"""
pool = None
finish = False
queue = None
def setPool(self, pool):
"""
Sets the pool for this worker
:param pool:
:return:
"""
self.pool = pool
def setQueue(self, queue):
"""
Set the queue for this worker
:param Queue queue:
:return:
"""
self.queue = queue
def start(self):
self.daemon = True
super(Worker, self).start()
def run(self):
"""
Start processing tasks
:return:
"""
while not self.finish:
task = self.queue.get()
if task:
try:
task.target(*task.args, **task.kwargs)
except Exception as err:
print("error: logging exception " + str(err))
self.pool.logger.write(traceback.format_exc())