Skip to content

Commit

Permalink
fix: exceptions raised in apply from a remote_function now surfac…
Browse files Browse the repository at this point in the history
…e in the client (#387)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes internal issue 309699263 🦕
  • Loading branch information
shobsi authored Feb 29, 2024
1 parent bfe2b23 commit dd3643d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
35 changes: 13 additions & 22 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import random
import shutil
import string
import subprocess
import sys
import tempfile
import textwrap
Expand Down Expand Up @@ -87,19 +86,6 @@ def _get_hash(def_, package_requirements=None):
return hashlib.md5(def_repr).hexdigest()


def _run_system_command(command):
program = subprocess.Popen(
[command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
stdout, stderr = program.communicate()
exit_code = program.wait()
if exit_code:
raise RuntimeError(
f"Command: {command}\nOutput: {stdout.decode()}\nError: {stderr.decode()}"
f"{constants.FEEDBACK_LINK}"
)


def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str:
return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}"

Expand Down Expand Up @@ -281,6 +267,8 @@ def generate_cloud_function_main_code(self, def_, dir):
code_template = textwrap.dedent(
"""\
import cloudpickle
import functions_framework
from flask import jsonify
import json
# original udf code is in {udf_code_file}
Expand All @@ -289,14 +277,17 @@ def generate_cloud_function_main_code(self, def_, dir):
udf = cloudpickle.load(f)
def {handler_func_name}(request):
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
try:
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
except Exception as e:
return jsonify( {{ "errorMessage": str(e) }} ), 400
"""
)

Expand Down
24 changes: 23 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tempfile
import textwrap

from google.api_core.exceptions import NotFound, ResourceExhausted
from google.api_core.exceptions import BadRequest, NotFound, ResourceExhausted
from google.cloud import bigquery, functions_v2
import pandas
import pytest
Expand Down Expand Up @@ -1214,6 +1214,28 @@ def square(x):
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_runtime_error(session, scalars_dfs, dataset_id):
try:

@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, _ = scalars_dfs

with pytest.raises(
BadRequest, match="400.*errorMessage.*unsupported operand type"
):
# int64_col has nulls which should cause error in square
scalars_df["int64_col"].apply(square).to_pandas()
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_anonymous_dataset(session, scalars_dfs):
try:
Expand Down

0 comments on commit dd3643d

Please sign in to comment.