Skip to content

Commit

Permalink
WIP: Better parallel implementation of linear regression (#62)
Browse files Browse the repository at this point in the history
* started refactoring the the linregress function

* added nb, whats-new

Co-authored-by: jbusecke <[email protected]>
  • Loading branch information
Julius Busecke and jbusecke authored Apr 12, 2020
1 parent 1943d8e commit e9e4224
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 85 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ coverage.xml

# Sphinx documentation
docs/_build/
doc/_build/

# PyBuilder
target/
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ To install from source use pip:
### Plotting utilities
`xarrayutils.plotting` provides several small utility functions to make common tasks I find in my workflow in matplotlib easier. [Examples](doc/plotting.ipynb)


### Utilities for large scale climate data analysis
`xarrayutils.utils` provides some helpful tools to simplify common tasks in climate data
analysis workflows. [This notebook](doc/utils.ipynb), provides some examples on how to
use linear regression.
25 changes: 25 additions & 0 deletions doc/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: xarrayutils-docs
dependencies:
- python
- future
- pytest
- numpydoc
- sphinx
- sphinx_rtd_theme
- ipykernel
# Packages which are needed to execute your examples
# These will vary
- ipython
- matplotlib
- xarray
- netcdf4
- dask
- numpy
- nc-time-axis
- astropy
#
- pip:
- docrep
- nbsphinx
- jupyter_client
- jupyter-sphinx
328 changes: 328 additions & 0 deletions doc/utils.ipynb

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
.. currentmodule:: xarrayutils

What's New
===========

.. _whats-new.0.3.0:

v0.2.0 (unreleased)

Breaking changes
~~~~~~~~~~~~~~~~
- Refactored implementation of :py:meth:`utils.xr_linregress`: Scales better and
does not require rechunking the data anymore. No more `nanmask` option available
(:pull:`62`)
By `Julius Busecke <https://github.com/jbusecke>`_

New Features
~~~~~~~~~~~~

Bug fixes
~~~~~~~~~


Documentation
~~~~~~~~~~~~~
- Added example notebook for `xarrayutils.utils`


Internal Changes
~~~~~~~~~~~~~~~~



v0.1.3
----------------------
Changes not documented for this release and earlier
4 changes: 2 additions & 2 deletions xarrayutils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
__version__ = "0.1.0"
from . import utils
from . import numpy_utils
from . import xmitgcm_utils
from .utils import *

from ._version import get_versions
__version__ = get_versions()['version']

__version__ = get_versions()["version"]
del get_versions
91 changes: 69 additions & 22 deletions xarrayutils/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
extractBox_dict,
linear_trend,
_lin_trend_legacy,
_linregress_ufunc,
xr_linregress,
xr_detrend,
lag_and_combine,
Expand Down Expand Up @@ -147,17 +146,53 @@ def test_linregress_ufunc():
assert np.isnan(_linregress_ufunc(x, y, nanmask=True)).all()


@pytest.mark.parametrize("chunks", [None, {"x": -1, "y": 1}, {"x": 1, "y": 1}])
@pytest.mark.parametrize("variant", range(3))
@pytest.mark.parametrize("dtype", [None, np.float])
@pytest.mark.parametrize("nans", [False, True])
def test_xr_linregress(chunks, variant, dtype, nans):
a = xr.DataArray(np.random.rand(3, 13, 5), dims=["x", "time", "y"])
b = xr.DataArray(np.random.rand(3, 5, 13), dims=["x", "y", "time"])
def _linregress_ufunc(a, b, nanmask=False):
"""ufunc to wrap check output of `xr_linregress` against pure scipy results"""
if nanmask:
idxa = np.isnan(a)
idxb = np.isnan(b)
mask = np.logical_and(~idxa, ~idxb)
if sum(~mask) < len(b): # only applies the mask if not all nan
a = a[mask]
b = b[mask]
slope, intercept, r_value, p_value, std_err = stats.linregress(a, b)
return np.array([slope, intercept, r_value, p_value, std_err])


@pytest.mark.parametrize(
"chunks, dim",
[
(None, "time"),
({"x": -1, "y": 1}, "time"),
({"x": 1, "y": 1}, "time"),
({"x": -1, "y": 1}, "x"),
],
)
# @pytest.mark.parametrize("variant", range(3))
@pytest.mark.parametrize("variant", [0])
# @pytest.mark.parametrize("dtype", [None, np.float])
@pytest.mark.parametrize("dtype", [None])
# @pytest.mark.parametrize("nans", [False, True])
@pytest.mark.parametrize("nans", [True, "all"])
@pytest.mark.parametrize(
"ni, parameter", enumerate(["slope", "intercept", "r_value", "p_value", "std_err"])
)
def test_xr_linregress(chunks, dim, variant, dtype, nans, parameter, ni):
a = xr.DataArray(np.random.rand(6, 8, 5), dims=["x", "time", "y"])
b = xr.DataArray(np.random.rand(6, 5, 8), dims=["x", "y", "time"])
if nans:
# add nans at random positions
a.data[np.unravel_index(np.random.randint(0, 2 * 4 * 12, 10), a.shape)] = np.nan
b.data[np.unravel_index(np.random.randint(0, 2 * 4 * 12, 10), b.shape)] = np.nan
if nans == "all":
a = xr.ones_like(a) * np.nan
b = xr.ones_like(b) * np.nan

else:
# add nans at random positions
a.data[
np.unravel_index(np.random.randint(0, 5 * 7 * 3, 10), a.shape)
] = np.nan
b.data[
np.unravel_index(np.random.randint(0, 5 * 7 * 3, 10), b.shape)
] = np.nan

if chunks is not None:
if variant == 0:
Expand All @@ -168,16 +203,17 @@ def test_xr_linregress(chunks, variant, dtype, nans):
a = a.chunk(chunks)
b = b.chunk(chunks)

reg = xr_linregress(a, b, dtype=dtype)
for xx in range(len(a.x)):
for yy in range(len(a.y)):
pos = dict(x=xx, y=yy)
expected = _linregress_ufunc(a.isel(**pos), b.isel(**pos))
reg = xr_linregress(a, b, dim=dim)

dims = list(set(a.dims) - set([dim]))
for ii in range(len(a[dims[0]])):
for jj in range(len(a[dims[1]])):
pos = dict({dims[0]: ii, dims[1]: jj})

expected = _linregress_ufunc(a.isel(**pos), b.isel(**pos), nanmask=True)
reg_sub = reg.isel(**pos)
for ni, nn in enumerate(
["slope", "intercept", "r_value", "p_value", "std_err"]
):
np.testing.assert_allclose(reg_sub[nn].data, expected[ni])

np.testing.assert_allclose(reg_sub[parameter].data, expected[ni])


def test_linear_trend():
Expand All @@ -199,7 +235,12 @@ def test_linear_trend():
x_fit = t
y_fit = data[:, xi, yi]
fit = np.array(stats.linregress(x_fit, y_fit))
test = fit_da.sel(x=xi, y=yi).data
test = np.array(
[
fit_da.sel(x=xi, y=yi)[param].data
for param in ["slope", "intercept", "r_value", "p_value", "std_err"]
]
)
assert np.allclose(fit, test)

# Test with other timedim (previously was not caught)
Expand All @@ -217,7 +258,13 @@ def test_linear_trend():
x_fit = t
y_fit = data[xi, :, yi]
fit = np.array(stats.linregress(x_fit, y_fit))
assert np.allclose(fit, fit_da.sel(x=xi, y=yi).data)
test = test = np.array(
[
fit_da.sel(x=xi, y=yi)[param].data
for param in ["slope", "intercept", "r_value", "p_value", "std_err"]
]
)
assert np.allclose(fit, test)


# @pytest.mark.parametrize(
Expand Down
115 changes: 55 additions & 60 deletions xarrayutils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,8 @@
from dask.array.core import Array
import warnings
from xarrayutils.utilities import detect_dtype

# from xarrayutils.filtering import filter_1D as filter_1D_refactored
from xarrayutils.filtering import filter_1D as filter_1D_refactored

# from scipy import optimize
# import dask.array as dsa


"""
Collection of several useful routines for xarray
Expand All @@ -36,76 +31,76 @@ def shift_lon(ds, londim, shift=360, crit=0, smaller=True, sort=True):
return ds


def _linregress_ufunc(a, b, nanmask=False):
"""ufunc to wrap scipy.stats.linregress for xr_linregress"""
if nanmask:
idxa = np.isnan(a)
idxb = np.isnan(b)
mask = np.logical_and(~idxa, ~idxb)
if sum(~mask) < len(b): # only applies the mask if not all nan
a = a[mask]
b = b[mask]
slope, intercept, r_value, p_value, std_err = stats.linregress(a, b)
return np.array([slope, intercept, r_value, p_value, std_err])


def xr_linregress(a, b, dim="time", convert_to_dataset=True, dtype=None, nanmask=False):
"""Applies scipy.stats.linregress over two xr.DataArrays or xr.Datasets.
def xr_linregress(x, y, dim="time"):
"""Calculates linear regression along dimension `dim`.
Results are equivalent to `scipy.stats.linregress`.
Parameters
----------
a : {xr.DataArray}
x : {xr.DataArray}
Independent variable for linear regression. E.g. time.
b : {xr.DataArray, xr.Dataset}
y : {xr.DataArray, xr.Dataset}
Dependent variable.
dim : str
Dimension over which to perform linear regression.
Must be present in both `a` and `b` (the default is 'time').
convert_to_dataset : bool
Converts the output parameter to data_variables instead of an
additional dimension (the default is True).
dtype : dtype
Dtype for the output. If None, defaults to dtype of `b`, or `b`s
first data variable(the default is None).
nanmask: bool
optional masking of nans in the data to avoid nan output from
regression (the default is False)
Returns
-------
type(b)
Returns a dataarray containing the parameter values of
scipy.stats.linregress for each data_variable in `b`.
Returns a dataarray containing the parameter values
for each data_variable in `b`. The naming convention
follows `scipy.stats.linregress`
"""
# align the nan Values before...
x = x.where(~np.isnan(y))
y = y.where(~np.isnan(x))
# TODO: think about making this optional? Right now I err on the side of caution

# Inspired by this post https://stackoverflow.com/a/60352716 but adjusted, so that
# results are exactly as with scipy.stats.linregress for 1d vectors.

n = y.notnull().sum(dim)

nanmask = np.isnan(y).all(dim)

if dtype is None:
dtype = detect_dtype(b)

stats = xr.apply_ufunc(
_linregress_ufunc,
a,
b,
nanmask,
input_core_dims=[[dim], [dim], []],
output_core_dims=[["parameter"]],
vectorize=True,
dask="parallelized",
output_dtypes=[dtype],
output_sizes={"parameter": 5},
xmean = x.mean(dim)
ymean = y.mean(dim)
xstd = x.std(dim)
ystd = y.std(dim)

cov = ((x - xmean) * (y - ymean)).sum(dim) / (n)
cor = cov / (xstd * ystd)

slope = cov / (xstd ** 2)
intercept = ymean - xmean * slope

df = n - 2
TINY = 1.0e-20
tstats = cor * np.sqrt(df / ((1.0 - cor + TINY) * (1.0 + cor + TINY)))
stderr = slope / tstats

pval = (
xr.apply_ufunc(
stats.distributions.t.sf,
abs(tstats),
df,
dask="parallelized",
output_dtypes=[y.dtype],
)
* 2
)
stats["parameter"] = xr.DataArray(
["slope", "intercept", "r_value", "p_value", "std_err"], dims=["parameter"]

return xr.Dataset(
{
"slope": slope,
"intercept": intercept,
"r_value": cor.fillna(0).where(~nanmask),
"p_value": pval,
"std_err": stderr,
}
)
if convert_to_dataset:
# chug them all into a dataset
ds_stats = xr.Dataset()
for ff in stats.parameter.data:
ds_stats[ff] = stats.sel(parameter=ff)
out = ds_stats
else:
out = stats
return out


def linear_trend(obj, dim):
Expand All @@ -116,7 +111,7 @@ def linear_trend(obj, dim):
x = xr.DataArray(
np.arange(len(obj[dim])).astype(np.float), dims=dim, coords={dim: obj[dim]}
)
trend = xr_linregress(x, obj, dim=dim, convert_to_dataset=False)
trend = xr_linregress(x, obj, dim=dim)
return trend


Expand Down Expand Up @@ -701,7 +696,7 @@ def xr_detrend(b, dim="time", trend_params=None, convert_datetime=True):

# Create new time dataarray
trend_full = t_data * out.slope + out.intercept
trend_full = trend_full.assign_coords({dim:b[dim].data})
trend_full = trend_full.assign_coords({dim: b[dim].data})
return b - trend_full


Expand Down

0 comments on commit e9e4224

Please sign in to comment.