Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: exceptions raised in apply from a remote_function now surface in the client #387

Merged
merged 4 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import random
import shutil
import string
import subprocess
import sys
import tempfile
import textwrap
Expand Down Expand Up @@ -87,19 +86,6 @@ def _get_hash(def_, package_requirements=None):
return hashlib.md5(def_repr).hexdigest()


def _run_system_command(command):
program = subprocess.Popen(
[command], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
)
stdout, stderr = program.communicate()
exit_code = program.wait()
if exit_code:
raise RuntimeError(
f"Command: {command}\nOutput: {stdout.decode()}\nError: {stderr.decode()}"
f"{constants.FEEDBACK_LINK}"
)


def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str:
return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}"

Expand Down Expand Up @@ -281,6 +267,8 @@ def generate_cloud_function_main_code(self, def_, dir):
code_template = textwrap.dedent(
"""\
import cloudpickle
import functions_framework
from flask import jsonify
import json

# original udf code is in {udf_code_file}
Expand All @@ -289,14 +277,17 @@ def generate_cloud_function_main_code(self, def_, dir):
udf = cloudpickle.load(f)

def {handler_func_name}(request):
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
try:
request_json = request.get_json(silent=True)
calls = request_json["calls"]
replies = []
for call in calls:
reply = udf(*call)
replies.append(reply)
return_json = json.dumps({{"replies" : replies}})
return return_json
except Exception as e:
return jsonify( {{ "errorMessage": str(e) }} ), 400
"""
)

Expand Down
24 changes: 23 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tempfile
import textwrap

from google.api_core.exceptions import NotFound, ResourceExhausted
from google.api_core.exceptions import BadRequest, NotFound, ResourceExhausted
from google.cloud import bigquery, functions_v2
import pandas
import pytest
Expand Down Expand Up @@ -1214,6 +1214,28 @@ def square(x):
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_runtime_error(session, scalars_dfs, dataset_id):
try:

@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, _ = scalars_dfs

with pytest.raises(
BadRequest, match="400.*errorMessage.*unsupported operand type"
):
# int64_col has nulls which should cause error in square
scalars_df["int64_col"].apply(square).to_pandas()
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_anonymous_dataset(session, scalars_dfs):
try:
Expand Down