forked from fecgov/openFEC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
manage.py
executable file
·169 lines (144 loc) · 5.43 KB
/
manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python
import glob
import logging
import subprocess
import multiprocessing
from flask.ext.script import Server
from flask.ext.script import Manager
from sqlalchemy import text as sqla_text
from webservices.env import env
from webservices.rest import app, db
from webservices.config import SQL_CONFIG
from webservices.common.util import get_full_path
manager = Manager(app)
logger = logging.getLogger('manager')
logging.basicConfig(level=logging.INFO)
# The Flask app server should only be used for local testing, so we default to
# using debug mode and auto-reload. To disable debug mode locally, pass the
# --no-debug flag to `runserver`.
manager.add_command('runserver', Server(use_debugger=True, use_reloader=True))
def execute_sql_file(path):
# This helper is typically used within a multiprocessing pool; create a new database
# engine for each job.
db.engine.dispose()
logger.info(('Running {}'.format(path)))
with open(path) as fp:
cmd = '\n'.join([
line for line in fp.readlines()
if not line.startswith('--')
])
db.engine.execute(sqla_text(cmd), **SQL_CONFIG)
def execute_sql_folder(path, processes):
sql_dir = get_full_path(path)
if not sql_dir.endswith('/'):
sql_dir += '/'
paths = sorted(glob.glob(sql_dir + '*.sql'))
if processes > 1:
pool = multiprocessing.Pool(processes=processes)
pool.map(execute_sql_file, sorted(paths))
else:
for path in paths:
execute_sql_file(path)
@manager.command
def load_pacronyms():
import pandas as pd
import sqlalchemy as sa
try:
table = sa.Table('ofec_pacronyms', db.metadata, autoload_with=db.engine)
db.engine.execute(table.delete())
except sa.exc.NoSuchTableError:
pass
load_table(pd.read_excel('data/pacronyms.xlsx'), 'ofec_pacronyms', indexes=('ID NUMBER', ))
def load_table(frame, tablename, if_exists='replace', indexes=()):
import sqlalchemy as sa
frame.to_sql(tablename, db.engine, if_exists=if_exists)
table = sa.Table(tablename, db.metadata, autoload_with=db.engine)
for index in indexes:
sa.Index('{}_{}_idx'.format(tablename, index), table.c[index]).create(db.engine)
@manager.command
def build_districts():
import pandas as pd
load_table(pd.read_csv('data/fips_states.csv'), 'ofec_fips_states')
load_table(pd.read_csv('data/natl_zccd_delim.csv'), 'ofec_zips_districts', indexes=('ZCTA', ))
@manager.command
def load_election_dates():
import pandas as pd
frame = pd.read_excel('data/election_dates.xlsx')
frame.columns = [column.lower() for column in frame.columns]
load_table(
frame, 'ofec_election_dates',
indexes=('office', 'state', 'district', 'election_yr', 'senate_class'),
)
@manager.command
def dump_districts(dest=None):
source = db.engine.url
dest = dest or './data/districts.dump'
cmd = (
'pg_dump {source} --format c --no-acl --no-owner -f {dest} '
'-t ofec_fips_states -t ofec_zips_districts'
).format(**locals())
subprocess.call(cmd, shell=True)
@manager.command
def load_districts(source=None):
source = source or './data/districts.dump'
dest = db.engine.url
cmd = (
'pg_restore --dbname {dest} --no-acl --no-owner --clean {source}'
).format(**locals())
subprocess.call(cmd, shell=True)
@manager.command
def build_district_counts(outname='districts.json'):
import utils
utils.write_district_counts(outname)
@manager.command
def update_schemas(processes=1):
logger.info("Starting DB refresh...")
processes = int(processes)
execute_sql_folder('data/sql_updates/', processes=processes)
execute_sql_file('data/rename_temporary_views.sql')
logger.info("Finished DB refresh.")
@manager.command
def update_functions(processes=1):
execute_sql_folder('data/functions/', processes=processes)
@manager.command
def update_itemized(schedule):
logger.info('Updating Schedule {0} tables...'.format(schedule))
execute_sql_file('data/sql_setup/prepare_schedule_{0}.sql'.format(schedule))
logger.info('Finished Schedule {0} update.'.format(schedule))
@manager.command
def rebuild_aggregates(processes=1):
logger.info('Rebuilding incremental aggregates...')
execute_sql_folder('data/sql_incremental_aggregates/', processes=processes)
logger.info('Finished rebuilding incremental aggregates.')
@manager.command
def update_aggregates():
logger.info('Updating incremental aggregates...')
db.engine.execute('select update_aggregates()')
logger.info('Finished updating incremental aggregates.')
@manager.command
def update_all(processes=1):
"""Update all derived data. Warning: Extremely slow on production data.
"""
processes = int(processes)
update_functions(processes=processes)
load_districts()
load_pacronyms()
load_election_dates()
update_itemized('a')
update_itemized('b')
update_itemized('e')
rebuild_aggregates(processes=processes)
update_schemas(processes=processes)
@manager.command
def refresh_materialized():
"""Refresh materialized views."""
logger.info('Refreshing materialized views...')
execute_sql_file('data/refresh_materialized_views.sql')
logger.info('Finished refreshing materialized views.')
@manager.command
def cf_startup():
"""Migrate schemas on `cf push`."""
if env.index == '0':
subprocess.Popen(['python', 'manage.py', 'update_schemas'])
if __name__ == '__main__':
manager.run()