Skip to content

Commit

Permalink
Merge pull request #56 from lincc-frameworks/wrap_map_partitions
Browse files Browse the repository at this point in the history
Better Ensure `reduce` returns a NestedFrame
  • Loading branch information
dougbrn authored Sep 30, 2024
2 parents f3b201e + 4594185 commit 7b9c408
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/nested_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,15 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame:
"""

# Handle meta shorthands to produce nestedframe output
# route standard dict meta to nestedframe
if isinstance(meta, dict):
series_dict = {item[0]: pd.Series(dtype=item[1]) for item in meta.items()}
meta = npd.NestedFrame(series_dict)
# reroute series meta to nestedframe, per consistency with nested-pandas
elif isinstance(meta, tuple) and len(meta) == 2: # len 2 to only try on proper series meta
meta = npd.NestedFrame(pd.Series(name=meta[0], dtype=meta[1]).to_frame())

# apply nested_pandas reduce via map_partitions
# wrap the partition in a npd.NestedFrame call for:
# https://github.com/lincc-frameworks/nested-dask/issues/21
Expand Down
26 changes: 26 additions & 0 deletions tests/nested_dask/test_nestedframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,32 @@ def reflect_inputs(*args):
assert pytest.approx(sum(res2.compute()[0]), 0.1) == 2488.960119


@pytest.mark.parametrize("meta", ["df", "series"])
def test_reduce_output_type(meta):
"""test the meta handling of reduce"""

a = npd.NestedFrame({"a": pd.Series([1, 2, 3], dtype=pd.ArrowDtype(pa.int64()))}, index=[0, 0, 1])
b = npd.NestedFrame({"b": pd.Series([1, 2], dtype=pd.ArrowDtype(pa.int64()))}, index=[0, 1])

ndf = b.add_nested(a, name="test")
nddf = nd.NestedFrame.from_pandas(ndf, npartitions=1)

if meta == "df":

def mean_arr(b, arr): # type: ignore
return {"b": b, "mean": np.mean(arr)} # type: ignore

reduced = nddf.reduce(mean_arr, "b", "test.a", meta={"b": int, "mean": float})
elif meta == "series":

def mean_arr(arr): # type: ignore
return np.mean(arr) # type: ignore

reduced = nddf.reduce(mean_arr, "test.a", meta=(0, "float"))
assert isinstance(reduced, nd.NestedFrame)
assert isinstance(reduced.compute(), npd.NestedFrame)


def test_to_parquet_combined(test_dataset, tmp_path):
"""test to_parquet when saving all layers to a single directory"""

Expand Down

0 comments on commit 7b9c408

Please sign in to comment.