From bced04626510bb780f83f9da41af4d7bd1d784d7 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 8 Mar 2024 01:41:45 +0000 Subject: [PATCH 1/3] test: do GCF cleanup in both presubmit and e2e tests --- tests/system/conftest.py | 66 ++++++- tests/system/large/test_remote_function.py | 216 ++++++--------------- tests/system/utils.py | 63 ++++++ 3 files changed, 185 insertions(+), 160 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 4b5ebc9d43..70a25a8b4f 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -21,6 +21,7 @@ import typing from typing import Dict, Optional +from google.api_core.exceptions import NotFound, ResourceExhausted import google.cloud.bigquery as bigquery import google.cloud.bigquery_connection_v1 as bigquery_connection_v1 import google.cloud.exceptions @@ -34,7 +35,20 @@ import test_utils.prefixer import bigframes -from tests.system.utils import convert_pandas_dtypes +from tests.system.utils import ( + convert_pandas_dtypes, + delete_cloud_function, + get_cloud_functions, + get_remote_function_endpoints, +) + +# Use this to control the number of cloud functions being deleted in a single +# test session. This should help soften the spike of the number of mutations per +# minute tracked against a quota limit (default 60, increased to 120 for +# bigframes-dev project) by the Cloud Functions API +# We are running pytest with "-n 20". Let's say each session lasts about a +# minute, so we are setting a limit of 120/20 = 6 deletions per session. +MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION = 6 CURRENT_DIR = pathlib.Path(__file__).parent DATA_DIR = CURRENT_DIR.parent / "data" @@ -1040,3 +1054,53 @@ def floats_bf(session, floats_pd): @pytest.fixture() def floats_product_bf(session, floats_product_pd): return session.read_pandas(floats_product_pd) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_cloud_functions(session, cloudfunctions_client, dataset_id_permanent): + """Clean up stale cloud functions.""" + permanent_endpoints = get_remote_function_endpoints( + session.bqclient, dataset_id_permanent + ) + delete_count = 0 + for cloud_function in get_cloud_functions( + cloudfunctions_client, + session.bqclient.project, + session.bqclient.location, + name_prefix="bigframes-", + ): + # Ignore bigframes cloud functions referred by the remote functions in + # the permanent dataset + if cloud_function.service_config.uri in permanent_endpoints: + continue + + # Ignore the functions less than one day old + age = datetime.now() - datetime.fromtimestamp( + cloud_function.update_time.timestamp() + ) + if age.days <= 0: + continue + + # Go ahead and delete + try: + delete_cloud_function(cloudfunctions_client, cloud_function.name) + delete_count += 1 + if delete_count >= MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION: + break + except NotFound: + # This can happen when multiple pytest sessions are running in + # parallel. Two or more sessions may discover the same cloud + # function, but only one of them would be able to delete it + # successfully, while the other instance will run into this + # exception. Ignore this exception. + pass + except ResourceExhausted: + # This can happen if we are hitting GCP limits, e.g. + # google.api_core.exceptions.ResourceExhausted: 429 Quota exceeded + # for quota metric 'Per project mutation requests' and limit + # 'Per project mutation requests per minute per region' of service + # 'cloudfunctions.googleapis.com' for consumer + # 'project_number:1084210331973'. + # [reason: "RATE_LIMIT_EXCEEDED" domain: "googleapis.com" ... + # Let's stop further clean up and leave it to later. + break diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 77aa3c7603..d9141164e4 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import importlib.util import inspect import math # must keep this at top level to test udf referring global import @@ -21,26 +20,19 @@ import tempfile import textwrap -from google.api_core.exceptions import BadRequest, NotFound, ResourceExhausted -from google.cloud import bigquery, functions_v2 +from google.api_core.exceptions import BadRequest, NotFound +from google.cloud import bigquery import pandas import pytest import test_utils.prefixer import bigframes -from bigframes.functions.remote_function import ( - get_cloud_function_name, - get_remote_function_locations, +from bigframes.functions.remote_function import get_cloud_function_name +from tests.system.utils import ( + assert_pandas_df_equal, + delete_cloud_function, + get_cloud_functions, ) -from tests.system.utils import assert_pandas_df_equal - -# Use this to control the number of cloud functions being deleted in a single -# test session. This should help soften the spike of the number of mutations per -# minute tracked against a quota limit (default 60, increased to 120 for -# bigframes-dev project) by the Cloud Functions API -# We are running pytest with "-n 20". Let's say each session lasts about a -# minute, so we are setting a limit of 120/20 = 6 deletions per session. -_MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION = 6 # NOTE: Keep this import at the top level to test global var behavior with # remote functions @@ -48,57 +40,8 @@ _team_euler = "Team Euler" -def get_remote_function_endpoints(bigquery_client, dataset_id): - """Get endpoints used by the remote functions in a datset""" - endpoints = set() - routines = bigquery_client.list_routines(dataset=dataset_id) - for routine in routines: - rf_options = routine._properties.get("remoteFunctionOptions") - if not rf_options: - continue - rf_endpoint = rf_options.get("endpoint") - if rf_endpoint: - endpoints.add(rf_endpoint) - return endpoints - - -def get_cloud_functions( - functions_client, project, location, name=None, name_prefix=None -): - """Get the cloud functions in the given project and location.""" - - assert ( - not name or not name_prefix - ), f"At most one of the {name.__name__} or {name_prefix.__name__} can be passed." - - _, location = get_remote_function_locations(location) - parent = f"projects/{project}/locations/{location}" - request = functions_v2.ListFunctionsRequest(parent=parent) - page_result = functions_client.list_functions(request=request) - for response in page_result: - # If name is provided and it does not match then skip - if bool(name): - full_name = parent + f"/functions/{name}" - if response.name != full_name: - continue - # If name prefix is provided and it does not match then skip - elif bool(name_prefix): - full_name_prefix = parent + f"/functions/{name_prefix}" - if not response.name.startswith(full_name_prefix): - continue - - yield response - - -def delete_cloud_function(functions_client, full_name): - """Delete a cloud function with the given fully qualified name.""" - request = functions_v2.DeleteFunctionRequest(name=full_name) - operation = functions_client.delete_function(request=request) - return operation - - def cleanup_remote_function_assets( - bigquery_client, functions_client, remote_udf, ignore_failures=True + bigquery_client, cloudfunctions_client, remote_udf, ignore_failures=True ): """Clean up the GCP assets behind a bigframes remote function.""" @@ -112,7 +55,9 @@ def cleanup_remote_function_assets( # Clean up cloud function try: - delete_cloud_function(functions_client, remote_udf.bigframes_cloud_function) + delete_cloud_function( + cloudfunctions_client, remote_udf.bigframes_cloud_function + ) except Exception: # By default don't raise exception in cleanup if not ignore_failures: @@ -169,62 +114,6 @@ def bq_cf_connection() -> str: return "bigframes-rf-conn" -@pytest.fixture(scope="module") -def functions_client() -> functions_v2.FunctionServiceClient: - """Cloud Functions client""" - return functions_v2.FunctionServiceClient() - - -@pytest.fixture(scope="module", autouse=True) -def cleanup_cloud_functions(session, functions_client, dataset_id_permanent): - """Clean up stale cloud functions.""" - permanent_endpoints = get_remote_function_endpoints( - session.bqclient, dataset_id_permanent - ) - delete_count = 0 - for cloud_function in get_cloud_functions( - functions_client, - session.bqclient.project, - session.bqclient.location, - name_prefix="bigframes-", - ): - # Ignore bigframes cloud functions referred by the remote functions in - # the permanent dataset - if cloud_function.service_config.uri in permanent_endpoints: - continue - - # Ignore the functions less than one day old - age = datetime.now() - datetime.fromtimestamp( - cloud_function.update_time.timestamp() - ) - if age.days <= 0: - continue - - # Go ahead and delete - try: - delete_cloud_function(functions_client, cloud_function.name) - delete_count += 1 - if delete_count >= _MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION: - break - except NotFound: - # This can happen when multiple pytest sessions are running in - # parallel. Two or more sessions may discover the same cloud - # function, but only one of them would be able to delete it - # successfully, while the other instance will run into this - # exception. Ignore this exception. - pass - except ResourceExhausted: - # This can happen if we are hitting GCP limits, e.g. - # google.api_core.exceptions.ResourceExhausted: 429 Quota exceeded - # for quota metric 'Per project mutation requests' and limit - # 'Per project mutation requests per minute per region' of service - # 'cloudfunctions.googleapis.com' for consumer - # 'project_number:1084210331973'. - # [reason: "RATE_LIMIT_EXCEEDED" domain: "googleapis.com" ... - # Let's stop further clean up and leave it to later. - break - - @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_multiply_with_ibis( session, @@ -232,7 +121,6 @@ def test_remote_function_multiply_with_ibis( ibis_client, dataset_id, bq_cf_connection, - functions_client, ): try: @@ -274,7 +162,9 @@ def multiply(x, y): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, functions_client, multiply) + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, multiply + ) @pytest.mark.flaky(retries=2, delay=120) @@ -284,7 +174,7 @@ def test_remote_function_stringify_with_ibis( ibis_client, dataset_id, bq_cf_connection, - functions_client, + cloudfunctions_client, ): try: @@ -319,12 +209,14 @@ def stringify(x): ) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, functions_client, stringify) + cleanup_remote_function_assets( + session.bqclient, cloudfunctions_client, stringify + ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_decorator_with_bigframes_series( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -362,12 +254,12 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, functions_client, square) + cleanup_remote_function_assets(session.bqclient, cloudfunctions_client, square) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_explicit_with_bigframes_series( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -407,13 +299,17 @@ def add_one(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, remote_add_one + session.bqclient, cloudfunctions_client, remote_add_one ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_explicit_dataset_not_created( - session, scalars_dfs, dataset_id_not_created, bq_cf_connection, functions_client + session, + scalars_dfs, + dataset_id_not_created, + bq_cf_connection, + cloudfunctions_client, ): try: @@ -451,12 +347,12 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, functions_client, square) + cleanup_remote_function_assets(session.bqclient, cloudfunctions_client, square) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_var( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: POSITIVE_SIGN = 1 @@ -502,12 +398,14 @@ def sign(num): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, functions_client, remote_sign) + cleanup_remote_function_assets( + session.bqclient, cloudfunctions_client, remote_sign + ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_import( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: import math as mymath @@ -548,13 +446,13 @@ def circumference(radius): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, remote_circumference + session.bqclient, cloudfunctions_client, remote_circumference ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_global_var_and_import( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -597,7 +495,7 @@ def find_team(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, remote_find_team + session.bqclient, cloudfunctions_client, remote_find_team ) @@ -607,7 +505,7 @@ def test_remote_function_restore_with_bigframes_series( scalars_dfs, dataset_id, bq_cf_connection, - functions_client, + cloudfunctions_client, ): try: @@ -623,7 +521,7 @@ def add_one(x): # There should be no cloud function yet for the unique udf cloud_functions = list( get_cloud_functions( - functions_client, + cloudfunctions_client, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -644,7 +542,7 @@ def add_one(x): # There should have been excactly one cloud function created at this point cloud_functions = list( get_cloud_functions( - functions_client, + cloudfunctions_client, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -684,7 +582,7 @@ def inner_test(): # Let's delete the cloud function while not touching the bq remote function delete_operation = delete_cloud_function( - functions_client, cloud_functions[0].name + cloudfunctions_client, cloud_functions[0].name ) delete_operation.result() assert delete_operation.done() @@ -692,7 +590,7 @@ def inner_test(): # There should be no cloud functions at this point for the uniq udf cloud_functions = list( get_cloud_functions( - functions_client, + cloudfunctions_client, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -714,7 +612,7 @@ def inner_test(): # There should be excactly one cloud function again cloud_functions = list( get_cloud_functions( - functions_client, + cloudfunctions_client, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -731,13 +629,13 @@ def inner_test(): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, remote_add_one + session.bqclient, cloudfunctions_client, remote_add_one ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_default_value( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -771,13 +669,13 @@ def is_odd(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, is_odd_remote + session.bqclient, cloudfunctions_client, is_odd_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_custom_value( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -814,13 +712,13 @@ def is_odd(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, is_odd_remote + session.bqclient, cloudfunctions_client, is_odd_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_lambda( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: add_one_lambda = lambda x: x + 1 # noqa: E731 @@ -858,13 +756,13 @@ def test_remote_udf_lambda( finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, add_one_lambda_remote + session.bqclient, cloudfunctions_client, add_one_lambda_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_explicit_name( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -915,13 +813,13 @@ def square(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, square_remote + session.bqclient, cloudfunctions_client, square_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_external_package_dependencies( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -960,13 +858,13 @@ def pd_np_foo(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, pd_np_foo_remote + session.bqclient, cloudfunctions_client, pd_np_foo_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_explicit_name_reuse( - session, scalars_dfs, dataset_id, bq_cf_connection, functions_client + session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client ): try: @@ -1113,13 +1011,13 @@ def plusone(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, functions_client, square_remote1 + session.bqclient, cloudfunctions_client, square_remote1 ) cleanup_remote_function_assets( - session.bqclient, functions_client, square_remote2 + session.bqclient, cloudfunctions_client, square_remote2 ) cleanup_remote_function_assets( - session.bqclient, functions_client, plusone_remote + session.bqclient, cloudfunctions_client, plusone_remote ) for dir_ in dirs_to_cleanup: shutil.rmtree(dir_) diff --git a/tests/system/utils.py b/tests/system/utils.py index a4647b4f51..3b66aa9d3b 100644 --- a/tests/system/utils.py +++ b/tests/system/utils.py @@ -15,13 +15,19 @@ import base64 import decimal import functools +from typing import Iterable, Optional, Set import geopandas as gpd # type: ignore +import google.api_core.operation +from google.cloud import bigquery, functions_v2 +from google.cloud.functions_v2.types import functions import numpy as np import pandas as pd import pyarrow as pa # type: ignore import pytest +from bigframes.functions.remote_function import get_remote_function_locations + def skip_legacy_pandas(test): @functools.wraps(test) @@ -241,3 +247,60 @@ def assert_pandas_df_equal_pca(actual, expected, **kwargs): except AssertionError: # Allow for sign difference per column pd.testing.assert_series_equal(-actual[column], expected[column], **kwargs) + + +def get_remote_function_endpoints( + bigquery_client: bigquery.Client, dataset_id: str +) -> Set[str]: + """Get endpoints used by the remote functions in a datset""" + endpoints = set() + routines = bigquery_client.list_routines(dataset=dataset_id) + for routine in routines: + rf_options = routine._properties.get("remoteFunctionOptions") + if not rf_options: + continue + rf_endpoint = rf_options.get("endpoint") + if rf_endpoint: + endpoints.add(rf_endpoint) + return endpoints + + +def get_cloud_functions( + functions_client: functions_v2.FunctionServiceClient, + project: str, + location: str, + name: Optional[str] = None, + name_prefix: Optional[str] = None, +) -> Iterable[functions.ListFunctionsResponse]: + """Get the cloud functions in the given project and location.""" + + assert ( + not name or not name_prefix + ), "Either 'name' or 'name_prefix' can be passed but not both." + + _, location = get_remote_function_locations(location) + parent = f"projects/{project}/locations/{location}" + request = functions_v2.ListFunctionsRequest(parent=parent) + page_result = functions_client.list_functions(request=request) + for response in page_result: + # If name is provided and it does not match then skip + if bool(name): + full_name = parent + f"/functions/{name}" + if response.name != full_name: + continue + # If name prefix is provided and it does not match then skip + elif bool(name_prefix): + full_name_prefix = parent + f"/functions/{name_prefix}" + if not response.name.startswith(full_name_prefix): + continue + + yield response + + +def delete_cloud_function( + functions_client: functions_v2.FunctionServiceClient, full_name: str +) -> google.api_core.operation.Operation: + """Delete a cloud function with the given fully qualified name.""" + request = functions_v2.DeleteFunctionRequest(name=full_name) + operation = functions_client.delete_function(request=request) + return operation From b44fa1d7295cef0b03cd43efcd8253e40f781b54 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 8 Mar 2024 03:06:10 +0000 Subject: [PATCH 2/3] use functions client from session --- tests/system/large/test_remote_function.py | 73 +++++++++++----------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index d9141164e4..3f4ce041ef 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -174,7 +174,6 @@ def test_remote_function_stringify_with_ibis( ibis_client, dataset_id, bq_cf_connection, - cloudfunctions_client, ): try: @@ -210,13 +209,13 @@ def stringify(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, stringify + session.bqclient, session.cloudfunctionsclient, stringify ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_decorator_with_bigframes_series( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -254,12 +253,14 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, cloudfunctions_client, square) + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square + ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_explicit_with_bigframes_series( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -299,7 +300,7 @@ def add_one(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, remote_add_one + session.bqclient, session.cloudfunctionsclient, remote_add_one ) @@ -309,7 +310,6 @@ def test_remote_function_explicit_dataset_not_created( scalars_dfs, dataset_id_not_created, bq_cf_connection, - cloudfunctions_client, ): try: @@ -347,12 +347,14 @@ def square(x): assert_pandas_df_equal(bf_result, pd_result) finally: # clean up the gcp assets created for the remote function - cleanup_remote_function_assets(session.bqclient, cloudfunctions_client, square) + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square + ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_var( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: POSITIVE_SIGN = 1 @@ -399,13 +401,13 @@ def sign(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, remote_sign + session.bqclient, session.cloudfunctionsclient, remote_sign ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_outside_import( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: import math as mymath @@ -446,13 +448,13 @@ def circumference(radius): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, remote_circumference + session.bqclient, session.cloudfunctionsclient, remote_circumference ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_referring_global_var_and_import( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -495,7 +497,7 @@ def find_team(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, remote_find_team + session.bqclient, session.cloudfunctionsclient, remote_find_team ) @@ -505,7 +507,6 @@ def test_remote_function_restore_with_bigframes_series( scalars_dfs, dataset_id, bq_cf_connection, - cloudfunctions_client, ): try: @@ -521,7 +522,7 @@ def add_one(x): # There should be no cloud function yet for the unique udf cloud_functions = list( get_cloud_functions( - cloudfunctions_client, + session.cloudfunctionsclient, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -542,7 +543,7 @@ def add_one(x): # There should have been excactly one cloud function created at this point cloud_functions = list( get_cloud_functions( - cloudfunctions_client, + session.cloudfunctionsclient, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -582,7 +583,7 @@ def inner_test(): # Let's delete the cloud function while not touching the bq remote function delete_operation = delete_cloud_function( - cloudfunctions_client, cloud_functions[0].name + session.cloudfunctionsclient, cloud_functions[0].name ) delete_operation.result() assert delete_operation.done() @@ -590,7 +591,7 @@ def inner_test(): # There should be no cloud functions at this point for the uniq udf cloud_functions = list( get_cloud_functions( - cloudfunctions_client, + session.cloudfunctionsclient, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -612,7 +613,7 @@ def inner_test(): # There should be excactly one cloud function again cloud_functions = list( get_cloud_functions( - cloudfunctions_client, + session.cloudfunctionsclient, session.bqclient.project, session.bqclient.location, name=add_one_uniq_cf_name, @@ -629,13 +630,13 @@ def inner_test(): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, remote_add_one + session.bqclient, session.cloudfunctionsclient, remote_add_one ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_default_value( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -669,13 +670,13 @@ def is_odd(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, is_odd_remote + session.bqclient, session.cloudfunctionsclient, is_odd_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_custom_value( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -712,14 +713,12 @@ def is_odd(num): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, is_odd_remote + session.bqclient, session.cloudfunctionsclient, is_odd_remote ) @pytest.mark.flaky(retries=2, delay=120) -def test_remote_udf_lambda( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client -): +def test_remote_udf_lambda(session, scalars_dfs, dataset_id, bq_cf_connection): try: add_one_lambda = lambda x: x + 1 # noqa: E731 @@ -756,13 +755,13 @@ def test_remote_udf_lambda( finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, add_one_lambda_remote + session.bqclient, session.cloudfunctionsclient, add_one_lambda_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_explicit_name( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -813,13 +812,13 @@ def square(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, square_remote + session.bqclient, session.cloudfunctionsclient, square_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_external_package_dependencies( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -858,13 +857,13 @@ def pd_np_foo(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, pd_np_foo_remote + session.bqclient, session.cloudfunctionsclient, pd_np_foo_remote ) @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_with_explicit_name_reuse( - session, scalars_dfs, dataset_id, bq_cf_connection, cloudfunctions_client + session, scalars_dfs, dataset_id, bq_cf_connection ): try: @@ -1011,13 +1010,13 @@ def plusone(x): finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, square_remote1 + session.bqclient, session.cloudfunctionsclient, square_remote1 ) cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, square_remote2 + session.bqclient, session.cloudfunctionsclient, square_remote2 ) cleanup_remote_function_assets( - session.bqclient, cloudfunctions_client, plusone_remote + session.bqclient, session.cloudfunctionsclient, plusone_remote ) for dir_ in dirs_to_cleanup: shutil.rmtree(dir_) From 2fe59ebcfe63cf13b338cea8f9095ccad1a42439 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 18 Mar 2024 05:41:57 +0000 Subject: [PATCH 3/3] address review comments --- tests/system/conftest.py | 25 +++++++++++-------------- tests/system/utils.py | 4 ++-- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 70a25a8b4f..e6b241c9a3 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -21,7 +21,7 @@ import typing from typing import Dict, Optional -from google.api_core.exceptions import NotFound, ResourceExhausted +import google.api_core.exceptions import google.cloud.bigquery as bigquery import google.cloud.bigquery_connection_v1 as bigquery_connection_v1 import google.cloud.exceptions @@ -35,12 +35,7 @@ import test_utils.prefixer import bigframes -from tests.system.utils import ( - convert_pandas_dtypes, - delete_cloud_function, - get_cloud_functions, - get_remote_function_endpoints, -) +import tests.system.utils # Use this to control the number of cloud functions being deleted in a single # test session. This should help soften the spike of the number of mutations per @@ -362,7 +357,7 @@ def nested_pandas_df() -> pd.DataFrame: DATA_DIR / "nested.jsonl", lines=True, ) - convert_pandas_dtypes(df, bytes_col=True) + tests.system.utils.convert_pandas_dtypes(df, bytes_col=True) df = df.set_index("rowindex") return df @@ -414,7 +409,7 @@ def scalars_pandas_df_default_index() -> pd.DataFrame: DATA_DIR / "scalars.jsonl", lines=True, ) - convert_pandas_dtypes(df, bytes_col=True) + tests.system.utils.convert_pandas_dtypes(df, bytes_col=True) df = df.set_index("rowindex", drop=False) df.index.name = None @@ -1059,11 +1054,11 @@ def floats_product_bf(session, floats_product_pd): @pytest.fixture(scope="session", autouse=True) def cleanup_cloud_functions(session, cloudfunctions_client, dataset_id_permanent): """Clean up stale cloud functions.""" - permanent_endpoints = get_remote_function_endpoints( + permanent_endpoints = tests.system.utils.get_remote_function_endpoints( session.bqclient, dataset_id_permanent ) delete_count = 0 - for cloud_function in get_cloud_functions( + for cloud_function in tests.system.utils.get_cloud_functions( cloudfunctions_client, session.bqclient.project, session.bqclient.location, @@ -1083,18 +1078,20 @@ def cleanup_cloud_functions(session, cloudfunctions_client, dataset_id_permanent # Go ahead and delete try: - delete_cloud_function(cloudfunctions_client, cloud_function.name) + tests.system.utils.delete_cloud_function( + cloudfunctions_client, cloud_function.name + ) delete_count += 1 if delete_count >= MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION: break - except NotFound: + except google.api_core.exceptions.NotFound: # This can happen when multiple pytest sessions are running in # parallel. Two or more sessions may discover the same cloud # function, but only one of them would be able to delete it # successfully, while the other instance will run into this # exception. Ignore this exception. pass - except ResourceExhausted: + except google.api_core.exceptions.ResourceExhausted: # This can happen if we are hitting GCP limits, e.g. # google.api_core.exceptions.ResourceExhausted: 429 Quota exceeded # for quota metric 'Per project mutation requests' and limit diff --git a/tests/system/utils.py b/tests/system/utils.py index 3b66aa9d3b..8ea49ed7e2 100644 --- a/tests/system/utils.py +++ b/tests/system/utils.py @@ -26,7 +26,7 @@ import pyarrow as pa # type: ignore import pytest -from bigframes.functions.remote_function import get_remote_function_locations +from bigframes.functions import remote_function def skip_legacy_pandas(test): @@ -278,7 +278,7 @@ def get_cloud_functions( not name or not name_prefix ), "Either 'name' or 'name_prefix' can be passed but not both." - _, location = get_remote_function_locations(location) + _, location = remote_function.get_remote_function_locations(location) parent = f"projects/{project}/locations/{location}" request = functions_v2.ListFunctionsRequest(parent=parent) page_result = functions_client.list_functions(request=request)