Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LAMA to Dask migration: Data.concatenate #425

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c2b5446
Apply basic dask.array.concatenate operation within Data.concatenate
sadielbartholomew Jul 6, 2022
6092a28
Data.concatenate: rename to fix duplication of a variable name
sadielbartholomew Jul 6, 2022
7ac8ea3
Apply conformance of units within Data.concatenate to fix test cases
sadielbartholomew Jul 6, 2022
bd341ac
Fix bad input in test_Data_concatenate for further test case passes
sadielbartholomew Jul 6, 2022
95125db
Convert scalar arrays to 1D in Data.concatenate to fix test cases
sadielbartholomew Jul 6, 2022
f923514
Fix broken NumPy call in test_Data_concatenate for full test pass
sadielbartholomew Jul 6, 2022
708a4bb
Remove test_Data_AUXILIARY_MASK given removal of relevant methods
sadielbartholomew Jul 4, 2022
846e5be
Add new test for Data.concatenate based on old _AUXILIARY_MASK test
sadielbartholomew Jul 4, 2022
adbf979
Remove all redundant LAMA-centered code from Data.concatenate
sadielbartholomew Jul 5, 2022
42e0536
Make new test_Data_concatenate more systematic and thorough
sadielbartholomew Jul 5, 2022
ad36b62
Extend test_Data_concatenate to check post-op. cyclicity of axes
sadielbartholomew Jul 25, 2022
dc9592c
Handle lost cyclicity for join axes in Data.concatenate
sadielbartholomew Jul 26, 2022
6836a71
Mark Data.concatenate as daskified (LAMA->Dask migration complete)
sadielbartholomew Jul 26, 2022
9f8672b
Update cf/data/data.py to copy data[0] array in Data.concatenate
sadielbartholomew Jul 27, 2022
4b34129
Data.concatenate: address feedback by deprecating kwarg
sadielbartholomew Jul 27, 2022
33bcdb7
Data.concatenate: address feedback by removing now-unused variable
sadielbartholomew Jul 27, 2022
051238d
Data.concatenate: address feedback by simplifying data array copying
sadielbartholomew Jul 27, 2022
4f61852
Data.concatenate: address feedback by fixing -/ve cyclic axis case
sadielbartholomew Jul 27, 2022
52f9580
Data.concatenate: address feedback by warning on cyclic axis spec.
sadielbartholomew Jul 27, 2022
aa5e43f
Data.concatenate: address feedback by managing data array copying
sadielbartholomew Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 26 additions & 205 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3401,6 +3401,7 @@ def _set_subspace(self, *args, **kwargs):
raise NotImplementedError("'cf.Data._set_subspace' is unavailable.")

@classmethod
@daskified(_DASKIFIED_VERBOSE)
def concatenate(cls, data, axis=0, _preserve=True):
sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved
"""Join a sequence of data arrays together.

Expand Down Expand Up @@ -3469,234 +3470,54 @@ def concatenate(cls, data, axis=0, _preserve=True):
)

data0 = data[0]
sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved
data = data[1:]
data_rest = data[1:]

if _preserve:
sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved
data0 = data0.copy()
else:
# If data0 appears more than once in the input data arrays
# then we need to copy it
for d in data:
for d in data_rest:
if d is data0:
data0 = data0.copy()
break
# --- End: if

# Turn a scalar array into a 1-d array
ndim = data0._ndim
if not ndim:
data0.insert_dimension(inplace=True)
ndim = 1

# ------------------------------------------------------------
# Check that the axis, shapes and units of all of the input
# data arrays are consistent
# ------------------------------------------------------------
if axis < 0:
axis += ndim
if not 0 <= axis < ndim:
raise ValueError(
"Can't concatenate: Invalid axis specification: Expected "
"-{0}<=axis<{0}, got axis={1}".format(ndim, axis)
)

shape0 = data0._shape
conformed_units_data = []
units0 = data0.Units
axis_p1 = axis + 1
for data1 in data:
shape1 = data1._shape
if (
shape0[axis_p1:] != shape1[axis_p1:]
or shape0[:axis] != shape1[:axis]
):
raise ValueError(
"Can't concatenate: All the input array axes except "
"for the concatenation axis must have the same size"
)
for index, data1 in enumerate(data):
# Turn any scalar array into a 1-d array
if not data1.ndim:
data1.insert_dimension(inplace=True)
sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved

# Check and conform, if necessary, the units of all inputs
if not units0.equivalent(data1.Units):
raise ValueError(
"Can't concatenate: All the input arrays must have "
"equivalent units"
)
# --- End: for

for i, data1 in enumerate(data):
if _preserve:
data1 = data1.copy()
elif not units0.equals(data1.Units): # conform for consistency
data1 = data[index]
data1_copy = data1.copy()
sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved
data1_copy.Units = units0
# Must add the copy, else the original (units) are processed
conformed_units_data.append(data1_copy)
else:
# If data1 appears more than once in the input data
# arrays then we need to copy it
for d in data[i + 1 :]:
if d is data1:
data1 = data1.copy()
break
# --- End: if
conformed_units_data.append(data1)

# Turn a scalar array into a 1-d array
if not data1._ndim:
data1.insert_dimension(inplace=True)
# Get data as dask arrays and apply concatenation operation
dxs = []
for data1 in conformed_units_data:
dxs.append(data1.to_dask_array())

shape1 = data1._shape

# ------------------------------------------------------------
# 1. Make sure that the internal names of the axes match
# ------------------------------------------------------------
axis_map = {}
if data1._pmsize < data0._pmsize:
for axis1, axis0 in zip(data1._axes, data0._axes):
axis_map[axis1] = axis0

data1._change_axis_names(axis_map)
else:
for axis1, axis0 in zip(data1._axes, data0._axes):
axis_map[axis0] = axis1

data0._change_axis_names(axis_map)
# --- End: if
data0._set_dask(da.concatenate(dxs, axis=axis))

# ------------------------------------------------------------
# Find the internal name of the concatenation axis
# ------------------------------------------------------------
Paxis = data0._axes[axis]

# ------------------------------------------------------------
# 2. Make sure that the aggregating axis is an axis of the
# partition matrix of both arrays and that the partition
# matrix axes are the same in both arrays (although, for
# now, they may have different orders)
#
# Note:
#
# a) This may involve adding new partition matrix axes to
# either or both of data0 and data1.
#
# b) If the aggregating axis needs to be added it is inserted
# as the outer (slowest varying) axis to reduce the
# likelihood of having to (expensively) transpose the
# partition matrix.
# ------------------------------------------------------------
for f, g in zip((data0, data1), (data1, data0)):

g_pmaxes = g.partitions.axes
if Paxis in g_pmaxes:
g_pmaxes = g_pmaxes[:]
g_pmaxes.remove(Paxis)

f_partitions = f.partitions
f_pmaxes = f_partitions.axes
for pmaxis in g_pmaxes[::-1] + [Paxis]:
if pmaxis not in f_pmaxes:
f_partitions.insert_dimension(pmaxis, inplace=True)

# if Paxis not in f_partitions.axes:
# f_partitions.insert_dimension(Paxis, inplace=True)
# --- End: for

# ------------------------------------------------------------
# 3. Make sure that aggregating axis is the outermost (slowest
# varying) axis of the partition matrix of data0
# ------------------------------------------------------------
ipmaxis = data0.partitions.axes.index(Paxis)
if ipmaxis:
data0.partitions.swapaxes(ipmaxis, 0, inplace=True)

# ------------------------------------------------------------
# 4. Make sure that the partition matrix axes of data1 are in
# the same order as those in data0
# ------------------------------------------------------------
pmaxes1 = data1.partitions.axes
ipmaxes = [
pmaxes1.index(pmaxis) for pmaxis in data0.partitions.axes
]
data1.partitions.transpose(ipmaxes, inplace=True)

# --------------------------------------------------------
# 5. Create new partition boundaries in the partition
# matrices of data0 and data1 so that their partition
# arrays may be considered as different slices of a
# common, larger hyperrectangular partition array.
#
# Note:
#
# * There is no need to add any boundaries across the
# concatenation axis.
# --------------------------------------------------------
boundaries0 = data0.partition_boundaries()
boundaries1 = data1.partition_boundaries()

for dim in data0.partitions.axes[1:]:

# Still here? Then see if there are any partition matrix
# boundaries to be created for this partition dimension
bounds0 = boundaries0[dim]
bounds1 = boundaries1[dim]

symmetric_diff = set(bounds0).symmetric_difference(bounds1)
if not symmetric_diff:
# The partition boundaries for this partition
# dimension are already the same in data0 and data1
continue

# Still here? Then there are some partition boundaries to
# be created for this partition dimension in data0 and/or
# data1.
for f, g, bf, bg in (
(data0, data1, bounds0, bounds1),
(data1, data0, bounds1, bounds0),
):
extra_bounds = [i for i in bg if i in symmetric_diff]
f.add_partitions(extra_bounds, dim)
# --- End: for
# --- End: for

# ------------------------------------------------------------
# 6. Concatenate data0 and data1 partition matrices
# ------------------------------------------------------------
# if data0._flip != data1._flip:
if data0._flip() != data1._flip():
data0._move_flip_to_partitions()
data1._move_flip_to_partitions()

matrix0 = data0.partitions.matrix
matrix1 = data1.partitions.matrix

new_pmshape = list(matrix0.shape)
new_pmshape[0] += matrix1.shape[0]

# Initialise an empty partition matrix with the new shape
new_matrix = np.empty(new_pmshape, dtype=object)

# Insert the data0 partition matrix
new_matrix[: matrix0.shape[0]] = matrix0

# Insert the data1 partition matrix
new_matrix[matrix0.shape[0] :] = matrix1

data0.partitions.matrix = new_matrix

# Update the location map of the partition matrix of data0
data0.partitions.set_location_map((Paxis,), (axis,))

# ------------------------------------------------------------
# 7. Update the size, shape and dtype of data0
# ------------------------------------------------------------
# original_shape0 = data0._shape

data0._size += data1._size

shape0 = list(shape0)
shape0[axis] += shape1[axis]
data0._shape = tuple(shape0)

dtype0 = data0.dtype
dtype1 = data1.dtype
if dtype0 != dtype1:
data0.dtype = np.result_type(dtype0, dtype1)
# Manage cyclicity of axes: if join axis was cyclic, it is no longer
cyclic_axes = data0._cyclic
sadielbartholomew marked this conversation as resolved.
Show resolved Hide resolved
if cyclic_axes:
# Since cyclicity is lost for the join axis
data0.cyclic(axes=axis, iscyclic=False)

# ------------------------------------------------------------
# Done
# ------------------------------------------------------------
return data0

def _move_flip_to_partitions(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can nuke this LAMA-only method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Do you think there are any / many more to nuke, since if there are I would prefer to do it another PR for separation of concerns? Otherwise for one (or maybe two) I can nuke them here as suggested.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, but flake8 should alert us to unused methods. I don't mind if you nuke this here, or there!

Expand Down
Loading