Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask: Data memory/disk #387

Merged
merged 14 commits into from
Apr 28, 2022
156 changes: 42 additions & 114 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def __init__(
copy=True,
dtype=None,
mask=None,
persist=False,
to_memory=False,
init_options=None,
_use_array=True,
):
Expand Down Expand Up @@ -324,18 +324,26 @@ def __init__(

.. versionadded:: TODODASK

persist: `bool`, optional
If True then persist the underlying array into memory,
equivalent to calling `persist` on the data
immediately after initialisation.
to_memory: `bool`, optional
If True then ensure that the original data are in
memory, rather than on disk.

If the original data are on disk then reading data
If the original data are on disk, then reading data
into memory during initialisation will slow down the
initialisation process, but can considerably improve
downstream performance by avoiding the need for
independent reads for every dask chunk, each time the
data are computed.

In general, setting *to_memory* to True is not the same
as calling the `persist` of the newly created `Data`
object, which also decompresses data compressed by
convention and computes any data type, mask and
date-time modifications.

If the input *array* is a `dask.array.Array` object
then *to_memory* is ignored.

.. versionadded:: TODODASK

init_options: `dict`, optional
Expand Down Expand Up @@ -476,11 +484,20 @@ def __init__(
"for compressed input arrays"
)

# Bring the compressed data into memory without
# decompressing it
if to_memory:
try:
array = array.to_memory()
except AttributeError:
pass

# Save the input compressed array, as this will contain
# extra information, such as a count or index variable.
self._set_Array(array)

array = compressed_to_dask(array, chunks)

elif not is_dask_collection(array):
# Turn the data into a dask array
kwargs = init_options.get("from_array", {})
Expand All @@ -491,6 +508,13 @@ def __init__(
"Use the 'chunks' parameter instead."
)

# Bring the data into memory
if to_memory:
try:
array = array.to_memory()
except AttributeError:
pass

array = to_dask(array, chunks, **kwargs)

elif chunks != _DEFAULT_CHUNKS:
Expand Down Expand Up @@ -534,10 +558,6 @@ def __init__(
if mask is not None:
self.where(mask, cf_masked, inplace=True)

# Bring the data into memory
if persist:
self.persist(inplace=True)

@property
def dask_compressed_array(self):
"""TODODASK.
Expand Down Expand Up @@ -8981,99 +9001,6 @@ def to_dask_array(self):
"""
return self._custom["dask"]

def to_disk(self):
"""Store the data array on disk.

There is no change to partition's whose sub-arrays are already on
disk.

:Returns:

`None`

**Examples**

>>> d.to_disk()

"""
print("TODODASK - ???")
config = self.partition_configuration(readonly=True, to_disk=True)

for partition in self.partitions.matrix.flat:
if partition.in_memory:
partition.open(config)
partition.array
partition.close()

def to_memory(self, regardless=False, parallelise=False):
"""Store each partition's data in memory in place if the master
array is smaller than the chunk size.

There is no change to partitions with data that are already in memory.

:Parameters:

regardless: `bool`, optional
If True then store all partitions' data in memory
regardless of the size of the master array. By default
only store all partitions' data in memory if the master
array is smaller than the chunk size.

parallelise: `bool`, optional
If True than only move those partitions to memory that are
flagged for processing on this rank.

:Returns:

`None`

**Examples**

>>> d.to_memory()
>>> d.to_memory(regardless=True)

"""
print("TODODASK - ???")
config = self.partition_configuration(readonly=True)
fm_threshold = cf_fm_threshold()

# If parallelise is False then all partitions are flagged for
# processing on this rank, otherwise only a subset are
self._flag_partitions_for_processing(parallelise)

for partition in self.partitions.matrix.flat:
if partition._process_partition:
# Only move the partition to memory if it is flagged
# for processing
partition.open(config)
if (
partition.on_disk
and partition.nbytes <= free_memory() - fm_threshold
):
partition.array

partition.close()
# --- End: for

@property
def in_memory(self):
"""True if the array is retained in memory.

:Returns:

**Examples**

>>> d.in_memory

"""
print("TODODASK - ???")
for partition in self.partitions.matrix.flat:
if not partition.in_memory:
return False
# --- End: for

return True

@daskified(_DASKIFIED_VERBOSE)
def datum(self, *index):
"""Return an element of the data array as a standard Python
Expand Down Expand Up @@ -10112,17 +10039,6 @@ def swapaxes(self, axis0, axis1, inplace=False, i=False):
d._set_dask(dx, reset_mask_hardness=False)
return d

def save_to_disk(self, itemsize=None):
"""cf.Data.save_to_disk is dead.

Use not cf.Data.fits_in_memory instead.

"""
raise NotImplementedError(
"cf.Data.save_to_disk is dead. Use not "
"cf.Data.fits_in_memory instead."
)

def fits_in_memory(self, itemsize):
"""Return True if the master array is small enough to be
retained in memory.
Expand Down Expand Up @@ -10906,6 +10822,18 @@ def tolist(self):
"""
return self.array.tolist()

@daskified(_DASKIFIED_VERBOSE)
def to_memory(self):
"""Bring data on disk into memory.

Not implemented. Consider using `persist` instead.

"""
raise NotImplementedError(
"'Data.to_memory' is not available. "
"Consider using 'Data.persist' instead."
)

@daskified(_DASKIFIED_VERBOSE)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
Expand Down
49 changes: 48 additions & 1 deletion cf/data/mixin/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,23 @@ def dtvarray(self):
"""Deprecated at version 3.0.0."""
_DEPRECATION_ERROR_ATTRIBUTE(self, "dtvarray") # pragma: no cover

@property
def in_memory(self):
"""True if the array is retained in memory.

Deprecated at version TODODASK.

davidhassell marked this conversation as resolved.
Show resolved Hide resolved
"""
_DEPRECATION_ERROR_ATTRIBUTE(
self,
"in_memory",
version="TODODASK",
removed_at="5.0.0",
) # pragma: no cover

def files(self):
"""Deprecated at version 3.4.0, use method `get_` instead."""
"""Deprecated at version 3.4.0, use method `get_filenames`
instead."""
_DEPRECATION_ERROR_METHOD(
self,
"files",
Expand Down Expand Up @@ -625,6 +640,38 @@ def partition_boundaries(self):
"TODODASK - consider using 'chunks' instead"
) # pragma: no cover

def save_to_disk(self, itemsize=None):
"""Deprecated."""
_DEPRECATION_ERROR_METHOD(
self,
"save_to_disk",
removed_at="4.0.0",
) # pragma: no cover

def to_disk(self):
"""Store the data array on disk.

Deprecated at version TODODASK.

There is no change to partitions whose sub-arrays are already
on disk.

:Returns:

`None`

**Examples**

>>> d.to_disk()

"""
_DEPRECATION_ERROR_METHOD(
self,
"to_disk",
version="TODODASK",
removed_at="5.0.0",
) # pragma: no cover

@staticmethod
def seterr(all=None, divide=None, over=None, under=None, invalid=None):
"""Set how floating-point errors in the results of arithmetic
Expand Down
8 changes: 4 additions & 4 deletions cf/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3220,11 +3220,11 @@ def _DEPRECATION_ERROR_ATTRIBUTE(
):
if removed_at:
removed_at = f" and will be removed at version {removed_at}"

warnings.warn(
raise DeprecationError(
f"{instance.__class__.__name__} attribute {attribute!r} has been "
f"deprecated at version {version}{removed_at}. {message}",
DeprecationWarning,
f"deprecated at version {version} and will be removed at version "
f"{removed_at}. {message}"
)


Expand Down