diff --git a/bigframes/series.py b/bigframes/series.py index 74580989f3..4aef959a76 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -58,6 +58,12 @@ LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] +_remote_function_recommendation_message = ( + "Your functions could not be applied directly to the Series." + " Try converting it to a remote function." +) + + @log_adapter.class_logger class Series(bigframes.operations.base.SeriesMethods, vendored_pandas_series.Series): def __init__(self, *args, **kwargs): @@ -1210,12 +1216,43 @@ def _groupby_values( dropna=dropna, ) - def apply(self, func) -> Series: + def apply( + self, func, by_row: typing.Union[typing.Literal["compat"], bool] = "compat" + ) -> Series: # TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs # is actually a ternary op # Reproject as workaround to applying filter too late. This forces the filter # to be applied before passing data to remote function, protecting from bad # inputs causing errors. + + if by_row not in ["compat", False]: + raise ValueError("Param by_row must be one of 'compat' or False") + + if not callable(func): + raise ValueError( + "Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported." + ) + + if not hasattr(func, "bigframes_remote_function"): + # It is not a remote function + # Then it must be a vectorized function that applies to the Series + # as a whole + if by_row: + raise ValueError( + "A vectorized non-remote function can be provided only with by_row=False." + " For element-wise operation it must be a remote function." + ) + + try: + return func(self) + except Exception as ex: + # This could happen if any of the operators in func is not + # supported on a Series. Let's guide the customer to use a + # remote function instead + if hasattr(ex, "message"): + ex.message += f"\n{_remote_function_recommendation_message}" + raise + reprojected_series = Series(self._block._force_reproject()) return reprojected_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) @@ -1325,7 +1362,11 @@ def duplicated(self, keep: str = "first") -> Series: def mask(self, cond, other=None) -> Series: if callable(cond): - cond = self.apply(cond) + if hasattr(cond, "bigframes_remote_function"): + cond = self.apply(cond) + else: + # For non-remote function assume that it is applicable on Series + cond = self.apply(cond, by_row=False) if not isinstance(cond, Series): raise TypeError( diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index f2790d190a..42651ed96f 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2560,6 +2560,51 @@ def test_mask_custom_value(scalars_dfs): assert_pandas_df_equal(bf_result, pd_result) +@pytest.mark.parametrize( + ("lambda_",), + [ + pytest.param(lambda x: x > 0), + pytest.param( + lambda x: True if x > 0 else False, + marks=pytest.mark.xfail( + raises=ValueError, + ), + ), + ], + ids=[ + "lambda_arithmatic", + "lambda_arbitrary", + ], +) +def test_mask_lambda(scalars_dfs, lambda_): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_col = scalars_df["int64_col"] + bf_result = bf_col.mask(lambda_).to_pandas() + + pd_col = scalars_pandas_df["int64_col"] + pd_result = pd_col.mask(lambda_) + + # ignore dtype check, which are Int64 and object respectively + assert_series_equal(bf_result, pd_result, check_dtype=False) + + +def test_mask_simple_udf(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + def foo(x): + return x < 1000000 + + bf_col = scalars_df["int64_col"] + bf_result = bf_col.mask(foo).to_pandas() + + pd_col = scalars_pandas_df["int64_col"] + pd_result = pd_col.mask(foo) + + # ignore dtype check, which are Int64 and object respectively + assert_series_equal(bf_result, pd_result, check_dtype=False) + + @pytest.mark.parametrize( ("column", "to_type"), [ @@ -3042,3 +3087,109 @@ def test_series_iter( scalars_df_index["int64_too"], scalars_pandas_df_index["int64_too"] ): assert bf_i == pd_i + + +@pytest.mark.parametrize( + ( + "col", + "lambda_", + ), + [ + pytest.param("int64_col", lambda x: x * x + x + 1), + pytest.param("int64_col", lambda x: x % 2 == 1), + pytest.param("string_col", lambda x: x + "_suffix"), + ], + ids=[ + "lambda_int_int", + "lambda_int_bool", + "lambda_str_str", + ], +) +def test_apply_lambda(scalars_dfs, col, lambda_): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_col = scalars_df[col] + + # Can't be applied to BigFrames Series without by_row=False + with pytest.raises(ValueError, match="by_row=False"): + bf_col.apply(lambda_) + + bf_result = bf_col.apply(lambda_, by_row=False).to_pandas() + + pd_col = scalars_pandas_df[col] + pd_result = pd_col.apply(lambda_) + + # ignore dtype check, which are Int64 and object respectively + assert_series_equal(bf_result, pd_result, check_dtype=False) + + +@pytest.mark.parametrize( + ("ufunc",), + [ + pytest.param(numpy.log), + pytest.param(numpy.sqrt), + pytest.param(numpy.sin), + ], + ids=[ + "log", + "sqrt", + "sin", + ], +) +def test_apply_numpy_ufunc(scalars_dfs, ufunc): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_col = scalars_df["int64_col"] + + # Can't be applied to BigFrames Series without by_row=False + with pytest.raises(ValueError, match="by_row=False"): + bf_col.apply(ufunc) + + bf_result = bf_col.apply(ufunc, by_row=False).to_pandas() + + pd_col = scalars_pandas_df["int64_col"] + pd_result = pd_col.apply(ufunc) + + assert_series_equal(bf_result, pd_result) + + +def test_apply_simple_udf(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + def foo(x): + return x * x + 2 * x + 3 + + bf_col = scalars_df["int64_col"] + + # Can't be applied to BigFrames Series without by_row=False + with pytest.raises(ValueError, match="by_row=False"): + bf_col.apply(foo) + + bf_result = bf_col.apply(foo, by_row=False).to_pandas() + + pd_col = scalars_pandas_df["int64_col"] + pd_result = pd_col.apply(foo) + + # ignore dtype check, which are Int64 and object respectively + assert_series_equal(bf_result, pd_result, check_dtype=False) + + +@pytest.mark.parametrize( + ("col", "lambda_", "exception"), + [ + pytest.param("int64_col", {1: 2, 3: 4}, ValueError), + pytest.param("int64_col", numpy.square, TypeError), + pytest.param("string_col", lambda x: x.capitalize(), AttributeError), + ], + ids=[ + "not_callable", + "numpy_ufunc", + "custom_lambda", + ], +) +def test_apply_not_supported(scalars_dfs, col, lambda_, exception): + scalars_df, _ = scalars_dfs + + bf_col = scalars_df[col] + with pytest.raises(exception): + bf_col.apply(lambda_, by_row=False) diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 4232d3ec2a..b203471606 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -1116,18 +1116,24 @@ def nsmallest(self, n: int = 5, keep: str = "first") -> Series: def apply( self, func, + by_row="compat", ) -> DataFrame | Series: """ Invoke function on values of a Series. + Can be ufunc (a NumPy function that applies to the entire Series) or a + Python function that only works on single values. If it is an arbitrary + python function then converting it into a `remote_function` is recommended. + **Examples:** >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None - Let's use ``reuse=False`` flag to make sure a new ``remote_function`` + For applying arbitrary python function a `remote_funciton` is recommended. + Let's use ``reuse=False`` flag to make sure a new `remote_function` is created every time we run the following code, but you can skip it - to potentially reuse a previously deployed ``remote_function`` from + to potentially reuse a previously deployed `remote_function` from the same user defined function. >>> @bpd.remote_function([int], float, reuse=False) @@ -1152,9 +1158,9 @@ def apply( 4 2.0 dtype: Float64 - You could turn a user defined function with external package - dependencies into a BigQuery DataFrames remote function. You would - provide the names of the packages via ``packages`` param. + To turn a user defined function with external package dependencies into + a `remote_function`, you would provide the names of the packages via + `packages` param. >>> @bpd.remote_function( ... [str], @@ -1176,11 +1182,48 @@ def apply( >>> names = bpd.Series(["Alice", "Bob"]) >>> hashes = names.apply(get_hash) + Simple vectorized functions, lambdas or ufuncs can be applied directly + with `by_row=False`. + + >>> nums = bpd.Series([1, 2, 3, 4]) + >>> nums + 0 1 + 1 2 + 2 3 + 3 4 + dtype: Int64 + >>> nums.apply(lambda x: x*x + 2*x + 1, by_row=False) + 0 4 + 1 9 + 2 16 + 3 25 + dtype: Int64 + + >>> def is_odd(num): + ... return num % 2 == 1 + >>> nums.apply(is_odd, by_row=False) + 0 True + 1 False + 2 True + 3 False + dtype: boolean + + >>> nums.apply(np.log, by_row=False) + 0 0.0 + 1 0.693147 + 2 1.098612 + 3 1.386294 + dtype: Float64 + Args: func (function): BigFrames DataFrames ``remote_function`` to apply. The function should take a scalar and return a scalar. It will be applied to every element in the ``Series``. + by_row (False or "compat", default "compat"): + If `"compat"` , func must be a remote function which will be + passed each element of the Series, like `Series.map`. If False, + the func will be passed the whole Series at once. Returns: bigframes.series.Series: A new Series with values representing the @@ -2680,7 +2723,8 @@ def mask(self, cond, other): dtype: Int64 You can mask the values in the Series based on a condition. The values - matching the condition would be masked. + matching the condition would be masked. The condition can be provided in + formm of a Series. >>> s.mask(s % 2 == 0) 0 @@ -2736,6 +2780,32 @@ def mask(self, cond, other): 2 Caroline dtype: string + Simple vectorized (i.e. they only perform operations supported on a + Series) lambdas or python functions can be used directly. + + >>> nums = bpd.Series([1, 2, 3, 4], name="nums") + >>> nums + 0 1 + 1 2 + 2 3 + 3 4 + Name: nums, dtype: Int64 + >>> nums.mask(lambda x: (x+1) % 2 == 1) + 0 1 + 1 + 2 3 + 3 + Name: nums, dtype: Int64 + + >>> def is_odd(num): + ... return num % 2 == 1 + >>> nums.mask(is_odd) + 0 + 1 2 + 2 + 3 4 + Name: nums, dtype: Int64 + Args: cond (bool Series/DataFrame, array-like, or callable): Where cond is False, keep the original value. Where True, replace