Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support BYOSA in remote_function #407

Merged
merged 7 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def __init__(
bq_connection_client,
bq_connection_id,
cloud_resource_manager_client,
cloud_function_service_account,
):
self._gcp_project_id = gcp_project_id
self._cloud_function_region = cloud_function_region
Expand All @@ -140,6 +141,7 @@ def __init__(
self._bq_connection_manager = clients.BqConnectionManager(
bq_connection_client, cloud_resource_manager_client
)
self._cloud_function_service_account = cloud_function_service_account

def create_bq_remote_function(
self, input_args, input_types, output_type, endpoint, bq_function_name
Expand Down Expand Up @@ -384,6 +386,9 @@ def create_cloud_function(self, def_, cf_name, package_requirements=None):
function.service_config = functions_v2.ServiceConfig()
function.service_config.available_memory = "1024M"
function.service_config.timeout_seconds = 600
function.service_config.service_account_email = (
self._cloud_function_service_account
)
create_function_request.function = function

# Create the cloud function and wait for it to be ready to use
Expand Down Expand Up @@ -591,6 +596,7 @@ def remote_function(
reuse: bool = True,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
cloud_function_service_account: Optional[str] = None,
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -646,12 +652,12 @@ def remote_function(
Client to use for BigQuery operations. If this param is not provided
then bigquery client from the session would be used.
bigquery_connection_client (google.cloud.bigquery_connection_v1.ConnectionServiceClient, Optional):
Client to use for cloud functions operations. If this param is not
provided then functions client from the session would be used.
cloud_functions_client (google.cloud.functions_v2.FunctionServiceClient, Optional):
Client to use for BigQuery connection operations. If this param is
not provided then bigquery connection client from the session would
be used.
cloud_functions_client (google.cloud.functions_v2.FunctionServiceClient, Optional):
Client to use for cloud functions operations. If this param is not
provided then the functions client from the session would be used.
resource_manager_client (google.cloud.resourcemanager_v3.ProjectsClient, Optional):
Client to use for cloud resource management operations, e.g. for
getting and setting IAM roles on cloud resources. If this param is
Expand Down Expand Up @@ -686,7 +692,13 @@ def remote_function(
Explicit name of the external package dependencies. Each dependency
is added to the `requirements.txt` as is, and can be of the form
supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/.

cloud_function_service_account (str, Optional):
Service account to use for the cloud functions. If not provided then
the default service account would be used. See
https://cloud.google.com/functions/docs/securing/function-identity
for more details. Please make sure the service account has the
necessary IAM permissions configured as described in
https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration.
"""
import bigframes.pandas as bpd

Expand Down Expand Up @@ -787,6 +799,7 @@ def wrapper(f):
bigquery_connection_client,
bq_connection_id,
resource_manager_client,
cloud_function_service_account,
)

rf_name, cf_name = remote_function_client.provision_bq_remote_function(
Expand Down
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ def remote_function(
reuse: bool = True,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
cloud_function_service_account: Optional[str] = None,
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -625,6 +626,7 @@ def remote_function(
reuse=reuse,
name=name,
packages=packages,
cloud_function_service_account=cloud_function_service_account,
)


Expand Down
9 changes: 9 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ def remote_function(
reuse: bool = True,
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
cloud_function_service_account: Optional[str] = None,
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1410,6 +1411,13 @@ def remote_function(
Explicit name of the external package dependencies. Each dependency
is added to the `requirements.txt` as is, and can be of the form
supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/.
cloud_function_service_account (str, Optional):
Service account to use for the cloud functions. If not provided
then the default service account would be used. See
https://cloud.google.com/functions/docs/securing/function-identity
for more details. Please make sure the service account has the
necessary IAM permissions configured as described in
https://cloud.google.com/functions/docs/reference/iam/roles#additional-configuration.
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1428,6 +1436,7 @@ def remote_function(
reuse=reuse,
name=name,
packages=packages,
cloud_function_service_account=cloud_function_service_account,
)

def read_gbq_function(
Expand Down
43 changes: 43 additions & 0 deletions tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,3 +1279,46 @@ def square(x):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.skip("This requires additional project config.")
def test_remote_function_via_session_custom_sa(scalars_dfs):
# Set these values to run the test locally
# TODO(shobs): Automate and enable this test
PROJECT = ""
GCF_SERVICE_ACCOUNT = ""

rf_session = bigframes.Session(context=bigframes.BigQueryOptions(project=PROJECT))

try:

@rf_session.remote_function(
[int], int, reuse=False, cloud_function_service_account=GCF_SERVICE_ACCOUNT
)
def square_num(x):
if x is None:
return x
return x * x

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_result_col = bf_int64_col.apply(square_num)
bf_result = bf_int64_col.to_frame().assign(result=bf_result_col).to_pandas()

pd_int64_col = scalars_pandas_df["int64_col"]
pd_result_col = pd_int64_col.apply(lambda x: x if x is None else x * x)
pd_result = pd_int64_col.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result, check_dtype=False)

# Assert that the GCF is created with the intended SA
gcf = rf_session.cloudfunctionsclient.get_function(
name=square_num.bigframes_cloud_function
)
assert gcf.service_config.service_account_email == GCF_SERVICE_ACCOUNT
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
rf_session.bqclient, rf_session.cloudfunctionsclient, square_num
)