Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pangeo Climate Anomalies example #223

Closed
TomNicholas opened this issue Jun 22, 2023 · 10 comments
Closed

Pangeo Climate Anomalies example #223

TomNicholas opened this issue Jun 22, 2023 · 10 comments
Labels
benchmarks Example benchmark problem xarray-integration Uses or required for cubed-xarray integration

Comments

@TomNicholas
Copy link
Member

I tried to set up the problem of calculating the anomaly with respect to the group mean from pangeo-data/distributed-array-examples#4. I used fake data with the same chunking scheme instead of real data to make it easier to test.

I was not expecting this plan though 😬

image

No idea what's going on here, especially as #145 also has a groupby in it.


I set up the problem like this:

from datetime import datetime, timedelta
import numpy as np
time = np.arange(datetime(1979,1,1), datetime(2022,1,1), timedelta(hours=1)).astype('datetime64[ns]')
lat = np.linspace(-90.0, 90.0, 721)[::-1].astype(np.float32)
lon = np.linspace(0.0, 359.8, 1440).astype(np.float32)
def create_cubed_data(t_length):
  
    return xr.DataArray(
        name="asn",
        data=cubed.array_api.astype(cubed.random.random((t_length, 721, 1440), chunks=(31, -1, -1), spec=spec), np.float32),
        dims=['time', 'latitude', 'longitude'],
        coords={'time': time[:t_length], 'latitude': lat, 'longitude': lon}
    ).to_dataset()
datasets = {
    '1.5GB': create_cubed_data(372),
    '15GB':  create_cubed_data(3720),
    '150GB': create_cubed_data(37200),
    '1.5TB': create_cubed_data(372000),
}
1.5 GB dataset, 
1.5e+01 GB dataset, 
1.5e+02 GB dataset, 
1.5e+03 GB dataset
for scale, ds in datasets.items():
    print(f'{ds.nbytes / 1e9:.2} GB dataset, ')
workloads['1.5GB']['asn'].data.visualize()
@TomNicholas TomNicholas added xarray-integration Uses or required for cubed-xarray integration benchmarks Example benchmark problem labels Jun 22, 2023
@tomwhite
Copy link
Member

I think this is happening because Xarray's default groupby strategy indexes out each group as a new array and then applies the operation (mean in this case) to each array. Having a new array per group is obviously not a scalable approach. (BTW #145 has the same problem.)

To make this work efficiently we'd use Flox's map-reduce strategy, which is basically the same as Cubed's reduction implementation. This is tracked in xarray-contrib/flox#224.

@TomNicholas
Copy link
Member Author

TomNicholas commented Jun 25, 2023

Even once I build a plausible groupby graph (see xarray-contrib/flox#224 (comment)), the subtraction of the mean presents a problem.

def climate_anomaly(ds):
    mean = ds.groupby("time.dayofyear").mean(skipna=False, method='map-reduce')  # skipna=False to avoid eager load, see xarray issue #7243
    
    anomaly = ds - mean.sel(dayofyear=ds.time.dt.dayofyear)
    return anomaly

For the 1.5GB dataset it raises:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[17], line 1
----> 1 result = climate_anomaly(datasets['1.5GB'])
      2 result

Cell In[14], line 6, in climate_anomaly(ds)
      2 mean = ds.groupby("time.dayofyear").mean(skipna=False, method='map-reduce')  # skipna=False to avoid eager load, see xarray issue #7243
      4 print(mean)
----> 6 anomaly = ds - mean.sel(dayofyear=ds.time.dt.dayofyear)
      7 return anomaly

File ~/Documents/Work/Code/xarray/xarray/core/_typed_ops.py:19, in DatasetOpsMixin.__sub__(self, other)
     18 def __sub__(self, other):
---> 19     return self._binary_op(other, operator.sub)

File ~/Documents/Work/Code/xarray/xarray/core/dataset.py:6693, in Dataset._binary_op(self, other, f, reflexive, join)
   6691     self, other = align(self, other, join=align_type, copy=False)  # type: ignore[assignment]
   6692 g = f if not reflexive else lambda x, y: f(y, x)
-> 6693 ds = self._calculate_binary_op(g, other, join=align_type)
   6694 keep_attrs = _get_keep_attrs(default=False)
   6695 if keep_attrs:

File ~/Documents/Work/Code/xarray/xarray/core/dataset.py:6757, in Dataset._calculate_binary_op(self, f, other, join, inplace)
   6754 ds = self.coords.merge(other_coords)
   6756 if isinstance(other, Dataset):
-> 6757     new_vars = apply_over_both(
   6758         self.data_vars, other.data_vars, self.variables, other.variables
   6759     )
   6760 else:
   6761     other_variable = getattr(other, "variable", other)

File ~/Documents/Work/Code/xarray/xarray/core/dataset.py:6737, in Dataset._calculate_binary_op.<locals>.apply_over_both(lhs_data_vars, rhs_data_vars, lhs_vars, rhs_vars)
   6735 for k in lhs_data_vars:
   6736     if k in rhs_data_vars:
-> 6737         dest_vars[k] = f(lhs_vars[k], rhs_vars[k])
   6738     elif join in ["left", "outer"]:
   6739         dest_vars[k] = f(lhs_vars[k], np.nan)

File ~/Documents/Work/Code/xarray/xarray/core/_typed_ops.py:431, in VariableOpsMixin.__sub__(self, other)
    430 def __sub__(self, other):
--> 431     return self._binary_op(other, operator.sub)

File ~/Documents/Work/Code/xarray/xarray/core/variable.py:2705, in Variable._binary_op(self, other, f, reflexive)
   2702 attrs = self._attrs if keep_attrs else None
   2703 with np.errstate(all="ignore"):
   2704     new_data = (
-> 2705         f(self_data, other_data) if not reflexive else f(other_data, self_data)
   2706     )
   2707 result = Variable(dims, new_data, attrs=attrs)
   2708 return result

File ~/Documents/Work/Code/cubed/cubed/array_api/array_object.py:132, in Array.__sub__(self, other)
    130 if other is NotImplemented:
    131     return other
--> 132 return elemwise(np.subtract, self, other, dtype=result_type(self, other))

File ~/Documents/Work/Code/cubed/cubed/core/ops.py:302, in elemwise(func, dtype, *args)
    300 if dtype is None:
    301     raise ValueError("dtype must be specified for elemwise")
--> 302 return blockwise(
    303     func,
    304     expr_inds,
    305     *concat((a, tuple(range(a.ndim)[::-1])) for a in args),
    306     dtype=dtype,
    307 )

File ~/Documents/Work/Code/cubed/cubed/core/ops.py:206, in blockwise(func, out_ind, dtype, adjust_chunks, new_axes, align_arrays, target_store, *args, **kwargs)
    203     raise ValueError("Unknown dimension", new)
    205 if align_arrays:
--> 206     chunkss, arrays = unify_chunks(*args)
    207 else:
    208     inds = args[1::2]

File ~/Documents/Work/Code/cubed/cubed/core/ops.py:845, in unify_chunks(*args, **kwargs)
    835 chunks = tuple(
    836     chunkss[j]
    837     if a.shape[n] > 1
   (...)
    841     for n, j in enumerate(i)
    842 )
    843 if chunks != a.chunks and all(a.chunks):
    844     # this will raise if chunks are not regular
--> 845     chunksize = to_chunksize(chunks)
    846     arrays.append(rechunk(a, chunksize))
    847 else:

File ~/Documents/Work/Code/cubed/cubed/utils.py:104, in to_chunksize(chunkset)
     91 """Convert a chunkset to a chunk size for Zarr.
     92 
     93 Parameters
   (...)
    100 Tuple of ints
    101 """
    103 if not _check_regular_chunks(chunkset):
--> 104     raise ValueError("Array must have regular chunks")
    106 return tuple(c[0] for c in chunkset)

ValueError: Array must have regular chunks

For the 15GB dataset it doesn't raise (how??) but creates this graph

image

For the 150GB data is raises the same error again. Should probably not read too much into this but I thought it might be interesting to see for now.

@tomwhite
Copy link
Member

ValueError: Array must have regular chunks

It would be useful to see what the irregular chunking actually is so we can see how it came about - can you change the exception to print it?

@TomNicholas
Copy link
Member Author

It would be useful to see what the irregular chunking actually is so we can see how it came about - can you change the exception to print it?

Array must have regular chunks, but found chunks=((16, 15, 1, 16, 14, 2, 16, 13, 3, 16, 12, 4, 16, 11, 5, 16, 10, 6, 16, 9, 7, 16, 8, 8, 16, 7, 9, 16, 6, 10, 16, 5, 11, 16, 4), (721,), (1440,))

Is this a chunk for every group?

@dcherian
Copy link

dcherian commented Jun 26, 2023

Yes blockwise needs irregular chunks in general. It rechunks so that there is an integer number of groups per block

@TomNicholas
Copy link
Member Author

Yes blockwise needs irregular chunks in general. It rechunks so that there is an integer number of groups per block

Zarr doesn't yet support irregular yet chunks right? Seems to be tracked in ZEP003. So does that mean blockwise is a no-go for groupby in cubed for now?

@dcherian
Copy link

does that mean blockwise is a no-go for groupby in cubed

👍 It'll only work for the special case of equal sized groups (.groupby("time.hour") for example), when the algorithm decides to put an equal number of groups in each block.

@tomwhite
Copy link
Member

Zarr doesn't yet support irregular yet chunks right? Seems to be tracked in ZEP003. So does that mean blockwise is a no-go for groupby in cubed for now?

That's right. Note that if there was an early implementation of irregular chunks then we could use that since it's an implementation detail of Cubed (it's intermediate data).

See also #73

@TomNicholas
Copy link
Member Author

Do map-reduce and cohorts also have this problem?

@tomwhite
Copy link
Member

tomwhite commented Jun 7, 2024

Fixed in #476

@tomwhite tomwhite closed this as completed Jun 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
benchmarks Example benchmark problem xarray-integration Uses or required for cubed-xarray integration
Projects
None yet
Development

No branches or pull requests

3 participants