Skip to content

Commit

Permalink
google sdk: Enable to specify the service account of proxy function a…
Browse files Browse the repository at this point in the history
…nd cloud scheduler job
  • Loading branch information
toshitanian committed Aug 16, 2021
1 parent 78200c8 commit bdd5955
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
7 changes: 6 additions & 1 deletion sdk/python/kfp/v2/google/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def create_schedule_from_job_spec(
service_account: Optional[str] = None,
enable_caching: Optional[bool] = None,
app_engine_region: Optional[str] = None,
service_account_for_schedule: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline file.
Expand Down Expand Up @@ -399,6 +400,9 @@ def create_schedule_from_job_spec(
If set, the setting applies to all tasks in the pipeline -- overrides
the compile time settings.
app_engine_region: The region that cloud scheduler job is created in.
service_account_for_schedule: The service account that Cloud Scheduler job and the proxy cloud function use.
this should have permission to call AI Platform API and the proxy function.
If not specified, the functions uses the App Engine default service account.
Returns:
Created Google Cloud Scheduler Job object dictionary.
Expand All @@ -417,4 +421,5 @@ def create_schedule_from_job_spec(
parameter_values=parameter_values,
pipeline_root=pipeline_root,
service_account=service_account,
app_engine_region=app_engine_region)
app_engine_region=app_engine_region,
service_account_for_schedule=service_account_for_schedule)
18 changes: 15 additions & 3 deletions sdk/python/kfp/v2/google/client/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create_from_pipeline_file(
pipeline_root: Optional[str] = None,
service_account: Optional[str] = None,
app_engine_region: Optional[str] = None,
service_account_for_schedule: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline file.
Expand Down Expand Up @@ -74,6 +75,9 @@ def create_from_pipeline_file(
specified during the compile time.
service_account: The service account that the pipeline workload runs as.
app_engine_region: The region that cloud scheduler job is created in.
service_account_for_schedule: The service account that Cloud Scheduler job and the proxy cloud function use.
this should have permission to call AI Platform API and the proxy function.
If not specified, the functions uses the App Engine default service account.
Returns:
Created Google Cloud Scheduler Job object dictionary.
Expand All @@ -90,6 +94,7 @@ def create_from_pipeline_file(
pipeline_root=pipeline_root,
service_account=service_account,
app_engine_region=app_engine_region,
service_account_for_schedule=service_account_for_schedule,
)

def _create_from_pipeline_dict(
Expand All @@ -102,6 +107,7 @@ def _create_from_pipeline_dict(
pipeline_root: Optional[str] = None,
service_account: Optional[str] = None,
app_engine_region: Optional[str] = None,
service_account_for_schedule: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline dictionary."""

Expand All @@ -113,6 +119,7 @@ def _create_from_pipeline_dict(
proxy_function_url = _get_proxy_cloud_function_endpoint(
project_id=project_id,
region=region,
service_account_for_schedule=service_account_for_schedule,
)

if parameter_values or pipeline_root:
Expand Down Expand Up @@ -154,7 +161,7 @@ def _create_from_pipeline_dict(

project_location_path = 'projects/{}/locations/{}'.format(project_id, app_engine_region)
scheduled_job_full_name = '{}/jobs/{}'.format(project_location_path, schedule_name)
service_account_email = '{}@appspot.gserviceaccount.com'.format(project_id)
service_account_email = service_account_for_schedule or '{}@appspot.gserviceaccount.com'.format(project_id)

scheduled_job = dict(
name=scheduled_job_full_name, # Optional. Only used for readable names.
Expand Down Expand Up @@ -264,6 +271,7 @@ def _create_or_get_cloud_function(
project_id: str,
region: str,
runtime: str = 'python37',
service_account_for_schedule: Optional[str] = None,
):
"""Creates Google Cloud Function."""
functions_api = _get_cloud_functions_api()
Expand Down Expand Up @@ -317,7 +325,8 @@ def _create_or_get_cloud_function(
'httpsTrigger': {},
'runtime': runtime,
}

if service_account_for_schedule is not None:
request_body["serviceAccountEmail"] = service_account_for_schedule
try:
functions_api.create(
location=project_location_path,
Expand Down Expand Up @@ -368,6 +377,7 @@ def _enable_required_apis(project_id: str,):
def _get_proxy_cloud_function_endpoint(
project_id: str,
region: str = 'us-central1',
service_account_for_schedule: Optional[str] = None,
):
"""Sets up a proxy Cloud Function."""
function_source_path = (
Expand All @@ -384,6 +394,8 @@ def _get_proxy_cloud_function_endpoint(
file_data={
'main.py': function_source,
'requirements.txt': 'google-api-python-client>=1.7.8,<2',
})
},
service_account_for_schedule=service_account_for_schedule,
)
endpoint_url = function_dict['httpsTrigger']['url']
return endpoint_url

0 comments on commit bdd5955

Please sign in to comment.