Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Series.combine_first #1290

Merged
merged 14 commits into from
Mar 25, 2020
1 change: 0 additions & 1 deletion databricks/koalas/missing/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class _MissingPandasLikeSeries(object):
between_time = unsupported_function("between_time")
bfill = unsupported_function("bfill")
combine = unsupported_function("combine")
combine_first = unsupported_function("combine_first")
cov = unsupported_function("cov")
divmod = unsupported_function("divmod")
droplevel = unsupported_function("droplevel")
Expand Down
61 changes: 60 additions & 1 deletion databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from pyspark.sql.window import Window

from databricks import koalas as ks # For running doctests and reference resolution in PyCharm.
from databricks.koalas.config import get_option
from databricks.koalas.config import get_option, option_context
from databricks.koalas.base import IndexOpsMixin
from databricks.koalas.exceptions import SparkPandasIndexingError
from databricks.koalas.frame import DataFrame
Expand Down Expand Up @@ -4335,6 +4335,65 @@ def pct_change(self, periods=1):

return self._with_new_scol((scol - prev_row) / prev_row)

def combine_first(self, other):
"""
Combine Series values, choosing the calling Series's values first.

Parameters
----------
other : Series
The value(s) to be combined with the `Series`.

Returns
-------
Series
The result of combining the Series with the other object.

See Also
--------
Series.combine : Perform elementwise operation on two Series
using a given function.

Notes
-----
Result index will be the union of the two indexes.

Examples
--------
>>> s1 = ks.Series([1, np.nan])
>>> s2 = ks.Series([3, 4])
>>> s1.combine_first(s2)
itholic marked this conversation as resolved.
Show resolved Hide resolved
0 1.0
1 4.0
Name: 0, dtype: float64
itholic marked this conversation as resolved.
Show resolved Hide resolved
"""
if not isinstance(other, ks.Series):
raise ValueError("`combine_first` only allows `Series` for parameter `other`")
if self._kdf is other._kdf:
this = self.name
that = other.name
combined = self._kdf
itholic marked this conversation as resolved.
Show resolved Hide resolved
else:
this = "__this_{}".format(self.name)
that = "__that_{}".format(other.name)
with option_context("compute.ops_on_diff_frames", True):
itholic marked this conversation as resolved.
Show resolved Hide resolved
combined = combine_frames(self.to_frame(), other)
sdf = combined._sdf
# If `self` has missing value, use value of `other`
cond = F.when(sdf[this].isNull(), sdf[that]).otherwise(sdf[this])
# If `self` and `other` come from same frame, the anchor should be kept
if self._kdf is other._kdf:
return self._with_new_scol(cond)
index_scols = combined._internal.index_spark_columns
sdf = sdf.select(*index_scols, cond.alias(self.name)).distinct()
internal = _InternalFrame(
spark_frame=sdf,
index_map=self._internal.index_map,
column_labels=self._internal.column_labels,
column_label_names=self._internal.column_label_names,
)
return _col(ks.DataFrame(internal))

def dot(self, other):
"""
Compute the dot product between the Series and the columns of other.
Expand Down
51 changes: 51 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,57 @@ def test_axes(self):
pser = kser.to_pandas()
self.assert_list_eq(kser.axes, pser.axes)

def test_combine_first(self):
kser1 = ks.Series({"falcon": 330.0, "eagle": 160.0})
kser2 = ks.Series({"falcon": 345.0, "eagle": 200.0, "duck": 30.0})
pser1 = kser1.to_pandas()
pser2 = kser2.to_pandas()

self.assert_eq(
repr(kser1.combine_first(kser2).sort_index()),
repr(pser1.combine_first(pser2).sort_index()),
)
with self.assertRaisesRegex(
ValueError, "`combine_first` only allows `Series` for parameter `other`"
):
kser1.combine_first(50)

# MultiIndex
midx1 = pd.MultiIndex(
[["lama", "cow", "falcon", "koala"], ["speed", "weight", "length", "power"]],
[[0, 3, 1, 1, 1, 2, 2, 2], [0, 2, 0, 3, 2, 0, 1, 3]],
)
midx2 = pd.MultiIndex(
[["lama", "cow", "falcon"], ["speed", "weight", "length"]],
[[0, 0, 0, 1, 1, 1, 2, 2, 2], [0, 1, 2, 0, 1, 2, 0, 1, 2]],
)
kser1 = ks.Series([45, 200, 1.2, 30, 250, 1.5, 320, 1], index=midx1)
kser2 = ks.Series([-45, 200, -1.2, 30, -250, 1.5, 320, 1, -0.3], index=midx2)
pser1 = kser1.to_pandas()
pser2 = kser2.to_pandas()

self.assert_eq(
repr(kser1.combine_first(kser2).sort_index()),
repr(pser1.combine_first(pser2).sort_index()),
)

# Series come from same DataFrame
kdf = ks.DataFrame(
{
"A": {"falcon": 330.0, "eagle": 160.0},
"B": {"falcon": 345.0, "eagle": 200.0, "duck": 30.0},
}
)
kser1 = kdf.A
kser2 = kdf.B
pser1 = kser1.to_pandas()
pser2 = kser2.to_pandas()

self.assert_eq(
repr(kser1.combine_first(kser2).sort_index()),
repr(pser1.combine_first(pser2).sort_index()),
)

def test_udt(self):
sparse_values = {0: 0.1, 1: 1.1}
sparse_vector = SparseVector(len(sparse_values), sparse_values)
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Binary operator functions
Series.rmod
Series.floordiv
Series.rfloordiv
Series.combine_first
Series.lt
Series.gt
Series.le
Expand Down