-
Notifications
You must be signed in to change notification settings - Fork 2
/
metadata_tables.py
191 lines (160 loc) · 7.15 KB
/
metadata_tables.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from tqdm import tqdm
import pandas as pd
import numpy as np
from pathlib import Path
import ballast_info
import subprocess
import voto_erddap_utils as utils
import logging
import os
cwdir = os.getcwd()
_log = logging.getLogger(__name__)
def write_csv(df, name):
if not "datasetID" in list(df):
df["datasetID"] = df.index
df = df.convert_dtypes()
_log.info(f"write {name}.csv")
df.to_csv(f'{cwdir}/output/{name}.csv', sep=';', index=False)
subprocess.check_call(['/usr/bin/rsync', f'{cwdir}/output/{name}.csv', '[email protected]:/data/meta'])
_log.info(f"sent '{cwdir}/output/{name}.csv to erddap")
def meta_proc():
e = utils.init_erddap()
# Fetch dataset list
e.response = "csv"
e.dataset_id = "allDatasets"
df_datasets = e.to_pandas(parse_dates=['minTime (UTC)', 'maxTime (UTC)'])
# drop the allDatasets row and make the datasetID the index for easier reading
df_datasets.set_index("datasetID", inplace=True)
df_datasets.drop("allDatasets", inplace=True)
df_datasets = df_datasets[df_datasets.index.str[:3] == "nrt"]
df_datasets = df_datasets.drop('nrt_SEA057_M75')
df_datasets = df_datasets.drop('nrt_SEA070_M29')
# df_datasets = df_datasets.head(3)
_log.info(f"found {len(df_datasets)} datasets")
ds_meta = {}
for dataset_id in tqdm(df_datasets.index):
ds_meta[dataset_id] = utils.get_meta(dataset_id)
# Download data
ds_nrt = utils.download_glider_dataset(df_datasets.index, nrt_only=True)
# Merge all metadata available in one big column
df_met = []
_log.info(f"processing metadata files")
for dataset_id in df_datasets.index:
dictrow = {}
for key, val in ds_meta[dataset_id].items():
# If the value is a method (like dataset.close) do not include it
if callable(val):
continue
if type(val) is dict:
if val == {}:
continue
for k, v in val.items():
if type(v) is dict:
for c, u in v.items():
dictrow[f'{k}_{c}'] = u
else:
dictrow[f'{key}_{k}'] = v
elif type(val) is str:
val_rep = val.replace("\n", "")
dictrow[key] = val_rep
elif type(val) is list:
dictrow[key] = str(val)
else:
dictrow[key] = val
dfrow = pd.DataFrame(dictrow, index=[dataset_id])
df_met.append(dfrow)
df_met_all = pd.concat(df_met)
write_csv(df_met_all, 'metadata_table')
# Merge all variables attributes into one table
dat_var = []
_log.info(f"processing attributes files")
for dataset_id in df_datasets.index[:]:
d_row = {}
vars_data = list(ds_nrt[dataset_id].data_vars)
for i in vars_data[:]:
att = ds_nrt[dataset_id][i].attrs
for key, val in att.items():
if type(val) is list or np.array:
d_row[f'{i}_{key}'] = str(val)
elif str("\n") in str(val):
d_row[key] = str(val).replace("\n", "")
else:
d_row[f'{i}_{key}'] = val
df_row = pd.DataFrame(d_row, index=[dataset_id])
dat_var.append(df_row)
var_all = pd.concat(dat_var)
write_csv(var_all, 'var_attrs_table')
# Merge the metadata table with the attributes table
full_table = var_all.merge(df_met_all, left_on=var_all.index, right_on=df_met_all.index)
write_csv(full_table, 'full_meta_attrs_table')
_log.info(f"merged metadata and attributes ")
# Create a smaller, more user friendly table
table = pd.DataFrame(columns=['glider_serial', 'deployment_id', 'basin', 'deployment_start', 'deployment_end',
'available_variables', 'science_variables', 'ctd', 'oxygen', 'optics', 'ad2cp',
'irradiance', 'nitrate', 'datasetID'])
missions = df_datasets.index
dic = ds_meta
table.deployment_id = range(0, len(missions))
_log.info(f"Creating users table ")
for i in range(len(missions)):
d = dic[missions[i]]
table.glider_serial[i] = f'SEA0{d["glider_serial"]}'
table.deployment_id[i] = d["deployment_id"]
table.deployment_start[i] = d["deployment_start"][:10]
table.deployment_end[i] = d["deployment_end"][:10]
table.basin[i] = d["basin"]
table.datasetID[i] = d["dataset_id"]
table.available_variables[i] = d["variables"]
table.science_variables[i] = d["variables"]
table.ctd[i] = d['ctd']
table.oxygen[i] = d['oxygen']
table.optics[i] = d['optics']
if 'irradiance' in d:
table.irradiance[i] = d['irradiance']
if 'AD2CP' in d:
table.ad2cp[i] = d['AD2CP']
if 'nitrate' in d:
table.nitrate[i] = d['nitrate']
nav_var = {'profile_index', 'rowSize', 'latitude', 'longitude', 'time', 'depth',
'angular_cmd', 'angular_pos', 'ballast_cmd', 'ballast_pos', 'desired_heading',
'dive_num', 'heading', 'internal_pressure', 'internal_temperature', 'linear_cmd',
'linear_pos', 'nav_state', 'pitch', 'profile_direction', 'profile_num',
'roll', 'security_level', 'vertical_distance_to_seafloor', 'voltage', 'declination'}
# Iterate each element in list
# and add them in variable total
table.science_variables[i] = [i for i in table.science_variables[i] if i not in nav_var]
write_csv(table, 'users_table')
def proc_ballast(missions):
outfile = Path("output/ballast.csv")
for ds_id in missions:
to_download = [ds_id]
if outfile.exists():
df = pd.read_csv(outfile, sep=';')
to_download = set(to_download).difference(df['datasetID'].values)
else:
df = pd.DataFrame()
if len(to_download) == 0:
_log.debug("No datasets found matching supplied arguments")
else:
df_add = ballast_info.ballast_info(to_download)
df = pd.concat((df, df_add))
df = df.groupby('datasetID').first()
write_csv(df, 'ballast')
_log.debug(f"Added ballast info for dataset {ds_id}")
df = pd.read_csv(outfile, sep=';')
_log.info(f"ballast data present for {len(df[df.datasetID.str.contains('nrt')])} nrt datasets")
_log.info(f"ballast data present for {len(df[df.datasetID.str.contains('delayed')])} delayed datasets")
if __name__ == '__main__':
logf = '/home/pipeline/log/metadata_tables.log'
logging.basicConfig(filename=logf,
filemode='a',
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
_log.info("Start processing")
meta_proc()
all_nrt = ballast_info.select_datasets(mission_num=None, glider_serial=None, data_type='nrt')
all_delayed = ballast_info.select_datasets(mission_num=None, glider_serial=None, data_type='delayed')
proc_ballast(all_nrt)
proc_ballast(all_delayed)
_log.info("End processing")