Skip to content

Commit

Permalink
Fix combine_first to support tupled names. (#1534)
Browse files Browse the repository at this point in the history
Fix `Series.combine_first` to support tupled names.

```py
>>> kser1 = ks.Series({"falcon": 330.0, "eagle": 160.0})
>>> kser2 = ks.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0})
>>> kser1.name = ("X", "A")
>>> kser2.name = ("Y", "B")
>>> kser1.combine_first(kser2)
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: 'Cannot resolve column name "__this_(\'X\', \'A\')" among (__index_level_0__, __this_(X, A), __that_(Y, B), __natural_order__);'
```
  • Loading branch information
ueshin authored May 25, 2020
1 parent 25117cb commit d5546c1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
16 changes: 9 additions & 7 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4544,26 +4544,28 @@ def combine_first(self, other):
if not isinstance(other, ks.Series):
raise ValueError("`combine_first` only allows `Series` for parameter `other`")
if same_anchor(self, other):
this = self.name
that = other.name
this = self._internal.spark_column
that = other._internal.spark_column
combined = self._kdf
else:
this = "__this_{}".format(self.name)
that = "__that_{}".format(other.name)
with option_context("compute.ops_on_diff_frames", True):
combined = combine_frames(self.to_frame(), other)
sdf = combined._sdf
this = combined["this"][self.name]._internal.spark_column
that = combined["that"][other.name]._internal.spark_column
# If `self` has missing value, use value of `other`
cond = F.when(sdf[this].isNull(), sdf[that]).otherwise(sdf[this])
cond = F.when(this.isNull(), that).otherwise(this)
# If `self` and `other` come from same frame, the anchor should be kept
if same_anchor(self, other):
return self._with_new_scol(cond)
index_scols = combined._internal.index_spark_columns
sdf = sdf.select(*index_scols, cond.alias(self.name)).distinct()
sdf = combined._internal.spark_frame.select(
*index_scols, cond.alias(self._internal.data_spark_column_names[0])
).distinct()
internal = InternalFrame(
spark_frame=sdf,
index_map=self._internal.index_map,
column_labels=self._internal.column_labels,
data_spark_columns=[scol_for(sdf, self._internal.data_spark_column_names[0])],
column_label_names=self._internal.column_label_names,
)
return first_series(ks.DataFrame(internal))
Expand Down
26 changes: 20 additions & 6 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,14 +1301,21 @@ def test_combine_first(self):
pser2 = kser2.to_pandas()

self.assert_eq(
repr(kser1.combine_first(kser2).sort_index()),
repr(pser1.combine_first(pser2).sort_index()),
kser1.combine_first(kser2).sort_index(), pser1.combine_first(pser2).sort_index()
)
with self.assertRaisesRegex(
ValueError, "`combine_first` only allows `Series` for parameter `other`"
):
kser1.combine_first(50)

kser1.name = ("X", "A")
kser2.name = ("Y", "B")
pser1.name = ("X", "A")
pser2.name = ("Y", "B")
self.assert_eq(
kser1.combine_first(kser2).sort_index(), pser1.combine_first(pser2).sort_index()
)

# MultiIndex
midx1 = pd.MultiIndex(
[["lama", "cow", "falcon", "koala"], ["speed", "weight", "length", "power"]],
Expand All @@ -1324,8 +1331,7 @@ def test_combine_first(self):
pser2 = kser2.to_pandas()

self.assert_eq(
repr(kser1.combine_first(kser2).sort_index()),
repr(pser1.combine_first(pser2).sort_index()),
kser1.combine_first(kser2).sort_index(), pser1.combine_first(pser2).sort_index()
)

# Series come from same DataFrame
Expand All @@ -1341,8 +1347,16 @@ def test_combine_first(self):
pser2 = kser2.to_pandas()

self.assert_eq(
repr(kser1.combine_first(kser2).sort_index()),
repr(pser1.combine_first(pser2).sort_index()),
kser1.combine_first(kser2).sort_index(), pser1.combine_first(pser2).sort_index()
)

kser1.name = ("X", "A")
kser2.name = ("Y", "B")
pser1.name = ("X", "A")
pser2.name = ("Y", "B")

self.assert_eq(
kser1.combine_first(kser2).sort_index(), pser1.combine_first(pser2).sort_index()
)

def test_udt(self):
Expand Down

0 comments on commit d5546c1

Please sign in to comment.