Skip to content

Commit

Permalink
Add indicator support to merge (dask#8539)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Feb 28, 2024
1 parent 84e7c4d commit f86a031
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
3 changes: 3 additions & 0 deletions distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def merge_unpack(
suffixes: Suffixes,
left_index: bool,
right_index: bool,
indicator: bool = False,
):
from dask.dataframe.multi import merge_chunk

Expand All @@ -193,6 +194,7 @@ def merge_unpack(
suffixes=suffixes,
left_index=left_index,
right_index=right_index,
indicator=indicator,
)


Expand Down Expand Up @@ -430,5 +432,6 @@ def _construct_graph(self) -> dict[tuple | str, tuple]:
self.suffixes,
self.left_index,
self.right_index,
self.indicator,
)
return dsk
19 changes: 19 additions & 0 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,22 @@ async def test_merge_does_not_deadlock_if_worker_joins(c, s, a):
result = await result
expected = pd.merge(pdf1, pdf2, left_on="a", right_on="x")
assert_eq(result, expected, check_index=False)


@gen_cluster(client=True)
async def test_merge_indicator(c, s, a, b):
data = {
"id": [1, 2, 3],
"test": [4, 5, 6],
}
pdf = pd.DataFrame(data)
df = dd.from_pandas(pdf, npartitions=2)
result = df.merge(df, on="id", how="outer", indicator=True)
x = c.compute(result)
x = await x
expected = pdf.merge(pdf, on="id", how="outer", indicator=True)

pd.testing.assert_frame_equal(
x.sort_values("id", ignore_index=True),
expected.sort_values("id", ignore_index=True),
)

0 comments on commit f86a031

Please sign in to comment.