Skip to content

Commit

Permalink
Merge pull request #1 from DPeterK/cubes_and_cubelists
Browse files Browse the repository at this point in the history
Handle cubes and cubelists
  • Loading branch information
Kevin Donkers authored Nov 20, 2019
2 parents 0af2081 + 3f79869 commit 5513c2f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 36 deletions.
98 changes: 62 additions & 36 deletions intake_iris/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,88 @@ class DataSourceMixin(DataSource):
def _open_dataset(self):
with warnings.catch_warnings():
warnings.simplefilter(self.warnings)
self._ds = iris.load(self.urlpath, **self._kwargs).concatenate()
if self._iris_object == 'iris-cube':
self._ds = iris.load_cube(self.urlpath, **self._kwargs)
self.IrisObjSource = CubeSource(self._ds)
else:
self._ds = iris.load(self.urlpath, **self._kwargs)
self.IrisObjSource = CubeListSource(self._ds)

def _get_schema(self):
"""Make schema object, which embeds iris object and some details"""
if self._ds is None:
self._open_dataset()

metadata = {}
self._schema = Schema(
datashape=None,
dtype=None,
shape=None,
npartitions=None,
extra_metadata=metadata)
return self._schema
return self.IrisObjSource._get_schema()

def read(self):
"""Return a version of the iris cube/cubelist with all the data in memory"""
"""Load entire dataset into a container and return it"""
self._load_metadata()
if isinstance(self._ds, CubeList):
self._ds.realise_data()
else:
_ = self._ds.data
return self._ds

def read_chunked(self):
"""Return iris object (which will have chunks)"""
"""Return iterator over container fragments of data source"""
self._load_metadata()
return self._ds
return self.read()

def read_partition(self, i):
"""Fetch one chunk of data at tuple index i
"""
"""Return a part of the data corresponding to i-th partition.
import numpy as np
By default, assumes i should be an integer between zero and npartitions;
override for more complex indexing schemes.
"""
self._load_metadata()
if not isinstance(i, (tuple, list)):
raise TypeError('For iris sources, must specify partition as '
'tuple')
if isinstance(i, list):
i = tuple(i)
if isinstance(self._ds, CubeList):
arr = self._ds[i[0]].lazy_data()
i = i[1:]
else:
arr = self._ds.lazy_data()
if isinstance(arr, np.ndarray):
return arr
# dask array
return arr[i].compute()
return self.IrisObjSource.read_partition(i)

def to_dask(self):
"""Return iris object where variables are dask arrays"""
return self.read_chunked()
"""Return a dask container for this data source"""
self._load_metadata()
return self.IrisObjSource.to_dask()

def close(self):
"""Delete open file from memory"""
self._ds = None
self._schema = None


class CubeSource(object):
def __init__(self, cube):
self.cube = cube

def _get_schema(self):
"""Make schema object, which embeds iris cube and some details"""
metadata = {}
self._schema = Schema(
datashape=self.cube.shape,
dtype=self.cube.dtype,
shape=self.cube.shape,
npartitions=self.cube.lazy_data().chunks,
extra_metadata=metadata)
return self._schema

def read_partition(self, i):
return self.cube[i]

def to_dask(self):
return self.cube.lazy_data()


class CubeListSource(object):
def __init__(self, cubelist):
self.cubelist = cubelist

def _get_schema(self):
"""Make schema object, which embeds iris cubelist and some details"""
metadata = {}
self._schema = Schema(
datashape=None,
dtype=None,
shape=len(self.cubelist),
npartitions=len(self.cubelist),
extra_metadata=metadata)
return self._schema

def read_partition(self, i):
return self.cubelist[i]

def to_dask(self):
raise NotImplementedError
1 change: 1 addition & 0 deletions intake_iris/grib.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ def __init__(self, urlpath, warnings='default', iris_kwargs=None, metadata=None,
self.urlpath = urlpath
self.warnings = warnings
self._kwargs = iris_kwargs or kwargs
self._iris_object = self._kwargs.pop('iris-object', 'iris-cubelist')
self._ds = None
super(GRIBSource, self).__init__(metadata=metadata)
1 change: 1 addition & 0 deletions intake_iris/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ def __init__(self, urlpath, warnings='default', iris_kwargs=None, metadata=None,
self.urlpath = urlpath
self.warnings = warnings
self._kwargs = iris_kwargs or kwargs
self._iris_object = self._kwargs.pop('iris-object', 'iris-cubelist')
self._ds = None
super(NetCDFSource, self).__init__(metadata=metadata)

0 comments on commit 5513c2f

Please sign in to comment.