Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporary decorator to mark & log 'daskified' methods #245

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion cf/data/data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import operator
from functools import partial, reduce
from functools import partial, reduce, wraps
from itertools import product
from json import dumps as json_dumps
from json import loads as json_loads
Expand Down Expand Up @@ -123,6 +123,48 @@
logger = logging.getLogger(__name__)


def daskified(apply_temp_log_level=None):
def decorator(method):
"""Temporary decorator to mark and log methods migrated to Dask.

A log level argument will set the log level throughout the call of
the method to that level and then reset it back to the previous
global level. A message will also be emitted to indicate whenever
the method is called, unless no argument is given [daskified()]
in which case the decorator does nothing except mark methods
which are considered to be daskified, a main purpose for this
decorator.

Note: for properties the decorator must be placed underneath the
property decorator so it is called before and not after it.

"""

@wraps(method)
def wrapper(*args, **kwargs):
if apply_temp_log_level is None: # distingush from 0
return method(*args, **kwargs)

original_global_log_level = log_level()
# Switch log level for the duration of the method call, with an
# initial message to indicate a run first guaranteed to show
log_level(apply_temp_log_level)
# Not actually a warning, but setting as warning ensures it shows
# (unless logging is disabled, but ignore that complication for
# this temporary and informal decorator!)
logger.warning(f"%%%%% Running daskified {method.__name__} %%%%%")

out = method(*args, **kwargs)

# ... then return the log level to the global level afterwards
log_level(original_global_log_level)
return out

return wrapper

return decorator


# --------------------------------------------------------------------
# Constants
# --------------------------------------------------------------------
Expand Down Expand Up @@ -573,6 +615,7 @@ def dask_compressed_array(self):

return ca._get_dask().copy()

@daskified(1)
def __contains__(self, value):
"""Membership test operator ``in``

Expand Down Expand Up @@ -864,6 +907,7 @@ def __repr__(self):
"""
return super().__repr__().replace("<", "<CF ", 1)

@daskified(1)
def __getitem__(self, indices):
"""Return a subspace of the data defined by indices.

Expand Down Expand Up @@ -970,6 +1014,7 @@ def __getitem__(self, indices):

return new

@daskified(1)
def __setitem__(self, indices, value):
"""Implement indexed assignment.

Expand Down Expand Up @@ -5458,6 +5503,7 @@ def _hardmask(self):
del self._custom["_hardmask"]

@property
@daskified(1)
def _axes(self):
"""Storage for the axis identifiers.

Expand Down Expand Up @@ -5515,6 +5561,7 @@ def force_compute(self, value):
# Attributes
# ----------------------------------------------------------------
@property
@daskified(1)
def Units(self):
"""The `cf.Units` object containing the units of the data array.

Expand Down Expand Up @@ -5594,6 +5641,7 @@ def data(self):
return self

@property
@daskified(1)
def dtype(self):
"""The `numpy` data-type of the data.

Expand Down Expand Up @@ -5669,6 +5717,7 @@ def fill_value(self):
self.del_fill_value(None)

@property
@daskified(1)
def hardmask(self):
"""Hardness of the mask.

Expand Down Expand Up @@ -5742,6 +5791,7 @@ def ismasked(self):
_DEPRECATION_ERROR_METHOD("TODODASK use is_masked instead")

@property
@daskified(1)
def is_masked(self):
"""True if the data array has any masked values.

Expand Down Expand Up @@ -5819,6 +5869,7 @@ def isscalar(self):
return not self.ndim

@property
@daskified(1)
def nbytes(self):
"""Total number of bytes consumed by the elements of the array.

Expand Down Expand Up @@ -5846,6 +5897,7 @@ def nbytes(self):
# TODODASK - what about nans (e.g. after da.unique)

@property
@daskified(1)
def ndim(self):
"""Number of dimensions in the data array.

Expand Down Expand Up @@ -5876,6 +5928,7 @@ def ndim(self):
return dx.ndim

@property
@daskified(1)
def shape(self):
"""Tuple of the data array's dimension sizes.

Expand Down Expand Up @@ -5905,6 +5958,7 @@ def shape(self):
# TODODASK - what about nans (e.g. after da.unique dx.shape -> (nan,))

@property
@daskified(1)
def size(self):
"""Number of elements in the data array.

Expand Down Expand Up @@ -5937,6 +5991,7 @@ def size(self):
# TODODASK - what about nans (e.g. after da.unique)

@property
@daskified(1)
def array(self):
"""A numpy array copy the data array.

Expand Down Expand Up @@ -5980,6 +6035,7 @@ def array(self):
return a

@property
@daskified(1)
def datetime_array(self):
"""An independent numpy array of date-time objects.

Expand Down Expand Up @@ -6713,6 +6769,7 @@ def arccosh(self, inplace=False):

return d

@daskified()
def add_partitions(self, extra_boundaries, pdim):
"""Add partition boundaries.

Expand Down Expand Up @@ -8938,6 +8995,7 @@ def exp(self, inplace=False, i=False):

return d

@daskified(1)
@_inplace_enabled(default=False)
def insert_dimension(self, position=0, inplace=False):
"""Expand the shape of the data array in place.
Expand Down Expand Up @@ -10584,6 +10642,7 @@ def mid_range(
_preserve_partitions=_preserve_partitions,
)

@daskified(1)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def flip(self, axes=None, inplace=False, i=False):
Expand Down Expand Up @@ -11952,6 +12011,7 @@ def log(self, base=None, inplace=False, i=False):

return d

@daskified(1)
@_deprecated_kwarg_check("i")
@_inplace_enabled(default=False)
def squeeze(self, axes=None, inplace=False, i=False):
Expand Down Expand Up @@ -12554,6 +12614,7 @@ def range(
_preserve_partitions=_preserve_partitions,
)

@daskified(1)
@_inplace_enabled(default=False)
@_deprecated_kwarg_check("i")
def roll(self, axis, shift, inplace=False, i=False):
Expand Down