-
Notifications
You must be signed in to change notification settings - Fork 4
/
attach_calcs.py
50 lines (40 loc) · 1.59 KB
/
attach_calcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import pandas as pd
#import dask.bag as db
import datetime.datetime
import dask
from dask.distributed import Client
from dask import delayed, compute
from find_storms import get_file_tree, parse_date_string
from marcus_calcs import attach_marcus_stats, filename_from_dt
def parse_storm_number(file):
storm_name = file.split('/')[-1]
storm_number = storm_name[-4:]
return storm_number
def prep_storm_attach_stats(file, grid_dir):
tracks = pd.read_csv(file)
tracks.set_index(['scan', 'uid'], inplace=True)
tracks['time'] = tracks['time'].apply(parse_date_string)
tracks['file'] = tracks['time'].apply(lambda dt:
filename_from_dt(dt, grid_dir))
return parse_storm_number(file), attach_marcus_stats(tracks)
if __name__ == '__main__':
track_dir = ''
grid_dir = '/lcrc/group/earthscience/radar/houston/data/'
out_dir = ''
pattern = "storm_*"
files = get_file_tree(track_dir, pattern)
files.sort()
# track_bag = db.from_sequence(files, npartitions=36)
# track_bag = track_bag.map(lambda file:
# prep_storm_attach_stats(file, grid_dir))
# out_tracks = track_bag.compute()
start = datetime.now()
track_graph = [delayed(prep_storm_attach_stats)(file, grid_dir)
for file in files.sort()]
client = Client('')
out_tracks = compute(*track_graph, get=client.get)
for number, tracks in out_tracks:
tracks.to_csv(out_dir + 'stormcalcs_' + number + '.csv')
time_elapsed = datetime.now() - start
print('nfiles: ', len(files))
print(time_elapsed)