diff --git a/cf/data/data.py b/cf/data/data.py index fa5eeb48bc..24105fcfe7 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -213,7 +213,7 @@ def __init__( copy=True, dtype=None, mask=None, - persist=False, + to_memory=False, init_options=None, _use_array=True, ): @@ -324,18 +324,26 @@ def __init__( .. versionadded:: TODODASK - persist: `bool`, optional - If True then persist the underlying array into memory, - equivalent to calling `persist` on the data - immediately after initialisation. + to_memory: `bool`, optional + If True then ensure that the original data are in + memory, rather than on disk. - If the original data are on disk then reading data + If the original data are on disk, then reading data into memory during initialisation will slow down the initialisation process, but can considerably improve downstream performance by avoiding the need for independent reads for every dask chunk, each time the data are computed. + In general, setting *to_memory* to True is not the same + as calling the `persist` of the newly created `Data` + object, which also decompresses data compressed by + convention and computes any data type, mask and + date-time modifications. + + If the input *array* is a `dask.array.Array` object + then *to_memory* is ignored. + .. versionadded:: TODODASK init_options: `dict`, optional @@ -476,11 +484,20 @@ def __init__( "for compressed input arrays" ) + # Bring the compressed data into memory without + # decompressing it + if to_memory: + try: + array = array.to_memory() + except AttributeError: + pass + # Save the input compressed array, as this will contain # extra information, such as a count or index variable. self._set_Array(array) array = compressed_to_dask(array, chunks) + elif not is_dask_collection(array): # Turn the data into a dask array kwargs = init_options.get("from_array", {}) @@ -491,6 +508,13 @@ def __init__( "Use the 'chunks' parameter instead." ) + # Bring the data into memory + if to_memory: + try: + array = array.to_memory() + except AttributeError: + pass + array = to_dask(array, chunks, **kwargs) elif chunks != _DEFAULT_CHUNKS: @@ -534,10 +558,6 @@ def __init__( if mask is not None: self.where(mask, cf_masked, inplace=True) - # Bring the data into memory - if persist: - self.persist(inplace=True) - @property def dask_compressed_array(self): """TODODASK. @@ -8994,99 +9014,6 @@ def to_dask_array(self): """ return self._custom["dask"] - def to_disk(self): - """Store the data array on disk. - - There is no change to partition's whose sub-arrays are already on - disk. - - :Returns: - - `None` - - **Examples** - - >>> d.to_disk() - - """ - print("TODODASK - ???") - config = self.partition_configuration(readonly=True, to_disk=True) - - for partition in self.partitions.matrix.flat: - if partition.in_memory: - partition.open(config) - partition.array - partition.close() - - def to_memory(self, regardless=False, parallelise=False): - """Store each partition's data in memory in place if the master - array is smaller than the chunk size. - - There is no change to partitions with data that are already in memory. - - :Parameters: - - regardless: `bool`, optional - If True then store all partitions' data in memory - regardless of the size of the master array. By default - only store all partitions' data in memory if the master - array is smaller than the chunk size. - - parallelise: `bool`, optional - If True than only move those partitions to memory that are - flagged for processing on this rank. - - :Returns: - - `None` - - **Examples** - - >>> d.to_memory() - >>> d.to_memory(regardless=True) - - """ - print("TODODASK - ???") - config = self.partition_configuration(readonly=True) - fm_threshold = cf_fm_threshold() - - # If parallelise is False then all partitions are flagged for - # processing on this rank, otherwise only a subset are - self._flag_partitions_for_processing(parallelise) - - for partition in self.partitions.matrix.flat: - if partition._process_partition: - # Only move the partition to memory if it is flagged - # for processing - partition.open(config) - if ( - partition.on_disk - and partition.nbytes <= free_memory() - fm_threshold - ): - partition.array - - partition.close() - # --- End: for - - @property - def in_memory(self): - """True if the array is retained in memory. - - :Returns: - - **Examples** - - >>> d.in_memory - - """ - print("TODODASK - ???") - for partition in self.partitions.matrix.flat: - if not partition.in_memory: - return False - # --- End: for - - return True - @daskified(_DASKIFIED_VERBOSE) def datum(self, *index): """Return an element of the data array as a standard Python @@ -10125,17 +10052,6 @@ def swapaxes(self, axis0, axis1, inplace=False, i=False): d._set_dask(dx, reset_mask_hardness=False) return d - def save_to_disk(self, itemsize=None): - """cf.Data.save_to_disk is dead. - - Use not cf.Data.fits_in_memory instead. - - """ - raise NotImplementedError( - "cf.Data.save_to_disk is dead. Use not " - "cf.Data.fits_in_memory instead." - ) - def fits_in_memory(self, itemsize): """Return True if the master array is small enough to be retained in memory. @@ -10919,6 +10835,18 @@ def tolist(self): """ return self.array.tolist() + @daskified(_DASKIFIED_VERBOSE) + def to_memory(self): + """Bring data on disk into memory. + + Not implemented. Consider using `persist` instead. + + """ + raise NotImplementedError( + "'Data.to_memory' is not available. " + "Consider using 'Data.persist' instead." + ) + @daskified(_DASKIFIED_VERBOSE) @_deprecated_kwarg_check("i") @_inplace_enabled(default=False) diff --git a/cf/data/mixin/deprecations.py b/cf/data/mixin/deprecations.py index aa36ec426c..6c73d5aeec 100644 --- a/cf/data/mixin/deprecations.py +++ b/cf/data/mixin/deprecations.py @@ -107,8 +107,23 @@ def dtvarray(self): """Deprecated at version 3.0.0.""" _DEPRECATION_ERROR_ATTRIBUTE(self, "dtvarray") # pragma: no cover + @property + def in_memory(self): + """True if the array is retained in memory. + + Deprecated at version TODODASK. + + """ + _DEPRECATION_ERROR_ATTRIBUTE( + self, + "in_memory", + version="TODODASK", + removed_at="5.0.0", + ) # pragma: no cover + def files(self): - """Deprecated at version 3.4.0, use method `get_` instead.""" + """Deprecated at version 3.4.0, use method `get_filenames` + instead.""" _DEPRECATION_ERROR_METHOD( self, "files", @@ -625,6 +640,38 @@ def partition_boundaries(self): "TODODASK - consider using 'chunks' instead" ) # pragma: no cover + def save_to_disk(self, itemsize=None): + """Deprecated.""" + _DEPRECATION_ERROR_METHOD( + self, + "save_to_disk", + removed_at="4.0.0", + ) # pragma: no cover + + def to_disk(self): + """Store the data array on disk. + + Deprecated at version TODODASK. + + There is no change to partitions whose sub-arrays are already + on disk. + + :Returns: + + `None` + + **Examples** + + >>> d.to_disk() + + """ + _DEPRECATION_ERROR_METHOD( + self, + "to_disk", + version="TODODASK", + removed_at="5.0.0", + ) # pragma: no cover + @staticmethod def seterr(all=None, divide=None, over=None, under=None, invalid=None): """Set how floating-point errors in the results of arithmetic diff --git a/cf/functions.py b/cf/functions.py index a7c085da66..de142c0e40 100644 --- a/cf/functions.py +++ b/cf/functions.py @@ -3220,11 +3220,11 @@ def _DEPRECATION_ERROR_ATTRIBUTE( ): if removed_at: removed_at = f" and will be removed at version {removed_at}" - - warnings.warn( + + raise DeprecationError( f"{instance.__class__.__name__} attribute {attribute!r} has been " - f"deprecated at version {version}{removed_at}. {message}", - DeprecationWarning, + f"deprecated at version {version} and will be removed at version " + f"{removed_at}. {message}" )