This repository has been archived by the owner on Feb 8, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
web_ui.py
162 lines (129 loc) · 4.84 KB
/
web_ui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import io
import os
import re
import uvicorn
import scrapers
import traceback
import vaccination
from datetime import datetime, date
from starlette.routing import Route
from contextlib import redirect_stdout
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.templating import Jinja2Templates
templates = Jinja2Templates(directory='templates')
states_all = scrapers.states_all
async def write_file(file_obj):
contents = await file_obj.read()
save_file_name = datetime.utcnow().isoformat() + '-' + file_obj.filename
save_file_name = os.path.join(scrapers.INPUTS_DIR, save_file_name)
with open(save_file_name, 'wb') as f:
f.write(contents)
f.close()
return save_file_name
async def homepage(request):
available_state_codes = list(states_all.keys())
if request.method == 'POST':
try:
form = await request.form()
url = None
if form.get('file'):
url = await write_file(form['file'])
skip_output = False
output_txt = form.get('output_txt')
if output_txt:
with open(os.path.join(scrapers.OUTPUTS_DIR, 'output.txt'),'w') as f:
f.write(output_txt.strip(' ').strip('\n'))
f.close()
skip_output = True
args = {
'state_code': form.get('state_code'),
'page': form.get('page'),
'skip_output': skip_output,
'type': form.get('type'),
'url': url if form.get('type') in ['pdf','image'] else None
}
f = io.StringIO()
with redirect_stdout(f):
result = scrapers.run(args)
delta = f.getvalue()
output_correction = None
if 'needs_correction' in result and result['needs_correction'] == True:
try:
with open(os.path.join(scrapers.OUTPUTS_DIR, 'output.txt')) as f:
output_correction = f.read()
f.close()
except:
pass
return templates.TemplateResponse('index.html', {
'request': request,
'output': delta,
'available_state_codes': available_state_codes,
'output_correction': output_correction
})
except Exception as e:
output = traceback.format_exc()
return templates.TemplateResponse('index.html', {
'request': request,
'output': output,
'available_state_codes': available_state_codes,
'output_correction': None
})
return templates.TemplateResponse('index.html', {
'request': request,
'output': '',
'available_state_codes': available_state_codes,
'output_correction': None
})
async def vaccination_route(request):
output = ''
f = io.StringIO()
is_mohfw = False
with redirect_stdout(f):
try:
if request.method=='POST':
form = await request.form()
fn_map = {
'cowin_state': vaccination.get_cowin_state,
'cowin_district': vaccination.get_cowin_district,
'mohfw_state': vaccination.get_mohfw_state
}
vacc_src = form.get('source').lower()
is_mohfw = vacc_src=='mohfw_state'
if os.path.exists('_outputs/vaccination_state_level.txt'):
os.remove('_outputs/vaccination_state_level.txt')
if os.path.exists('_outputs/vaccination_district_level.csv'):
os.remove('_outputs/vaccination_district_level.csv')
state_codes = None
if form.get('state_codes'):
state_codes = list(map(lambda sc: sc.lower().strip(), form.get('state_codes').split(',')))
from_date = datetime(*list(map(int, form.get('from_date').split('-')))) if form.get('from_date') else datetime.today()
to_date = datetime(*list(map(int, form.get('to_date').split('-')))) if form.get('to_date') else None
if to_date is None or to_date <= from_date:
to_date = from_date
fn_map[vacc_src](from_date, to_date, state_codes)
if vacc_src=='cowin_state':
with open('_outputs/vaccination_state_level.txt') as ff:
output += ff.read() + '\n'
ff.close()
if vacc_src=='cowin_district':
with open('_outputs/vaccination_district_level.csv') as ff:
output += ff.read() + '\n'
ff.close()
except Exception as e:
output += traceback.format_exc() + '\n'
if is_mohfw:
output = f.getvalue() + '\n' + output
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return templates.TemplateResponse('vaccination.html', {
'request': request,
'output': ansi_escape.sub('', output)})
async def state_details(request):
return JSONResponse(states_all.get(request.path_params['state_code'],{}))
if __name__ == '__main__':
app = Starlette(debug=True, routes=[
Route('/', homepage, methods=['GET', 'POST']),
Route('/vaccination', vaccination_route, methods=['GET', 'POST']),
Route('/state_details/{state_code}', state_details)
])
uvicorn.run(app, host='0.0.0.0', port=8005)