-
Notifications
You must be signed in to change notification settings - Fork 4
/
track_storms.py
98 lines (73 loc) · 3.01 KB
/
track_storms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from tracking.core import cell_tracking as ct
import pandas as pd
#from joblib import Parallel, delayed
import dask
from dask import bag as db
#from dask import compute, delayed
import datetime
from find_storms import Grid_iter
import multiprocessing as mp
def parse_date_string(date_string):
return datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S')
def unparse_datetime(dt, base):
date = dt.strftime('%Y%m%d')
file = '/KHGX_grid_' + dt.strftime('%Y%m%d.%H%M%S')
ext = '.nc'
return base + date + file + ext
def tracks_from_iter(s_name, s_iter):
tobj = ct.Cell_tracks()
tobj.get_tracks(s_iter)
return s_name, tobj
class Grid_iter:
def __init__(self, dts, dir_base):
self.filenames = [unparse_datetime(dt, dir_base) for dt in dts]
self.i = -1
self.n = len(self.filenames)
def __iter__(self):
return self
def __next__(self):
if self.i < self.n - 1:
self.i += 1
return pyart.io.read_grid(self.filenames[self.i])
else:
raise StopIteration
if __name__ == '__main__':
# file_dir = '/lcrc/group/earthscience/radar/houston/data/'
# out_dir = '/home/picel/khgx/july2015_kdp/'
# storms_path = '/home/picel/khgx/july2015_kdp/storms_kdp.csv'
# file_dir = '/home/mhpicel/blues_earthscience/radar/houston/data/'
# out_dir = '/home/mhpicel/NASA/july2015/kdp_dataframes/'
# storms_path = '/home/mhpicel/blues_home/khgx/july2015_kdp/storms_kdp.csv'
start_time = datetime.datetime.now()
storms = pd.read_csv(storms_path)
ct.FIELD_THRESH = 32
ct.MIN_SIZE = 32
min_storm_length = 5
long_enough = storms['storm_id'].value_counts() >= min_storm_length
long_storm_ix = long_enough.keys()[long_enough]
long_storms = storms.set_index('storm_id').loc[long_storm_ix.sort_values()]
long_storm_dts = long_storms['begin'].apply(parse_date_string)
storm_iters = [
(storm_id, Grid_iter(long_storm_dts.loc[storm_id], file_dir))
for storm_id
in long_storm_dts.keys().unique()
]
print('tracking', len(storm_iters), 'storms')
# storm_tracks = Parallel(n_jobs=mp.cpu_count())(
# delayed(tracks_from_iter)(name, storm) for name, storm in storm_iters
# )
# storm_graph = [delayed(tracks_from_iter)(name, storm)
# for name, storm in storm_iters]
# storm_tracks = compute(*storm_graph, get=dask.multiprocessing.get)
storm_bag = db.from_sequence(storm_iters, npartitions=36)
storm_bag = storm_bag.map(lambda s_iter:
tracks_from_iter(s_iter[0], s_iter[1]))
storm_tracks = storm_bag.compute()
for name, tracks in storm_tracks:
tracks.tracks.to_csv(out_dir + 'storm_' + str(name).zfill(4))
time_elapsed = datetime.datetime.now() - start_time
print('time elapsed:', time_elapsed)
meta = open(out_dir + 'tracking_meta.txt', 'w')
meta.write('Number of Storms: ' + len(storm_tracks) + '\n')
meta.write('Time Elapsed: ' + time_elapsed + '\n')
meta.close()