From 591bc5b07e36c2b8527f53258eb89052e77a0e56 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 28 Feb 2024 00:36:11 +0100 Subject: [PATCH] Add indicator support to merge --- distributed/shuffle/_merge.py | 3 +++ distributed/shuffle/tests/test_merge.py | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index 2a6e801bf2..6796ab338c 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -172,6 +172,7 @@ def merge_unpack( suffixes: Suffixes, left_index: bool, right_index: bool, + indicator: bool = False, ): from dask.dataframe.multi import merge_chunk @@ -193,6 +194,7 @@ def merge_unpack( suffixes=suffixes, left_index=left_index, right_index=right_index, + indicator=indicator, ) @@ -430,5 +432,6 @@ def _construct_graph(self) -> dict[tuple | str, tuple]: self.suffixes, self.left_index, self.right_index, + self.indicator, ) return dsk diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index d135061875..1f6eb2f33e 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -539,3 +539,22 @@ async def test_merge_does_not_deadlock_if_worker_joins(c, s, a): result = await result expected = pd.merge(pdf1, pdf2, left_on="a", right_on="x") assert_eq(result, expected, check_index=False) + + +@gen_cluster(client=True) +async def test_merge_indicator(c, s, a, b): + data = { + "id": [1, 2, 3], + "test": [4, 5, 6], + } + pdf = pd.DataFrame(data) + df = dd.from_pandas(pdf, npartitions=2) + result = df.merge(df, on="id", how="outer", indicator=True) + x = c.compute(result) + x = await x + expected = pdf.merge(pdf, on="id", how="outer", indicator=True) + + pd.testing.assert_frame_equal( + x.sort_values("id", ignore_index=True), + expected.sort_values("id", ignore_index=True), + )