forked from numalariamodeling/covid-chicago
-
Notifications
You must be signed in to change notification settings - Fork 0
/
combine_and_trim.py
312 lines (275 loc) · 12.6 KB
/
combine_and_trim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
Combine, reformat and trim single simulation trajectories.
Output: tracjectoriesDat including all outcome channels, and trajectoriesDat_trim including key channels only
If number of trajectories exceeds a specified limit, multiple trajectories in chunks will be returned.
"""
import argparse
import pandas as pd
import os
import shutil
import sys
sys.path.append('../')
from load_paths import load_box_paths
from processing_helpers import *
def parse_args():
description = "Simulation run for modeling Covid-19"
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"-exp",
"--exp_name",
type=str,
help="Name of simulation experiment"
)
parser.add_argument(
"-loc",
"--Location",
type=str,
help="Local or NUCLUSTER",
default = "Local"
)
parser.add_argument(
"--time_start",
type=int,
help="Lower limit of time steps to keep",
default=1
)
parser.add_argument(
"--time_stop",
type=int,
help="Upper limit of time steps to keep",
default=1000
)
parser.add_argument(
"-limit",
"--scen_limit",
type=int,
help="Number of simulations to combine",
default = 700
)
parser.add_argument(
"--additional_sample_param",
type=str,
nargs='+',
help="""Name of additional sample parameters to keep, reduced to minimum to reduce file size
format: --additional_sample_param time_to_infectious time_to_death (no quotes)
Note: sample parameters can also always be added from the sample_parameters.csv if required in the postprocessing""",
default = ''
)
parser.add_argument(
"--delete_trajectories",
action='store_true',
help="If specified, single trajectories will be deleted after postprocessing.",
)
return parser.parse_args()
def reprocess(input_fname='trajectories.csv'):
fname = os.path.join(git_dir, input_fname)
row_df = pd.read_csv(fname, skiprows=1)
df = row_df.set_index('sampletimes').transpose()
run_time = len([x for x in df.columns.values if '{0}' in x])
num_runs = int((len(row_df)) / run_time)
df = df.reset_index(drop=False)
df = df.rename(columns={'index': 'time'})
df['time'] = df['time'].astype(float)
adf = pd.DataFrame()
for run_num in range(num_runs):
channels = [x for x in df.columns.values if '{%d}' % run_num in x]
sdf = df[['time'] + channels]
sdf = sdf.rename(columns={
x: x.split('{')[0] for x in channels
})
sdf['run_num'] = run_num
adf = pd.concat([adf, sdf])
adf = adf.reset_index()
del adf['index']
return adf
def trim_trajectories(df, fname,sample_param_to_keep, time_start=1, time_stop=1000,
time_varying_params=None, grpnames=None):
"""Generate a subset of the trajectoriesDat dataframe
The new csv file is saved under trajectoriesDat_trim.csv, no dataframe is returned
"""
channels = ['susceptible', 'infected', 'recovered', 'infected_cumul', 'detected_cumul',
'asymp_cumul', 'asymp', 'asymp_det_cumul',
'symp_mild_cumul', 'symp_mild', 'symp_mild_det_cumul',
'symp_severe_cumul','symp_severe', 'symp_severe_det_cumul',
'hosp_det_cumul', 'hosp_cumul', 'hosp_det', 'hospitalized',
'crit_cumul','crit_det_cumul', 'crit_det', 'critical',
'deaths_det_cumul', 'deaths']
if time_varying_params == None:
time_varying_params = ['Ki_t']
column_list = ['time', 'run_num'] + sample_param_to_keep
if grpnames is not None:
for grp in grpnames:
grp_ch = str(grp.replace('_', '-'))
[column_list.append(f'{channel}_{str(grp_ch)}') for channel in channels]
if grp_ch !="All" and not 'age' in grp_ch :
[column_list.append(f'{time_varying_param}_{str(grp_ch)}') for time_varying_param in time_varying_params]
del grp, grp_ch
column_list = column_list + ['N_All']
else:
column_list = column_list + channels + time_varying_params
"""Trim df and save"""
df = df[column_list]
df = df[df['time'] > time_start]
df = df[df['time'] < time_stop]
df.to_csv(os.path.join(exp_path, fname + '_trim.csv'), index=False, date_format='%Y-%m-%d')
def combine_trajectories(sampledf, Nscenarios_start=0, Nscenarios_stop=1000, fname='trajectoriesDat.csv',SAVE=True):
df_list = []
n_errors = 0
for scen_i in range(Nscenarios_start, Nscenarios_stop):
input_name = "trajectories_scen" + str(scen_i) + ".csv"
try:
df_i = reprocess(os.path.join(trajectories_path, input_name))
df_i['scen_num'] = scen_i
df_i = df_i.merge(sampledf, on=['scen_num'])
df_list.append(df_i)
except:
n_errors += 1
continue
print("Number of errors:" + str(n_errors))
try:
dfc = pd.concat(df_list)
dfc = dfc.dropna()
if SAVE:
dfc.to_csv(os.path.join(exp_path, fname), index=False, date_format='%Y-%m-%d')
except ValueError:
print('WARNING: No objects to concatenate - either no trajectories or n_scen_limit size is too small')
dfc = pd.DataFrame()
return dfc
def combine_trajectories_chunks(grp_list, useTrim=True):
"""workaround for using EMS vs region in filename for spatial model and keep suffix also for 'All'"""
grp_save_suffix = [grp for grp in grp_list[1:]][0][:3]
if grp_save_suffix == 'EMS': grp_save_suffix = 'region'
files = os.listdir(exp_path)
files = [file for file in files if '.csv' in file ]
files = [file for file in files if not grp_save_suffix in file ]
files = [file for file in files if 'trajectories' in file]
files_not_trim = [file for file in files if not 'trim' in file]
files_trim = [file for file in files if 'trim' in file]
if useTrim:
files = files_trim
[os.unlink(os.path.join(exp_path, file)) for file in files_not_trim]
del files_trim, files_not_trim
else:
files = files_not_trim
[os.unlink(os.path.join(exp_path, file)) for file in files_trim]
del files_trim, files_not_trim
for i, grp in enumerate(grp_list):
print(f'Combine trajectories for {grp}')
"""extract grp suffix, might need to be applicable for age model or any other grp"""
grp_suffix = grp[:3]
df_all = pd.DataFrame()
for file in files:
df_f = pd.read_csv(os.path.join(exp_path, file))
df_cols = df_f.columns
outcome_cols = [df_col for df_col in df_cols if grp_suffix in df_col or 'All' in df_col ]
outcomeVars_to_drop = [outcome_col for outcome_col in outcome_cols if not grp in outcome_col]
outcomeVars_to_drop = [outcome_col for outcome_col in outcomeVars_to_drop if not grp.replace(f'{grp_suffix}_',f'{grp_suffix}-') in outcome_col]
df_f = df_f.drop(outcomeVars_to_drop, axis=1)
if df_all.empty:
df_all = df_f
else:
df_all.append(df_f)
del df_f
fname = f'trajectoriesDat_{grp_save_suffix}_{i}'
if useTrim: fname = f'{fname}_trim'
df_all.to_csv(os.path.join(exp_path, f'{fname}.csv'), index=False, date_format='%Y-%m-%d')
if i ==0:
write_report(nscenarios_processed= len(df_all['scen_num'].unique()))
[os.unlink(os.path.join(exp_path,file)) for file in files]
def write_report(nscenarios_processed):
trackScen = f'Number of scenarios processed n= {str(nscenarios_processed)} out of total ' \
f'N= {str(Nscenario)} ({str(nscenarios_processed / Nscenario)} %)'
file = open(os.path.join(exp_path, "Simulation_report.txt"), 'w')
file.write(trackScen)
file.close()
if __name__ == '__main__':
args = parse_args()
exp_name = args.exp_name
time_start = args.time_start
time_stop = args.time_stop
Location = args.Location
additional_sample_param = args.additional_sample_param
Scenario_save_limit = args.scen_limit
datapath, projectpath, wdir, exe_dir, git_dir = load_box_paths(Location=Location)
sim_out_dir = os.path.join(wdir, "simulation_output")
if not os.path.exists(os.path.join(sim_out_dir,exp_name)):
sim_out_dir = os.path.join(git_dir, "_temp")
print(f'Processing trajectories from {sim_out_dir}')
exp_path = os.path.join(sim_out_dir, exp_name)
trajectories_path = os.path.join(exp_path, 'trajectories')
"""Define model type and grp suffix of parameters and outcome channels"""
sampledf = pd.read_csv(os.path.join(exp_path, "sampled_parameters.csv"))
N_cols = [col for col in sampledf.columns if 'N_' in col]
if len(N_cols)!=0:
grp_list = ['All'] + [col.replace('N_','') for col in N_cols]
grp_suffix = grp_list[0][:3]
else:
grp_list = None
N_cols = ['speciesS', 'initialAs']
"""Define parameters to keep"""
sample_param_to_keep = ['startdate', 'scen_num', 'sample_num'] + N_cols
if isinstance(additional_sample_param, list): sample_param_to_keep = sample_param_to_keep + additional_sample_param
try:
sampledf = pd.read_csv(os.path.join(exp_path, "sampled_parameters.csv"), usecols= sample_param_to_keep)
except:
"""when running from input csv sample_num might be missing"""
sample_param_to_keep = ['startdate', 'scen_num'] + N_cols
if isinstance(additional_sample_param, list): sample_param_to_keep = sample_param_to_keep + additional_sample_param
sampledf = pd.read_csv(os.path.join(exp_path, "sampled_parameters.csv"), usecols= sample_param_to_keep)
sample_param_to_keep = sample_param_to_keep + ['sample_num']
sampledf['sample_num'] = 0
Nscenario = max(sampledf['scen_num'])
if Nscenario <= Scenario_save_limit:
fname = "trajectoriesDat.csv"
if not os.path.exists(os.path.join(exp_path, fname)):
dfc = combine_trajectories(sampledf=sampledf,
Nscenarios_start=0,
Nscenarios_stop=Nscenario + 1,
fname=fname)
else:
dfc = pd.read_csv(os.path.join(exp_path, fname))
"""Update group names"""
grp_list, grp_suffix,grp_numbers = get_group_names(exp_path=exp_path)
trim_trajectories(df=dfc,
sample_param_to_keep = sample_param_to_keep,
time_start=time_start,
time_stop=time_stop,
grpnames = grp_list ,
fname=fname.split(".csv")[0])
write_report(nscenarios_processed= len(dfc['scen_num'].unique()))
if Nscenario > Scenario_save_limit:
n_subsets = int(Nscenario/Scenario_save_limit)
"""Combine trajectories in specified chunks for n subsets"""
for i in range(1,n_subsets+2):
if i ==1 : Nscenario_stop=Scenario_save_limit
if i > 1 : Nscenario_stop = Nscenario_stop + Scenario_save_limit
print(Nscenario_stop)
Nscenarios_start = Nscenario_stop-Scenario_save_limit
fname = 'trajectoriesDat_'+str(Nscenario_stop)+'.csv'
if not os.path.exists(os.path.join(exp_path, fname)):
dfc = combine_trajectories(sampledf=sampledf,
Nscenarios_start=Nscenarios_start,
Nscenarios_stop=Nscenario_stop,
fname=fname)
else:
dfc = pd.read_csv(os.path.join(exp_path, fname))
"""Trim trajectories"""
if not dfc.empty:
trim_trajectories(df=dfc,
sample_param_to_keep=sample_param_to_keep,
time_start=time_start,
time_stop=time_stop,
grpnames=grp_list,
fname=fname.split(".csv")[0])
del dfc
else:
print(f'WARNING: No trajectories found for scenarios {Nscenarios_start} to {Nscenario_stop}')
continue
"""Combine trajectory scenario batches per grp,
if grpList not specified default to spatial (EMS) model,
deletes the trajectory chunks when done"""
combine_trajectories_chunks(grp_list= grp_list)
if args.delete_trajectories:
"""THIS WILL DELETE ALL SINGLE TRAJECTORIES!"""
shutil.rmtree(trajectories_path, ignore_errors=True)
print(f'Single trajectories deleted')