From d92ced2adaa30a0405ace9ca6cd70a8e217f13d0 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 5 Mar 2024 20:56:16 +0000 Subject: [PATCH] feat: Support BYOSA in `remote_function` (#407) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes internal issue 328138730 🦕 --- bigframes/functions/remote_function.py | 21 +++++++++-- bigframes/pandas/__init__.py | 2 + bigframes/session/__init__.py | 9 +++++ tests/system/large/test_remote_function.py | 43 ++++++++++++++++++++++ 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index c31105a021..5bc8291f59 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -129,6 +129,7 @@ def __init__( bq_connection_client, bq_connection_id, cloud_resource_manager_client, + cloud_function_service_account, ): self._gcp_project_id = gcp_project_id self._cloud_function_region = cloud_function_region @@ -140,6 +141,7 @@ def __init__( self._bq_connection_manager = clients.BqConnectionManager( bq_connection_client, cloud_resource_manager_client ) + self._cloud_function_service_account = cloud_function_service_account def create_bq_remote_function( self, input_args, input_types, output_type, endpoint, bq_function_name @@ -384,6 +386,9 @@ def create_cloud_function(self, def_, cf_name, package_requirements=None): function.service_config = functions_v2.ServiceConfig() function.service_config.available_memory = "1024M" function.service_config.timeout_seconds = 600 + function.service_config.service_account_email = ( + self._cloud_function_service_account + ) create_function_request.function = function # Create the cloud function and wait for it to be ready to use @@ -591,6 +596,7 @@ def remote_function( reuse: bool = True, name: Optional[str] = None, packages: Optional[Sequence[str]] = None, + cloud_function_service_account: Optional[str] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -646,12 +652,12 @@ def remote_function( Client to use for BigQuery operations. If this param is not provided then bigquery client from the session would be used. bigquery_connection_client (google.cloud.bigquery_connection_v1.ConnectionServiceClient, Optional): - Client to use for cloud functions operations. If this param is not - provided then functions client from the session would be used. - cloud_functions_client (google.cloud.functions_v2.FunctionServiceClient, Optional): Client to use for BigQuery connection operations. If this param is not provided then bigquery connection client from the session would be used. + cloud_functions_client (google.cloud.functions_v2.FunctionServiceClient, Optional): + Client to use for cloud functions operations. If this param is not + provided then the functions client from the session would be used. resource_manager_client (google.cloud.resourcemanager_v3.ProjectsClient, Optional): Client to use for cloud resource management operations, e.g. for getting and setting IAM roles on cloud resources. If this param is @@ -686,7 +692,13 @@ def remote_function( Explicit name of the external package dependencies. Each dependency is added to the `requirements.txt` as is, and can be of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. - + cloud_function_service_account (str, Optional): + Service account to use for the cloud functions. If not provided then + the default service account would be used. See + https://cloud.google.com/functions/docs/securing/function-identity + for more details. Please make sure the service account has the + necessary IAM permissions configured as described in + https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration. """ import bigframes.pandas as bpd @@ -787,6 +799,7 @@ def wrapper(f): bigquery_connection_client, bq_connection_id, resource_manager_client, + cloud_function_service_account, ) rf_name, cf_name = remote_function_client.provision_bq_remote_function( diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 110978a7f1..3c9bb003cc 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -615,6 +615,7 @@ def remote_function( reuse: bool = True, name: Optional[str] = None, packages: Optional[Sequence[str]] = None, + cloud_function_service_account: Optional[str] = None, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -625,6 +626,7 @@ def remote_function( reuse=reuse, name=name, packages=packages, + cloud_function_service_account=cloud_function_service_account, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 4bd205afea..ef4a349244 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1337,6 +1337,7 @@ def remote_function( reuse: bool = True, name: Optional[str] = None, packages: Optional[Sequence[str]] = None, + cloud_function_service_account: Optional[str] = None, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1410,6 +1411,13 @@ def remote_function( Explicit name of the external package dependencies. Each dependency is added to the `requirements.txt` as is, and can be of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. + cloud_function_service_account (str, Optional): + Service account to use for the cloud functions. If not provided + then the default service account would be used. See + https://cloud.google.com/functions/docs/securing/function-identity + for more details. Please make sure the service account has the + necessary IAM permissions configured as described in + https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1428,6 +1436,7 @@ def remote_function( reuse=reuse, name=name, packages=packages, + cloud_function_service_account=cloud_function_service_account, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index b33298ae01..77aa3c7603 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1279,3 +1279,46 @@ def square(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, square ) + + +@pytest.mark.skip("This requires additional project config.") +def test_remote_function_via_session_custom_sa(scalars_dfs): + # Set these values to run the test locally + # TODO(shobs): Automate and enable this test + PROJECT = "" + GCF_SERVICE_ACCOUNT = "" + + rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=PROJECT)) + + try: + + @rf_session.remote_function( + [int], int, reuse=False, cloud_function_service_account=GCF_SERVICE_ACCOUNT + ) + def square_num(x): + if x is None: + return x + return x * x + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_col"] + bf_result_col = bf_int64_col.apply(square_num) + bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_col"] + pd_result_col = pd_int64_col.apply(lambda x: x if x is None else x * x) + pd_result = pd_int64_col.to_frame().assign(result=pd_result_col) + + assert_pandas_df_equal(bf_result, pd_result, check_dtype=False) + + # Assert that the GCF is created with the intended SA + gcf = rf_session.cloudfunctionsclient.get_function( + name=square_num.bigframes_cloud_function + ) + assert gcf.service_config.service_account_email == GCF_SERVICE_ACCOUNT + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + rf_session.bqclient, rf_session.cloudfunctionsclient, square_num + )