Skip to content

Commit

Permalink
Chunk control (#3361)
Browse files Browse the repository at this point in the history
* First ideas.

* Working towards new form for chunking policy.

* Handle special cases of chunk+shape.

* Header fixes.

* Pep8 fixes.

* Fix tests.

* Remove redundant import.

* Small improvements.

* Test fix.

* Rename chunking routine and pass dtype.

* Use Python3 style import of 'Iterable', to avoid deprecation.

* Review changes.

* Remove incompatible usages of as_lazy_data 'chunks' arg, in some tests.

* Remove redundant test code.

* License header fix.

* Linter fix.

* Revised optimum chunk size calculation.

* Simplified test.

* Codestyle fix.
  • Loading branch information
pp-mo authored and lbdreyer committed Aug 23, 2019
1 parent 3618edb commit f402a19
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 94 deletions.
155 changes: 123 additions & 32 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2017 - 2018, Met Office
# (C) British Crown Copyright 2017 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -27,8 +27,9 @@

import dask
import dask.array as da
import dask.context
from dask.local import get_sync as dget_sync
import dask.config
import dask.utils

import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -58,26 +59,104 @@ def is_lazy_data(data):
return result


# A magic value, chosen to minimise chunk creation time and chunk processing
# time within dask.
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2
def _optimum_chunksize(chunks, shape,
limit=None,
dtype=np.dtype('f4')):
"""
Reduce or increase an initial chunk shape to get close to a chosen ideal
size, while prioritising the splitting of the earlier (outer) dimensions
and keeping intact the later (inner) ones.
Args:
def _limited_shape(shape):
# Reduce a shape to less than a default overall number-of-points, reducing
# earlier dimensions preferentially.
# Note: this is only a heuristic, assuming that earlier dimensions are
# 'outer' storage dimensions -- not *always* true, even for NetCDF data.
shape = list(shape)
i_reduce = 0
while np.prod(shape) > _MAX_CHUNK_SIZE:
factor = np.ceil(np.prod(shape) / _MAX_CHUNK_SIZE)
new_dim = int(shape[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
shape[i_reduce] = new_dim
i_reduce += 1
return tuple(shape)
* chunks (tuple of int, or None):
Pre-existing chunk shape of the target data : None if unknown.
* shape (tuple of int):
The full array shape of the target data.
* limit (int):
The 'ideal' target chunk size, in bytes. Default from dask.config.
* dtype (np.dtype):
Numpy dtype of target data.
Returns:
* chunk (tuple of int):
The proposed shape of one full chunk.
.. note::
The purpose of this is very similar to
`dask.array.core.normalize_chunks`, when called as
`(chunks='auto', shape, dtype=dtype, previous_chunks=chunks, ...)`.
Except, the operation here is optimised specifically for a 'c-like'
dimension order, i.e. outer dimensions first, as for netcdf variables.
So if, in future, this policy can be implemented in dask, then we would
prefer to replace this function with a call to that one.
Accordingly, the arguments roughly match 'normalize_chunks', except
that we don't support the alternative argument forms of that routine.
The return value, however, is a single 'full chunk', rather than a
complete chunking scheme : so an equivalent code usage could be
"chunks = [c[0] for c in normalise_chunks('auto', ...)]".
"""
# Set the chunksize limit.
if limit is None:
# Fetch the default 'optimal' chunksize from the dask config.
limit = dask.config.get('array.chunk-size')
# Convert to bytes
limit = dask.utils.parse_bytes(limit)

point_size_limit = limit / dtype.itemsize

# Create result chunks, starting with a copy of the input.
result = list(chunks)

if np.prod(result) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying later
# (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(result) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(result))
new_dim = result[i_expand] * int(factor)
if new_dim >= shape[i_expand]:
# Clip to dim size : chunk dims must not exceed the full shape.
new_dim = shape[i_expand]
else:
# 'new_dim' is less than the relevant dim of 'shape' -- but it
# is also the largest possible multiple of the input-chunks,
# within the size limit.
# So : 'i_expand' is the outer (last) dimension over which we
# will multiply the input chunks, and 'new_dim' is a value that
# ensures the fewest possible chunks within that dim.

# Now replace 'new_dim' with the value **closest to equal-size
# chunks**, for the same (minimum) number of chunks.
# More-equal chunks are practically better.
# E.G. : "divide 8 into multiples of 2, with a limit of 7",
# produces new_dim=6, which would mean chunks of sizes (6, 2).
# But (4, 4) is clearly better for memory and time cost.

# Calculate how many (expanded) chunks fit into this dimension.
dim_chunks = np.ceil(shape[i_expand] * 1. / new_dim)
# Get "ideal" (equal) size for that many chunks.
ideal_equal_chunk_size = shape[i_expand] / dim_chunks
# Use the nearest whole multiple of input chunks >= ideal.
new_dim = int(result[i_expand] *
np.ceil(ideal_equal_chunk_size /
result[i_expand]))

result[i_expand] = new_dim
i_expand -= 1
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(result) > point_size_limit:
factor = np.ceil(np.prod(result) / point_size_limit)
new_dim = int(result[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
result[i_reduce] = new_dim
i_reduce += 1

return tuple(result)


def as_lazy_data(data, chunks=None, asarray=False):
Expand All @@ -86,29 +165,41 @@ def as_lazy_data(data, chunks=None, asarray=False):
Args:
* data:
An array. This will be converted to a dask array.
* data (array-like):
An indexable object with 'shape', 'dtype' and 'ndim' properties.
This will be converted to a dask array.
Kwargs:
* chunks:
Describes how the created dask array should be split up. Defaults to a
value first defined in biggus (being `8 * 1024 * 1024 * 2`).
For more information see
http://dask.pydata.org/en/latest/array-creation.html#chunks.
* chunks (list of int):
If present, a source chunk shape, e.g. for a chunked netcdf variable.
* asarray:
* asarray (bool):
If True, then chunks will be converted to instances of `ndarray`.
Set to False (default) to pass passed chunks through unchanged.
Returns:
The input array converted to a dask array.
.. note::
The result chunk size is a multiple of 'chunks', if given, up to the
dask default chunksize, i.e. `dask.config.get('array.chunk-size'),
or the full data shape if that is smaller.
If 'chunks' is not given, the result has chunks of the full data shape,
but reduced by a factor if that exceeds the dask default chunksize.
"""
if chunks is None:
# Default to the shape of the wrapped array-like,
# but reduce it if larger than a default maximum size.
chunks = _limited_shape(data.shape)
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype)

if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)
Expand Down
6 changes: 4 additions & 2 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2018, Met Office
# (C) British Crown Copyright 2010 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -510,8 +510,10 @@ def _get_cf_var_data(cf_var, filename):
netCDF4.default_fillvals[cf_var.dtype.str[1:]])
proxy = NetCDFDataProxy(cf_var.shape, dtype, filename, cf_var.cf_name,
fill_value)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# Chunks can be an iterable, None, or `'contiguous'`.
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == 'contiguous':
chunks = None
return as_lazy_data(proxy, chunks=chunks)
Expand Down
27 changes: 9 additions & 18 deletions lib/iris/tests/unit/analysis/test_RMS.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,22 @@ def test_masked_weighted(self):
class Test_lazy_aggregate(tests.IrisTest):
def test_1d(self):
# 1-dimensional input.
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64))
rms = RMS.lazy_aggregate(data, 0)
expected_rms = 4.5
self.assertAlmostEqual(rms, expected_rms)

def test_2d(self):
# 2-dimensional input.
data = as_lazy_data(np.array([[5, 2, 6, 4], [12, 4, 10, 8]],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
expected_rms = np.array([4.5, 9.0], dtype=np.float64)
rms = RMS.lazy_aggregate(data, 1)
self.assertArrayAlmostEqual(rms, expected_rms)

def test_1d_weighted(self):
# 1-dimensional input with weights.
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64))
weights = np.array([1, 4, 3, 2], dtype=np.float64)
expected_rms = 8.0
# https://github.com/dask/dask/issues/3846.
Expand All @@ -120,10 +117,8 @@ def test_1d_weighted(self):

def test_1d_lazy_weighted(self):
# 1-dimensional input with lazy weights.
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64),
chunks=-1)
weights = as_lazy_data(np.array([1, 4, 3, 2], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([4, 7, 10, 8], dtype=np.float64))
weights = as_lazy_data(np.array([1, 4, 3, 2], dtype=np.float64))
expected_rms = 8.0
# https://github.com/dask/dask/issues/3846.
with self.assertRaisesRegexp(TypeError, 'unexpected keyword argument'):
Expand All @@ -133,8 +128,7 @@ def test_1d_lazy_weighted(self):
def test_2d_weighted(self):
# 2-dimensional input with weights.
data = as_lazy_data(np.array([[4, 7, 10, 8], [14, 16, 20, 8]],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
weights = np.array([[1, 4, 3, 2], [2, 1, 1.5, 0.5]], dtype=np.float64)
expected_rms = np.array([8.0, 16.0], dtype=np.float64)
# https://github.com/dask/dask/issues/3846.
Expand All @@ -144,8 +138,7 @@ def test_2d_weighted(self):

def test_unit_weighted(self):
# Unit weights should be the same as no weights.
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64),
chunks=-1)
data = as_lazy_data(np.array([5, 2, 6, 4], dtype=np.float64))
weights = np.ones_like(data)
expected_rms = 4.5
# https://github.com/dask/dask/issues/3846.
Expand All @@ -157,8 +150,7 @@ def test_masked(self):
# Masked entries should be completely ignored.
data = as_lazy_data(ma.array([5, 10, 2, 11, 6, 4],
mask=[False, True, False, True, False, False],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
expected_rms = 4.5
rms = RMS.lazy_aggregate(data, 0)
self.assertAlmostEqual(rms, expected_rms)
Expand All @@ -169,8 +161,7 @@ def test_masked_weighted(self):
# For now, masked weights are simply not supported.
data = as_lazy_data(ma.array([4, 7, 18, 10, 11, 8],
mask=[False, False, True, False, True, False],
dtype=np.float64),
chunks=-1)
dtype=np.float64))
weights = np.array([1, 4, 5, 3, 8, 2])
expected_rms = 8.0
with self.assertRaisesRegexp(TypeError, 'unexpected keyword argument'):
Expand Down
6 changes: 2 additions & 4 deletions lib/iris/tests/unit/cube/test_Cube.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2013 - 2018, Met Office
# (C) British Crown Copyright 2013 - 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -1475,9 +1475,7 @@ def test__masked_scalar_arraymask(self):
self._check_copy(cube, cube.copy())

def test__lazy(self):
# Note: multiple chunks added as a workaround suggested to dask#3751,
# which is fixed in dask#3754.
cube = Cube(as_lazy_data(np.array([1, 0]), chunks=(1, 1)))
cube = Cube(as_lazy_data(np.array([1, 0])))
self._check_copy(cube, cube.copy())


Expand Down
15 changes: 8 additions & 7 deletions lib/iris/tests/unit/fileformats/netcdf/test__get_cf_var_data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2018, Met Office
# (C) British Crown Copyright 2019, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -26,7 +26,7 @@
from dask.array import Array as dask_array
import numpy as np

from iris._lazy_data import _limited_shape
from iris._lazy_data import _optimum_chunksize
import iris.fileformats.cf
from iris.fileformats.netcdf import _get_cf_var_data
from iris.tests import mock
Expand All @@ -35,8 +35,8 @@
class Test__get_cf_var_data(tests.IrisTest):
def setUp(self):
self.filename = 'DUMMY'
self.shape = (3, 240, 200)
self.expected_chunks = _limited_shape(self.shape)
self.shape = (300000, 240, 200)
self.expected_chunks = _optimum_chunksize(self.shape, self.shape)

def _make(self, chunksizes):
cf_data = mock.Mock(_FillValue=None)
Expand All @@ -55,15 +55,16 @@ def test_cf_data_type(self):
self.assertIsInstance(lazy_data, dask_array)

def test_cf_data_chunks(self):
chunks = [1, 12, 100]
chunks = [2500, 240, 200]
cf_var = self._make(chunks)
lazy_data = _get_cf_var_data(cf_var, self.filename)
lazy_data_chunks = [c[0] for c in lazy_data.chunks]
self.assertArrayEqual(chunks, lazy_data_chunks)
expected_chunks = _optimum_chunksize(chunks, self.shape)
self.assertArrayEqual(lazy_data_chunks, expected_chunks)

def test_cf_data_no_chunks(self):
# No chunks means chunks are calculated from the array's shape by
# `iris._lazy_data._limited_shape()`.
# `iris._lazy_data._optimum_chunksize()`.
chunks = None
cf_var = self._make(chunks)
lazy_data = _get_cf_var_data(cf_var, self.filename)
Expand Down
Loading

0 comments on commit f402a19

Please sign in to comment.