Skip to content

Commit

Permalink
Complete NumPy universial functions for DataFrames
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Dec 13, 2019
1 parent 0af6c1c commit 742e285
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 29 deletions.
42 changes: 41 additions & 1 deletion databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from functools import partial, reduce
import sys
from itertools import zip_longest
from typing import Any, Optional, List, Tuple, Union, Generic, TypeVar, Iterable, Dict
from typing import Any, Optional, List, Tuple, Union, Generic, TypeVar, Iterable, Dict, Callable

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -8396,6 +8396,46 @@ def __dir__(self):
def __iter__(self):
return iter(self.columns)

# NDArray Compat
def __array_ufunc__(self, ufunc: Callable, method: str, *inputs: Any, **kwargs: Any):
# TODO: is it possible to deduplicate it with '_map_series_op'?
if (all(isinstance(inp, DataFrame) for inp in inputs)
and any(inp is not inputs[0] for inp in inputs)):
# binary only
assert len(inputs) == 2
this = inputs[0]
that = inputs[1]
if this._internal.column_index_level != that._internal.column_index_level:
raise ValueError('cannot join with no overlapping index names')

# Different DataFrames
def apply_op(kdf, this_column_index, that_column_index):
for this_idx, that_idx in zip(this_column_index, that_column_index):
yield (ufunc(kdf[this_idx], kdf[that_idx], **kwargs), this_idx)

return align_diff_frames(apply_op, this, that, fillna=True, how="full")
else:
# DataFrame and Series
applied = []
this = inputs[0]
assert all(inp is this for inp in inputs if isinstance(inp, DataFrame))

for idx in this._internal.column_index:
arguments = []
for inp in inputs:
arguments.append(inp[idx] if isinstance(inp, DataFrame) else inp)
# both binary and unary.
applied.append(ufunc(*arguments, **kwargs))

sdf = this._sdf.select(
this._internal.index_scols + [c._scol for c in applied])
internal = this._internal.copy(sdf=sdf,
column_index=[c._internal.column_index[0]
for c in applied],
column_scols=[scol_for(sdf, c._internal.data_columns[0])
for c in applied])
return DataFrame(internal)

if sys.version_info >= (3, 7):
def __class_getitem__(cls, params):
# This is a workaround to support variadic generic in DataFrame in Python 3.7.
Expand Down
110 changes: 82 additions & 28 deletions databricks/koalas/tests/test_numpy_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,35 @@
import pandas as pd

from databricks import koalas as ks
from databricks.koalas import set_option, reset_option
from databricks.koalas.numpy_compat import unary_np_spark_mappings, binary_np_spark_mappings
from databricks.koalas.testing.utils import ReusedSQLTestCase, SQLTestUtils


class NumPyCompatTest(ReusedSQLTestCase, SQLTestUtils):
blacklist = [
# Koalas does not currently support
"conj",
"conjugate",
"isnat",
"matmul",
"frexp",

# Values are close enough but tests failed.
"arccos",
"exp",
"expm1",
"log", # flaky
"log10", # flaky
"log1p", # flaky
"modf",
"floor_divide", # flaky

# Results seem inconsistent in a different version of, I (Hyukjin) suspect, PyArrow.
# From PyArrow 0.15, seems it returns the correct results via PySpark. Probably we
# can enable it later when Koalas switches to PyArrow 0.15 completely.
"left_shift",
]

@property
def pdf(self):
Expand All @@ -48,44 +72,25 @@ def test_np_add_index(self):
p_index = self.pdf.index
self.assert_eq(np.add(k_index, k_index), np.add(p_index, p_index))

def test_np_unsupported(self):
def test_np_unsupported_series(self):
kdf = self.kdf
with self.assertRaisesRegex(NotImplementedError, "Koalas.*not.*support.*sqrt.*"):
np.sqrt(kdf.a, kdf.b)

def test_np_spark_compat(self):
def test_np_unsupported_frame(self):
kdf = self.kdf
with self.assertRaisesRegex(NotImplementedError, "Koalas.*not.*support.*sqrt.*"):
np.sqrt(kdf, kdf)

def test_np_spark_compat_series(self):
# Use randomly generated dataFrame
pdf = pd.DataFrame(
np.random.randint(-100, 100, size=(np.random.randint(100), 2)), columns=['a', 'b'])
kdf = ks.from_pandas(pdf)

blacklist = [
# Koalas does not currently support
"conj",
"conjugate",
"isnat",
"matmul",
"frexp",

# Values are close enough but tests failed.
"arccos",
"exp",
"expm1",
"log", # flaky
"log10", # flaky
"log1p", # flaky
"modf",
"floor_divide", # flaky

# Results seem inconsistent in a different version of, I (Hyukjin) suspect, PyArrow.
# From PyArrow 0.15, seems it returns the correct results via PySpark. Probably we
# can enable it later when Koalas switches to PyArrow 0.15 completely.
"left_shift",
]

for np_name, spark_func in unary_np_spark_mappings.items():
np_func = getattr(np, np_name)
if np_name not in blacklist:
if np_name not in self.blacklist:
try:
# unary ufunc
self.assert_eq(np_func(pdf.a), np_func(kdf.a), almost=True)
Expand All @@ -94,7 +99,7 @@ def test_np_spark_compat(self):

for np_name, spark_func in binary_np_spark_mappings.items():
np_func = getattr(np, np_name)
if np_name not in blacklist:
if np_name not in self.blacklist:
try:
# binary ufunc
self.assert_eq(
Expand All @@ -103,3 +108,52 @@ def test_np_spark_compat(self):
np_func(pdf.a, 1), np_func(kdf.a, 1), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e

def test_np_spark_compat_frame(self):
# Use randomly generated dataFrame
pdf = pd.DataFrame(
np.random.randint(-100, 100, size=(np.random.randint(100), 2)), columns=['a', 'b'])
pdf2 = pd.DataFrame(
np.random.randint(-100, 100, size=(len(pdf), len(pdf.columns))), columns=['a', 'b'])
kdf = ks.from_pandas(pdf)
kdf2 = ks.from_pandas(pdf2)

# Test only 10 for now. it takes too much time.
for np_name, spark_func in list(unary_np_spark_mappings.items())[-10:]:
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# unary ufunc
self.assert_eq(np_func(pdf), np_func(kdf), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e

# Test only 10 for now. it takes too much time.
for np_name, spark_func in list(binary_np_spark_mappings.items())[:10]:
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# binary ufunc
self.assert_eq(
np_func(pdf, pdf), np_func(kdf, kdf), almost=True)
self.assert_eq(
np_func(pdf, 1), np_func(kdf, 1), almost=True)
except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e

# Test only top 5 for now. 'compute.ops_on_diff_frames' option increases too much time.
try:
set_option('compute.ops_on_diff_frames', True)
for np_name, spark_func in list(binary_np_spark_mappings.items())[:5]:
np_func = getattr(np, np_name)
if np_name not in self.blacklist:
try:
# binary ufunc
self.assert_eq(
np_func(pdf, pdf2).sort_index(),
np_func(kdf, kdf2).sort_index(), almost=True)

except Exception as e:
raise AssertionError("Test in '%s' function was failed." % np_name) from e
finally:
reset_option('compute.ops_on_diff_frames')

0 comments on commit 742e285

Please sign in to comment.