Skip to content

Commit

Permalink
feat(google client): Enable to specify the service account to proxy f…
Browse files Browse the repository at this point in the history
…unction of scheduler in google client sdk (#6013)

* google sdk: Enable to specify the service account of proxy function and cloud scheduler job

* rename service_account_for_schedule to cloud_scheduler_service_account
  • Loading branch information
toshitanian authored Aug 18, 2021
1 parent 72c54c8 commit d40985c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
7 changes: 6 additions & 1 deletion sdk/python/kfp/v2/google/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def create_schedule_from_job_spec(
service_account: Optional[str] = None,
enable_caching: Optional[bool] = None,
app_engine_region: Optional[str] = None,
cloud_scheduler_service_account: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline file.
Expand Down Expand Up @@ -399,6 +400,9 @@ def create_schedule_from_job_spec(
If set, the setting applies to all tasks in the pipeline -- overrides
the compile time settings.
app_engine_region: The region that cloud scheduler job is created in.
cloud_scheduler_service_account: The service account that Cloud Scheduler job and the proxy cloud function use.
this should have permission to call AI Platform API and the proxy function.
If not specified, the functions uses the App Engine default service account.
Returns:
Created Google Cloud Scheduler Job object dictionary.
Expand All @@ -417,4 +421,5 @@ def create_schedule_from_job_spec(
parameter_values=parameter_values,
pipeline_root=pipeline_root,
service_account=service_account,
app_engine_region=app_engine_region)
app_engine_region=app_engine_region,
cloud_scheduler_service_account=cloud_scheduler_service_account)
18 changes: 15 additions & 3 deletions sdk/python/kfp/v2/google/client/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create_from_pipeline_file(
pipeline_root: Optional[str] = None,
service_account: Optional[str] = None,
app_engine_region: Optional[str] = None,
cloud_scheduler_service_account: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline file.
Expand Down Expand Up @@ -74,6 +75,9 @@ def create_from_pipeline_file(
specified during the compile time.
service_account: The service account that the pipeline workload runs as.
app_engine_region: The region that cloud scheduler job is created in.
cloud_scheduler_service_account: The service account that Cloud Scheduler job and the proxy cloud function use.
this should have permission to call AI Platform API and the proxy function.
If not specified, the functions uses the App Engine default service account.
Returns:
Created Google Cloud Scheduler Job object dictionary.
Expand All @@ -90,6 +94,7 @@ def create_from_pipeline_file(
pipeline_root=pipeline_root,
service_account=service_account,
app_engine_region=app_engine_region,
cloud_scheduler_service_account=cloud_scheduler_service_account,
)

def _create_from_pipeline_dict(
Expand All @@ -102,6 +107,7 @@ def _create_from_pipeline_dict(
pipeline_root: Optional[str] = None,
service_account: Optional[str] = None,
app_engine_region: Optional[str] = None,
cloud_scheduler_service_account: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline dictionary."""

Expand All @@ -113,6 +119,7 @@ def _create_from_pipeline_dict(
proxy_function_url = _get_proxy_cloud_function_endpoint(
project_id=project_id,
region=region,
cloud_scheduler_service_account=cloud_scheduler_service_account,
)

if parameter_values or pipeline_root:
Expand Down Expand Up @@ -154,7 +161,7 @@ def _create_from_pipeline_dict(

project_location_path = 'projects/{}/locations/{}'.format(project_id, app_engine_region)
scheduled_job_full_name = '{}/jobs/{}'.format(project_location_path, schedule_name)
service_account_email = '{}@appspot.gserviceaccount.com'.format(project_id)
service_account_email = cloud_scheduler_service_account or '{}@appspot.gserviceaccount.com'.format(project_id)

scheduled_job = dict(
name=scheduled_job_full_name, # Optional. Only used for readable names.
Expand Down Expand Up @@ -264,6 +271,7 @@ def _create_or_get_cloud_function(
project_id: str,
region: str,
runtime: str = 'python37',
cloud_scheduler_service_account: Optional[str] = None,
):
"""Creates Google Cloud Function."""
functions_api = _get_cloud_functions_api()
Expand Down Expand Up @@ -317,7 +325,8 @@ def _create_or_get_cloud_function(
'httpsTrigger': {},
'runtime': runtime,
}

if cloud_scheduler_service_account is not None:
request_body["serviceAccountEmail"] = cloud_scheduler_service_account
try:
functions_api.create(
location=project_location_path,
Expand Down Expand Up @@ -368,6 +377,7 @@ def _enable_required_apis(project_id: str,):
def _get_proxy_cloud_function_endpoint(
project_id: str,
region: str = 'us-central1',
cloud_scheduler_service_account: Optional[str] = None,
):
"""Sets up a proxy Cloud Function."""
function_source_path = (
Expand All @@ -384,6 +394,8 @@ def _get_proxy_cloud_function_endpoint(
file_data={
'main.py': function_source,
'requirements.txt': 'google-api-python-client>=1.7.8,<2',
})
},
cloud_scheduler_service_account=cloud_scheduler_service_account,
)
endpoint_url = function_dict['httpsTrigger']['url']
return endpoint_url

0 comments on commit d40985c

Please sign in to comment.