Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mergeback of FEATURE_chunk_control branch #5588

Merged
merged 8 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 135 additions & 66 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _optimum_chunksize_internals(
shape,
limit=None,
dtype=np.dtype("f4"),
dims_fixed=None,
dask_array_chunksize=dask.config.get("array.chunk-size"),
):
"""
Expand All @@ -80,6 +81,11 @@ def _optimum_chunksize_internals(
:mod:`dask.config`.
* dtype (np.dtype):
Numpy dtype of target data.
* dims_fixed (list of bool):
If set, a list of values equal in length to 'chunks' or 'shape'.
'True' values indicate a dimension that can not be changed, i.e. that
element of the result must equal the corresponding value in 'chunks' or
data.shape.

Returns:
* chunk (tuple of int):
Expand All @@ -100,6 +106,9 @@ def _optimum_chunksize_internals(
"chunks = [c[0] for c in normalise_chunks('auto', ...)]".

"""
if chunks is None:
chunks = list(shape)
trexfeathers marked this conversation as resolved.
Show resolved Hide resolved

# Set the chunksize limit.
if limit is None:
# Fetch the default 'optimal' chunksize from the dask config.
Expand All @@ -109,58 +118,90 @@ def _optimum_chunksize_internals(

point_size_limit = limit / dtype.itemsize

# Create result chunks, starting with a copy of the input.
result = list(chunks)

if np.prod(result) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying later
# (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(result) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(result))
new_dim = result[i_expand] * int(factor)
if new_dim >= shape[i_expand]:
# Clip to dim size : chunk dims must not exceed the full shape.
new_dim = shape[i_expand]
else:
# 'new_dim' is less than the relevant dim of 'shape' -- but it
# is also the largest possible multiple of the input-chunks,
# within the size limit.
# So : 'i_expand' is the outer (last) dimension over which we
# will multiply the input chunks, and 'new_dim' is a value that
# ensures the fewest possible chunks within that dim.

# Now replace 'new_dim' with the value **closest to equal-size
# chunks**, for the same (minimum) number of chunks.
# More-equal chunks are practically better.
# E.G. : "divide 8 into multiples of 2, with a limit of 7",
# produces new_dim=6, which would mean chunks of sizes (6, 2).
# But (4, 4) is clearly better for memory and time cost.

# Calculate how many (expanded) chunks fit into this dimension.
dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim)
# Get "ideal" (equal) size for that many chunks.
ideal_equal_chunk_size = shape[i_expand] / dim_chunks
# Use the nearest whole multiple of input chunks >= ideal.
new_dim = int(
result[i_expand]
* np.ceil(ideal_equal_chunk_size / result[i_expand])
)

result[i_expand] = new_dim
i_expand -= 1
if dims_fixed is not None:
if not np.any(dims_fixed):
dims_fixed = None

if dims_fixed is None:
# Get initial result chunks, starting with a copy of the input.
working = list(chunks)
else:
# Adjust the operation to ignore the 'fixed' dims.
# (We reconstruct the original later, before return).
chunks = np.array(chunks)
dims_fixed_arr = np.array(dims_fixed)
# Reduce the target size by the fixed size of all the 'fixed' dims.
point_size_limit = point_size_limit // np.prod(chunks[dims_fixed_arr])
# Work on only the 'free' dims.
original_shape = tuple(shape)
shape = tuple(np.array(shape)[~dims_fixed_arr])
working = list(chunks[~dims_fixed_arr])

if len(working) >= 1:
if np.prod(working) < point_size_limit:
# If size is less than maximum, expand the chunks, multiplying
# later (i.e. inner) dims first.
i_expand = len(shape) - 1
while np.prod(working) < point_size_limit and i_expand >= 0:
factor = np.floor(point_size_limit * 1.0 / np.prod(working))
new_dim = working[i_expand] * int(factor)
if new_dim >= shape[i_expand]:
# Clip to dim size : must not exceed the full shape.
new_dim = shape[i_expand]
else:
# 'new_dim' is less than the relevant dim of 'shape' -- but
# it is also the largest possible multiple of the
# input-chunks, within the size limit.
# So : 'i_expand' is the outer (last) dimension over which
# we will multiply the input chunks, and 'new_dim' is a
# value giving the fewest possible chunks within that dim.

# Now replace 'new_dim' with the value **closest to
# equal-size chunks**, for the same (minimum) number of
# chunks. More-equal chunks are practically better.
# E.G. : "divide 8 into multiples of 2, with a limit of 7",
# produces new_dim=6, meaning chunks of sizes (6, 2).
# But (4, 4) is clearly better for memory and time cost.

# Calculate how many (expanded) chunks fit in this dim.
dim_chunks = np.ceil(shape[i_expand] * 1.0 / new_dim)
# Get "ideal" (equal) size for that many chunks.
ideal_equal_chunk_size = shape[i_expand] / dim_chunks
# Use the nearest whole multiple of input chunks >= ideal.
new_dim = int(
working[i_expand]
* np.ceil(ideal_equal_chunk_size / working[i_expand])
)

working[i_expand] = new_dim
i_expand -= 1
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(working) > point_size_limit:
factor = np.ceil(np.prod(working) / point_size_limit)
new_dim = int(working[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
working[i_reduce] = new_dim
i_reduce += 1

working = tuple(working)

if dims_fixed is None:
result = working
else:
# Similarly, reduce if too big, reducing earlier (outer) dims first.
i_reduce = 0
while np.prod(result) > point_size_limit:
factor = np.ceil(np.prod(result) / point_size_limit)
new_dim = int(result[i_reduce] / factor)
if new_dim < 1:
new_dim = 1
result[i_reduce] = new_dim
i_reduce += 1
# Reconstruct the original form
result = []
for i_dim in range(len(original_shape)):
if dims_fixed[i_dim]:
dim = chunks[i_dim]
else:
dim = working[0]
working = working[1:]
result.append(dim)

return tuple(result)
return result


@wraps(_optimum_chunksize_internals)
Expand All @@ -169,6 +210,7 @@ def _optimum_chunksize(
shape,
limit=None,
dtype=np.dtype("f4"),
dims_fixed=None,
):
# By providing dask_array_chunksize as an argument, we make it so that the
# output of _optimum_chunksize_internals depends only on its arguments (and
Expand All @@ -178,11 +220,14 @@ def _optimum_chunksize(
tuple(shape),
limit=limit,
dtype=dtype,
dims_fixed=dims_fixed,
dask_array_chunksize=dask.config.get("array.chunk-size"),
)


def as_lazy_data(data, chunks=None, asarray=False):
def as_lazy_data(
data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False
):
"""
Convert the input array `data` to a :class:`dask.array.Array`.

Expand All @@ -201,6 +246,16 @@ def as_lazy_data(data, chunks=None, asarray=False):
If True, then chunks will be converted to instances of `ndarray`.
Set to False (default) to pass passed chunks through unchanged.

* dims_fixed (list of bool):
If set, a list of values equal in length to 'chunks' or data.ndim.
'True' values indicate a dimension which can not be changed, i.e. the
result for that index must equal the value in 'chunks' or data.shape.

* dask_chunking (bool):
If True, Iris chunking optimisation will be bypassed, and dask's default
chunking will be used instead. Including a value for chunks while dask_chunking
is set to True will result in a failure.

Returns:
The input array converted to a :class:`dask.array.Array`.

Expand All @@ -212,24 +267,38 @@ def as_lazy_data(data, chunks=None, asarray=False):
but reduced by a factor if that exceeds the dask default chunksize.

"""
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(chunks, shape=data.shape, dtype=data.dtype)

if dask_chunking:
if chunks is not None:
raise ValueError(
f"Dask chunking chosen, but chunks already assigned value {chunks}"
)
lazy_params = {"asarray": asarray, "meta": np.ndarray}
else:
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
chunks = list(data.shape)

# Adjust chunk size for better dask performance,
# NOTE: but only if no shape dimension is zero, so that we can handle the
# PPDataProxy of "raw" landsea-masked fields, which have a shape of (0, 0).
if all(elem > 0 for elem in data.shape):
# Expand or reduce the basic chunk shape to an optimum size.
chunks = _optimum_chunksize(
chunks,
shape=data.shape,
dtype=data.dtype,
dims_fixed=dims_fixed,
)
lazy_params = {
"chunks": chunks,
"asarray": asarray,
"meta": np.ndarray,
}
if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)
if not is_lazy_data(data):
data = da.from_array(
data, chunks=chunks, asarray=asarray, meta=np.ndarray
)
data = da.from_array(data, **lazy_params)
return data


Expand Down
Loading
Loading