-
Notifications
You must be signed in to change notification settings - Fork 114
/
Copy pathprocess_facility.py
266 lines (215 loc) · 8.57 KB
/
process_facility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""A script to clean US EPA's Facility data from GHG Emitter Facilities table"""
import csv
import datacommons
import json
import os.path
import pathlib
import sys
from absl import app
from absl import flags
from shapely import geometry
# Allows the following module imports to work when running as a script
_SCRIPT_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(_SCRIPT_PATH, '../..')) # for Crosswalk
from us_epa.util.crosswalk import Crosswalk
from us_epa.util import facilities_helper as fh
FLAGS = flags.FLAGS
flags.DEFINE_string(
'epa_input_tables_path', 'tmp_data',
'Path to directory contain crosswalk.csv, V_GHG_EMITTER_FACILITIES.csv, etc.'
)
flags.DEFINE_string('epa_output_path', 'output', 'Output directory')
# Input tables we process
# Schema: https://enviro.epa.gov/enviro/ef_metadata_html.ef_metadata_table?p_table_name=<table>
_TABLES = ('V_GHG_EMITTER_FACILITIES', 'V_GHG_SUPPLIER_FACILITIES',
'V_GHG_INJECTION_FACILITIES')
# Cleaned CSV Columns
# - 'containedInPlace' is a repeated list of refs to County and Census ZCTA
# - eiaPlantCode can also be repeated
_DCID = 'dcid'
_EPA_GHG_ID = 'epaGhgrpFacilityId'
_EPA_FRS_ID = 'epaFrsId'
_EIA_PP_RELATION = 'partOf'
_NAME = 'name'
_ADDRESS = 'address'
_CIP = 'containedInPlace'
_NAICS = 'naics'
_LAT = 'latitude'
_LNG = 'longitude'
_CLEAN_CSV_HDR = (_DCID, _EPA_GHG_ID, _EPA_FRS_ID, _EIA_PP_RELATION, _NAME,
_ADDRESS, _CIP, _NAICS, _LAT, _LNG)
_OUT_FILE_PREFIX = 'us_epa_facility'
_CROSSWALK_FILE = 'crosswalks.csv'
_GEOJSON_CACHE = {}
def _gen_tmcf():
lines = [
'Node: E:FacilityTable->E0',
'typeOf: dcs:EpaReportingFacility',
]
for p in _CLEAN_CSV_HDR:
lines.append(f'{p}: C:FacilityTable->{p}')
return '\n'.join(lines)
def _v(table, row, col):
return row.get(table + '.' + col, '')
def _cv(table, row, col):
return _v(table, row, col).strip().title()
def _str(v):
if not v:
return ''
return '"' + v + '"'
def _get_name(table, row):
name = _cv(table, row, 'FACILITY_NAME')
return name.replace(' Llc', ' LLC')
def _get_address(table, row):
parts = []
for k in ['ADDRESS1', 'ADDRESS2', 'CITY', 'STATE_NAME']:
p = _cv(table, row, k)
if p:
parts.append(p)
address = ', '.join(parts)
p = _cv(table, row, 'ZIP')
if p:
address += ' - ' + p
return address
def _get_naics(table, row):
column = 'NAICS_CODE' if table == 'V_GHG_INJECTION_FACILITIES' else 'PRIMARY_NAICS_CODE'
naics = _v(table, row, column)
if not naics:
return ''
return 'dcs:NAICS/' + naics
def _validate_latlng(lat, lng, dcid):
"""Validate whether the lat/lng is located within the given entity's geo boundary"""
gj = ''
if dcid in _GEOJSON_CACHE:
gj = _GEOJSON_CACHE[dcid]
else:
resp = datacommons.get_property_values([dcid], 'geoJsonCoordinates')
if not resp[dcid]:
print(f'Did not find GEO JSON for {dcid}')
return False
gj = resp[dcid][0]
_GEOJSON_CACHE[dcid] = gj
point = geometry.Point(float(lng), float(lat))
polygon = geometry.shape(json.loads(gj))
if not polygon.contains(point):
return False
return True
_COUNTERS = {
'given_county_wrong_latlng': [],
'given_county_correct_latlng': [],
'zipbased_county_wrong_latlng': [],
'zipbased_county_correct_latlng': [],
'missing_zip_and_county': [],
}
def _resolve_places(facility_id, zip, provided_county, lat, lng):
"""Resolve the geo relations for the given Facility
Returns resolved <zip>, <county>, <lat>, <lng>
"""
if zip == 'zip/00000':
_COUNTERS['missing_zip_and_county'].append(facility_id)
return '', '', '', ''
county_candidates = fh.get_county_candidates(zip)
if any(provided_county in l for l in county_candidates):
# Provided county is in the candidate list, use that.
if lat and lng and _validate_latlng(lat, lng, provided_county):
# Lat/lng is in the chosen county
_COUNTERS['given_county_correct_latlng'].append(facility_id)
return zip, provided_county, lat, lng
_COUNTERS['given_county_wrong_latlng'].append(facility_id)
return zip, provided_county, '', ''
if lat and lng:
# Prefer the county with lat/lng match.
for list in county_candidates:
for c in list:
if _validate_latlng(lat, lng, c):
_COUNTERS['zipbased_county_correct_latlng'].append(
facility_id)
return zip, c, lat, lng
# Lat or lng is empty or did not match any county. Pick a candidate county prefering
# containedInPlace over geoOverlaps.
for list in county_candidates:
if list:
_COUNTERS['zipbased_county_wrong_latlng'].append(facility_id)
return zip, list[0], '', ''
_COUNTERS['missing_zip_and_county'].append(facility_id)
return '', '', '', ''
def counters_string():
result = []
for k, v in _COUNTERS.items():
result.append(k + ' -> ' + str(len(v)) + ' - ' + ', '.join(v[:3]))
return '\n'.join(result)
def process(input_tables_path, output_path):
crosswalk = Crosswalk(os.path.join(input_tables_path, _CROSSWALK_FILE))
processed_ids = set()
with open(os.path.join(output_path, _OUT_FILE_PREFIX + '.csv'), 'w') as wfp:
# IMPORTANT: We want to escape double quote (\") if it is specified in the cell
# value, rather than the default of using two double quotes ("")
cw = csv.DictWriter(wfp,
_CLEAN_CSV_HDR,
doublequote=False,
escapechar='\\')
cw.writeheader()
for table in _TABLES:
table_path = os.path.join(input_tables_path, table + '.csv')
rows_written = 0
with open(table_path, 'r') as rfp:
cr = csv.DictReader(rfp)
for in_row in cr:
ghg_id = _v(table, in_row, 'FACILITY_ID')
assert ghg_id
if ghg_id in processed_ids:
continue
processed_ids.add(ghg_id)
lat = _v(table, in_row, 'LATITUDE')
lng = _v(table, in_row, 'LONGITUDE')
zip = 'zip/' + _v(table, in_row,
'ZIP')[:5] # zips have extension
county = 'geoId/' + _v(table, in_row, 'COUNTY_FIPS')
zip, county, lat, lng = _resolve_places(
ghg_id, zip, county, lat, lng)
out_row = {
_DCID:
_str(crosswalk.get_dcid(ghg_id)),
_EPA_GHG_ID:
_str(ghg_id),
_EPA_FRS_ID:
_str(crosswalk.get_frs_id(ghg_id)),
_EIA_PP_RELATION:
', '.join([
'dcid:eia/pp/' + v
for v in crosswalk.get_power_plant_ids(ghg_id)
]),
_NAME:
_str(_get_name(table, in_row)),
_ADDRESS:
_str(_get_address(table, in_row)),
_CIP:
', '.join(fh.get_cip(zip, county)),
_NAICS:
_get_naics(table, in_row),
_LAT:
_str(lat),
_LNG:
_str(lng),
}
rows_written += 1
if rows_written % 100 == 99:
print('Geo Resolution Stats: \n' + counters_string())
cw.writerow(out_row)
print('Produced ' + str(rows_written) + ' rows from ' + table)
print('Geo Resolution Stats: \n' + counters_string())
with open(os.path.join(output_path, _OUT_FILE_PREFIX + '.tmcf'), 'w') as fp:
fp.write(_gen_tmcf())
def main(_):
# Validate inputs.
assert FLAGS.epa_output_path
assert FLAGS.epa_input_tables_path
assert os.path.exists(
os.path.join(FLAGS.epa_input_tables_path, _CROSSWALK_FILE))
for t in _TABLES:
assert os.path.exists(
os.path.join(FLAGS.epa_input_tables_path, t + '.csv'))
pathlib.Path(FLAGS.epa_output_path).mkdir(exist_ok=True)
process(FLAGS.epa_input_tables_path, FLAGS.epa_output_path)
if __name__ == '__main__':
app.run(main)