-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrun_worker.py
122 lines (93 loc) · 3.51 KB
/
run_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
import logging
import time
from json import JSONDecodeError
import boto3
from bottle import default_app
from app.app import init_app
from app.worker import actions
app = default_app()
def parse_message(raw_message):
receipt_handle = raw_message['ReceiptHandle']
body_json = json.loads(raw_message['Body'])
if body_json.get('Type') == 'Notification':
msg_id = body_json['MessageId']
message = body_json['Message']
else:
msg_id = raw_message['MessageId']
message = body_json
return receipt_handle, msg_id, message
def loop():
client = boto3.client('sqs')
while True:
query_url = app.config['aws.sqs.queue_url']
wait_time_seconds = int(app.config['aws.sqs.wait_time_seconds'])
resp = client.receive_message(
QueueUrl=query_url,
WaitTimeSeconds=wait_time_seconds)
for raw_message in resp.get('Messages', []):
receipt_handle, msg_id, message = parse_message(raw_message)
if isinstance(message, str):
try:
message = json.loads(message)
except JSONDecodeError:
logging.error('invalid json: %s', message)
delete_message(client, query_url, receipt_handle)
continue
message_action = message.get('action', '').lower()
message_payload = message.get('payload', {})
action_funcs = actions.get(message_action, [])
for action_func in action_funcs:
logging.info('aws sqs message receive: %s %s', msg_id, message)
wk = None
start = time.time()
if message_action:
wk = worker_create(msg_id, message_action, message_payload)
try:
action_func(message_payload, msg_id)
logging.info('message process done: %s func: %s',
msg_id, action_func.__name__)
if wk:
worker_done(wk, start)
except: # noqa
logging.exception('message process failed: %s func: %s',
msg_id, action_func.__name__)
if wk:
worker_failed(wk, start)
break
finally:
# if not db.is_closed():
# db.close()
pass
else:
delete_message(client, query_url, receipt_handle)
def worker_create(msg_id, message_action, message_payload):
if not (msg_id and message_action):
return
# WorkerLog.create(message_id=msg_id, action=message_action,
# payload=json.dumps(message_payload))
# wk = (WorkerLog
# .filter(WorkerLog.message_id == msg_id)
# .order_by(WorkerLog.id.desc())
# .first())
# return wk
pass
def worker_failed(wk, start):
# wk.time_spent = diff_end_time(start)
# wk.result = WorkerResult.FAILED.value
# wk.save()
pass
def worker_done(wk, start):
# wk.time_spent = diff_end_time(start)
# wk.result = WorkerResult.DONE.value
# wk.save()
pass
def delete_message(client, query_url, receipt_handle):
delete_resp = client.delete_message(
QueueUrl=query_url,
ReceiptHandle=receipt_handle,
)
logging.info('delete sqs msg %s resp: %s', receipt_handle, delete_resp)
if __name__ == '__main__':
init_app()
loop()