-
Notifications
You must be signed in to change notification settings - Fork 1
/
COIOverlaps.py
229 lines (174 loc) · 7.09 KB
/
COIOverlaps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
from collections import deque
import geopandas as gp
import csv
import matplotlib.pyplot as plt
import os
import libpysal as lp
# region Import files
# 1. shapefile of the US census blocks of the state
shapefile_of_us_census_blocks = "FILLIN.shp"
blocks_shapefile = gp.read_file(shapefile_of_us_census_blocks)
# 2. shapefile or geojson of the enacted plan (such as Arizona U.S. House districts)
shapefile_of_enacted_plan = "FILLIN.geojson"
arizona_congress_shapefile = gp.read_file(shapefile_of_enacted_plan)
# 3. folder that contains all the COI maps (in CSV form) the user is interested in assessing
folder_of_all_COI_maps = "FILLIN"
all_COI_files = os.listdir(folder_of_all_COI_maps)
# 4, the CSV of the census block assignments of the enacted plan
csv_blocks_to_districts = "FILLIIN.csv"
planBlocks = csv.reader(open(csv_blocks_to_districts))
# 5, the CSV of the state's PL file
allBlocks = csv.reader(open("FILLIN.csv"))
# endregion
# region Remove autogenerated file '.DS_Store' from COI list
x = 0
while x < len(all_COI_files):
if all_COI_files[x] == '.DS_Store':
all_COI_files.pop(x)
x += 1
# endregion
# region SUM the # of times a block is part of a coi.
highest_block_usage = 0
most_used_block = 'test'
block_num_tally = {} # key = blockID, value = number of times the block appears in a COI
for x in all_COI_files:
coi = csv.reader(open(folder_of_all_COI_maps + '/' + str(x)))
for y in coi:
blockID = y[0]
# CSV exported from DistrictBuilder includes all census blocks, and marks COI blocks with a '1'
# in column 2, so only look at blocks with a column 2 (y[1]) value != 0.
if y[1] != '0' and len(blockID) > 10:
if block_num_tally.get(blockID) is None:
block_num_tally[blockID] = 1
else:
block_num_tally[blockID] += 1
# check if the current block has the highest usage and set highest_block_usage and most_common_block
if block_num_tally[blockID] > highest_block_usage:
highest_block_usage = block_num_tally[blockID]
most_used_block = blockID
# endregion
# create column "block_usage", and then assign each row's block_usage value as the value in the dictionary for that row's geoID.
# the shapefile['Geo... does this automatically. For each row it takes the GEOID of the row and finds the dict value for that geoID and
# sets it as the value for block_usage
blocks_shapefile['block_usage'] = blocks_shapefile['GEOID20'].map(block_num_tally)
blocks_shapefile = blocks_shapefile.fillna(0)
# region Effective Splits.
# This process requires a adjacency matrix which is created using libpysal's weights.Queen method.
w_queen = lp.weights.Queen.from_dataframe(blocks_shapefile)
# create geoid --> index and index --> geoid dicts so that w_queen can interface with HS list
block_to_index = {}
index_to_block = {}
for index, row in blocks_shapefile.iterrows():
geoID = row['GEOID20']
block_to_index[geoID] = index
index_to_block[index] = geoID
# If a block has a high score, put it in the HighScore list. Convert the geoID to its index spot in the GeoDataFrame so that its neighbors can be found later in w_queen
HighScore = []
for x in block_num_tally:
if block_num_tally[x] >= 3:
index = block_to_index[x]
HighScore.append(index)
#region BFS to create coi_cores out of hotspots
coi_cores = {}
been_used = {}
n = 0
for x in HighScore:
in_use = deque()
if x in been_used:
continue
else:
parent = x
coi_cores[parent] = {parent}
# been_used[x] = 1
in_use.append(x)
while len(in_use) > 0:
current = in_use.popleft()
if current in been_used.keys():
continue
# if current != parent:
coi_cores[parent].add(current)
been_used[current] = 1
for neigh in w_queen.neighbors[current]:
if neigh in HighScore:
if neigh not in been_used.keys():
in_use.append(neigh)
n += 1
else:
continue
# endregion
print('n = ' + str(n))
# region create blockID --> [district number, population] dictionary for all census blocks. Used in Effective Splits calculation
ID_to_pop_and_dist_number = {}
for x in planBlocks:
if x[0] == 'GEOID20':
continue
else:
geoID = x[0]
ID_to_pop_and_dist_number[geoID] = [x[1], None]
for x in allBlocks:
geoIDLong = str(x[7])
geoID = geoIDLong[9:]
block_pop = x[9]
previous_result = ID_to_pop_and_dist_number[geoID]
previous_result[1] = block_pop
ID_to_pop_and_dist_number[geoID] = previous_result
# endregion
# region see how each coi_cores' population is divided amongst districts
final_ES = {}
# key = index of coi core parent, value = dict{district; population}, where district = any district that contains some
# population of the COI, and population = how much
for parent in coi_cores:
popByDistrict = {}
blocks_list = coi_cores[parent]
parent = index_to_block[parent]
final_ES[parent] = popByDistrict
pop_of_coi = 0
for index in blocks_list:
block = index_to_block[index]
value = ID_to_pop_and_dist_number[block]
district = value[0]
population = int(value[1])
pop_of_coi += population
if district not in popByDistrict:
popByDistrict[district] = population
else:
popByDistrict[district] += population
popByDistrict['TotalPop'] = pop_of_coi
# endregion
# calculate and print the effective splits scores
for parent in final_ES:
coi_core_dict = final_ES[parent]
perc_per_districts = []
sum_ni2 = 0
if coi_core_dict['TotalPop'] != 0:
for district, pop in coi_core_dict.items():
if district == 'TotalPop':
continue
else:
ni = pop / coi_core_dict['TotalPop']
perc_per_districts.append(ni)
for ni in perc_per_districts:
sum_ni2 += ni ** 2
effective_splits = str(round(((1 / sum_ni2) - 1), 3))
coi_core_dict['EffectiveSplits'] = effective_splits[0:5]
# endregion
# region Export to a CSV
with open('coi_cores.csv', 'w') as csvfile:
writer = csv.writer(csvfile)
core_tally = 1
writer.writerow(['COI Core', 'Effective Split Score', 'Blocks in the COI Core'])
for parent, core_blocks in coi_cores.items():
parent = index_to_block[parent]
eff_split = final_ES[parent].get('EffectiveSplits')
writer.writerow([core_tally, eff_split, core_blocks])
core_tally += 1
# endregion
print(final_ES)
print('############################')
print(coi_cores)
fig = plt.figure(1, figsize=(10, 7))
ax = fig.add_subplot()
arizona_congress_shapefile.apply(lambda shpFile: ax.annotate(text=shpFile.NAME, xy=shpFile.geometry.centroid.coords[0], ha='right', fontsize=6), axis=1)
arizona_congress_shapefile = arizona_congress_shapefile.boundary.plot(ax=ax, color='Black', linewidth=.4)
blocks_shapefile.plot(ax=arizona_congress_shapefile, column='block_usage', cmap='OrRd')
plt.show()