Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(google client): Enable to specify the service account to proxy function of scheduler in google client sdk #6013

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/python/kfp/v2/google/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def create_schedule_from_job_spec(
service_account: Optional[str] = None,
enable_caching: Optional[bool] = None,
app_engine_region: Optional[str] = None,
cloud_scheduler_service_account: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline file.

Expand Down Expand Up @@ -399,6 +400,9 @@ def create_schedule_from_job_spec(
If set, the setting applies to all tasks in the pipeline -- overrides
the compile time settings.
app_engine_region: The region that cloud scheduler job is created in.
cloud_scheduler_service_account: The service account that Cloud Scheduler job and the proxy cloud function use.
this should have permission to call AI Platform API and the proxy function.
If not specified, the functions uses the App Engine default service account.

Returns:
Created Google Cloud Scheduler Job object dictionary.
Expand All @@ -417,4 +421,5 @@ def create_schedule_from_job_spec(
parameter_values=parameter_values,
pipeline_root=pipeline_root,
service_account=service_account,
app_engine_region=app_engine_region)
app_engine_region=app_engine_region,
cloud_scheduler_service_account=cloud_scheduler_service_account)
18 changes: 15 additions & 3 deletions sdk/python/kfp/v2/google/client/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create_from_pipeline_file(
pipeline_root: Optional[str] = None,
service_account: Optional[str] = None,
app_engine_region: Optional[str] = None,
cloud_scheduler_service_account: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline file.

Expand Down Expand Up @@ -74,6 +75,9 @@ def create_from_pipeline_file(
specified during the compile time.
service_account: The service account that the pipeline workload runs as.
app_engine_region: The region that cloud scheduler job is created in.
cloud_scheduler_service_account: The service account that Cloud Scheduler job and the proxy cloud function use.
this should have permission to call AI Platform API and the proxy function.
If not specified, the functions uses the App Engine default service account.

Returns:
Created Google Cloud Scheduler Job object dictionary.
Expand All @@ -90,6 +94,7 @@ def create_from_pipeline_file(
pipeline_root=pipeline_root,
service_account=service_account,
app_engine_region=app_engine_region,
cloud_scheduler_service_account=cloud_scheduler_service_account,
)

def _create_from_pipeline_dict(
Expand All @@ -102,6 +107,7 @@ def _create_from_pipeline_dict(
pipeline_root: Optional[str] = None,
service_account: Optional[str] = None,
app_engine_region: Optional[str] = None,
cloud_scheduler_service_account: Optional[str] = None,
) -> dict:
"""Creates schedule for compiled pipeline dictionary."""

Expand All @@ -113,6 +119,7 @@ def _create_from_pipeline_dict(
proxy_function_url = _get_proxy_cloud_function_endpoint(
project_id=project_id,
region=region,
cloud_scheduler_service_account=cloud_scheduler_service_account,
)

if parameter_values or pipeline_root:
Expand Down Expand Up @@ -154,7 +161,7 @@ def _create_from_pipeline_dict(

project_location_path = 'projects/{}/locations/{}'.format(project_id, app_engine_region)
scheduled_job_full_name = '{}/jobs/{}'.format(project_location_path, schedule_name)
service_account_email = '{}@appspot.gserviceaccount.com'.format(project_id)
service_account_email = cloud_scheduler_service_account or '{}@appspot.gserviceaccount.com'.format(project_id)

scheduled_job = dict(
name=scheduled_job_full_name, # Optional. Only used for readable names.
Expand Down Expand Up @@ -264,6 +271,7 @@ def _create_or_get_cloud_function(
project_id: str,
region: str,
runtime: str = 'python37',
cloud_scheduler_service_account: Optional[str] = None,
):
"""Creates Google Cloud Function."""
functions_api = _get_cloud_functions_api()
Expand Down Expand Up @@ -317,7 +325,8 @@ def _create_or_get_cloud_function(
'httpsTrigger': {},
'runtime': runtime,
}

if cloud_scheduler_service_account is not None:
request_body["serviceAccountEmail"] = cloud_scheduler_service_account
try:
functions_api.create(
location=project_location_path,
Expand Down Expand Up @@ -368,6 +377,7 @@ def _enable_required_apis(project_id: str,):
def _get_proxy_cloud_function_endpoint(
project_id: str,
region: str = 'us-central1',
cloud_scheduler_service_account: Optional[str] = None,
):
"""Sets up a proxy Cloud Function."""
function_source_path = (
Expand All @@ -384,6 +394,8 @@ def _get_proxy_cloud_function_endpoint(
file_data={
'main.py': function_source,
'requirements.txt': 'google-api-python-client>=1.7.8,<2',
})
},
cloud_scheduler_service_account=cloud_scheduler_service_account,
)
endpoint_url = function_dict['httpsTrigger']['url']
return endpoint_url