Skip to content

Commit

Permalink
fix: throw an error message when setting is_row_processor=True to rea…
Browse files Browse the repository at this point in the history
…d a multi param function (#1160)

* fix: throw an error message when setting is_row_processor=True to read a multi param function

* add type ignore

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* resolve comments

* resolve the comments

* resolve the comments

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
jialuoo and gcf-owl-bot[bot] authored Dec 12, 2024
1 parent 227d228 commit b2816a5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
6 changes: 6 additions & 0 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def read_gbq_function(
except google.api_core.exceptions.NotFound:
raise ValueError(f"Unknown function '{routine_ref}'. {constants.FEEDBACK_LINK}")

if is_row_processor and len(routine.arguments) > 1:
raise ValueError(
"A multi-input function cannot be a row processor. A row processor function "
"takes in a single input representing the row."
)

try:
ibis_signature = ibis_signature_from_routine(routine)
except ReturnTypeMissingError:
Expand Down
52 changes: 52 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,58 @@ def read_gbq_function(
2 TestCad$123456Str
dtype: string
Another use case is to define your own remote funtion and use it later.
For example, define the remote function:
>>> @bpd.remote_function()
... def tenfold(num: int) -> float:
... return num * 10
Then, read back the deployed BQ remote function:
>>> tenfold_ref = bpd.read_gbq_function(
... tenfold.bigframes_remote_function,
... )
>>> df = bpd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})
>>> df
a b c
0 1 3 5
1 2 4 6
<BLANKLINE>
[2 rows x 3 columns]
>>> df['a'].apply(tenfold_ref)
0 10.0
1 20.0
Name: a, dtype: Float64
It also supports row processing by using `is_row_processor=True`. Please
note, row processor implies that the function has only one input
parameter.
>>> @bpd.remote_function()
... def row_sum(s: bpd.Series) -> float:
... return s['a'] + s['b'] + s['c']
>>> row_sum_ref = bpd.read_gbq_function(
... row_sum.bigframes_remote_function,
... is_row_processor=True,
... )
>>> df = bpd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})
>>> df
a b c
0 1 3 5
1 2 4 6
<BLANKLINE>
[2 rows x 3 columns]
>>> df.apply(row_sum_ref, axis=1)
0 9.0
1 12.0
dtype: Float64
Args:
function_name (str):
The function's name in BigQuery in the format
Expand Down
30 changes: 30 additions & 0 deletions tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,21 @@ def test_read_gbq_function_enforces_explicit_types(
)


@pytest.mark.flaky(retries=2, delay=120)
def test_read_gbq_function_multiple_inputs_not_a_row_processor(session):
with pytest.raises(ValueError) as context:
# The remote function has two args, which cannot be row processed. Throw
# a ValueError for it.
session.read_gbq_function(
function_name="bqutil.fn.cw_regexp_instr_2",
is_row_processor=True,
)
assert str(context.value) == (
"A multi-input function cannot be a row processor. A row processor function "
"takes in a single input representing the row."
)


@pytest.mark.flaky(retries=2, delay=120)
def test_df_apply_axis_1(session, scalars_dfs, dataset_id_permanent):
columns = [
Expand All @@ -864,6 +879,8 @@ def add_ints(row):
dataset_id_permanent,
name=get_rf_name(add_ints, is_row_processor=True),
)(add_ints)
assert add_ints_remote.bigframes_remote_function # type: ignore
assert add_ints_remote.bigframes_cloud_function # type: ignore

with pytest.warns(
bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview."
Expand All @@ -882,6 +899,19 @@ def add_ints(row):
pd_result, bf_result, check_dtype=False, check_exact=False
)

# Read back the deployed BQ remote function using read_gbq_function.
func_ref = session.read_gbq_function(
function_name=add_ints_remote.bigframes_remote_function, # type: ignore
is_row_processor=True,
)

assert func_ref.bigframes_remote_function == add_ints_remote.bigframes_remote_function # type: ignore

bf_result_gbq = scalars_df[columns].apply(func_ref, axis=1).to_pandas()
pd.testing.assert_series_equal(
pd_result, bf_result_gbq, check_dtype=False, check_exact=False
)


@pytest.mark.flaky(retries=2, delay=120)
def test_df_apply_axis_1_ordering(session, scalars_dfs, dataset_id_permanent):
Expand Down

0 comments on commit b2816a5

Please sign in to comment.