Skip to content

Commit

Permalink
feat: limited support of lamdas in Series.apply (#345)
Browse files Browse the repository at this point in the history
BEGIN_COMMIT_OVERRIDE
feat: limited support of lambdas in `Series.apply` (#345)
END_COMMIT_OVERRIDE

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated https://screenshot.googleplex.com/6ZEiKXPz8LWMTRf

Partially fixes internal issue 295964341 🦕
  • Loading branch information
shobsi authored Feb 12, 2024
1 parent ffb0d15 commit 208e081
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 8 deletions.
45 changes: 43 additions & 2 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]]


_remote_function_recommendation_message = (
"Your functions could not be applied directly to the Series."
" Try converting it to a remote function."
)


@log_adapter.class_logger
class Series(bigframes.operations.base.SeriesMethods, vendored_pandas_series.Series):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -1210,12 +1216,43 @@ def _groupby_values(
dropna=dropna,
)

def apply(self, func) -> Series:
def apply(
self, func, by_row: typing.Union[typing.Literal["compat"], bool] = "compat"
) -> Series:
# TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs
# is actually a ternary op
# Reproject as workaround to applying filter too late. This forces the filter
# to be applied before passing data to remote function, protecting from bad
# inputs causing errors.

if by_row not in ["compat", False]:
raise ValueError("Param by_row must be one of 'compat' or False")

if not callable(func):
raise ValueError(
"Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported."
)

if not hasattr(func, "bigframes_remote_function"):
# It is not a remote function
# Then it must be a vectorized function that applies to the Series
# as a whole
if by_row:
raise ValueError(
"A vectorized non-remote function can be provided only with by_row=False."
" For element-wise operation it must be a remote function."
)

try:
return func(self)
except Exception as ex:
# This could happen if any of the operators in func is not
# supported on a Series. Let's guide the customer to use a
# remote function instead
if hasattr(ex, "message"):
ex.message += f"\n{_remote_function_recommendation_message}"
raise

reprojected_series = Series(self._block._force_reproject())
return reprojected_series._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
Expand Down Expand Up @@ -1325,7 +1362,11 @@ def duplicated(self, keep: str = "first") -> Series:

def mask(self, cond, other=None) -> Series:
if callable(cond):
cond = self.apply(cond)
if hasattr(cond, "bigframes_remote_function"):
cond = self.apply(cond)
else:
# For non-remote function assume that it is applicable on Series
cond = self.apply(cond, by_row=False)

if not isinstance(cond, Series):
raise TypeError(
Expand Down
151 changes: 151 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2560,6 +2560,51 @@ def test_mask_custom_value(scalars_dfs):
assert_pandas_df_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("lambda_",),
[
pytest.param(lambda x: x > 0),
pytest.param(
lambda x: True if x > 0 else False,
marks=pytest.mark.xfail(
raises=ValueError,
),
),
],
ids=[
"lambda_arithmatic",
"lambda_arbitrary",
],
)
def test_mask_lambda(scalars_dfs, lambda_):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df["int64_col"]
bf_result = bf_col.mask(lambda_).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.mask(lambda_)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


def test_mask_simple_udf(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

def foo(x):
return x < 1000000

bf_col = scalars_df["int64_col"]
bf_result = bf_col.mask(foo).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.mask(foo)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("column", "to_type"),
[
Expand Down Expand Up @@ -3042,3 +3087,109 @@ def test_series_iter(
scalars_df_index["int64_too"], scalars_pandas_df_index["int64_too"]
):
assert bf_i == pd_i


@pytest.mark.parametrize(
(
"col",
"lambda_",
),
[
pytest.param("int64_col", lambda x: x * x + x + 1),
pytest.param("int64_col", lambda x: x % 2 == 1),
pytest.param("string_col", lambda x: x + "_suffix"),
],
ids=[
"lambda_int_int",
"lambda_int_bool",
"lambda_str_str",
],
)
def test_apply_lambda(scalars_dfs, col, lambda_):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df[col]

# Can't be applied to BigFrames Series without by_row=False
with pytest.raises(ValueError, match="by_row=False"):
bf_col.apply(lambda_)

bf_result = bf_col.apply(lambda_, by_row=False).to_pandas()

pd_col = scalars_pandas_df[col]
pd_result = pd_col.apply(lambda_)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("ufunc",),
[
pytest.param(numpy.log),
pytest.param(numpy.sqrt),
pytest.param(numpy.sin),
],
ids=[
"log",
"sqrt",
"sin",
],
)
def test_apply_numpy_ufunc(scalars_dfs, ufunc):
scalars_df, scalars_pandas_df = scalars_dfs

bf_col = scalars_df["int64_col"]

# Can't be applied to BigFrames Series without by_row=False
with pytest.raises(ValueError, match="by_row=False"):
bf_col.apply(ufunc)

bf_result = bf_col.apply(ufunc, by_row=False).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.apply(ufunc)

assert_series_equal(bf_result, pd_result)


def test_apply_simple_udf(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs

def foo(x):
return x * x + 2 * x + 3

bf_col = scalars_df["int64_col"]

# Can't be applied to BigFrames Series without by_row=False
with pytest.raises(ValueError, match="by_row=False"):
bf_col.apply(foo)

bf_result = bf_col.apply(foo, by_row=False).to_pandas()

pd_col = scalars_pandas_df["int64_col"]
pd_result = pd_col.apply(foo)

# ignore dtype check, which are Int64 and object respectively
assert_series_equal(bf_result, pd_result, check_dtype=False)


@pytest.mark.parametrize(
("col", "lambda_", "exception"),
[
pytest.param("int64_col", {1: 2, 3: 4}, ValueError),
pytest.param("int64_col", numpy.square, TypeError),
pytest.param("string_col", lambda x: x.capitalize(), AttributeError),
],
ids=[
"not_callable",
"numpy_ufunc",
"custom_lambda",
],
)
def test_apply_not_supported(scalars_dfs, col, lambda_, exception):
scalars_df, _ = scalars_dfs

bf_col = scalars_df[col]
with pytest.raises(exception):
bf_col.apply(lambda_, by_row=False)
82 changes: 76 additions & 6 deletions third_party/bigframes_vendored/pandas/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,18 +1116,24 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series:
def apply(
self,
func,
by_row="compat",
) -> DataFrame | Series:
"""
Invoke function on values of a Series.
Can be ufunc (a NumPy function that applies to the entire Series) or a
Python function that only works on single values. If it is an arbitrary
python function then converting it into a `remote_function` is recommended.
**Examples:**
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None
Let's use ``reuse=False`` flag to make sure a new ``remote_function``
For applying arbitrary python function a `remote_funciton` is recommended.
Let's use ``reuse=False`` flag to make sure a new `remote_function`
is created every time we run the following code, but you can skip it
to potentially reuse a previously deployed ``remote_function`` from
to potentially reuse a previously deployed `remote_function` from
the same user defined function.
>>> @bpd.remote_function([int], float, reuse=False)
Expand All @@ -1152,9 +1158,9 @@ def apply(
4 2.0
dtype: Float64
You could turn a user defined function with external package
dependencies into a BigQuery DataFrames remote function. You would
provide the names of the packages via ``packages`` param.
To turn a user defined function with external package dependencies into
a `remote_function`, you would provide the names of the packages via
`packages` param.
>>> @bpd.remote_function(
... [str],
Expand All @@ -1176,11 +1182,48 @@ def apply(
>>> names = bpd.Series(["Alice", "Bob"])
>>> hashes = names.apply(get_hash)
Simple vectorized functions, lambdas or ufuncs can be applied directly
with `by_row=False`.
>>> nums = bpd.Series([1, 2, 3, 4])
>>> nums
0 1
1 2
2 3
3 4
dtype: Int64
>>> nums.apply(lambda x: x*x + 2*x + 1, by_row=False)
0 4
1 9
2 16
3 25
dtype: Int64
>>> def is_odd(num):
... return num % 2 == 1
>>> nums.apply(is_odd, by_row=False)
0 True
1 False
2 True
3 False
dtype: boolean
>>> nums.apply(np.log, by_row=False)
0 0.0
1 0.693147
2 1.098612
3 1.386294
dtype: Float64
Args:
func (function):
BigFrames DataFrames ``remote_function`` to apply. The function
should take a scalar and return a scalar. It will be applied to
every element in the ``Series``.
by_row (False or "compat", default "compat"):
If `"compat"` , func must be a remote function which will be
passed each element of the Series, like `Series.map`. If False,
the func will be passed the whole Series at once.
Returns:
bigframes.series.Series: A new Series with values representing the
Expand Down Expand Up @@ -2680,7 +2723,8 @@ def mask(self, cond, other):
dtype: Int64
You can mask the values in the Series based on a condition. The values
matching the condition would be masked.
matching the condition would be masked. The condition can be provided in
formm of a Series.
>>> s.mask(s % 2 == 0)
0 <NA>
Expand Down Expand Up @@ -2736,6 +2780,32 @@ def mask(self, cond, other):
2 Caroline
dtype: string
Simple vectorized (i.e. they only perform operations supported on a
Series) lambdas or python functions can be used directly.
>>> nums = bpd.Series([1, 2, 3, 4], name="nums")
>>> nums
0 1
1 2
2 3
3 4
Name: nums, dtype: Int64
>>> nums.mask(lambda x: (x+1) % 2 == 1)
0 1
1 <NA>
2 3
3 <NA>
Name: nums, dtype: Int64
>>> def is_odd(num):
... return num % 2 == 1
>>> nums.mask(is_odd)
0 <NA>
1 2
2 <NA>
3 4
Name: nums, dtype: Int64
Args:
cond (bool Series/DataFrame, array-like, or callable):
Where cond is False, keep the original value. Where True, replace
Expand Down

0 comments on commit 208e081

Please sign in to comment.