-
Notifications
You must be signed in to change notification settings - Fork 2
/
scrape.py
149 lines (108 loc) · 3.79 KB
/
scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
''' scrape text files from pdf files from google coronavirus mobility reports'''
# before running, need a folder of txt files which are generated by pdftotext
# for file in *.pdf; do pdftotext -layout "$file" "$file.txt"; done
import os
import glob
import json
from pandas.io.json import json_normalize
#from pandas.json_normalize import json_normalize
# where to find files, and where to save them
PDF_TXT_PATH = 'pdf'
DATA_PATH = 'data'
JSON_PATH = 'data' + os.sep + 'mobility_reports.json'
CSV_PATH = 'data' + os.sep + 'mobility_reports.csv'
# information to split the pages and lines
PAGE_SPLIT = '\f'
LINE_SPLIT = '\n'
categories = ('Retail & recreation','Grocery & pharmacy','Parks','Transit stations','Workplace','Residential')
# to find actual data
split_string = 'compared to baseline'
not_enough_data = 'Not enough data for this date'
ignore_string = 'Currently, there is not enough data'
def main():
country_map = load_countries()
# store the individual files results in this array
results = []
# find all matching files in PDF_TXT_PATH folder
for f in glob.glob(PDF_TXT_PATH + os.sep + '*.txt'):
#for f in ['pdf/2020-03-29_US_Georgia_Mobility_Report_en.pdf.txt']:
#print(f)
with open(f,'r') as fp:
pages = fp.read()
page_count = len(pages)
i = 0
for p in pages.split(PAGE_SPLIT):
i+=1
# loop over all valid pages
# skip the first two, and the last
if i <= 2 or i >= page_count - 1:
# ignore these pages
continue
else:
# each pdf page has up to 2 sections
# need to fake the second section because there is no way to get the region name without it, so far
section_top = '\n'.join(p.split('Baseline')[0:5])
section_bottom = '\n'.join('\n'.join(p.split('Baseline')[6:11]).split('\n'*3)[1:])
#print(section_bottom)
#exit(0)
sections = [section_top,section_bottom]
#p = p.replace('\n'*5+categories[3],'\n'*4+categories[3])
#sections = p.split('\n'*5)
for text in sections:
if split_string in text and ignore_string not in text:
# run the main parsing code, if the expected text is on this page
d = parse_section(f,text,country_map)
if d is not None:
results.append(d)
# write all output to text files
write_json(results)
write_csv(results)
def parse_section(f,text, cm):
# get some metadata from file name
country = f.split(os.sep)[-1].split('_')[1]
country_name = cm.get(country,'n/a')
if country_name == 'n/a':
print('WARNING: missing country mapping:',country)
area = ' '.join(f.split(os.sep)[-1].split('_')[2:]).split('Mobility')[0].strip()
if area == '':
area = 'n/a'
date = f.split(os.sep)[-1].split('_')[0]
# get region
region = text.strip().split('\n')[0]#.split('\n')[0]
#print(region)
#region = text.split(categories[0])[0].replace(categories[0],'')
# find values from text
values = [t.replace(split_string,'').replace(not_enough_data,'n/a') for t in text.split('\n') if split_string in t or not_enough_data in t]
# split into sublists
values = [t.split() for t in values]
# merge sublists
values = [i for l in values for i in l]
# check that 6 values are present
if len(values) != 6:
# gotta check why this still happens
print('WARNING:',f,region,categories,values)
print(text)
exit(0)
return None
# pack into dictionary
d = {'date' : date,
'country': country,
'country_name':country_name,
'area': area,
'region' : region,
'values': dict(zip(categories, values))
}
return d
def load_countries():
with open('country_dict.json','r') as cc:
country_map = json.load(cc)
return country_map
def write_json(results):
with open(JSON_PATH, 'w') as fp:
json.dump(results, fp, indent=4)
def write_csv(results):
df = json_normalize(results)
df.to_csv(CSV_PATH,index=False)
if __name__ == "__main__":
main()