Skip to content
forked from pydata/xarray

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into init-zarr
Browse files Browse the repository at this point in the history
* upstream/main:
  Faster encoding functions. (pydata#8565)
  ENH: vendor SerializableLock from dask and use as default backend lock, adapt tests (pydata#8571)
  Silence a bunch of CachingFileManager warnings (pydata#8584)
  Bump actions/download-artifact from 3 to 4 (pydata#8556)
  Minimize duplication in `map_blocks` task graph (pydata#8412)
  [pre-commit.ci] pre-commit autoupdate (pydata#8578)
  ignore a `DeprecationWarning` emitted by `seaborn` (pydata#8576)
  Fix mypy type ignore (pydata#8564)
  Support for the new compression arguments. (pydata#7551)
  FIX: reverse index output of bottleneck move_argmax/move_argmin functions (pydata#8552)
  • Loading branch information
dcherian committed Jan 4, 2024
2 parents ce3b17d + 5f1f78f commit 1498c35
Show file tree
Hide file tree
Showing 19 changed files with 419 additions and 152 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/ci-additional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ jobs:
# Raise an error if there are warnings in the doctests, with `-Werror`.
# This is a trial; if it presents an problem, feel free to remove.
# See https://github.com/pydata/xarray/issues/7164 for more info.
# ignores:
# 1. h5py: see https://github.com/pydata/xarray/issues/8537
python -m pytest --doctest-modules xarray --ignore xarray/tests -Werror \
-W "ignore:h5py is running against HDF5 1.14.3:UserWarning"
#
# If dependencies emit warnings we can't do anything about, add ignores to
# `xarray/tests/__init__.py`.
python -m pytest --doctest-modules xarray --ignore xarray/tests -Werror
mypy:
name: Mypy
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pypi-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
name: Install Python
with:
python-version: "3.11"
- uses: actions/download-artifact@v3
- uses: actions/download-artifact@v4
with:
name: releases
path: dist
Expand Down Expand Up @@ -82,7 +82,7 @@ jobs:
id-token: write

steps:
- uses: actions/download-artifact@v3
- uses: actions/download-artifact@v4
with:
name: releases
path: dist
Expand All @@ -106,7 +106,7 @@ jobs:
id-token: write

steps:
- uses: actions/download-artifact@v3
- uses: actions/download-artifact@v4
with:
name: releases
path: dist
Expand Down
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@ repos:
files: ^xarray/
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: 'v0.1.6'
rev: 'v0.1.9'
hooks:
- id: ruff
args: ["--fix"]
# https://github.com/python/black#version-control-integration
- repo: https://github.com/psf/black
rev: 23.11.0
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 23.12.1
hooks:
- id: black-jupyter
- repo: https://github.com/keewis/blackdoc
rev: v0.3.9
hooks:
- id: blackdoc
exclude: "generate_aggregations.py"
additional_dependencies: ["black==23.11.0"]
additional_dependencies: ["black==23.12.1"]
- id: blackdoc-autoupdate-black
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.1
rev: v1.8.0
hooks:
- id: mypy
# Copied from setup.cfg
Expand Down
14 changes: 13 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ New Features

- :py:meth:`xr.cov` and :py:meth:`xr.corr` now support using weights (:issue:`8527`, :pull:`7392`).
By `Llorenç Lledó <https://github.com/lluritu>`_.
- Accept the compression arguments new in netCDF 1.6.0 in the netCDF4 backend.
See `netCDF4 documentation <https://unidata.github.io/netcdf4-python/#efficient-compression-of-netcdf-variables>`_ for details.
By `Markel García-Díez <https://github.com/markelg>`_. (:issue:`6929`, :pull:`7551`) Note that some
new compression filters needs plugins to be installed which may not be available in all netCDF distributions.

Breaking changes
~~~~~~~~~~~~~~~~
Expand All @@ -38,14 +42,22 @@ Deprecations
Bug fixes
~~~~~~~~~

- Reverse index output of bottleneck's rolling move_argmax/move_argmin functions (:issue:`8541`, :pull:`8552`).
By `Kai Mühlbauer <https://github.com/kmuehlbauer>`_.
- Vendor `SerializableLock` from dask and use as default lock for netcdf4 backends (:issue:`8442`, :pull:`8571`).
By `Kai Mühlbauer <https://github.com/kmuehlbauer>`_.


Documentation
~~~~~~~~~~~~~


Internal Changes
~~~~~~~~~~~~~~~~

- The implementation of :py:func:`map_blocks` has changed to minimize graph size and duplication of data.
This should be a strict improvement even though the graphs are not always embarassingly parallel any more.
Please open an issue if you spot a regression. (:pull:`8412`, :issue:`8409`).
By `Deepak Cherian <https://github.com/dcherian>`_.
- Remove null values before plotting. (:pull:`8535`).
By `Jimmy Westling <https://github.com/illviljan>`_.

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ module = [
"cf_units.*",
"cfgrib.*",
"cftime.*",
"cloudpickle.*",
"cubed.*",
"cupy.*",
"dask.types.*",
Expand Down
84 changes: 76 additions & 8 deletions xarray/backends/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,83 @@

import multiprocessing
import threading
import uuid
import weakref
from collections.abc import MutableMapping
from typing import Any

try:
from dask.utils import SerializableLock
except ImportError:
# no need to worry about serializing the lock
SerializableLock = threading.Lock # type: ignore
from collections.abc import Hashable, MutableMapping
from typing import Any, ClassVar
from weakref import WeakValueDictionary


# SerializableLock is adapted from Dask:
# https://github.com/dask/dask/blob/74e898f0ec712e8317ba86cc3b9d18b6b9922be0/dask/utils.py#L1160-L1224
# Used under the terms of Dask's license, see licenses/DASK_LICENSE.
class SerializableLock:
"""A Serializable per-process Lock
This wraps a normal ``threading.Lock`` object and satisfies the same
interface. However, this lock can also be serialized and sent to different
processes. It will not block concurrent operations between processes (for
this you should look at ``dask.multiprocessing.Lock`` or ``locket.lock_file``
but will consistently deserialize into the same lock.
So if we make a lock in one process::
lock = SerializableLock()
And then send it over to another process multiple times::
bytes = pickle.dumps(lock)
a = pickle.loads(bytes)
b = pickle.loads(bytes)
Then the deserialized objects will operate as though they were the same
lock, and collide as appropriate.
This is useful for consistently protecting resources on a per-process
level.
The creation of locks is itself not threadsafe.
"""

_locks: ClassVar[
WeakValueDictionary[Hashable, threading.Lock]
] = WeakValueDictionary()
token: Hashable
lock: threading.Lock

def __init__(self, token: Hashable | None = None):
self.token = token or str(uuid.uuid4())
if self.token in SerializableLock._locks:
self.lock = SerializableLock._locks[self.token]
else:
self.lock = threading.Lock()
SerializableLock._locks[self.token] = self.lock

def acquire(self, *args, **kwargs):
return self.lock.acquire(*args, **kwargs)

def release(self, *args, **kwargs):
return self.lock.release(*args, **kwargs)

def __enter__(self):
self.lock.__enter__()

def __exit__(self, *args):
self.lock.__exit__(*args)

def locked(self):
return self.lock.locked()

def __getstate__(self):
return self.token

def __setstate__(self, token):
self.__init__(token)

def __str__(self):
return f"<{self.__class__.__name__}: {self.token}>"

__repr__ = __str__


# Locks used by multiple backends.
Expand Down
25 changes: 17 additions & 8 deletions xarray/backends/netCDF4_.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ def _extract_nc4_variable_encoding(
"_FillValue",
"dtype",
"compression",
"significant_digits",
"quantize_mode",
"blosc_shuffle",
"szip_coding",
"szip_pixels_per_block",
"endian",
}
if lsd_okay:
valid_encodings.add("least_significant_digit")
Expand Down Expand Up @@ -497,20 +503,23 @@ def prepare_variable(
if name in self.ds.variables:
nc4_var = self.ds.variables[name]
else:
nc4_var = self.ds.createVariable(
default_args = dict(
varname=name,
datatype=datatype,
dimensions=variable.dims,
zlib=encoding.get("zlib", False),
complevel=encoding.get("complevel", 4),
shuffle=encoding.get("shuffle", True),
fletcher32=encoding.get("fletcher32", False),
contiguous=encoding.get("contiguous", False),
chunksizes=encoding.get("chunksizes"),
zlib=False,
complevel=4,
shuffle=True,
fletcher32=False,
contiguous=False,
chunksizes=None,
endian="native",
least_significant_digit=encoding.get("least_significant_digit"),
least_significant_digit=None,
fill_value=fill_value,
)
default_args.update(encoding)
default_args.pop("_FillValue", None)
nc4_var = self.ds.createVariable(**default_args)

nc4_var.setncatts(attrs)

Expand Down
20 changes: 12 additions & 8 deletions xarray/coding/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ class EncodedStringCoder(VariableCoder):
def __init__(self, allows_unicode=True):
self.allows_unicode = allows_unicode

def encode(self, variable, name=None):
def encode(self, variable: Variable, name=None) -> Variable:
dims, data, attrs, encoding = unpack_for_encoding(variable)

contains_unicode = is_unicode_dtype(data.dtype)
encode_as_char = encoding.get("dtype") == "S1"

if encode_as_char:
del encoding["dtype"] # no longer relevant

Expand All @@ -69,9 +68,12 @@ def encode(self, variable, name=None):
# TODO: figure out how to handle this in a lazy way with dask
data = encode_string_array(data, string_encoding)

return Variable(dims, data, attrs, encoding)
return Variable(dims, data, attrs, encoding)
else:
variable.encoding = encoding
return variable

def decode(self, variable, name=None):
def decode(self, variable: Variable, name=None) -> Variable:
dims, data, attrs, encoding = unpack_for_decoding(variable)

if "_Encoding" in attrs:
Expand All @@ -95,13 +97,15 @@ def encode_string_array(string_array, encoding="utf-8"):
return np.array(encoded, dtype=bytes).reshape(string_array.shape)


def ensure_fixed_length_bytes(var):
def ensure_fixed_length_bytes(var: Variable) -> Variable:
"""Ensure that a variable with vlen bytes is converted to fixed width."""
dims, data, attrs, encoding = unpack_for_encoding(var)
if check_vlen_dtype(data.dtype) == bytes:
if check_vlen_dtype(var.dtype) == bytes:
dims, data, attrs, encoding = unpack_for_encoding(var)
# TODO: figure out how to handle this with dask
data = np.asarray(data, dtype=np.bytes_)
return Variable(dims, data, attrs, encoding)
return Variable(dims, data, attrs, encoding)
else:
return var


class CharacterArrayCoder(VariableCoder):
Expand Down
16 changes: 9 additions & 7 deletions xarray/conventions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from xarray.core.pycompat import is_duck_dask_array
from xarray.core.utils import emit_user_level_warning
from xarray.core.variable import IndexVariable, Variable
from xarray.core.variable import Variable

CF_RELATED_DATA = (
"bounds",
Expand Down Expand Up @@ -97,10 +97,10 @@ def _infer_dtype(array, name=None):


def ensure_not_multiindex(var: Variable, name: T_Name = None) -> None:
if isinstance(var, IndexVariable) and isinstance(var.to_index(), pd.MultiIndex):
if isinstance(var._data, indexing.PandasMultiIndexingAdapter):
raise NotImplementedError(
f"variable {name!r} is a MultiIndex, which cannot yet be "
"serialized to netCDF files. Instead, either use reset_index() "
"serialized. Instead, either use reset_index() "
"to convert MultiIndex levels into coordinate variables instead "
"or use https://cf-xarray.readthedocs.io/en/latest/coding.html."
)
Expand Down Expand Up @@ -647,7 +647,9 @@ def cf_decoder(
return variables, attributes


def _encode_coordinates(variables, attributes, non_dim_coord_names):
def _encode_coordinates(
variables: T_Variables, attributes: T_Attrs, non_dim_coord_names
):
# calculate global and variable specific coordinates
non_dim_coord_names = set(non_dim_coord_names)

Expand Down Expand Up @@ -675,7 +677,7 @@ def _encode_coordinates(variables, attributes, non_dim_coord_names):
variable_coordinates[k].add(coord_name)

if any(
attr_name in v.encoding and coord_name in v.encoding.get(attr_name)
coord_name in v.encoding.get(attr_name, tuple())
for attr_name in CF_RELATED_DATA
):
not_technically_coordinates.add(coord_name)
Expand Down Expand Up @@ -742,7 +744,7 @@ def _encode_coordinates(variables, attributes, non_dim_coord_names):
return variables, attributes


def encode_dataset_coordinates(dataset):
def encode_dataset_coordinates(dataset: Dataset):
"""Encode coordinates on the given dataset object into variable specific
and global attributes.
Expand All @@ -764,7 +766,7 @@ def encode_dataset_coordinates(dataset):
)


def cf_encoder(variables, attributes):
def cf_encoder(variables: T_Variables, attributes: T_Attrs):
"""
Encode a set of CF encoded variables and attributes.
Takes a dicts of variables and attributes and encodes them
Expand Down
2 changes: 1 addition & 1 deletion xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
try:
from dask.delayed import Delayed
except ImportError:
Delayed = None # type: ignore
Delayed = None # type: ignore[misc,assignment]
try:
from iris.cube import Cube as iris_Cube
except ImportError:
Expand Down
2 changes: 1 addition & 1 deletion xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
try:
from dask.delayed import Delayed
except ImportError:
Delayed = None # type: ignore
Delayed = None # type: ignore[misc,assignment]
try:
from dask.dataframe import DataFrame as DaskDataFrame
except ImportError:
Expand Down
Loading

0 comments on commit 1498c35

Please sign in to comment.