-
Notifications
You must be signed in to change notification settings - Fork 106
/
__init__.py
executable file
·120 lines (104 loc) · 4.52 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import celery
from celery import signals
from celery.schedules import crontab
from webservices.env import env
from webservices.tasks import utils
import ssl
# Feature and dev are sharing the same RDS box so we only want dev to update
schedule = {}
if env.app.get("space_name", "unknown-space").lower() != "feature":
schedule = {
# Task 1: This task is launched every 5 minutes during 6am-7pmEST(13 hours).
# Task 1A: refresh_most_recent_aos(conn):
# 1) Identify the most recently modified AO(s) within 8 hours
# 2) For each modified AO, find the earliest AO referenced by the modified AO
# 3) Reload all AO(s) starting from the referenced AO to the latest AO.
# Task 1B: refresh_most_recent_cases(conn):
# When found modified case(s)(MUR/AF/ADR) within 8 hours,
# if published_flg = true, reload the case(s) on elasticsearch service.
# if published_flg = false, delete the case(s) on elasticsearch service.
"refresh_legal_docs": {
"task": "webservices.tasks.legal_docs.refresh_most_recent_legal_doc",
"schedule": crontab(minute="*/5", hour="10-23"),
},
# Task 2: This task is launched at 9pm(EST) everyday except Sunday.
# 1) Identify the daily modified AO(s) in past 24 hours(9pm-9pm EST)
# 2) For each modified AO, find the earliest AO referenced by the modified AO
# 3) Reload all AO(s) starting from the referenced AO to the latest AO
# 4) Send AO detail information to Slack.
"reload_all_aos_daily_except_sunday": {
"task": "webservices.tasks.legal_docs.daily_reload_all_aos_when_change",
"schedule": crontab(minute=0, hour=1, day_of_week="mon,tue,wed,thu,fri,sat"),
},
# Task 3: This task is launched at 9pm(EST) weekly only on Sunday.
# Reload all AOs.
"reload_all_aos_every_sunday": {
"task": "webservices.tasks.legal_docs.weekly_reload_all_aos",
"schedule": crontab(minute=0, hour=1, day_of_week="sun"),
},
# Task 4: This task is launched at 6pm(EST) everyday.
# When found modified case(s)(MUR/AF/ADR) in past 13 hours(6am-7pm EST), send case detail information to Slack.
"send_alert_legal_case": {
"task": "webservices.tasks.legal_docs.send_alert_daily_modified_legal_case",
"schedule": crontab(minute=0, hour=23),
},
# Task 5: This task is launched at 12am(EST) only on Sunday.
# Take Elasticsearch 'docs' index snapshot.
"backup_elasticsearch_every_sunday": {
"task": "webservices.tasks.legal_docs.create_es_backup",
"schedule": crontab(minute=0, hour=4, day_of_week="sun"),
},
# Task 6: This task is launched at 5am(EST) everyday.
# Refresh public materialized views.
"refresh_materialized_views": {
"task": "webservices.tasks.refresh_db.refresh_materialized_views",
"schedule": crontab(minute=0, hour=9),
},
}
def redis_url():
"""
Retrieve the URL needed to connect to a Redis instance, depending on environment.
When running in a cloud.gov environment, retrieve the uri credential for the 'aws-elasticache-redis' service.
"""
# Is the app running in a cloud.gov environment
if env.space is not None:
redis_env = env.get_service(label="aws-elasticache-redis")
redis_url = redis_env.credentials.get("uri")
return redis_url
return env.get_credential("FEC_REDIS_URL", "redis://localhost:6379/0")
app = celery.Celery("openfec")
app.conf.update(
broker_url=redis_url(),
broker_use_ssl={
"ssl_cert_reqs": ssl.CERT_NONE,
},
redis_backend_use_ssl={
"ssl_cert_reqs": ssl.CERT_NONE,
},
imports=(
"webservices.tasks.refresh_db",
"webservices.tasks.download",
"webservices.tasks.legal_docs",
),
beat_schedule=schedule,
broker_connection_timeout=30, # in seconds
broker_connection_max_retries=0, # for unlimited retries
task_acks_late=False
)
app.conf.ONCE = {
"backend": "celery_once.backends.Redis",
"settings": {
"url": redis_url() + "?ssl=true",
"default_timeout": 60 * 60
}
}
context = {}
@signals.task_prerun.connect
def push_context(task_id, task, *args, **kwargs):
context[task_id] = utils.get_app().app_context()
context[task_id].push()
@signals.task_postrun.connect
def pop_context(task_id, task, *args, **kwargs):
if task_id in context:
context[task_id].pop()
context.pop(task_id)