Skip to content

Commit

Permalink
Fix XarrayStream getting passed an open Dataset, add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
kwilcox committed May 29, 2020
1 parent c0a3c4f commit e0c039c
Show file tree
Hide file tree
Showing 5 changed files with 1,125 additions and 1,049 deletions.
192 changes: 99 additions & 93 deletions ioos_qc/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ def __init__(self, path_or_ncd, time=None, z=None, lat=None, lon=None, geom=None
self.lon_var = lon or 'lon'

def run(self, config: Config):
# Set all of the class variables and then call `run`, which will use
# the NumpySource runner
if isinstance(self.path_or_ncd, str):
do_close = True
ds = xr.open_dataset(self.path_or_ncd, decode_cf=False)
Expand Down Expand Up @@ -281,96 +279,104 @@ def run(self, config: Config):
# https://stackoverflow.com/a/27809959
results = defaultdict(lambda: defaultdict(odict))

with xr.open_dataset(
self.path_or_ncd,
decode_cf=True,
decode_coords=True,
decode_times=True,
mask_and_scale=True
) as ds:

for context in config.contexts:

for stream_id, stream_config in context.streams.items():

# Find any var specific kwargs to pass onto the run
if stream_id not in ds.variables:
L.warning('{stream_id} not in Dataset, skipping')
continue

# Because the variables could have different dimensions
# we calculate the coordiantes and subset for each
subset = {}
subset_kwargs = {}

# Region subset
# TODO: yeah this does nothing right now
# Subset against the passed in lat/lons variable keys
# and build up the subset dict to apply later

# Time subset
if self.time_var in ds[stream_id].coords:
if context.window.starting and context.window.ending:
subset[self.time_var] = slice(context.window.starting, context.window.ending)

# Start with everything as UNKNOWN (2)
result_to_fill = xr.full_like(ds[stream_id], QartodFlags.UNKNOWN)
subset_stream = ds[stream_id][subset]

if self.time_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['tinp'] = subset_stream.coords[self.time_var].values
elif self.time_var in ds.variables and ds[self.time_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['tinp'] = ds[self.time_var][subset].values
elif self.time_var in ds.variables and ds[self.time_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['tinp'] = ds[self.time_var][subset].values

if self.z_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['zinp'] = subset_stream.coords[self.z_var].values
elif self.z_var in ds.variables and ds[self.z_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['zinp'] = ds[self.z_var][subset].values
elif self.z_var in ds.variables and ds[self.z_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['zinp'] = ds[self.z_var][subset].values

if self.lat_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['lat'] = subset_stream.coords[self.lat_var].values
elif self.lat_var in ds.variables and ds[self.lat_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['lat'] = ds[self.lat_var][subset].values
elif self.lat_var in ds.variables and ds[self.lat_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['lat'] = ds[self.lat_var][subset].values

if self.lon_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['lon'] = subset_stream.coords[self.lon_var].values
elif self.lon_var in ds.variables and ds[self.lon_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['lon'] = ds[self.lon_var][subset].values
elif self.lon_var in ds.variables and ds[self.lon_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['lon'] = ds[self.lon_var][subset].values

run_result = stream_config.run(
**subset_kwargs,
**dict(inp=subset_stream.values)
)

for testpackage, test in run_result.items():
for testname, testresults in test.items():
# Build up the results from every context using the subset
# into the final return dict
if 'testname' not in results[stream_id][testpackage]:
results[stream_id][testpackage][testname] = result_to_fill.copy()
results[stream_id][testpackage][testname][subset] = testresults

# Reset the xarray DataArray back to a numpy array
results[stream_id][testpackage][testname] = results[stream_id][testpackage][testname].data
if isinstance(self.path_or_ncd, str):
do_close = True
ds = xr.open_dataset(
self.path_or_ncd,
decode_cf=True,
decode_coords=True,
decode_times=True,
mask_and_scale=True
)
else:
do_close = False
ds = self.path_or_ncd

for context in config.contexts:

for stream_id, stream_config in context.streams.items():

# Find any var specific kwargs to pass onto the run
if stream_id not in ds.variables:
L.warning('{stream_id} not in Dataset, skipping')
continue

# Because the variables could have different dimensions
# we calculate the coordiantes and subset for each
subset = {}
subset_kwargs = {}

# Region subset
# TODO: yeah this does nothing right now
# Subset against the passed in lat/lons variable keys
# and build up the subset dict to apply later

# Time subset
if self.time_var in ds[stream_id].coords:
if context.window.starting and context.window.ending:
subset[self.time_var] = slice(context.window.starting, context.window.ending)

# Start with everything as UNKNOWN (2)
result_to_fill = xr.full_like(ds[stream_id], QartodFlags.UNKNOWN)
subset_stream = ds[stream_id][subset]

if self.time_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['tinp'] = subset_stream.coords[self.time_var].values
elif self.time_var in ds.variables and ds[self.time_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['tinp'] = ds[self.time_var][subset].values
elif self.time_var in ds.variables and ds[self.time_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['tinp'] = ds[self.time_var][subset].values

if self.z_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['zinp'] = subset_stream.coords[self.z_var].values
elif self.z_var in ds.variables and ds[self.z_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['zinp'] = ds[self.z_var][subset].values
elif self.z_var in ds.variables and ds[self.z_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['zinp'] = ds[self.z_var][subset].values

if self.lat_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['lat'] = subset_stream.coords[self.lat_var].values
elif self.lat_var in ds.variables and ds[self.lat_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['lat'] = ds[self.lat_var][subset].values
elif self.lat_var in ds.variables and ds[self.lat_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['lat'] = ds[self.lat_var][subset].values

if self.lon_var in subset_stream.coords:
# Already subset with the stream, best case. Good netCDF file.
subset_kwargs['lon'] = subset_stream.coords[self.lon_var].values
elif self.lon_var in ds.variables and ds[self.lon_var].dims == ds[stream_id].dims:
# Same dimensions as the stream, so use the same subset
subset_kwargs['lon'] = ds[self.lon_var][subset].values
elif self.lon_var in ds.variables and ds[self.lon_var].size == ds[stream_id].size:
# Not specifically connected, but hey, the user asked for it
subset_kwargs['lon'] = ds[self.lon_var][subset].values

run_result = stream_config.run(
**subset_kwargs,
**dict(inp=subset_stream.values)
)

for testpackage, test in run_result.items():
for testname, testresults in test.items():
# Build up the results from every context using the subset
# into the final return dict
if 'testname' not in results[stream_id][testpackage]:
results[stream_id][testpackage][testname] = result_to_fill.copy()
results[stream_id][testpackage][testname][subset] = testresults

# Reset the xarray DataArray back to a numpy array
results[stream_id][testpackage][testname] = results[stream_id][testpackage][testname].data

if do_close is True:
ds.close()

return results
188 changes: 0 additions & 188 deletions tests/test_conf.py

This file was deleted.

Loading

0 comments on commit e0c039c

Please sign in to comment.