From bdd5955a5e457688f44b192dcdecfd9d18b9c73e Mon Sep 17 00:00:00 2001 From: toshiya kawasaki Date: Mon, 16 Aug 2021 21:43:29 +0900 Subject: [PATCH] google sdk: Enable to specify the service account of proxy function and cloud scheduler job --- sdk/python/kfp/v2/google/client/client.py | 7 ++++++- sdk/python/kfp/v2/google/client/schedule.py | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/sdk/python/kfp/v2/google/client/client.py b/sdk/python/kfp/v2/google/client/client.py index ca452935293..fd19e8e7d76 100644 --- a/sdk/python/kfp/v2/google/client/client.py +++ b/sdk/python/kfp/v2/google/client/client.py @@ -368,6 +368,7 @@ def create_schedule_from_job_spec( service_account: Optional[str] = None, enable_caching: Optional[bool] = None, app_engine_region: Optional[str] = None, + service_account_for_schedule: Optional[str] = None, ) -> dict: """Creates schedule for compiled pipeline file. @@ -399,6 +400,9 @@ def create_schedule_from_job_spec( If set, the setting applies to all tasks in the pipeline -- overrides the compile time settings. app_engine_region: The region that cloud scheduler job is created in. + service_account_for_schedule: The service account that Cloud Scheduler job and the proxy cloud function use. + this should have permission to call AI Platform API and the proxy function. + If not specified, the functions uses the App Engine default service account. Returns: Created Google Cloud Scheduler Job object dictionary. @@ -417,4 +421,5 @@ def create_schedule_from_job_spec( parameter_values=parameter_values, pipeline_root=pipeline_root, service_account=service_account, - app_engine_region=app_engine_region) + app_engine_region=app_engine_region, + service_account_for_schedule=service_account_for_schedule) diff --git a/sdk/python/kfp/v2/google/client/schedule.py b/sdk/python/kfp/v2/google/client/schedule.py index 84f9ca6ac2a..76faf524f2b 100644 --- a/sdk/python/kfp/v2/google/client/schedule.py +++ b/sdk/python/kfp/v2/google/client/schedule.py @@ -47,6 +47,7 @@ def create_from_pipeline_file( pipeline_root: Optional[str] = None, service_account: Optional[str] = None, app_engine_region: Optional[str] = None, + service_account_for_schedule: Optional[str] = None, ) -> dict: """Creates schedule for compiled pipeline file. @@ -74,6 +75,9 @@ def create_from_pipeline_file( specified during the compile time. service_account: The service account that the pipeline workload runs as. app_engine_region: The region that cloud scheduler job is created in. + service_account_for_schedule: The service account that Cloud Scheduler job and the proxy cloud function use. + this should have permission to call AI Platform API and the proxy function. + If not specified, the functions uses the App Engine default service account. Returns: Created Google Cloud Scheduler Job object dictionary. @@ -90,6 +94,7 @@ def create_from_pipeline_file( pipeline_root=pipeline_root, service_account=service_account, app_engine_region=app_engine_region, + service_account_for_schedule=service_account_for_schedule, ) def _create_from_pipeline_dict( @@ -102,6 +107,7 @@ def _create_from_pipeline_dict( pipeline_root: Optional[str] = None, service_account: Optional[str] = None, app_engine_region: Optional[str] = None, + service_account_for_schedule: Optional[str] = None, ) -> dict: """Creates schedule for compiled pipeline dictionary.""" @@ -113,6 +119,7 @@ def _create_from_pipeline_dict( proxy_function_url = _get_proxy_cloud_function_endpoint( project_id=project_id, region=region, + service_account_for_schedule=service_account_for_schedule, ) if parameter_values or pipeline_root: @@ -154,7 +161,7 @@ def _create_from_pipeline_dict( project_location_path = 'projects/{}/locations/{}'.format(project_id, app_engine_region) scheduled_job_full_name = '{}/jobs/{}'.format(project_location_path, schedule_name) - service_account_email = '{}@appspot.gserviceaccount.com'.format(project_id) + service_account_email = service_account_for_schedule or '{}@appspot.gserviceaccount.com'.format(project_id) scheduled_job = dict( name=scheduled_job_full_name, # Optional. Only used for readable names. @@ -264,6 +271,7 @@ def _create_or_get_cloud_function( project_id: str, region: str, runtime: str = 'python37', + service_account_for_schedule: Optional[str] = None, ): """Creates Google Cloud Function.""" functions_api = _get_cloud_functions_api() @@ -317,7 +325,8 @@ def _create_or_get_cloud_function( 'httpsTrigger': {}, 'runtime': runtime, } - + if service_account_for_schedule is not None: + request_body["serviceAccountEmail"] = service_account_for_schedule try: functions_api.create( location=project_location_path, @@ -368,6 +377,7 @@ def _enable_required_apis(project_id: str,): def _get_proxy_cloud_function_endpoint( project_id: str, region: str = 'us-central1', + service_account_for_schedule: Optional[str] = None, ): """Sets up a proxy Cloud Function.""" function_source_path = ( @@ -384,6 +394,8 @@ def _get_proxy_cloud_function_endpoint( file_data={ 'main.py': function_source, 'requirements.txt': 'google-api-python-client>=1.7.8,<2', - }) + }, + service_account_for_schedule=service_account_for_schedule, + ) endpoint_url = function_dict['httpsTrigger']['url'] return endpoint_url