From 9ad12eac1b38ae1858a6daa0d03ef6858382e5d2 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Tue, 9 Nov 2021 11:51:09 -0800 Subject: [PATCH 1/7] Delete sdk/python/kfp/v2/google directory --- sdk/python/kfp/v2/google/__init__.py | 13 - sdk/python/kfp/v2/google/client/__init__.py | 15 - .../_cloud_function_templated_http_request.py | 105 -- ...ud_function_templated_http_request_test.py | 54 - sdk/python/kfp/v2/google/client/client.py | 425 ------ .../kfp/v2/google/client/client_test.py | 228 --- .../kfp/v2/google/client/client_utils.py | 70 - .../kfp/v2/google/client/client_utils_test.py | 62 - ...aiplatform_public_google_rest_v1beta1.json | 1235 ----------------- .../google/client/runtime_config_builder.py | 159 --- .../client/runtime_config_builder_test.py | 176 --- sdk/python/kfp/v2/google/client/schedule.py | 412 ------ .../kfp/v2/google/client/schedule_test.py | 141 -- .../v2/google/client/testdata/pipeline1.json | 71 - .../testdata/pipeline1_request_body.json | 73 - .../pipeline1_request_body_final.json | 71 - .../google/client/testdata/pipeline_job.json | 300 ---- .../kfp/v2/google/experimental/__init__.py | 15 - .../kfp/v2/google/experimental/custom_job.py | 168 --- .../v2/google/experimental/custom_job_test.py | 139 -- 20 files changed, 3932 deletions(-) delete mode 100644 sdk/python/kfp/v2/google/__init__.py delete mode 100644 sdk/python/kfp/v2/google/client/__init__.py delete mode 100644 sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request.py delete mode 100644 sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request_test.py delete mode 100644 sdk/python/kfp/v2/google/client/client.py delete mode 100644 sdk/python/kfp/v2/google/client/client_test.py delete mode 100644 sdk/python/kfp/v2/google/client/client_utils.py delete mode 100644 sdk/python/kfp/v2/google/client/client_utils_test.py delete mode 100644 sdk/python/kfp/v2/google/client/discovery/aiplatform_public_google_rest_v1beta1.json delete mode 100644 sdk/python/kfp/v2/google/client/runtime_config_builder.py delete mode 100644 sdk/python/kfp/v2/google/client/runtime_config_builder_test.py delete mode 100644 sdk/python/kfp/v2/google/client/schedule.py delete mode 100644 sdk/python/kfp/v2/google/client/schedule_test.py delete mode 100644 sdk/python/kfp/v2/google/client/testdata/pipeline1.json delete mode 100644 sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body.json delete mode 100644 sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body_final.json delete mode 100644 sdk/python/kfp/v2/google/client/testdata/pipeline_job.json delete mode 100644 sdk/python/kfp/v2/google/experimental/__init__.py delete mode 100644 sdk/python/kfp/v2/google/experimental/custom_job.py delete mode 100644 sdk/python/kfp/v2/google/experimental/custom_job_test.py diff --git a/sdk/python/kfp/v2/google/__init__.py b/sdk/python/kfp/v2/google/__init__.py deleted file mode 100644 index e7878caf33e..00000000000 --- a/sdk/python/kfp/v2/google/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/sdk/python/kfp/v2/google/client/__init__.py b/sdk/python/kfp/v2/google/client/__init__.py deleted file mode 100644 index bc7b30aecd3..00000000000 --- a/sdk/python/kfp/v2/google/client/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from kfp.v2.google.client.client import AIPlatformClient diff --git a/sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request.py b/sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request.py deleted file mode 100644 index faf994b8f57..00000000000 --- a/sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Cloud function that proxies HTTP request after replacing placeholders.""" - -import datetime -import json -import logging -import re -import traceback -from typing import Optional, Tuple - -from google import auth -from google.auth.transport import urllib3 - - -def _process_request(request): - """Wrapper of _process_request_impl with error handling.""" - try: - return _process_request_impl(request) - except Exception as err: - traceback.print_exc() - raise err - - -def _preprocess_request_body( - request_body: bytes, - time: datetime.datetime, -) -> Tuple[str, str, Optional[bytes]]: - """Augments the request body before sending it to CAIP Pipelines API. - - Replaces placeholders, generates unique name, removes `_discovery_url`. - Args: - request_body: Request body - time: The scheduled invocation time. - - Returns: - Tuple of (url, method, resolved_request_body). - """ - request_str = request_body.decode('utf-8') - - # Replacing placeholders like: {{$.scheduledTime.strftime('%Y-%m-%d')}} - request_str = re.sub(r"{{\$.scheduledTime.strftime\('([^']*)'\)}}", - lambda m: time.strftime(m.group(1)), request_str) - - request_json = json.loads(request_str) - - url = str(request_json['_url']) - del request_json['_url'] - - method = str(request_json['_method']) - del request_json['_method'] - - resolved_request_body = None - if request_json: - resolved_request_str = json.dumps(request_json) - resolved_request_body = resolved_request_str.encode('utf-8') - return (url, method, resolved_request_body) - - -def _process_request_impl(request): - """Processes the incoming HTTP request. - - Args: - request (flask.Request): HTTP request object. - - Returns: - The response text or any set of values that can be turned into a Response - object using `make_response - `. - """ - time = datetime.datetime.now() - - logging.debug('request.headers=%s', request.headers) - logging.debug('Original request body: %s', request.data) - (url, method, resolved_request_body) = _preprocess_request_body( - request_body=request.data, - time=time, - ) - logging.debug('url=%s', url) - logging.debug('method=%s', method) - logging.debug('Resolved request body: %s', resolved_request_body) - - credentials, _ = auth.default() - authorized_http = urllib3.AuthorizedHttp(credentials=credentials) - response = authorized_http.request( - url=url, - method=method, - body=resolved_request_body, - ) - data_str = response.data.decode('utf-8') - if response.status != 200: - print(f'response.status={response.status}') - print(f'response.data={data_str}') - return (response.data, response.status) diff --git a/sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request_test.py b/sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request_test.py deleted file mode 100644 index 92ef16c4409..00000000000 --- a/sdk/python/kfp/v2/google/client/_cloud_function_templated_http_request_test.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for kfp.v2.google.client._cloud_function_create_pipeline_job.""" - -import datetime -import json -import os -import unittest - -from kfp.v2.google.client import _cloud_function_templated_http_request - - -class CloudFunctionCreatePipelineJobTest(unittest.TestCase): - - def test_preprocess(self): - test_data_path = os.path.join( - os.path.dirname(__file__), - 'testdata', - ) - function_request_path = os.path.join( - test_data_path, - 'pipeline1_request_body.json', - ) - expected_pipeline_request_path = os.path.join( - test_data_path, - 'pipeline1_request_body_final.json', - ) - - with open(function_request_path, 'rb') as f: - function_request = f.read() - - with open(expected_pipeline_request_path, 'r') as f: - expected_pipeline_request = json.load(f) - - (_, _, resolved_request_body - ) = _cloud_function_templated_http_request._preprocess_request_body( - function_request, time=datetime.datetime(2020, 8, 1, 12, 34)) - actual_pipeline_request = json.loads(resolved_request_body) - self.assertEqual(actual_pipeline_request, expected_pipeline_request) - - -if __name__ == '__main__': - unittest.main() diff --git a/sdk/python/kfp/v2/google/client/client.py b/sdk/python/kfp/v2/google/client/client.py deleted file mode 100644 index fa96cdcd3c3..00000000000 --- a/sdk/python/kfp/v2/google/client/client.py +++ /dev/null @@ -1,425 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Module for AIPlatformPipelines API client.""" - -import datetime -import json -import pkg_resources -import re -import subprocess -import warnings -from typing import Any, Dict, List, Mapping, Optional - -from absl import logging -from google import auth -from google.auth import exceptions -from google.oauth2 import credentials -from google.protobuf import json_format -from googleapiclient import discovery - -from kfp.v2.google.client import client_utils -from kfp.v2.google.client import runtime_config_builder -from kfp.v2.google.client.schedule import _create_from_pipeline_dict - -# AIPlatformPipelines API endpoint. -DEFAULT_ENDPOINT_FORMAT = '{region}-aiplatform.googleapis.com' - -# AIPlatformPipelines API version. -DEFAULT_API_VERSION = 'v1beta1' - -# If application default credential does not exist, we fall back to whatever -# previously provided to `gcloud auth`. One can run `gcloud auth login` to -# provide identity for this token. -_AUTH_ARGS = ('gcloud', 'auth', 'print-access-token') - -# AIPlatformPipelines service API parent pattern. -_PARENT_PATTERN = 'projects/{}/locations/{}' - -# AIPlatformPipelines service API job name relative name prefix pattern. -_JOB_NAME_PATTERN = '{parent}/pipelineJobs/{job_id}' - -# Pattern for valid names used as a uCAIP resource name. -_VALID_NAME_PATTERN = re.compile('^[a-z][-a-z0-9]{0,127}$') - -# Cloud Console UI link of a pipeline run. -UI_PIPELINE_JOB_LINK_FORMAT = ( - 'https://console.cloud.google.com/vertex-ai/locations/{region}/pipelines/' - 'runs/{job_id}?project={project_id}') - -# Display UI link HTML snippet -UI_LINK_HTML_FORMAT = ( - 'See the Pipeline job here.') - - -class _AccessTokenCredentials(credentials.Credentials): - """Credential class to provide token-based authentication.""" - - def __init__(self, token): - super().__init__(token=token) - - def refresh(self, request): - # Overrides refresh method in the base class because by default token is - # not refreshable. - pass - - -def _get_gcp_credential() -> Optional[credentials.Credentials]: - """Returns the GCP OAuth2 credential. - - Returns: - The credential. By default the function returns the current Application - Default Credentials, which is located at $GOOGLE_APPLICATION_CREDENTIALS. If - not set, this function returns the current login credential, whose token is - created by running 'gcloud auth login'. - For more information, see - https://cloud.google.com/sdk/gcloud/reference/auth/application-default/print-access-token - """ - result = None - try: - result, _ = auth.default() - except exceptions.DefaultCredentialsError as e: - logging.warning( - 'Failed to get GCP access token for application ' - 'default credential: (%s). Using end user credential.', e) - try: - token = subprocess.check_output(_AUTH_ARGS).rstrip().decode('utf-8') - result = _AccessTokenCredentials(token=token) - except subprocess.CalledProcessError as e: - # _AccessTokenCredentials won't throw CalledProcessError, so this - # exception implies that the subprocess call is failed. - logging.warning('Failed to get GCP access token: %s', e) - else: - return result - - -def _get_current_time() -> datetime.datetime: - """Gets the current timestamp.""" - return datetime.datetime.now() - - -def _is_ipython() -> bool: - """Returns whether we are running in notebook.""" - try: - import IPython # pylint: disable=g-import-not-at-top - ipy = IPython.get_ipython() - if ipy is None: - return False - except ImportError: - return False - - return True - - -def _extract_job_id(job_name: str) -> Optional[str]: - """Extracts job id from job name. - - Args: - job_name: The full job name. - - Returns: - The job id or None if no match found. - """ - p = re.compile( - 'projects/(?P.*)/locations/(?P.*)/pipelineJobs/(?P.*)' - ) - result = p.search(job_name) - return result.group('job_id') if result else None - - -def _set_enable_caching_value(pipeline_spec: Dict[str, Any], - enable_caching: bool) -> None: - """Sets pipeline tasks caching options. - - Args: - pipeline_spec: The dictionary of pipeline spec. - enable_caching: Whether to enable caching. - """ - for component in [pipeline_spec['root']] + list( - pipeline_spec['components'].values()): - if 'dag' in component: - for task in component['dag']['tasks'].values(): - task['cachingOptions'] = {'enableCache': enable_caching} - - -class AIPlatformClient(object): - """AIPlatformPipelines Unified API Client.""" - - def __init__( - self, - project_id: str, - region: str, - ): - """Constructs an AIPlatformPipelines API client. - - Args: - project_id: GCP project ID. - region: GCP project region. - """ - warnings.warn( - 'AIPlatformClient will be deprecated in v2.0.0. Please use PipelineJob' - ' https://googleapis.dev/python/aiplatform/latest/_modules/google/cloud/aiplatform/pipeline_jobs.html' - ' in Vertex SDK. Install the SDK using "pip install google-cloud-aiplatform"', - category=FutureWarning, - ) - - if not project_id: - raise ValueError( - 'A valid GCP project ID is required to run a pipeline.') - if not region: - raise ValueError( - 'A valid GCP region is required to run a pipeline.') - - self._endpoint = DEFAULT_ENDPOINT_FORMAT.format(region=region) - self._api_version = DEFAULT_API_VERSION - - self._project_id = project_id - self._region = region - self._parent = _PARENT_PATTERN.format(project_id, region) - - discovery_doc_path = pkg_resources.resource_filename( - 'kfp.v2.google.client', - 'discovery/aiplatform_public_google_rest_v1beta1.json') - - with open(discovery_doc_path) as f: - discovery_doc = f.read() - - self._api_client = discovery.build_from_document( - service=discovery_doc, - http=None, - credentials=_get_gcp_credential(), - ) - self._api_client._baseUrl = 'https://{}'.format(self._endpoint) # pylint:disable=protected-access - - def _display_job_link(self, job_id: str): - """Display an link to UI.""" - url = UI_PIPELINE_JOB_LINK_FORMAT.format( - region=self._region, job_id=job_id, project_id=self._project_id) - if _is_ipython(): - import IPython # pylint: disable=g-import-not-at-top - html = UI_LINK_HTML_FORMAT.format(url) - IPython.display.display(IPython.display.HTML(html)) - else: - print('See the Pipeline job here:', url) - - def _submit_job( - self, - job_spec: Mapping[str, Any], - job_id: Optional[str] = None, - ) -> dict: - """Submits a pipeline job to run on AIPlatformPipelines service. - - Args: - job_spec: AIPlatformPipelines pipelineJob spec. - job_id: Optional user-specified ID of this pipelineJob. If not provided, - pipeline service will automatically generate a random numeric ID. - - Returns: - The service returned PipelineJob instance. - - Raises: - RuntimeError: If AIPlatformPipelines service returns unexpected response - or empty job name. - """ - request = self._api_client.projects().locations().pipelineJobs().create( - body=job_spec, pipelineJobId=job_id, parent=self._parent) - response = request.execute() - job_id = _extract_job_id(response['name']) - if job_id: - self._display_job_link(job_id) - return response - - def get_job(self, job_id: str) -> dict: - """Gets an existing pipeline job on AIPlatformPipelines service. - - Args: - job_id: The relative ID of this pipelineJob. The full qualified name will - generated according to the project ID and region specified for the - client. - - Returns: - The JSON-formatted response from service representing the pipeline job. - """ - full_name = _JOB_NAME_PATTERN.format(parent=self._parent, job_id=job_id) - return self._api_client.projects().locations().pipelineJobs().get( - name=full_name).execute() - - def list_jobs(self) -> dict: - """Lists existing pipeline jobs on AIPlatformPipelines service. - - Returns: - The JSON-formatted response from service representing all pipeline jobs. - """ - return self._api_client.projects().locations().pipelineJobs().list( - parent=self._parent).execute() - - def create_run_from_job_spec( - self, - job_spec_path: str, - job_id: Optional[str] = None, - pipeline_root: Optional[str] = None, - parameter_values: Optional[Mapping[str, Any]] = None, - enable_caching: Optional[bool] = None, - cmek: Optional[str] = None, - service_account: Optional[str] = None, - network: Optional[str] = None, - labels: Optional[Mapping[str, str]] = None) -> dict: - """Runs a pre-compiled pipeline job on AIPlatformPipelines service. - - Args: - job_spec_path: The path of PipelineJob JSON file. It can be a local path - or a GS URI. - job_id: Optionally, the user can provide the unique ID of the job run. If - not specified, pipeline name + timestamp will be used. - pipeline_root: Optionally the user can override the pipeline root - specified during the compile time. - parameter_values: The mapping from runtime parameter names to its values. - enable_caching: Whether or not to enable caching for the run. - If not set, defaults to the compile time settings, which are True for all - tasks by default, while users may specify different caching options for - individual tasks. - If set, the setting applies to all tasks in the pipeline -- overrides - the compile time settings. - cmek: The customer-managed encryption key for a pipelineJob. If set, the - pipeline job and all of its sub-resources will be secured by this key. - service_account: The service account that the pipeline workload runs as. - network: The network configuration applied for pipeline jobs. If left - unspecified, the workload is not peered with any network. - labels: The user defined metadata to organize PipelineJob. - - Returns: - Full AIPlatformPipelines job name. - - Raises: - ParseError: On JSON parsing problems. - RuntimeError: If AIPlatformPipelines service returns unexpected response - or empty job name. - """ - job_spec = client_utils.load_json(job_spec_path) - pipeline_name = job_spec['pipelineSpec']['pipelineInfo']['name'] - job_id = job_id or '{pipeline_name}-{timestamp}'.format( - pipeline_name=re.sub('[^-0-9a-z]+', '-', - pipeline_name.lower()).lstrip('-').rstrip('-'), - timestamp=_get_current_time().strftime('%Y%m%d%H%M%S')) - if not _VALID_NAME_PATTERN.match(job_id): - raise ValueError( - 'Generated job ID: {} is illegal as a uCAIP pipelines job ID. ' - 'Expecting an ID following the regex pattern ' - '"[a-z][-a-z0-9]{{0,127}}"'.format(job_id)) - - job_name = _JOB_NAME_PATTERN.format(parent=self._parent, job_id=job_id) - - job_spec['name'] = job_name - job_spec['displayName'] = job_id - - builder = runtime_config_builder.RuntimeConfigBuilder.from_job_spec_json( - job_spec) - builder.update_pipeline_root(pipeline_root) - builder.update_runtime_parameters(parameter_values) - - runtime_config = builder.build() - job_spec['runtimeConfig'] = runtime_config - - if enable_caching is not None: - _set_enable_caching_value(job_spec['pipelineSpec'], enable_caching) - - if cmek is not None: - job_spec['encryptionSpec'] = {'kmsKeyName': cmek} - if service_account is not None: - job_spec['serviceAccount'] = service_account - if network is not None: - job_spec['network'] = network - - if labels: - if not isinstance(labels, Mapping): - raise ValueError( - 'Expect labels to be a mapping of string key value pairs. ' - 'Got "{}" of type "{}"'.format(labels, type(labels))) - for k, v in labels.items(): - if not isinstance(k, str) or not isinstance(v, str): - raise ValueError( - 'Expect labels to be a mapping of string key value pairs. ' - 'Got "{}".'.format(labels)) - - job_spec['labels'] = labels - - return self._submit_job( - job_spec=job_spec, - job_id=job_id, - ) - - def create_schedule_from_job_spec( - self, - job_spec_path: str, - schedule: str, - time_zone: str = 'US/Pacific', - pipeline_root: Optional[str] = None, - parameter_values: Optional[Mapping[str, Any]] = None, - service_account: Optional[str] = None, - enable_caching: Optional[bool] = None, - app_engine_region: Optional[str] = None, - cloud_scheduler_service_account: Optional[str] = None, - ) -> dict: - """Creates schedule for compiled pipeline file. - - This function creates scheduled job which will run the provided pipeline on - schedule. This is implemented by creating a Google Cloud Scheduler Job. - The job will be visible in https://console.google.com/cloudscheduler and can - be paused/resumed and deleted. - - To make the system work, this function also creates a Google Cloud Function - which acts as an intermediary between the Scheduler and Pipelines. A single - function is shared between all scheduled jobs. - The following APIs will be activated automatically: - * cloudfunctions.googleapis.com - * cloudscheduler.googleapis.com - * appengine.googleapis.com - - Args: - job_spec_path: Path of the compiled pipeline file. - schedule: Schedule in cron format. Example: "45 * * * *" - time_zone: Schedule time zone. Default is 'US/Pacific' - parameter_values: Arguments for the pipeline parameters - pipeline_root: Optionally the user can override the pipeline root - specified during the compile time. - service_account: The service account that the pipeline workload runs as. - enable_caching: Whether or not to enable caching for the run. - If not set, defaults to the compile time settings, which are True for all - tasks by default, while users may specify different caching options for - individual tasks. - If set, the setting applies to all tasks in the pipeline -- overrides - the compile time settings. - app_engine_region: The region that cloud scheduler job is created in. - cloud_scheduler_service_account: The service account that Cloud Scheduler job and the proxy cloud function use. - this should have permission to call AI Platform API and the proxy function. - If not specified, the functions uses the App Engine default service account. - - Returns: - Created Google Cloud Scheduler Job object dictionary. - """ - job_spec = client_utils.load_json(job_spec_path) - - if enable_caching is not None: - _set_enable_caching_value(job_spec['pipelineSpec'], enable_caching) - - return _create_from_pipeline_dict( - pipeline_dict=job_spec, - schedule=schedule, - project_id=self._project_id, - region=self._region, - time_zone=time_zone, - parameter_values=parameter_values, - pipeline_root=pipeline_root, - service_account=service_account, - app_engine_region=app_engine_region, - cloud_scheduler_service_account=cloud_scheduler_service_account) diff --git a/sdk/python/kfp/v2/google/client/client_test.py b/sdk/python/kfp/v2/google/client/client_test.py deleted file mode 100644 index 70b071036ed..00000000000 --- a/sdk/python/kfp/v2/google/client/client_test.py +++ /dev/null @@ -1,228 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for kfp.v2.google.client.client.""" - -import datetime -import json -import os -import unittest -from typing import Any, Dict -from unittest import mock - -from googleapiclient import discovery, http -from kfp.v2.google.client import client, client_utils - -# Mock response for get job request. -_EXPECTED_GET_RESPONSE = 'good job spec' -_GET_RESPONSES = { - 'projects/test-project/locations/us-central1/' - 'pipelineJobs/test-job': - _EXPECTED_GET_RESPONSE -} - - -def _load_test_data(filename: str) -> Dict[Any, Any]: - """Test helper function that loads json from testdata.""" - file_path = os.path.join(os.path.dirname(__file__), 'testdata', filename) - with open(file_path) as f: - return json.load(f) - - -# Mock the Python API client classes for testing purpose. -class _MockClient(object): - """Mocks Python Google API client.""" - - def projects(self): # pylint: disable=invalid-name - return _MockProjectsResource() - - -class _MockProjectsResource(object): - """Mocks API Resource returned by projects().""" - - def locations(self): # pylint: disable=invalid-name - return _MockLocationResource() - - -class _MockLocationResource(object): - """Mocks API Resource returned by locations().""" - - def pipelineJobs(self): # pylint: disable=invalid-name - return _MockPipelineJobsResource() - - -class _MockPipelineJobsResource(object): - """Mocks API Resource returned by pipelineJobs().""" - - class _MockGetRequest(http.HttpRequest): - - def __init__(self, name: str): - self._name = name - - def execute(self): - return _GET_RESPONSES.get(self._name) - - def get(self, name: str): # pylint: disable=invalid-name - """Mocks get job request.""" - return self._MockGetRequest(name=name) - - -class ClientTest(unittest.TestCase): - - def test_client_init_with_defaults(self): - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - - self.assertEqual(api_client._project_id, 'test-project') - self.assertEqual(api_client._region, 'us-central1') - self.assertEqual(api_client._endpoint, - 'us-central1-aiplatform.googleapis.com') - - @mock.patch.object(client, '_get_current_time', autospec=True) - @mock.patch.object(client_utils, 'load_json', autospec=True) - def test_create_run_from_pipeline_job(self, mock_load_json, - mock_get_current_time): - mock_load_json.return_value = _load_test_data('pipeline_job.json') - mock_get_current_time.return_value = datetime.date(2020, 10, 28) - - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - with mock.patch.object( - api_client, '_submit_job', autospec=True) as mock_submit: - api_client.create_run_from_job_spec( - job_spec_path='path/to/pipeline_job.json') - - golden = _load_test_data('pipeline_job.json') - mock_submit.assert_called_once_with( - job_spec=golden, job_id='sample-test-pipeline-20201028000000') - - @mock.patch.object(client, '_get_current_time', autospec=True) - @mock.patch.object(client_utils, 'load_json', autospec=True) - def test_job_id_parameters_override(self, mock_load_json, - mock_get_current_time): - mock_load_json.return_value = _load_test_data('pipeline_job.json') - mock_get_current_time.return_value = datetime.date(2020, 10, 28) - - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - with mock.patch.object( - api_client, '_submit_job', autospec=True) as mock_submit: - api_client.create_run_from_job_spec( - job_spec_path='path/to/pipeline_job.json', - job_id='my-new-id', - pipeline_root='gs://bucket/new-blob', - parameter_values={ - 'text': 'Hello test!', - 'list': [1, 2, 3], - }) - - golden = _load_test_data('pipeline_job.json') - golden['name'] = ('projects/test-project/locations/us-central1/' - 'pipelineJobs/my-new-id') - golden['displayName'] = 'my-new-id' - golden['runtimeConfig'][ - 'gcsOutputDirectory'] = 'gs://bucket/new-blob' - golden['runtimeConfig']['parameters']['text'] = { - 'stringValue': 'Hello test!' - } - golden['runtimeConfig']['parameters']['list'] = { - 'stringValue': '[1, 2, 3]' - } - mock_submit.assert_called_once_with( - job_spec=golden, job_id='my-new-id') - - @mock.patch.object(client, '_get_current_time', autospec=True) - @mock.patch.object(client_utils, 'load_json', autospec=True) - def test_advanced_settings(self, mock_load_json, mock_get_current_time): - mock_load_json.return_value = _load_test_data('pipeline_job.json') - mock_get_current_time.return_value = datetime.date(2020, 10, 28) - - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - with mock.patch.object( - api_client, '_submit_job', autospec=True) as mock_submit: - api_client.create_run_from_job_spec( - job_spec_path='path/to/pipeline_job.json', - cmek='custom-key', - service_account='custom-sa', - network='custom-network') - - golden = _load_test_data('pipeline_job.json') - golden['encryptionSpec'] = {'kmsKeyName': 'custom-key'} - golden['serviceAccount'] = 'custom-sa' - golden['network'] = 'custom-network' - mock_submit.assert_called_once_with( - job_spec=golden, job_id='sample-test-pipeline-20201028000000') - - def test_extract_job_id_success(self): - - job_name = 'projects/123/locations/us-central1/pipelineJobs/0123456789' - self.assertEqual('0123456789', client._extract_job_id(job_name)) - self.assertIsNone(client._extract_job_id('invalid name')) - - @mock.patch.object(client, '_get_current_time', autospec=True) - @mock.patch.object(client, '_get_gcp_credential', autospec=True) - @mock.patch.object(discovery, 'build_from_document', autospec=True) - def test_get_job_success(self, mock_build_client, mock_get_gcp_credential, - mock_get_current_time): - mock_get_current_time.return_value = datetime.date(2020, 10, 28) - mock_build_client.return_value = _MockClient() - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - - self.assertEqual(_EXPECTED_GET_RESPONSE, - api_client.get_job(job_id='test-job')) - mock_get_gcp_credential.assert_called_once() - - @mock.patch.object(client, '_get_current_time', autospec=True) - @mock.patch.object(client_utils, 'load_json', autospec=True) - def test_disable_caching(self, mock_load_json, mock_get_current_time): - mock_load_json.return_value = _load_test_data('pipeline_job.json') - mock_get_current_time.return_value = datetime.date(2020, 10, 28) - - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - with mock.patch.object( - api_client, '_submit_job', autospec=True) as mock_submit: - api_client.create_run_from_job_spec( - job_spec_path='path/to/pipeline_job.json', enable_caching=False) - - golden = _load_test_data('pipeline_job.json') - golden = json.loads( - json.dumps(golden).replace('"enableCache": true', - '"enableCache": false')) - mock_submit.assert_called_once_with( - job_spec=golden, job_id='sample-test-pipeline-20201028000000') - - @mock.patch.object(client, '_get_current_time', autospec=True) - @mock.patch.object(client_utils, 'load_json', autospec=True) - def test_setting_labels(self, mock_load_json, mock_get_current_time): - mock_load_json.return_value = _load_test_data('pipeline_job.json') - mock_get_current_time.return_value = datetime.date(2020, 10, 28) - - user_labels = {'label1': 'value1', 'label2': 'value2'} - api_client = client.AIPlatformClient( - project_id='test-project', region='us-central1') - with mock.patch.object( - api_client, '_submit_job', autospec=True) as mock_submit: - api_client.create_run_from_job_spec( - job_spec_path='path/to/pipeline_job.json', labels=user_labels) - - golden = _load_test_data('pipeline_job.json') - golden['labels'] = user_labels - mock_submit.assert_called_once_with( - job_spec=golden, job_id='sample-test-pipeline-20201028000000') - - -if __name__ == '__main__': - unittest.main() diff --git a/sdk/python/kfp/v2/google/client/client_utils.py b/sdk/python/kfp/v2/google/client/client_utils.py deleted file mode 100644 index ae6c7043310..00000000000 --- a/sdk/python/kfp/v2/google/client/client_utils.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Module for AIPlatformPipelines API client utils.""" - -import json -from typing import Any, Dict - -from google.cloud import storage - - -def load_json(path: str) -> Dict[str, Any]: - """Loads data from a JSON document. - - Args: - path: The path of the JSON document. It can be a local path or a GS URI. - - Returns: - A deserialized Dict object representing the JSON document. - """ - - if path.startswith('gs://'): - return _load_json_from_gs_uri(path) - else: - return _load_json_from_local_file(path) - - -def _load_json_from_gs_uri(uri: str) -> Dict[str, Any]: - """Loads data from a JSON document referenced by a GS URI. - - Args: - uri: The GCS URI of the JSON document. - - Returns: - A deserialized Dict object representing the JSON document. - - Raises: - google.cloud.exceptions.NotFound: If the blob is not found. - json.decoder.JSONDecodeError: On JSON parsing problems. - ValueError: If uri is not a valid gs URI. - """ - storage_client = storage.Client() - blob = storage.Blob.from_string(uri, storage_client) - return json.loads(blob.download_as_bytes()) - - -def _load_json_from_local_file(file_path: str) -> Dict[str, Any]: - """Loads data from a JSON local file. - - Args: - file_path: The local file path of the JSON document. - - Returns: - A deserialized Dict object representing the JSON document. - - Raises: - json.decoder.JSONDecodeError: On JSON parsing problems. - """ - with open(file_path) as f: - return json.load(f) diff --git a/sdk/python/kfp/v2/google/client/client_utils_test.py b/sdk/python/kfp/v2/google/client/client_utils_test.py deleted file mode 100644 index 1a4773d403b..00000000000 --- a/sdk/python/kfp/v2/google/client/client_utils_test.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for kfp.v2.google.client.client_utils.""" - -import json -import unittest -from unittest import mock - -from google.cloud import storage - -from kfp.v2.google.client import client_utils - - -class ClientUtilsTest(unittest.TestCase): - - @mock.patch.object(storage, 'Client', autospec=True) - @mock.patch.object(storage.Blob, 'download_as_bytes', autospec=True) - def test_load_json_from_gs_uri(self, mock_download_as_bytes, - unused_storage_client): - mock_download_as_bytes.return_value = b'{"key":"value"}' - self.assertEqual({'key': 'value'}, - client_utils.load_json('gs://bucket/path/to/blob')) - - @mock.patch('builtins.open', mock.mock_open(read_data='{"key":"value"}')) - def test_load_json_from_local_file(self): - self.assertEqual({'key': 'value'}, - client_utils.load_json('/path/to/file')) - - @mock.patch.object(storage, 'Client', autospec=True) - def test_load_json_from_gs_uri_with_non_gs_uri_should_fail( - self, unused_storage_client): - with self.assertRaisesRegex(ValueError, 'URI scheme must be gs'): - client_utils._load_json_from_gs_uri( - 'https://storage.google.com/bucket/blob') - - @mock.patch.object(storage, 'Client', autospec=True) - @mock.patch.object(storage.Blob, 'download_as_bytes', autospec=True) - def test_load_json_from_gs_uri_with_invalid_json_should_fail( - self, mock_download_as_bytes, unused_storage_client): - mock_download_as_bytes.return_value = b'invalid-json' - with self.assertRaises(json.decoder.JSONDecodeError): - client_utils._load_json_from_gs_uri('gs://bucket/path/to/blob') - - @mock.patch('builtins.open', mock.mock_open(read_data='invalid-json')) - def test_load_json_from_local_file_with_invalid_json_should_fail(self): - with self.assertRaises(json.decoder.JSONDecodeError): - client_utils._load_json_from_local_file('/path/to/file') - - -if __name__ == '__main__': - unittest.main() diff --git a/sdk/python/kfp/v2/google/client/discovery/aiplatform_public_google_rest_v1beta1.json b/sdk/python/kfp/v2/google/client/discovery/aiplatform_public_google_rest_v1beta1.json deleted file mode 100644 index 417124bfc6d..00000000000 --- a/sdk/python/kfp/v2/google/client/discovery/aiplatform_public_google_rest_v1beta1.json +++ /dev/null @@ -1,1235 +0,0 @@ -{ - "kind": "discovery#restDescription", - "discoveryVersion": "v1", - "id": "aiplatform:v1beta1", - "name": "aiplatform", - "canonicalName": "Aiplatform", - "version": "v1beta1", - "revision": "0", - "title": "Vertex AI API", - "description": "Train high-quality custom machine learning models with minimal machine learning expertise and effort.", - "ownerDomain": "google.com", - "ownerName": "Google", - "icons": { - "x16": "http://www.google.com/images/icons/product/search-16.gif", - "x32": "http://www.google.com/images/icons/product/search-32.gif" - }, - "documentationLink": "https://cloud.google.com/vertex-ai/", - "protocol": "rest", - "rootUrl": "https://aiplatform.googleapis.com/", - "mtlsRootUrl": "https://aiplatform.mtls.googleapis.com/", - "servicePath": "", - "baseUrl": "https://aiplatform.googleapis.com/", - "batchPath": "batch", - "version_module": true, - "fullyEncodeReservedExpansion": true, - "parameters": { - "access_token": { - "type": "string", - "description": "OAuth access token.", - "location": "query" - }, - "alt": { - "type": "string", - "description": "Data format for response.", - "default": "json", - "enum": [ - "json", - "media", - "proto" - ], - "enumDescriptions": [ - "Responses with Content-Type of application/json", - "Media download with context-dependent Content-Type", - "Responses with Content-Type of application/x-protobuf" - ], - "location": "query" - }, - "callback": { - "type": "string", - "description": "JSONP", - "location": "query" - }, - "fields": { - "type": "string", - "description": "Selector specifying which fields to include in a partial response.", - "location": "query" - }, - "key": { - "type": "string", - "description": "API key. Your API key identifies your project and provides you with API access, quota, and reports. Required unless you provide an OAuth 2.0 token.", - "location": "query" - }, - "oauth_token": { - "type": "string", - "description": "OAuth 2.0 token for the current user.", - "location": "query" - }, - "prettyPrint": { - "type": "boolean", - "description": "Returns response with indentations and line breaks.", - "default": "true", - "location": "query" - }, - "quotaUser": { - "type": "string", - "description": "Available to use for quota purposes for server-side applications. Can be any arbitrary string assigned to a user, but should not exceed 40 characters.", - "location": "query" - }, - "upload_protocol": { - "type": "string", - "description": "Upload protocol for media (e.g. \"raw\", \"multipart\").", - "location": "query" - }, - "uploadType": { - "type": "string", - "description": "Legacy upload protocol for media (e.g. \"media\", \"multipart\").", - "location": "query" - }, - "$.xgafv": { - "type": "string", - "description": "V1 error format.", - "enum": [ - "1", - "2" - ], - "enumDescriptions": [ - "v1 error format", - "v2 error format" - ], - "location": "query" - } - }, - "auth": { - "oauth2": { - "scopes": { - "https://www.googleapis.com/auth/cloud-platform": { - "description": "See, edit, configure, and delete your Google Cloud Platform data" - } - } - } - }, - "schemas": { - "GoogleCloudLocationListLocationsResponse": { - "id": "GoogleCloudLocationListLocationsResponse", - "description": "The response message for Locations.ListLocations.", - "type": "object", - "properties": { - "locations": { - "description": "A list of locations that matches the specified filter in the request.", - "type": "array", - "items": { - "$ref": "GoogleCloudLocationLocation" - } - }, - "nextPageToken": { - "description": "The standard List next-page token.", - "type": "string" - } - } - }, - "GoogleCloudLocationLocation": { - "id": "GoogleCloudLocationLocation", - "description": "A resource that represents Google Cloud Platform location.", - "type": "object", - "properties": { - "name": { - "description": "Resource name for the location, which may vary between implementations. For example: `\"projects\/example-project\/locations\/us-east1\"`", - "type": "string" - }, - "locationId": { - "description": "The canonical id for this location. For example: `\"us-east1\"`.", - "type": "string" - }, - "displayName": { - "description": "The friendly name for this location, typically a nearby city name. For example, \"Tokyo\".", - "type": "string" - }, - "labels": { - "description": "Cross-service attributes for the location. For example {\"cloud.googleapis.com\/region\": \"us-east1\"}", - "type": "object", - "additionalProperties": { - "type": "string" - } - }, - "metadata": { - "description": "Service-specific metadata. For example the available capacity at the given location.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object. Contains field @type with type URL." - } - } - } - }, - "GoogleLongrunningListOperationsResponse": { - "id": "GoogleLongrunningListOperationsResponse", - "description": "The response message for Operations.ListOperations.", - "type": "object", - "properties": { - "operations": { - "description": "A list of operations that matches the specified filter in the request.", - "type": "array", - "items": { - "$ref": "GoogleLongrunningOperation" - } - }, - "nextPageToken": { - "description": "The standard List next-page token.", - "type": "string" - } - } - }, - "GoogleLongrunningOperation": { - "id": "GoogleLongrunningOperation", - "description": "This resource represents a long-running operation that is the result of a network API call.", - "type": "object", - "properties": { - "name": { - "description": "The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations\/{unique_id}`.", - "type": "string" - }, - "metadata": { - "description": "Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object. Contains field @type with type URL." - } - }, - "done": { - "description": "If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.", - "type": "boolean" - }, - "error": { - "description": "The error result of the operation in case of failure or cancellation.", - "$ref": "GoogleRpcStatus" - }, - "response": { - "description": "The normal response of the operation in case of success. If the original method returns no data on success, such as `Delete`, the response is `google.protobuf.Empty`. If the original method is standard `Get`\/`Create`\/`Update`, the response should be the resource. For other methods, the response should have the type `XxxResponse`, where `Xxx` is the original method name. For example, if the original method name is `TakeSnapshot()`, the inferred response type is `TakeSnapshotResponse`.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object. Contains field @type with type URL." - } - } - } - }, - "GoogleRpcStatus": { - "id": "GoogleRpcStatus", - "description": "The `Status` type defines a logical error model that is suitable for different programming environments, including REST APIs and RPC APIs. It is used by [gRPC](https:\/\/github.com\/grpc). Each `Status` message contains three pieces of data: error code, error message, and error details. You can find out more about this error model and how to work with it in the [API Design Guide](https:\/\/cloud.google.com\/apis\/design\/errors).", - "type": "object", - "properties": { - "code": { - "description": "The status code, which should be an enum value of google.rpc.Code.", - "type": "integer", - "format": "int32" - }, - "message": { - "description": "A developer-facing error message, which should be in English. Any user-facing error message should be localized and sent in the google.rpc.Status.details field, or localized by the client.", - "type": "string" - }, - "details": { - "description": "A list of messages that carry the error details. There is a common set of message types for APIs to use.", - "type": "array", - "items": { - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object. Contains field @type with type URL." - } - } - } - } - }, - "GoogleProtobufEmpty": { - "id": "GoogleProtobufEmpty", - "description": "A generic empty message that you can re-use to avoid defining duplicated empty messages in your APIs. A typical example is to use it as the request or the response type of an API method. For instance: service Foo { rpc Bar(google.protobuf.Empty) returns (google.protobuf.Empty); } The JSON representation for `Empty` is empty JSON object `{}`.", - "type": "object", - "properties": { - } - }, - "GoogleCloudAiplatformV1beta1EncryptionSpec": { - "id": "GoogleCloudAiplatformV1beta1EncryptionSpec", - "description": "Represents a customer-managed encryption key spec that can be applied to a top-level resource.", - "type": "object", - "properties": { - "kmsKeyName": { - "description": "Required. The Cloud KMS resource identifier of the customer managed encryption key used to protect a resource. Has the form: `projects\/my-project\/locations\/my-region\/keyRings\/my-kr\/cryptoKeys\/my-key`. The key needs to be in the same region as where the compute resource is created.", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1Artifact": { - "id": "GoogleCloudAiplatformV1beta1Artifact", - "description": "Instance of a general artifact.", - "type": "object", - "properties": { - "name": { - "description": "Output only. The resource name of the Artifact.", - "readOnly": true, - "type": "string" - }, - "displayName": { - "description": "User provided display name of the Artifact. May be up to 128 Unicode characters.", - "type": "string" - }, - "uri": { - "description": "The uniform resource identifier of the artifact file. May be empty if there is no actual artifact file.", - "type": "string" - }, - "etag": { - "description": "An eTag used to perform consistent read-modify-write updates. If not set, a blind \"overwrite\" update happens.", - "type": "string" - }, - "labels": { - "description": "The labels with user-defined metadata to organize your Artifacts. Label keys and values can be no longer than 64 characters (Unicode codepoints), can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. No more than 64 user labels can be associated with one Artifact (System labels are excluded).", - "type": "object", - "additionalProperties": { - "type": "string" - } - }, - "createTime": { - "description": "Output only. Timestamp when this Artifact was created.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "updateTime": { - "description": "Output only. Timestamp when this Artifact was last updated.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "state": { - "description": "The state of this Artifact. This is a property of the Artifact, and does not imply or capture any ongoing process. This property is managed by clients (such as Vertex Pipelines), and the system does not prescribe or check the validity of state transitions.", - "type": "string", - "enumDescriptions": [ - "Unspecified state for the Artifact.", - "A state used by systems like Vertex Pipelines to indicate that the underlying data item represented by this Artifact is being created.", - "A state indicating that the Artifact should exist, unless something external to the system deletes it." - ], - "enum": [ - "STATE_UNSPECIFIED", - "PENDING", - "LIVE" - ] - }, - "schemaTitle": { - "description": "The title of the schema describing the metadata. Schema title and version is expected to be registered in earlier Create Schema calls. And both are used together as unique identifiers to identify schemas within the local metadata store.", - "type": "string" - }, - "schemaVersion": { - "description": "The version of the schema in schema_name to use. Schema title and version is expected to be registered in earlier Create Schema calls. And both are used together as unique identifiers to identify schemas within the local metadata store.", - "type": "string" - }, - "metadata": { - "description": "Properties of the Artifact.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object." - } - }, - "description": { - "description": "Description of the Artifact", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1Context": { - "id": "GoogleCloudAiplatformV1beta1Context", - "description": "Instance of a general context.", - "type": "object", - "properties": { - "name": { - "description": "Output only. The resource name of the Context.", - "readOnly": true, - "type": "string" - }, - "displayName": { - "description": "User provided display name of the Context. May be up to 128 Unicode characters.", - "type": "string" - }, - "etag": { - "description": "An eTag used to perform consistent read-modify-write updates. If not set, a blind \"overwrite\" update happens.", - "type": "string" - }, - "labels": { - "description": "The labels with user-defined metadata to organize your Contexts. Label keys and values can be no longer than 64 characters (Unicode codepoints), can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. No more than 64 user labels can be associated with one Context (System labels are excluded).", - "type": "object", - "additionalProperties": { - "type": "string" - } - }, - "createTime": { - "description": "Output only. Timestamp when this Context was created.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "updateTime": { - "description": "Output only. Timestamp when this Context was last updated.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "parentContexts": { - "description": "Output only. A list of resource names of Contexts that are parents of this Context. A Context may have at most 10 parent_contexts.", - "readOnly": true, - "type": "array", - "items": { - "type": "string" - } - }, - "schemaTitle": { - "description": "The title of the schema describing the metadata. Schema title and version is expected to be registered in earlier Create Schema calls. And both are used together as unique identifiers to identify schemas within the local metadata store.", - "type": "string" - }, - "schemaVersion": { - "description": "The version of the schema in schema_name to use. Schema title and version is expected to be registered in earlier Create Schema calls. And both are used together as unique identifiers to identify schemas within the local metadata store.", - "type": "string" - }, - "metadata": { - "description": "Properties of the Context.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object." - } - }, - "description": { - "description": "Description of the Context", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1Execution": { - "id": "GoogleCloudAiplatformV1beta1Execution", - "description": "Instance of a general execution.", - "type": "object", - "properties": { - "name": { - "description": "Output only. The resource name of the Execution.", - "readOnly": true, - "type": "string" - }, - "displayName": { - "description": "User provided display name of the Execution. May be up to 128 Unicode characters.", - "type": "string" - }, - "state": { - "description": "The state of this Execution. This is a property of the Execution, and does not imply or capture any ongoing process. This property is managed by clients (such as Vertex Pipelines) and the system does not prescribe or check the validity of state transitions.", - "type": "string", - "enumDescriptions": [ - "Unspecified Execution state", - "The Execution is new", - "The Execution is running", - "The Execution has finished running", - "The Execution has failed" - ], - "enum": [ - "STATE_UNSPECIFIED", - "NEW", - "RUNNING", - "COMPLETE", - "FAILED" - ] - }, - "etag": { - "description": "An eTag used to perform consistent read-modify-write updates. If not set, a blind \"overwrite\" update happens.", - "type": "string" - }, - "labels": { - "description": "The labels with user-defined metadata to organize your Executions. Label keys and values can be no longer than 64 characters (Unicode codepoints), can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. No more than 64 user labels can be associated with one Execution (System labels are excluded).", - "type": "object", - "additionalProperties": { - "type": "string" - } - }, - "createTime": { - "description": "Output only. Timestamp when this Execution was created.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "updateTime": { - "description": "Output only. Timestamp when this Execution was last updated.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "schemaTitle": { - "description": "The title of the schema describing the metadata. Schema title and version is expected to be registered in earlier Create Schema calls. And both are used together as unique identifiers to identify schemas within the local metadata store.", - "type": "string" - }, - "schemaVersion": { - "description": "The version of the schema in `schema_title` to use. Schema title and version is expected to be registered in earlier Create Schema calls. And both are used together as unique identifiers to identify schemas within the local metadata store.", - "type": "string" - }, - "metadata": { - "description": "Properties of the Execution.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object." - } - }, - "description": { - "description": "Description of the Execution", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineJob": { - "id": "GoogleCloudAiplatformV1beta1PipelineJob", - "description": "An instance of a machine learning PipelineJob.", - "type": "object", - "properties": { - "name": { - "description": "Output only. The resource name of the PipelineJob.", - "readOnly": true, - "type": "string" - }, - "displayName": { - "description": "The display name of the Pipeline. The name can be up to 128 characters long and can be consist of any UTF-8 characters.", - "type": "string" - }, - "createTime": { - "description": "Output only. Pipeline creation time.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "startTime": { - "description": "Output only. Pipeline start time.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "endTime": { - "description": "Output only. Pipeline end time.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "updateTime": { - "description": "Output only. Timestamp when this PipelineJob was most recently updated.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "pipelineSpec": { - "description": "Required. The spec of the pipeline.", - "type": "object", - "additionalProperties": { - "type": "any", - "description": "Properties of the object." - } - }, - "state": { - "description": "Output only. The detailed state of the job.", - "readOnly": true, - "type": "string", - "enumDescriptions": [ - "The pipeline state is unspecified.", - "The pipeline has been created or resumed, and processing has not yet begun.", - "The service is preparing to run the pipeline.", - "The pipeline is in progress.", - "The pipeline completed successfully.", - "The pipeline failed.", - "The pipeline is being cancelled. From this state, the pipeline may only go to either PIPELINE_STATE_SUCCEEDED, PIPELINE_STATE_FAILED or PIPELINE_STATE_CANCELLED.", - "The pipeline has been cancelled.", - "The pipeline has been stopped, and can be resumed." - ], - "enum": [ - "PIPELINE_STATE_UNSPECIFIED", - "PIPELINE_STATE_QUEUED", - "PIPELINE_STATE_PENDING", - "PIPELINE_STATE_RUNNING", - "PIPELINE_STATE_SUCCEEDED", - "PIPELINE_STATE_FAILED", - "PIPELINE_STATE_CANCELLING", - "PIPELINE_STATE_CANCELLED", - "PIPELINE_STATE_PAUSED" - ] - }, - "jobDetail": { - "description": "Output only. The details of pipeline run. Not available in the list view.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1PipelineJobDetail" - }, - "error": { - "description": "Output only. The error that occurred during pipeline execution. Only populated when the pipeline's state is FAILED or CANCELLED.", - "readOnly": true, - "$ref": "GoogleRpcStatus" - }, - "labels": { - "description": "The labels with user-defined metadata to organize PipelineJob. Label keys and values can be no longer than 64 characters (Unicode codepoints), can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. See https:\/\/goo.gl\/xmQnxf for more information and examples of labels.", - "type": "object", - "additionalProperties": { - "type": "string" - } - }, - "runtimeConfig": { - "description": "Runtime config of the pipeline.", - "$ref": "GoogleCloudAiplatformV1beta1PipelineJobRuntimeConfig" - }, - "encryptionSpec": { - "description": "Customer-managed encryption key spec for a pipelineJob. If set, this PipelineJob and all of its sub-resources will be secured by this key.", - "$ref": "GoogleCloudAiplatformV1beta1EncryptionSpec" - }, - "serviceAccount": { - "description": "The service account that the pipeline workload runs as. If not specified, the Compute Engine default service account in the project will be used. See https:\/\/cloud.google.com\/compute\/docs\/access\/service-accounts#default_service_account Users starting the pipeline must have the `iam.serviceAccounts.actAs` permission on this service account.", - "type": "string" - }, - "network": { - "description": "The full name of the Compute Engine [network](\/compute\/docs\/networks-and-firewalls#networks) to which the Pipeline Job's workload should be peered. For example, `projects\/12345\/global\/networks\/myVPC`. [Format](\/compute\/docs\/reference\/rest\/v1\/networks\/insert) is of the form `projects\/{project}\/global\/networks\/{network}`. Where {project} is a project number, as in `12345`, and {network} is a network name. Private services access must already be configured for the network. Pipeline job will apply the network configuration to the GCP resources being launched, if applied, such as Vertex AI Training or Dataflow job. If left unspecified, the workload is not peered with any network.", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineJobDetail": { - "id": "GoogleCloudAiplatformV1beta1PipelineJobDetail", - "description": "The runtime detail of PipelineJob.", - "type": "object", - "properties": { - "pipelineContext": { - "description": "Output only. The context of the pipeline.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1Context" - }, - "pipelineRunContext": { - "description": "Output only. The context of the current pipeline run.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1Context" - }, - "taskDetails": { - "description": "Output only. The runtime details of the tasks under the pipeline.", - "readOnly": true, - "type": "array", - "items": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineTaskDetail" - } - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineTaskDetail": { - "id": "GoogleCloudAiplatformV1beta1PipelineTaskDetail", - "description": "The runtime detail of a task execution.", - "type": "object", - "properties": { - "taskId": { - "description": "Output only. The system generated ID of the task.", - "readOnly": true, - "type": "string", - "format": "int64" - }, - "parentTaskId": { - "description": "Output only. The id of the parent task if the task is within a component scope. Empty if the task is at the root level.", - "readOnly": true, - "type": "string", - "format": "int64" - }, - "taskName": { - "description": "Output only. The user specified name of the task that is defined in PipelineJob.spec.", - "readOnly": true, - "type": "string" - }, - "createTime": { - "description": "Output only. Task create time.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "startTime": { - "description": "Output only. Task start time.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "endTime": { - "description": "Output only. Task end time.", - "readOnly": true, - "type": "string", - "format": "google-datetime" - }, - "executorDetail": { - "description": "Output only. The detailed execution info.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetail" - }, - "state": { - "description": "Output only. State of the task.", - "readOnly": true, - "type": "string", - "enumDescriptions": [ - "Unspecified.", - "Specifies pending state for the task.", - "Specifies task is being executed.", - "Specifies task completed successfully.", - "Specifies Task cancel is in pending state.", - "Specifies task is being cancelled.", - "Specifies task was cancelled.", - "Specifies task failed.", - "Specifies task was skipped due to cache hit.", - "Specifies that the task was not triggered because the task's trigger policy is not satisfied. The trigger policy is specified in the `condition` field of PipelineJob.pipeline_spec." - ], - "enum": [ - "STATE_UNSPECIFIED", - "PENDING", - "RUNNING", - "SUCCEEDED", - "CANCEL_PENDING", - "CANCELLING", - "CANCELLED", - "FAILED", - "SKIPPED", - "NOT_TRIGGERED" - ] - }, - "execution": { - "description": "Output only. The execution metadata of the task.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1Execution" - }, - "error": { - "description": "Output only. The error that occurred during task execution. Only populated when the task's state is FAILED or CANCELLED.", - "readOnly": true, - "$ref": "GoogleRpcStatus" - }, - "inputs": { - "description": "Output only. The runtime input artifacts of the task.", - "readOnly": true, - "type": "object", - "additionalProperties": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineTaskDetailArtifactList" - } - }, - "outputs": { - "description": "Output only. The runtime output artifacts of the task.", - "readOnly": true, - "type": "object", - "additionalProperties": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineTaskDetailArtifactList" - } - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetail": { - "id": "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetail", - "description": "The runtime detail of a pipeline executor.", - "type": "object", - "properties": { - "containerDetail": { - "description": "Output only. The detailed info for a container executor.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetailContainerDetail" - }, - "customJobDetail": { - "description": "Output only. The detailed info for a custom job executor.", - "readOnly": true, - "$ref": "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetailCustomJobDetail" - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetailContainerDetail": { - "id": "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetailContainerDetail", - "description": "The detail of a container execution. It contains the job names of the lifecycle of a container execution.", - "type": "object", - "properties": { - "mainJob": { - "description": "Output only. The name of the CustomJob for the main container execution.", - "readOnly": true, - "type": "string" - }, - "preCachingCheckJob": { - "description": "Output only. The name of the CustomJob for the pre-caching-check container execution. This job will be available if the PipelineJob.pipeline_spec specifies the `pre_caching_check` hook in the lifecycle events.", - "readOnly": true, - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetailCustomJobDetail": { - "id": "GoogleCloudAiplatformV1beta1PipelineTaskExecutorDetailCustomJobDetail", - "description": "The detailed info for a custom job executor.", - "type": "object", - "properties": { - "job": { - "description": "Output only. The name of the CustomJob.", - "readOnly": true, - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineTaskDetailArtifactList": { - "id": "GoogleCloudAiplatformV1beta1PipelineTaskDetailArtifactList", - "description": "A list of artifact metadata.", - "type": "object", - "properties": { - "artifacts": { - "description": "Output only. A list of artifact metadata.", - "readOnly": true, - "type": "array", - "items": { - "$ref": "GoogleCloudAiplatformV1beta1Artifact" - } - } - } - }, - "GoogleCloudAiplatformV1beta1PipelineJobRuntimeConfig": { - "id": "GoogleCloudAiplatformV1beta1PipelineJobRuntimeConfig", - "description": "The runtime config of a PipelineJob.", - "type": "object", - "properties": { - "parameters": { - "description": "The runtime parameters of the PipelineJob. The parameters will be passed into PipelineJob.pipeline_spec to replace the placeholders at runtime.", - "type": "object", - "additionalProperties": { - "$ref": "GoogleCloudAiplatformV1beta1Value" - } - }, - "gcsOutputDirectory": { - "description": "Required. A path in a Cloud Storage bucket, which will be treated as the root output directory of the pipeline. It is used by the system to generate the paths of output artifacts. The artifact paths are generated with a sub-path pattern `{job_id}\/{task_id}\/{output_key}` under the specified output directory. The service account specified in this pipeline must have the `storage.objects.get` and `storage.objects.create` permissions for this bucket.", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1Value": { - "id": "GoogleCloudAiplatformV1beta1Value", - "description": "Value is the value of the field.", - "type": "object", - "properties": { - "intValue": { - "description": "An integer value.", - "type": "string", - "format": "int64" - }, - "doubleValue": { - "description": "A double value.", - "type": "number", - "format": "double" - }, - "stringValue": { - "description": "A string value.", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1ListPipelineJobsResponse": { - "id": "GoogleCloudAiplatformV1beta1ListPipelineJobsResponse", - "description": "Response message for PipelineService.ListPipelineJobs", - "type": "object", - "properties": { - "pipelineJobs": { - "description": "List of PipelineJobs in the requested page.", - "type": "array", - "items": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineJob" - } - }, - "nextPageToken": { - "description": "A token to retrieve the next page of results. Pass to ListPipelineJobsRequest.page_token to obtain that page.", - "type": "string" - } - } - }, - "GoogleCloudAiplatformV1beta1CancelPipelineJobRequest": { - "id": "GoogleCloudAiplatformV1beta1CancelPipelineJobRequest", - "description": "Request message for PipelineService.CancelPipelineJob.", - "type": "object", - "properties": { - } - } - }, - "resources": { - "projects": { - "resources": { - "locations": { - "methods": { - "list": { - "id": "aiplatform.projects.locations.list", - "path": "v1beta1/{+name}/locations", - "flatPath": "v1beta1/projects/{projectsId}/locations", - "httpMethod": "GET", - "parameters": { - "name": { - "description": "The resource that owns the locations collection, if applicable.", - "pattern": "^projects\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - }, - "filter": { - "description": "A filter to narrow down results to a preferred subset. The filtering language accepts strings like \"displayName=tokyo\", and is documented in more detail in [AIP-160](https:\/\/google.aip.dev\/160).", - "location": "query", - "type": "string" - }, - "pageSize": { - "description": "The maximum number of results to return. If not set, the service selects a default.", - "location": "query", - "type": "integer", - "format": "int32" - }, - "pageToken": { - "description": "A page token received from the `next_page_token` field in the response. Send that page token to receive the subsequent page.", - "location": "query", - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleCloudLocationListLocationsResponse" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Lists information about the supported locations for this service." - }, - "get": { - "id": "aiplatform.projects.locations.get", - "path": "v1beta1/{+name}", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}", - "httpMethod": "GET", - "parameters": { - "name": { - "description": "Resource name for the location.", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleCloudLocationLocation" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Gets information about a location." - } - } - , - "resources": { - "pipelineJobs": { - "methods": { - "create": { - "id": "aiplatform.projects.locations.pipelineJobs.create", - "path": "v1beta1/{+parent}/pipelineJobs", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs", - "httpMethod": "POST", - "parameters": { - "parent": { - "description": "Required. The resource name of the Location to create the PipelineJob in. Format: `projects\/{project}\/locations\/{location}`", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - }, - "pipelineJobId": { - "description": "The ID to use for the PipelineJob, which will become the final component of the PipelineJob name. If not provided, an ID will be automatically generated. This value should be less than 128 characters, and valid characters are \/a-z-\/.", - "location": "query", - "type": "string" - } - }, - "parameterOrder": [ - "parent" - ], - "request": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineJob" - }, - "response": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineJob" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Creates a PipelineJob. A PipelineJob will run immediately when created." - }, - "get": { - "id": "aiplatform.projects.locations.pipelineJobs.get", - "path": "v1beta1/{+name}", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}", - "httpMethod": "GET", - "parameters": { - "name": { - "description": "Required. The name of the PipelineJob resource. Format: `projects\/{project}\/locations\/{location}\/pipelineJobs\/{pipeline_job}`", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleCloudAiplatformV1beta1PipelineJob" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Gets a PipelineJob." - }, - "list": { - "id": "aiplatform.projects.locations.pipelineJobs.list", - "path": "v1beta1/{+parent}/pipelineJobs", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs", - "httpMethod": "GET", - "parameters": { - "parent": { - "description": "Required. The resource name of the Location to list the PipelineJobs from. Format: `projects\/{project}\/locations\/{location}`", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - }, - "filter": { - "description": "The standard list filter. Supported fields: * `display_name` supports `=` and `!=`. * `state` supports `=` and `!=`. The following examples demonstrate how to filter the list of PipelineJobs: * `state=\"PIPELINE_STATE_SUCCEEDED\" AND display_name=\"my_pipeline\"` * `state=\"PIPELINE_STATE_RUNNING\" OR display_name=\"my_pipeline\"` * `NOT display_name=\"my_pipeline\"` * `state=\"PIPELINE_STATE_FAILED\"`", - "location": "query", - "type": "string" - }, - "pageSize": { - "description": "The standard list page size.", - "location": "query", - "type": "integer", - "format": "int32" - }, - "pageToken": { - "description": "The standard list page token. Typically obtained via ListPipelineJobsResponse.next_page_token of the previous PipelineService.ListPipelineJobs call.", - "location": "query", - "type": "string" - } - }, - "parameterOrder": [ - "parent" - ], - "response": { - "$ref": "GoogleCloudAiplatformV1beta1ListPipelineJobsResponse" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Lists PipelineJobs in a Location." - }, - "delete": { - "id": "aiplatform.projects.locations.pipelineJobs.delete", - "path": "v1beta1/{+name}", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}", - "httpMethod": "DELETE", - "parameters": { - "name": { - "description": "Required. The name of the PipelineJob resource to be deleted. Format: `projects\/{project}\/locations\/{location}\/pipelineJobs\/{pipeline_job}`", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleLongrunningOperation" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Deletes a PipelineJob." - }, - "cancel": { - "id": "aiplatform.projects.locations.pipelineJobs.cancel", - "path": "v1beta1/{+name}:cancel", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}:cancel", - "httpMethod": "POST", - "parameters": { - "name": { - "description": "Required. The name of the PipelineJob to cancel. Format: `projects\/{project}\/locations\/{location}\/pipelineJobs\/{pipeline_job}`", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "request": { - "$ref": "GoogleCloudAiplatformV1beta1CancelPipelineJobRequest" - }, - "response": { - "$ref": "GoogleProtobufEmpty" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Cancels a PipelineJob. Starts asynchronous cancellation on the PipelineJob. The server makes a best effort to cancel the pipeline, but success is not guaranteed. Clients can use PipelineService.GetPipelineJob or other methods to check whether the cancellation succeeded or whether the pipeline completed despite cancellation. On successful cancellation, the PipelineJob is not deleted; instead it becomes a pipeline with a PipelineJob.error value with a google.rpc.Status.code of 1, corresponding to `Code.CANCELLED`, and PipelineJob.state is set to `CANCELLED`." - } - } - , - "resources": { - "operations": { - "methods": { - "list": { - "id": "aiplatform.projects.locations.pipelineJobs.operations.list", - "path": "v1beta1/{+name}/operations", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}/operations", - "httpMethod": "GET", - "parameters": { - "name": { - "description": "The name of the operation's parent resource.", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - }, - "filter": { - "description": "The standard list filter.", - "location": "query", - "type": "string" - }, - "pageSize": { - "description": "The standard list page size.", - "location": "query", - "type": "integer", - "format": "int32" - }, - "pageToken": { - "description": "The standard list page token.", - "location": "query", - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleLongrunningListOperationsResponse" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Lists operations that match the specified filter in the request. If the server doesn't support this method, it returns `UNIMPLEMENTED`. NOTE: the `name` binding allows API services to override the binding to use different resource name schemes, such as `users\/*\/operations`. To override the binding, API services can add a binding such as `\"\/v1\/{name=users\/*}\/operations\"` to their service configuration. For backwards compatibility, the default name includes the operations collection id, however overriding users must ensure the name binding is the parent resource, without the operations collection id." - }, - "get": { - "id": "aiplatform.projects.locations.pipelineJobs.operations.get", - "path": "v1beta1/{+name}", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}/operations/{operationsId}", - "httpMethod": "GET", - "parameters": { - "name": { - "description": "The name of the operation resource.", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+\/operations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleLongrunningOperation" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Gets the latest state of a long-running operation. Clients can use this method to poll the operation result at intervals as recommended by the API service." - }, - "delete": { - "id": "aiplatform.projects.locations.pipelineJobs.operations.delete", - "path": "v1beta1/{+name}", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}/operations/{operationsId}", - "httpMethod": "DELETE", - "parameters": { - "name": { - "description": "The name of the operation resource to be deleted.", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+\/operations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleProtobufEmpty" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Deletes a long-running operation. This method indicates that the client is no longer interested in the operation result. It does not cancel the operation. If the server doesn't support this method, it returns `google.rpc.Code.UNIMPLEMENTED`." - }, - "cancel": { - "id": "aiplatform.projects.locations.pipelineJobs.operations.cancel", - "path": "v1beta1/{+name}:cancel", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}/operations/{operationsId}:cancel", - "httpMethod": "POST", - "parameters": { - "name": { - "description": "The name of the operation resource to be cancelled.", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+\/operations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleProtobufEmpty" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Starts asynchronous cancellation on a long-running operation. The server makes a best effort to cancel the operation, but success is not guaranteed. If the server doesn't support this method, it returns `google.rpc.Code.UNIMPLEMENTED`. Clients can use Operations.GetOperation or other methods to check whether the cancellation succeeded or whether the operation completed despite cancellation. On successful cancellation, the operation is not deleted; instead, it becomes an operation with an Operation.error value with a google.rpc.Status.code of 1, corresponding to `Code.CANCELLED`." - }, - "wait": { - "id": "aiplatform.projects.locations.pipelineJobs.operations.wait", - "path": "v1beta1/{+name}:wait", - "flatPath": "v1beta1/projects/{projectsId}/locations/{locationsId}/pipelineJobs/{pipelineJobsId}/operations/{operationsId}:wait", - "httpMethod": "POST", - "parameters": { - "name": { - "description": "The name of the operation resource to wait on.", - "pattern": "^projects\/[^\/]+\/locations\/[^\/]+\/pipelineJobs\/[^\/]+\/operations\/[^\/]+$", - "location": "path", - "required": true, - "type": "string" - }, - "timeout": { - "description": "The maximum duration to wait before timing out. If left blank, the wait will be at most the time permitted by the underlying HTTP\/RPC protocol. If RPC context deadline is also specified, the shorter one will be used.", - "location": "query", - "type": "string", - "format": "google-duration" - } - }, - "parameterOrder": [ - "name" - ], - "response": { - "$ref": "GoogleLongrunningOperation" - }, - "scopes": [ - "https://www.googleapis.com/auth/cloud-platform" - ], - "description": "Waits until the specified long-running operation is done or reaches at most a specified timeout, returning the latest state. If the operation is already done, the latest state is immediately returned. If the timeout specified is greater than the default HTTP\/RPC timeout, the HTTP\/RPC timeout is used. If the server does not support this method, it returns `google.rpc.Code.UNIMPLEMENTED`. Note that this method is on a best-effort basis. It may return the latest state before the specified timeout (including immediately), meaning even an immediate response is no guarantee that the operation is done." - } - } - } - } - } - } - } - } - } - }, - "basePath": "" -} diff --git a/sdk/python/kfp/v2/google/client/runtime_config_builder.py b/sdk/python/kfp/v2/google/client/runtime_config_builder.py deleted file mode 100644 index 569c168dad1..00000000000 --- a/sdk/python/kfp/v2/google/client/runtime_config_builder.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Builder for CAIP pipelines Pipeline level proto spec.""" - -import copy -import json -from typing import Any, Dict, Mapping, Optional, Union - - -class RuntimeConfigBuilder(object): - """CAIP pipelines RuntimeConfig builder. - - Constructs a RuntimeConfig spec with pipeline_root and parameter - overrides. - """ - - def __init__( - self, - pipeline_root: str, - parameter_types: Mapping[str, str], - parameter_values: Optional[Dict[str, Any]] = None, - ): - """Creates a RuntimeConfigBuilder object. - - Args: - pipeline_root: The root of the pipeline outputs. - parameter_types: The mapping from pipeline parameter name to its type. - parameter_values: The mapping from runtime parameter name to its value. - """ - self._pipeline_root = pipeline_root - self._parameter_types = parameter_types - self._parameter_values = copy.deepcopy(parameter_values or {}) - - @classmethod - def from_job_spec_json( - cls, job_spec: Mapping[str, Any]) -> 'RuntimeConfigBuilder': - """Creates a RuntimeConfigBuilder object from PipelineJob json spec. - - Args: - job_spec: The PipelineJob spec. - - Returns: - A RuntimeConfigBuilder object. - """ - runtime_config_spec = job_spec['runtimeConfig'] - parameter_types = {} - parameter_input_definitions = job_spec['pipelineSpec']['root'].get( - 'inputDefinitions', {}).get('parameters', {}) - for k, v in parameter_input_definitions.items(): - parameter_types[k] = v['type'] - - pipeline_root = runtime_config_spec.get('gcsOutputDirectory') - parameter_values = _parse_runtime_parameters(runtime_config_spec) - return cls(pipeline_root, parameter_types, parameter_values) - - def update_pipeline_root(self, pipeline_root: Optional[str]) -> None: - """Updates pipeline_root value. - - Args: - pipeline_root: The root of the pipeline outputs. - """ - if pipeline_root: - self._pipeline_root = pipeline_root - - def update_runtime_parameters( - self, parameter_values: Optional[Mapping[str, Any]]) -> None: - """Merges runtime parameter values. - - Args: - parameter_values: The mapping from runtime parameter names to its values. - """ - if parameter_values: - for k, v in parameter_values.items(): - if isinstance(v, (dict, list, bool)): - parameter_values[k] = json.dumps(v) - if parameter_values: - self._parameter_values.update(parameter_values) - - def build(self) -> Mapping[str, Any]: - """Build a RuntimeConfig proto.""" - if not self._pipeline_root: - raise ValueError( - 'Pipeline root must be specified, either during compile ' - 'time, or when calling the service.') - return { - 'gcsOutputDirectory': self._pipeline_root, - 'parameters': { - k: self._get_caip_value(k, v) - for k, v in self._parameter_values.items() - if v is not None - } - } - - def _get_caip_value(self, name: str, - value: Union[int, float, str]) -> Mapping[str, Any]: - """Converts primitive values into CAIP pipeline Value proto message. - - Args: - name: The name of the pipeline parameter. - value: The value of the pipeline parameter. - - Returns: - A dictionary represents the CAIP pipeline Value proto message. - - Raises: - AssertionError: if the value is None. - ValueError: if the parameeter name is not found in pipeline root inputs. - TypeError: if the paraemter type is not one of 'INT', 'DOUBLE', 'STRING'. - """ - assert value is not None, 'None values should be filterd out.' - - if name not in self._parameter_types: - raise ValueError( - 'The pipeline parameter {} is not found in the pipeline ' - 'job input definitions.'.format(name)) - - result = {} - if self._parameter_types[name] == 'INT': - result['intValue'] = value - elif self._parameter_types[name] == 'DOUBLE': - result['doubleValue'] = value - elif self._parameter_types[name] == 'STRING': - result['stringValue'] = value - else: - raise TypeError('Got unknown type of value: {}'.format(value)) - - return result - - -def _parse_runtime_parameters( - runtime_config_spec: Mapping[str, Any]) -> Optional[Dict[str, Any]]: - """Extracts runtime parameters from runtime config json spec.""" - runtime_parameters = runtime_config_spec.get('parameters') - if not runtime_parameters: - return None - - result = {} - for name, value in runtime_parameters.items(): - if 'intValue' in value: - result[name] = int(value['intValue']) - elif 'doubleValue' in value: - result[name] = float(value['doubleValue']) - elif 'stringValue' in value: - result[name] = value['stringValue'] - else: - raise TypeError('Got unknown type of value: {}'.format(value)) - - return result diff --git a/sdk/python/kfp/v2/google/client/runtime_config_builder_test.py b/sdk/python/kfp/v2/google/client/runtime_config_builder_test.py deleted file mode 100644 index 9e0b5d917c1..00000000000 --- a/sdk/python/kfp/v2/google/client/runtime_config_builder_test.py +++ /dev/null @@ -1,176 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for kfp.v2.google.client.runtime_config_builder.""" - -import unittest - -import frozendict -from kfp.v2.google.client import runtime_config_builder - - -class RuntimeConfigBuilderTest(unittest.TestCase): - - SAMPLE_JOB_SPEC = frozendict.frozendict({ - 'pipelineSpec': { - 'root': { - 'inputDefinitions': { - 'parameters': { - 'string_param': { - 'type': 'STRING' - }, - 'int_param': { - 'type': 'INT' - }, - 'float_param': { - 'type': 'DOUBLE' - }, - 'new_param': { - 'type': 'STRING' - }, - 'bool_param': { - 'type': 'STRING' - }, - 'dict_param': { - 'type': 'STRING' - }, - 'list_param': { - 'type': 'STRING' - }, - } - } - } - }, - 'runtimeConfig': { - 'gcsOutputDirectory': 'path/to/my/root', - 'parameters': { - 'string_param': { - 'stringValue': 'test-string' - }, - 'int_param': { - 'intValue': 42 - }, - 'float_param': { - 'doubleValue': 3.14 - }, - } - } - }) - - def testBuildRuntimeConfigFromIndividualValues(self): - my_builder = runtime_config_builder.RuntimeConfigBuilder( - pipeline_root='path/to/my/root', - parameter_types={ - 'string_param': 'STRING', - 'int_param': 'INT', - 'float_param': 'DOUBLE' - }, - parameter_values={ - 'string_param': 'test-string', - 'int_param': 42, - 'float_param': 3.14 - }) - actual_runtime_config = my_builder.build() - - expected_runtime_config = self.SAMPLE_JOB_SPEC['runtimeConfig'] - self.assertEqual(expected_runtime_config, actual_runtime_config) - - def testBuildRuntimeConfigFromJobSpecJson(self): - my_builder = ( - runtime_config_builder.RuntimeConfigBuilder.from_job_spec_json( - self.SAMPLE_JOB_SPEC)) - actual_runtime_config = my_builder.build() - - expected_runtime_config = self.SAMPLE_JOB_SPEC['runtimeConfig'] - self.assertEqual(expected_runtime_config, actual_runtime_config) - - def testBuildRuntimeConfigWithNoopUpdates(self): - my_builder = ( - runtime_config_builder.RuntimeConfigBuilder.from_job_spec_json( - self.SAMPLE_JOB_SPEC)) - my_builder.update_pipeline_root(None) - my_builder.update_runtime_parameters(None) - actual_runtime_config = my_builder.build() - - expected_runtime_config = self.SAMPLE_JOB_SPEC['runtimeConfig'] - self.assertEqual(expected_runtime_config, actual_runtime_config) - - def testBuildRuntimeConfigWithMergeUpdates(self): - my_builder = ( - runtime_config_builder.RuntimeConfigBuilder.from_job_spec_json( - self.SAMPLE_JOB_SPEC)) - my_builder.update_pipeline_root('path/to/my/new/root') - my_builder.update_runtime_parameters({ - 'int_param': 888, - 'new_param': 'new-string', - 'dict_param': { - 'a': 1 - }, - 'list_param': [1, 2, 3], - 'bool_param': True, - }) - actual_runtime_config = my_builder.build() - - expected_runtime_config = { - 'gcsOutputDirectory': 'path/to/my/new/root', - 'parameters': { - 'string_param': { - 'stringValue': 'test-string' - }, - 'int_param': { - 'intValue': 888 - }, - 'float_param': { - 'doubleValue': 3.14 - }, - 'new_param': { - 'stringValue': 'new-string' - }, - 'dict_param': { - 'stringValue': '{"a": 1}' - }, - 'list_param': { - 'stringValue': '[1, 2, 3]' - }, - 'bool_param': { - 'stringValue': 'true' - }, - } - } - self.assertEqual(expected_runtime_config, actual_runtime_config) - - def testBuildRuntimeConfigParameterNotFound(self): - my_builder = ( - runtime_config_builder.RuntimeConfigBuilder.from_job_spec_json( - self.SAMPLE_JOB_SPEC)) - my_builder.update_pipeline_root('path/to/my/new/root') - my_builder.update_runtime_parameters({'no_such_param': 'new-string'}) - with self.assertRaisesRegex( - ValueError, - 'The pipeline parameter no_such_param is not found'): - my_builder.build() - - def testParseRuntimeParameters(self): - expected_runtime_parameters = { - 'string_param': 'test-string', - 'int_param': 42, - 'float_param': 3.14, - } - actual_runtime_parameters = ( - runtime_config_builder._parse_runtime_parameters( - self.SAMPLE_JOB_SPEC['runtimeConfig'])) - self.assertEqual(expected_runtime_parameters, actual_runtime_parameters) - - -if __name__ == '__main__': - unittest.main() diff --git a/sdk/python/kfp/v2/google/client/schedule.py b/sdk/python/kfp/v2/google/client/schedule.py deleted file mode 100644 index ee9a05af198..00000000000 --- a/sdk/python/kfp/v2/google/client/schedule.py +++ /dev/null @@ -1,412 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Module for scheduling pipeline runs.""" - -import base64 -import hashlib -import json -import logging -import pathlib -import re -import tempfile -from typing import Any, Mapping, Optional -import zipfile - -import googleapiclient -from googleapiclient import discovery -import requests - -from kfp.v2.google.client import client_utils -from kfp.v2.google.client import runtime_config_builder - -_PROXY_FUNCTION_NAME = 'templated_http_request-v1' -_PROXY_FUNCTION_FILENAME = '_cloud_function_templated_http_request.py' - -_CAIPP_ENDPOINT_WITHOUT_REGION = 'aiplatform.googleapis.com' -_CAIPP_API_VERSION = 'v1beta1' - - -def create_from_pipeline_file( - pipeline_path: str, - schedule: str, - project_id: str, - region: str = 'us-central1', - time_zone: str = 'US/Pacific', - parameter_values: Optional[Mapping[str, Any]] = None, - pipeline_root: Optional[str] = None, - service_account: Optional[str] = None, - app_engine_region: Optional[str] = None, - cloud_scheduler_service_account: Optional[str] = None, -) -> dict: - """Creates schedule for compiled pipeline file. - - This function creates scheduled job which will run the provided pipeline on - schedule. This is implemented by creating a Google Cloud Scheduler Job. - The job will be visible in https://console.google.com/cloudscheduler and can - be paused/resumed and deleted. - - To make the system work, this function also creates a Google Cloud Function - which acts as an intermediary between the Scheduler and Pipelines. A single - function is shared between all scheduled jobs. - The following APIs will be activated automatically: - * cloudfunctions.googleapis.com - * cloudscheduler.googleapis.com - * appengine.googleapis.com - - Args: - pipeline_path: Path of the compiled pipeline file. - schedule: Schedule in cron format. Example: "45 * * * *" - project_id: Google Cloud project ID - region: Google Cloud compute region. Default is 'us-central1' - time_zone: Schedule time zone. Default is 'US/Pacific' - parameter_values: Arguments for the pipeline parameters - pipeline_root: Optionally the user can override the pipeline root - specified during the compile time. - service_account: The service account that the pipeline workload runs as. - app_engine_region: The region that cloud scheduler job is created in. - cloud_scheduler_service_account: The service account that Cloud Scheduler job and the proxy cloud function use. - this should have permission to call AI Platform API and the proxy function. - If not specified, the functions uses the App Engine default service account. - - Returns: - Created Google Cloud Scheduler Job object dictionary. - """ - pipeline_dict = client_utils.load_json(pipeline_path) - - return _create_from_pipeline_dict( - pipeline_dict=pipeline_dict, - schedule=schedule, - project_id=project_id, - region=region, - time_zone=time_zone, - parameter_values=parameter_values, - pipeline_root=pipeline_root, - service_account=service_account, - app_engine_region=app_engine_region, - cloud_scheduler_service_account=cloud_scheduler_service_account, - ) - - -def _create_from_pipeline_dict( - pipeline_dict: dict, - schedule: str, - project_id: str, - region: str = 'us-central1', - time_zone: str = 'US/Pacific', - parameter_values: Optional[Mapping[str, Any]] = None, - pipeline_root: Optional[str] = None, - service_account: Optional[str] = None, - app_engine_region: Optional[str] = None, - cloud_scheduler_service_account: Optional[str] = None, -) -> dict: - """Creates schedule for compiled pipeline dictionary.""" - - _enable_required_apis(project_id=project_id) - - # If appengine region is not provided, use the pipeline region. - app_engine_region = app_engine_region or region - - proxy_function_url = _get_proxy_cloud_function_endpoint( - project_id=project_id, - region=region, - cloud_scheduler_service_account=cloud_scheduler_service_account, - ) - - if parameter_values or pipeline_root: - config_builder = runtime_config_builder.RuntimeConfigBuilder.from_job_spec_json( - pipeline_dict) - config_builder.update_runtime_parameters( - parameter_values=parameter_values) - config_builder.update_pipeline_root(pipeline_root=pipeline_root) - updated_runtime_config = config_builder.build() - pipeline_dict['runtimeConfig'] = updated_runtime_config - - # Creating job creation request to get the final request URL - pipeline_jobs_api_url = f'https://{region}-{_CAIPP_ENDPOINT_WITHOUT_REGION}/{_CAIPP_API_VERSION}/projects/{project_id}/locations/{region}/pipelineJobs' - - # Preparing the request body for the Cloud Function processing - pipeline_name = pipeline_dict['pipelineSpec']['pipelineInfo']['name'] - full_pipeline_name = 'projects/{}/pipelineJobs/{}'.format(project_id, pipeline_name) - pipeline_display_name = pipeline_dict.get('displayName') - time_format_suffix = "-{{$.scheduledTime.strftime('%Y-%m-%d-%H-%M-%S')}}" - if 'name' in pipeline_dict: - pipeline_dict['name'] += time_format_suffix - if 'displayName' in pipeline_dict: - pipeline_dict['displayName'] += time_format_suffix - - pipeline_dict['_url'] = pipeline_jobs_api_url - pipeline_dict['_method'] = 'POST' - - if service_account is not None: - pipeline_dict['serviceAccount'] = service_account - - pipeline_text = json.dumps(pipeline_dict) - pipeline_data = pipeline_text.encode('utf-8') - - # Generating human-readable schedule name. - schedule_name = _build_schedule_name( - job_body_data=pipeline_data, - schedule=schedule, - pipeline_name=full_pipeline_name, - display_name=pipeline_display_name, - ) - - project_location_path = 'projects/{}/locations/{}'.format( - project_id, app_engine_region) - scheduled_job_full_name = '{}/jobs/{}'.format(project_location_path, - schedule_name) - service_account_email = cloud_scheduler_service_account or '{}@appspot.gserviceaccount.com'.format( - project_id) - - scheduled_job = dict( - name=scheduled_job_full_name, # Optional. Only used for readable names. - schedule=schedule, - time_zone=time_zone, - http_target=dict( - http_method='POST', - uri=proxy_function_url, - # Warning: when using google.cloud.scheduler_v1, the type of body is - # bytes or string. But when using the API through discovery, the body - # needs to be base64-encoded. - body=base64.b64encode(pipeline_data).decode('utf-8'), - oidc_token=dict(service_account_email=service_account_email,), - ), - # TODO(avolkov): Add labels once Cloud Scheduler supports them - # labels={ - # 'google.cloud.ai-platform.pipelines.scheduling': 'v1alpha1', - # }, - ) - - try: - response = _create_scheduler_job( - project_location_path=project_location_path, - job_body=scheduled_job, - ) - return response - except googleapiclient.errors.HttpError as err: - # Handling the case where the exact schedule already exists. - if err.resp.get('status') == '409': - raise RuntimeError( - 'The exact same schedule already exists') from err - raise err - - -def _create_scheduler_job(project_location_path: str, - job_body: Mapping[str, Any]) -> str: - """Creates a scheduler job. - - Args: - project_location_path: The project location path. - job_body: The scheduled job dictionary object. - - Returns: - The response from scheduler service. - """ - # We cannot use google.cloud.scheduler_v1.CloudSchedulerClient since - # it's not available internally. - scheduler_service = discovery.build( - 'cloudscheduler', 'v1', cache_discovery=False) - scheduler_jobs_api = scheduler_service.projects().locations().jobs() - response = scheduler_jobs_api.create( - parent=project_location_path, - body=job_body, - ).execute() - return response - - -def _build_schedule_name( - job_body_data: bytes, - schedule: str, - pipeline_name: str, - display_name: str, -) -> str: - """Generates the name for the schedule. - - Args: - job_body_data: The serialized pipeline job. - schedule: Schedule in cron format. - pipeline_name: Full resource name of the pipeline in - projects//pipelineJobs/ format. - display_name: Pipeline display name. - Returns: - Suggested schedule resource name. - """ - pipeline_name_part = 'pipeline' - if pipeline_name is not None: - # pipeline_name format: projects//pipelineJobs/ - pipeline_id = pipeline_name.split('/')[-1] - # Limiting the length of the pipeline name part. - pipeline_name_part = pipeline_id[0:200] - elif display_name is not None: - pipeline_name_part = display_name - pipeline_hash_part = hashlib.sha256(job_body_data).hexdigest()[0:8] - schedule_part = ( - schedule.replace('*/', 'div').replace('*', 'a').replace(' ', '-')) - job_name = '_'.join([ - 'pipeline', - pipeline_name_part, - pipeline_hash_part, - schedule_part, - ]) - job_name = re.sub('[^-_a-z0-9]', '_', job_name) - return job_name - - -# For mocking -def _get_cloud_functions_api(): - functions_service = discovery.build( - 'cloudfunctions', 'v1', cache_discovery=False) - functions_api = functions_service.projects().locations().functions() - return functions_api - - -def _create_or_get_cloud_function( - name: str, - entry_point: str, - file_data: Mapping[str, str], - project_id: str, - region: str, - runtime: str = 'python37', - cloud_scheduler_service_account: Optional[str] = None, -): - """Creates Google Cloud Function.""" - functions_api = _get_cloud_functions_api() - - project_location_path = 'projects/{}/locations/{}'.format( - project_id, region) - function_full_name = project_location_path + '/functions/' + name - - # Returning early if the function already exists. - try: - function_get_response = functions_api.get( - name=function_full_name).execute() - return function_get_response - except googleapiclient.errors.HttpError as err: - raise_error = True - if err.resp['status'] == '404': - # The function does not exist, which is expected. - raise_error = False - if raise_error: - raise err - - get_upload_url_response = functions_api.generateUploadUrl( - parent=project_location_path, - body={}, - ).execute() - upload_url = get_upload_url_response['uploadUrl'] - - # Preparing the payload archive - with tempfile.TemporaryFile() as archive_file: - with zipfile.ZipFile(archive_file, 'w', - zipfile.ZIP_DEFLATED) as archive: - for path, data in file_data.items(): - archive.writestr(path, data) - - archive_file.seek(0) - headers = { - 'content-type': 'application/zip', - 'x-goog-content-length-range': '0,104857600', - } - upload_response = requests.put( - url=upload_url, - headers=headers, - data=archive_file, - ) - upload_response.raise_for_status() - - # Prepare Request Body - # https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#resource-cloudfunction - - request_body = { - 'name': function_full_name, - 'entryPoint': entry_point, - 'sourceUploadUrl': upload_url, - 'httpsTrigger': {}, - 'runtime': runtime, - } - if cloud_scheduler_service_account is not None: - request_body["serviceAccountEmail"] = cloud_scheduler_service_account - try: - functions_api.create( - location=project_location_path, - body=request_body, - ).execute() - # Response is an operation object dict - # TODO(avolkov): Wait for the operation to succeed - # 'status' can be 'ACTIVE', 'DEPLOY_IN_PROGRESS', etc - except googleapiclient.errors.HttpError as err: - # Handling the case where the function already exists - raise_error = True - if err.resp['status'] == '409': - err_content_dict = json.loads(err.content) - err_error_dict = err_content_dict.get('error') - if err_error_dict and err_error_dict.get( - 'status') == 'ALREADY_EXISTS': - # This should not usually happen - logging.warning( - 'Cloud Function already exists: name=%s', - function_full_name, - ) - raise_error = False - if raise_error: - raise err - - function_get_response = functions_api.get(name=function_full_name).execute() - logging.info('Created Cloud Function: name=%s', function_full_name) - - return function_get_response - - -def _enable_required_apis(project_id: str,): - """Enables necessary APIs.""" - serviceusage_service = discovery.build( - 'serviceusage', 'v1', cache_discovery=False) - services_api = serviceusage_service.services() - - required_services = [ - 'cloudfunctions.googleapis.com', - 'cloudscheduler.googleapis.com', - 'appengine.googleapis.com', # Required by the Cloud Scheduler. - ] - project_path = 'projects/' + project_id - for service_name in required_services: - service_path = project_path + '/services/' + service_name - services_api.enable(name=service_path).execute() - - -def _get_proxy_cloud_function_endpoint( - project_id: str, - region: str = 'us-central1', - cloud_scheduler_service_account: Optional[str] = None, -): - """Sets up a proxy Cloud Function.""" - function_source_path = ( - pathlib.Path(__file__).parent / _PROXY_FUNCTION_FILENAME) - function_source = function_source_path.read_text() - function_entry_point = '_process_request' - - function_dict = _create_or_get_cloud_function( - name=_PROXY_FUNCTION_NAME, - entry_point=function_entry_point, - project_id=project_id, - region=region, - runtime='python37', - file_data={ - 'main.py': function_source, - 'requirements.txt': 'google-api-python-client>=1.7.8,<2', - }, - cloud_scheduler_service_account=cloud_scheduler_service_account, - ) - endpoint_url = function_dict['httpsTrigger']['url'] - return endpoint_url diff --git a/sdk/python/kfp/v2/google/client/schedule_test.py b/sdk/python/kfp/v2/google/client/schedule_test.py deleted file mode 100644 index 91cfad37268..00000000000 --- a/sdk/python/kfp/v2/google/client/schedule_test.py +++ /dev/null @@ -1,141 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for kfp.v2.google.client.schedule.""" - -import base64 -import hashlib -import json -import os -import unittest -from unittest import mock - -from kfp.v2.google.client import schedule - - -class ScheduleTest(unittest.TestCase): - - def test_create_from_pipeline_file(self): - test_data_path = os.path.join(os.path.dirname(__file__), 'testdata') - pipeline_path = os.path.join(test_data_path, 'pipeline1.json') - pipeline_request_body_path = os.path.join( - test_data_path, 'pipeline1_request_body.json') - - project_id = 'project-id' - location = 'us-central1' - function_url = ('https://{}-{}.cloudfunctions.net/' + - 'templated_http_request-v1').format( - location, project_id) - with mock.patch( - 'kfp.v2.google.client.schedule._enable_required_apis', - return_value=None, - ), mock.patch( - 'kfp.v2.google.client.schedule._get_proxy_cloud_function_endpoint', - return_value=function_url, - ), mock.patch( - 'kfp.v2.google.client.schedule._create_scheduler_job', - spec=True) as create_scheduler_job_mock: - schedule.create_from_pipeline_file( - pipeline_path=pipeline_path, - schedule='46 * * * *', - project_id=project_id, - region=location, - time_zone='America/Los_Angeles', - parameter_values={'name_param': 'World'}, - pipeline_root='gs://my-project/pipeline_root/tmp/', - ) - - with open(pipeline_request_body_path, 'rb') as f: - expected_body_dict = json.load(f) - expected_body_json = json.dumps(expected_body_dict) - expected_body_data = expected_body_json.encode('utf-8') - expected_body_data_hash = hashlib.sha256( - expected_body_data).hexdigest()[0:8] - - create_scheduler_job_mock.assert_called_with( - project_location_path='projects/{}/locations/{}'.format( - project_id, location), - job_body={ - 'name': - 'projects/{}/locations/{}/jobs/pipeline_my-pipeline_{}_46-a-a-a-a' - .format(project_id, location, expected_body_data_hash), - 'schedule': - '46 * * * *', - 'time_zone': - 'America/Los_Angeles', - 'http_target': { - 'http_method': - 'POST', - 'uri': - function_url, - 'body': - base64.b64encode(expected_body_data).decode('utf-8' - ), - 'oidc_token': { - 'service_account_email': - 'project-id@appspot.gserviceaccount.com', - }, - }, - }, - ) - - def test_create_schedule_when_cloud_function_already_exists(self): - test_data_path = os.path.join(os.path.dirname(__file__), 'testdata') - pipeline_path = os.path.join(test_data_path, 'pipeline1.json') - - project_id = 'project-id' - location = 'us-central1' - function_url = ('https://{}-{}.cloudfunctions.net/' + - 'templated_http_request-v1').format( - location, project_id) - - def mock_get_cloud_functions_api(): - functions_api = mock.Mock() - - def function_get(name): - del name - request_mock = mock.Mock() - request_mock.execute.return_value = { - 'httpsTrigger': { - 'url': function_url - } - } - return request_mock - - functions_api.get = function_get - return functions_api - - with mock.patch( - 'kfp.v2.google.client.schedule._enable_required_apis', - return_value=None, - ), mock.patch( - 'kfp.v2.google.client.schedule._get_cloud_functions_api', - new=mock_get_cloud_functions_api, - ), mock.patch( - 'kfp.v2.google.client.schedule._create_scheduler_job', - spec=True) as create_scheduler_job_mock: - schedule.create_from_pipeline_file( - pipeline_path=pipeline_path, - schedule='46 * * * *', - project_id=project_id, - region=location, - time_zone='America/Los_Angeles', - ) - create_scheduler_job_mock.assert_called_once() - actual_job_body = create_scheduler_job_mock.call_args[1]['job_body'] - self.assertEqual(actual_job_body['http_target']['uri'], - function_url) - - -if __name__ == '__main__': - unittest.main() diff --git a/sdk/python/kfp/v2/google/client/testdata/pipeline1.json b/sdk/python/kfp/v2/google/client/testdata/pipeline1.json deleted file mode 100644 index 47805fd356d..00000000000 --- a/sdk/python/kfp/v2/google/client/testdata/pipeline1.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "displayName": "my-pipeline", - "runtimeConfig": { - "gcsOutputDirectory": "gs://some-bucket/tmp/", - "parameters": { - "name_param": { - "stringValue": "world" - } - } - }, - "pipelineSpec": { - "pipelineInfo": { - "name": "my-pipeline" - }, - "root": { - "dag": { - "tasks": { - "task-test": { - "taskInfo": { - "name": "task-test" - }, - "inputs": { - "parameters": { - "name": { - "componentInputParameter": "name_param" - } - } - }, - "componentRef": { - "name": "comp-test" - } - } - } - }, - "inputDefinitions": { - "parameters": { - "name_param": { - "type": "STRING" - } - } - } - }, - "deploymentSpec": { - "executors": { - "exec-test": { - "container": { - "image": "python:3.7-alpine", - "command": [ - "echo", - "Hello {{$.inputs.parameters['name']}} {{$.scheduledTime.strftime('%Y-%m-%d')}}" - ] - } - } - } - }, - "components": { - "comp-test": { - "executorLabel": "exec-test", - "inputDefinitions": { - "parameters": { - "name": { - "type": "STRING" - } - } - } - } - }, - "sdkVersion": "dummy-kfp-version", - "schemaVersion": "2.0.0" - } -} diff --git a/sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body.json b/sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body.json deleted file mode 100644 index 7c8554a8819..00000000000 --- a/sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body.json +++ /dev/null @@ -1,73 +0,0 @@ -{ - "displayName": "my-pipeline-{{$.scheduledTime.strftime('%Y-%m-%d-%H-%M-%S')}}", - "runtimeConfig": { - "gcsOutputDirectory": "gs://my-project/pipeline_root/tmp/", - "parameters": { - "name_param": { - "stringValue": "World" - } - } - }, - "pipelineSpec": { - "pipelineInfo": { - "name": "my-pipeline" - }, - "root": { - "dag": { - "tasks": { - "task-test": { - "taskInfo": { - "name": "task-test" - }, - "inputs": { - "parameters": { - "name": { - "componentInputParameter": "name_param" - } - } - }, - "componentRef": { - "name": "comp-test" - } - } - } - }, - "inputDefinitions": { - "parameters": { - "name_param": { - "type": "STRING" - } - } - } - }, - "deploymentSpec": { - "executors": { - "exec-test": { - "container": { - "image": "python:3.7-alpine", - "command": [ - "echo", - "Hello {{$.inputs.parameters['name']}} {{$.scheduledTime.strftime('%Y-%m-%d')}}" - ] - } - } - } - }, - "components": { - "comp-test": { - "executorLabel": "exec-test", - "inputDefinitions": { - "parameters": { - "name": { - "type": "STRING" - } - } - } - } - }, - "sdkVersion": "dummy-kfp-version", - "schemaVersion": "2.0.0" - }, - "_url": "https://us-central1-aiplatform.googleapis.com/v1beta1/projects/project-id/locations/us-central1/pipelineJobs", - "_method": "POST" -} diff --git a/sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body_final.json b/sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body_final.json deleted file mode 100644 index 9cb28e46177..00000000000 --- a/sdk/python/kfp/v2/google/client/testdata/pipeline1_request_body_final.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "displayName": "my-pipeline-2020-08-01-12-34-00", - "runtimeConfig": { - "gcsOutputDirectory": "gs://my-project/pipeline_root/tmp/", - "parameters": { - "name_param": { - "stringValue": "World" - } - } - }, - "pipelineSpec": { - "pipelineInfo": { - "name": "my-pipeline" - }, - "root": { - "dag": { - "tasks": { - "task-test": { - "taskInfo": { - "name": "task-test" - }, - "inputs": { - "parameters": { - "name": { - "componentInputParameter": "name_param" - } - } - }, - "componentRef": { - "name": "comp-test" - } - } - } - }, - "inputDefinitions": { - "parameters": { - "name_param": { - "type": "STRING" - } - } - } - }, - "deploymentSpec": { - "executors": { - "exec-test": { - "container": { - "image": "python:3.7-alpine", - "command": [ - "echo", - "Hello {{$.inputs.parameters['name']}} 2020-08-01" - ] - } - } - } - }, - "components": { - "comp-test": { - "executorLabel": "exec-test", - "inputDefinitions": { - "parameters": { - "name": { - "type": "STRING" - } - } - } - } - }, - "sdkVersion": "dummy-kfp-version", - "schemaVersion": "2.0.0" - } -} diff --git a/sdk/python/kfp/v2/google/client/testdata/pipeline_job.json b/sdk/python/kfp/v2/google/client/testdata/pipeline_job.json deleted file mode 100644 index 5e3ebe5f123..00000000000 --- a/sdk/python/kfp/v2/google/client/testdata/pipeline_job.json +++ /dev/null @@ -1,300 +0,0 @@ -{ - "name": "projects/test-project/locations/us-central1/pipelineJobs/sample-test-pipeline-20201028000000", - "displayName": "sample-test-pipeline-20201028000000", - "pipelineSpec": { - "components": { - "comp-condition-1": { - "dag": { - "tasks": { - "task-flip-coin-2": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-flip-coin-2" - }, - "taskInfo": { - "name": "task-flip-coin-2" - } - }, - "task-print-msg-2": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-print-msg-2" - }, - "dependentTasks": [ - "task-flip-coin-2" - ], - "inputs": { - "parameters": { - "msg": { - "taskOutputParameter": { - "outputParameterKey": "Output", - "producerTask": "task-flip-coin-2" - } - } - } - }, - "taskInfo": { - "name": "task-print-msg-2" - } - }, - "task-print-msg-3": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-print-msg-3" - }, - "inputs": { - "parameters": { - "msg": { - "componentInputParameter": "pipelineparam--text" - } - } - }, - "taskInfo": { - "name": "task-print-msg-3" - } - } - } - }, - "inputDefinitions": { - "parameters": { - "pipelineparam--flip-coin-Output": { - "type": "STRING" - }, - "pipelineparam--text": { - "type": "STRING" - } - } - } - }, - "comp-flip-coin": { - "executorLabel": "exec-flip-coin", - "outputDefinitions": { - "parameters": { - "Output": { - "type": "STRING" - } - } - } - }, - "comp-flip-coin-2": { - "executorLabel": "exec-flip-coin-2", - "outputDefinitions": { - "parameters": { - "Output": { - "type": "STRING" - } - } - } - }, - "comp-print-msg": { - "executorLabel": "exec-print-msg", - "inputDefinitions": { - "parameters": { - "msg": { - "type": "STRING" - } - } - } - }, - "comp-print-msg-2": { - "executorLabel": "exec-print-msg-2", - "inputDefinitions": { - "parameters": { - "msg": { - "type": "STRING" - } - } - } - }, - "comp-print-msg-3": { - "executorLabel": "exec-print-msg-3", - "inputDefinitions": { - "parameters": { - "msg": { - "type": "STRING" - } - } - } - } - }, - "deploymentSpec": { - "executors": { - "exec-flip-coin": { - "container": { - "args": [ - "----output-paths", - "{{$.outputs.parameters['Output'].output_file}}" - ], - "command": [ - "sh", - "-ec", - "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", - "def flip_coin():\n \"\"\"Flip a coin and output heads or tails randomly.\"\"\"\n import random\n result = 'heads' if random.randint(0, 1) == 0 else 'tails'\n return result\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value, str):\n raise TypeError('Value \"{}\" has type \"{}\" instead of str.'.format(str(str_value), str(type(str_value))))\n return str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Flip coin', description='Flip a coin and output heads or tails randomly.')\n_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = flip_coin(**_parsed_args)\n\n_outputs = [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except OSError:\n pass\n with open(output_file, 'w') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n" - ], - "image": "python:3.7" - } - }, - "exec-flip-coin-2": { - "container": { - "args": [ - "----output-paths", - "{{$.outputs.parameters['Output'].output_file}}" - ], - "command": [ - "sh", - "-ec", - "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", - "def flip_coin():\n \"\"\"Flip a coin and output heads or tails randomly.\"\"\"\n import random\n result = 'heads' if random.randint(0, 1) == 0 else 'tails'\n return result\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value, str):\n raise TypeError('Value \"{}\" has type \"{}\" instead of str.'.format(str(str_value), str(type(str_value))))\n return str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Flip coin', description='Flip a coin and output heads or tails randomly.')\n_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = flip_coin(**_parsed_args)\n\n_outputs = [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except OSError:\n pass\n with open(output_file, 'w') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n" - ], - "image": "python:3.7" - } - }, - "exec-print-msg": { - "container": { - "args": [ - "--msg", - "{{$.inputs.parameters['msg']}}" - ], - "command": [ - "sh", - "-ec", - "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", - "def print_msg(msg):\n \"\"\"Print a message.\"\"\"\n print(msg)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Print msg', description='Print a message.')\n_parser.add_argument(\"--msg\", dest=\"msg\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = print_msg(**_parsed_args)\n" - ], - "image": "python:3.7" - } - }, - "exec-print-msg-2": { - "container": { - "args": [ - "--msg", - "{{$.inputs.parameters['msg']}}" - ], - "command": [ - "sh", - "-ec", - "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", - "def print_msg(msg):\n \"\"\"Print a message.\"\"\"\n print(msg)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Print msg', description='Print a message.')\n_parser.add_argument(\"--msg\", dest=\"msg\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = print_msg(**_parsed_args)\n" - ], - "image": "python:3.7" - } - }, - "exec-print-msg-3": { - "container": { - "args": [ - "--msg", - "{{$.inputs.parameters['msg']}}" - ], - "command": [ - "sh", - "-ec", - "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", - "def print_msg(msg):\n \"\"\"Print a message.\"\"\"\n print(msg)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Print msg', description='Print a message.')\n_parser.add_argument(\"--msg\", dest=\"msg\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs = print_msg(**_parsed_args)\n" - ], - "image": "python:3.7" - } - } - } - }, - "pipelineInfo": { - "name": "sample-test-pipeline" - }, - "root": { - "dag": { - "tasks": { - "task-condition-1": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-condition-1" - }, - "dependentTasks": [ - "task-flip-coin" - ], - "inputs": { - "parameters": { - "pipelineparam--flip-coin-Output": { - "taskOutputParameter": { - "outputParameterKey": "Output", - "producerTask": "task-flip-coin" - } - }, - "pipelineparam--text": { - "componentInputParameter": "text" - } - } - }, - "taskInfo": { - "name": "task-condition-1" - }, - "triggerPolicy": { - "condition": "inputs.parameters['pipelineparam--flip-coin-Output'].string_value == 'heads'" - } - }, - "task-flip-coin": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-flip-coin" - }, - "taskInfo": { - "name": "task-flip-coin" - } - }, - "task-print-msg": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-print-msg" - }, - "dependentTasks": [ - "task-flip-coin" - ], - "inputs": { - "parameters": { - "msg": { - "taskOutputParameter": { - "outputParameterKey": "Output", - "producerTask": "task-flip-coin" - } - } - } - }, - "taskInfo": { - "name": "task-print-msg" - } - } - } - }, - "inputDefinitions": { - "parameters": { - "text": { - "type": "STRING" - }, - "list": { - "type": "STRING" - } - } - } - }, - "schemaVersion": "2.0.0", - "sdkVersion": "kfp-1.5.0" - }, - "runtimeConfig": { - "gcsOutputDirectory": "dummy_root", - "parameters": { - "text": { - "stringValue": "Hello KFP!" - } - } - } -} diff --git a/sdk/python/kfp/v2/google/experimental/__init__.py b/sdk/python/kfp/v2/google/experimental/__init__.py deleted file mode 100644 index 3ca61950c23..00000000000 --- a/sdk/python/kfp/v2/google/experimental/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from kfp.v2.google.experimental.custom_job import run_as_aiplatform_custom_job diff --git a/sdk/python/kfp/v2/google/experimental/custom_job.py b/sdk/python/kfp/v2/google/experimental/custom_job.py deleted file mode 100644 index 91bb69f6abf..00000000000 --- a/sdk/python/kfp/v2/google/experimental/custom_job.py +++ /dev/null @@ -1,168 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Module for supporting Google Cloud AI Platform Custom Job.""" - -import copy -import warnings -from typing import Any, List, Mapping, Optional - -from kfp import dsl -from kfp.dsl import dsl_utils - -_DEFAULT_CUSTOM_JOB_MACHINE_TYPE = 'n1-standard-4' - - -def run_as_aiplatform_custom_job( - op: dsl.ContainerOp, - display_name: Optional[str] = None, - replica_count: Optional[int] = None, - machine_type: Optional[str] = None, - accelerator_type: Optional[str] = None, - accelerator_count: Optional[int] = None, - boot_disk_type: Optional[str] = None, - boot_disk_size_gb: Optional[int] = None, - timeout: Optional[str] = None, - restart_job_on_worker_restart: Optional[bool] = None, - service_account: Optional[str] = None, - network: Optional[str] = None, - output_uri_prefix: Optional[str] = None, - worker_pool_specs: Optional[List[Mapping[str, Any]]] = None, -) -> None: - """Run a pipeline task using AI Platform (Unified) custom training job. - - For detailed doc of the service, please refer to - https://cloud.google.com/ai-platform-unified/docs/training/create-custom-job - - Args: - op: The task (ContainerOp) object to run as aiplatform custom job. - display_name: Optional. The name of the custom job. - replica_count: Optional. The number of replicas to be split between master - workerPoolSpec and worker workerPoolSpec. (master always has 1 replica). - machine_type: Optional. The type of the machine to run the custom job. The - default value is "n1-standard-4". - accelerator_type: Optional. The type of accelerator(s) that may be attached - to the machine as per acceleratorCount. Optional. - accelerator_count: Optional. The number of accelerators to attach to the - machine. - boot_disk_type: Optional. Type of the boot disk (default is "pd-ssd"). Valid - values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard" - (Persistent Disk Hard Disk Drive). - boot_disk_size_gb: Optional. Size in GB of the boot disk (default is 100GB). - timeout: Optional. The maximum job running time. The default is 7 days. A - duration in seconds with up to nine fractional digits, terminated by 's'. - Example: "3.5s" - restart_job_on_worker_restart: Optional. Restarts the entire CustomJob if a - worker gets restarted. This feature can be used by distributed training - jobs that are not resilient to workers leaving and joining a job. - service_account: Optional. Specifies the service account for workload run-as - account. - network: Optional. The full name of the Compute Engine network to which the - job should be peered. For example, projects/12345/global/networks/myVPC. - output_uri_prefix: Optional. Google Cloud Storage URI to output directory. - additional_worker_pool_specs: Optional. Additional workerPoolSpecs for - distributed training. For details, please see: - https://cloud.google.com/ai-platform-unified/docs/training/distributed-training - """ - warnings.warn( - 'This function will be deprecated in v2.0.0', category=FutureWarning, - ) - - job_spec = {} - - if worker_pool_specs is not None: - worker_pool_specs = copy.deepcopy(worker_pool_specs) - - def _is_output_parameter(output_key: str) -> bool: - return output_key in ( - op.component_spec.output_definitions.parameters.keys()) - - for worker_pool_spec in worker_pool_specs: - if 'containerSpec' in worker_pool_spec: - container_spec = worker_pool_spec['containerSpec'] - if 'command' in container_spec: - dsl_utils.resolve_cmd_lines(container_spec['command'], - _is_output_parameter) - if 'args' in container_spec: - dsl_utils.resolve_cmd_lines(container_spec['args'], - _is_output_parameter) - - elif 'pythonPackageSpec' in worker_pool_spec: - # For custom Python training, resolve placeholders in args only. - python_spec = worker_pool_spec['pythonPackageSpec'] - if 'args' in python_spec: - dsl_utils.resolve_cmd_lines(python_spec['args'], - _is_output_parameter) - - else: - raise ValueError( - 'Expect either "containerSpec" or "pythonPackageSpec" in each ' - 'workerPoolSpec. Got: {}'.format(custom_job_spec)) - - job_spec['workerPoolSpecs'] = worker_pool_specs - - else: - worker_pool_spec = { - 'machineSpec': { - 'machineType': machine_type or _DEFAULT_CUSTOM_JOB_MACHINE_TYPE - }, - 'replicaCount': '1', - 'containerSpec': { - 'imageUri': op.container.image, - } - } - if op.container.command: - worker_pool_spec['containerSpec']['command'] = op.container.command - if op.container.args: - worker_pool_spec['containerSpec']['args'] = op.container.args - if accelerator_type is not None: - worker_pool_spec['machineSpec'][ - 'acceleratorType'] = accelerator_type - if accelerator_count is not None: - worker_pool_spec['machineSpec'][ - 'acceleratorCount'] = accelerator_count - if boot_disk_type is not None: - if 'diskSpec' not in worker_pool_spec: - worker_pool_spec['diskSpec'] = {} - worker_pool_spec['diskSpec']['bootDiskType'] = boot_disk_type - if boot_disk_size_gb is not None: - if 'diskSpec' not in worker_pool_spec: - worker_pool_spec['diskSpec'] = {} - worker_pool_spec['diskSpec']['bootDiskSizeGb'] = boot_disk_size_gb - - job_spec['workerPoolSpecs'] = [worker_pool_spec] - if replica_count is not None and replica_count > 1: - additional_worker_pool_spec = copy.deepcopy(worker_pool_spec) - additional_worker_pool_spec['replicaCount'] = str(replica_count - 1) - job_spec['workerPoolSpecs'].append(additional_worker_pool_spec) - - if timeout is not None: - if 'scheduling' not in job_spec: - job_spec['scheduling'] = {} - job_spec['scheduling']['timeout'] = timeout - if restart_job_on_worker_restart is not None: - if 'scheduling' not in job_spec: - job_spec['scheduling'] = {} - job_spec['scheduling'][ - 'restartJobOnWorkerRestart'] = restart_job_on_worker_restart - if service_account is not None: - job_spec['serviceAccount'] = service_account - if network is not None: - job_spec['network'] = network - if output_uri_prefix is not None: - job_spec['baseOutputDirectory'] = {'outputUriPrefix': output_uri_prefix} - - op.custom_job_spec = { - 'displayName': display_name or op.name, - 'jobSpec': job_spec - } diff --git a/sdk/python/kfp/v2/google/experimental/custom_job_test.py b/sdk/python/kfp/v2/google/experimental/custom_job_test.py deleted file mode 100644 index 4dfd9f26dee..00000000000 --- a/sdk/python/kfp/v2/google/experimental/custom_job_test.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for kfp.v2.google.experimental.custom_job.""" - -import unittest - -from kfp.dsl import _container_op -from kfp.v2.google.experimental import run_as_aiplatform_custom_job - - -class CustomJobTest(unittest.TestCase): - - def test_run_as_aiplatform_custom_job_simple_mode(self): - task = _container_op.ContainerOp( - name='test-task', - image='python:3.7', - command=['python3', 'main.py'], - arguments=['arg1', 'arg2']) - run_as_aiplatform_custom_job( - task, - display_name='custom-job1', - replica_count=10, - machine_type='n1-standard-8', - accelerator_type='NVIDIA_TESLA_K80', - accelerator_count=2, - boot_disk_type='pd-ssd', - boot_disk_size_gb=200, - timeout='3600s', - restart_job_on_worker_restart=True, - service_account='test-sa', - network='projects/123/global/networks/mypvc', - output_uri_prefix='gs://bucket/') - - expected_custom_job_spec = { - 'displayName': 'custom-job1', - 'jobSpec': { - 'workerPoolSpecs': [{ - 'replicaCount': '1', - 'machineSpec': { - 'machineType': 'n1-standard-8', - 'acceleratorType': 'NVIDIA_TESLA_K80', - 'acceleratorCount': 2 - }, - 'containerSpec': { - 'imageUri': 'python:3.7', - 'command': ['python3', 'main.py'], - 'args': ['arg1', 'arg2'] - }, - 'diskSpec': { - 'bootDiskType': 'pd-ssd', - 'bootDiskSizeGb': 200 - } - }, { - 'replicaCount': '9', - 'machineSpec': { - 'machineType': 'n1-standard-8', - 'acceleratorType': 'NVIDIA_TESLA_K80', - 'acceleratorCount': 2 - }, - 'containerSpec': { - 'imageUri': 'python:3.7', - 'command': ['python3', 'main.py'], - 'args': ['arg1', 'arg2'] - }, - 'diskSpec': { - 'bootDiskType': 'pd-ssd', - 'bootDiskSizeGb': 200 - } - }], - 'scheduling': { - 'timeout': '3600s', - 'restartJobOnWorkerRestart': True - }, - 'serviceAccount': 'test-sa', - 'network': 'projects/123/global/networks/mypvc', - 'baseOutputDirectory': { - 'outputUriPrefix': 'gs://bucket/' - } - } - } - self.maxDiff = None - self.assertDictEqual(task.custom_job_spec, expected_custom_job_spec) - - def test_run_as_aiplatform_custom_job_use_specified_worker_pool_specs(self): - task = _container_op.ContainerOp( - name='test-task', - image='python:3.7', - command=['python3', 'main.py'], - arguments=['arg1', 'arg2']) - run_as_aiplatform_custom_job( - task, - display_name='custom-job1', - worker_pool_specs=[ - { - 'containerSpec': { - 'imageUri': 'alpine', - 'command': ['sh', '-c', 'echo 1'], - }, - 'replicaCount': '1', - 'machineSpec': { - 'machineType': 'n1-standard-8', - }, - }, - ]) - - expected_custom_job_spec = { - 'displayName': 'custom-job1', - 'jobSpec': { - 'workerPoolSpecs': [{ - 'containerSpec': { - 'imageUri': 'alpine', - 'command': ['sh', '-c', 'echo 1'] - }, - 'replicaCount': '1', - 'machineSpec': { - 'machineType': 'n1-standard-8' - } - }] - } - } - - print(task.custom_job_spec) - self.maxDiff = None - self.assertDictEqual(task.custom_job_spec, expected_custom_job_spec) - - -if __name__ == '__main__': - unittest.main() From ab82a1e059b8654ddef6d599bc24fe43ed5ddd6a Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Tue, 9 Nov 2021 11:57:26 -0800 Subject: [PATCH 2/7] Update RELEASE.md --- sdk/RELEASE.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index fbe2dd9970b..c49c525f2c9 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -8,6 +8,8 @@ ## Breaking Changes +* Remove sdk/python/kfp/v2/google directory for v2, including google client and custom job [\#6886](https://github.com/kubeflow/pipelines/pull/6886) + ### For Pipeline Authors ### For Component Authors From 94f610d5c5b61c09fde5db1b3b888cd6dd673bf2 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Tue, 9 Nov 2021 12:00:45 -0800 Subject: [PATCH 3/7] Update MANIFEST.in --- sdk/python/MANIFEST.in | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/MANIFEST.in b/sdk/python/MANIFEST.in index 4e803d42d95..1b2896e9bcb 100644 --- a/sdk/python/MANIFEST.in +++ b/sdk/python/MANIFEST.in @@ -1,2 +1 @@ include kfp/dsl/type_schemas/*.yaml -include kfp/v2/google/client/discovery/aiplatform_public_google_rest_v1beta1.json \ No newline at end of file From 7bb32a81d0c962a8418ef16c0cbe41ebf815fe50 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Tue, 9 Nov 2021 12:03:39 -0800 Subject: [PATCH 4/7] Update requirements-test.txt --- sdk/python/requirements-test.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/requirements-test.txt b/sdk/python/requirements-test.txt index f4f42462218..bc04b4960a7 100644 --- a/sdk/python/requirements-test.txt +++ b/sdk/python/requirements-test.txt @@ -1,2 +1 @@ -r requirements.txt -frozendict==2.0.2 From 08300f3f02a80bcb642b2a7842a4b03838d26fab Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Tue, 9 Nov 2021 12:04:11 -0800 Subject: [PATCH 5/7] Update requirements.in --- sdk/python/requirements.in | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in index fc31e2ab85c..c863101e2ab 100644 --- a/sdk/python/requirements.in +++ b/sdk/python/requirements.in @@ -33,7 +33,6 @@ typer>=0.3.2,<1.0 absl-py>=0.9,<=0.11 kfp-pipeline-spec>=0.1.13,<0.2.0 fire>=0.3.1,<1 -google-api-python-client>=1.7.8,<2 pydantic>=1.8.2,<2 dataclasses>=0.8,<1; python_version<"3.7" typing-extensions>=3.7.4,<4; python_version<"3.9" From c41a86ded997555e1d82fb058047c9bec3321122 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Tue, 9 Nov 2021 12:10:00 -0800 Subject: [PATCH 6/7] fix setup --- sdk/python/requirements.txt | 21 ++------------------- sdk/python/setup.py | 10 ---------- 2 files changed, 2 insertions(+), 29 deletions(-) diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index 73ca463603b..65e14b95f62 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -31,22 +31,15 @@ fire==0.4.0 # via -r requirements.in google-api-core==1.31.3 # via - # google-api-python-client # google-cloud-core # google-cloud-storage -google-api-python-client==1.12.8 - # via -r requirements.in google-auth==1.35.0 # via # -r requirements.in # google-api-core - # google-api-python-client - # google-auth-httplib2 # google-cloud-core # google-cloud-storage # kubernetes -google-auth-httplib2==0.1.0 - # via google-api-python-client google-cloud-core==2.1.0 # via google-cloud-storage google-cloud-storage==1.42.3 @@ -57,10 +50,6 @@ google-resumable-media==2.0.3 # via google-cloud-storage googleapis-common-protos==1.53.0 # via google-api-core -httplib2==0.20.1 - # via - # google-api-python-client - # google-auth-httplib2 idna==3.2 # via requests jsonschema==3.2.0 @@ -90,9 +79,7 @@ pyasn1-modules==0.2.8 pydantic==1.8.2 # via -r requirements.in pyparsing==2.4.7 - # via - # httplib2 - # packaging + # via packaging pyrsistent==0.18.0 # via jsonschema python-dateutil==2.8.2 @@ -123,9 +110,7 @@ six==1.16.0 # absl-py # fire # google-api-core - # google-api-python-client # google-auth - # google-auth-httplib2 # google-cloud-storage # jsonschema # kfp-server-api @@ -143,9 +128,7 @@ typer==0.4.0 typing-extensions==3.10.0.2 # via pydantic uritemplate==3.0.1 - # via - # -r requirements.in - # google-api-python-client + # via -r requirements.in urllib3==1.26.7 # via # kfp-server-api diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 75ba23b331b..e5df29f2071 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -29,9 +29,6 @@ # https://github.com/googleapis/python-storage/blob/master/CHANGELOG.md#1200 'google-cloud-storage>=1.20.0,<2', 'kubernetes>=8.0.0,<19', - # google-api-python-client v2 doesn't work for private dicovery by default: - # https://github.com/googleapis/google-api-python-client/issues/1225#issuecomment-791058235 - 'google-api-python-client>=1.7.8,<2', 'google-auth>=1.6.1,<2', 'requests-toolbelt>=0.8.0,<1', 'cloudpickle>=2.0.0,<3', @@ -58,10 +55,6 @@ 'typing-extensions>=3.7.4,<4;python_version<"3.9"', ] -TESTS_REQUIRE = [ - 'frozendict', -] - EXTRAS_REQUIRE = { 'all': ['docker'], } @@ -100,7 +93,6 @@ def find_version(*file_path_parts): "https://github.com/kubeflow/pipelines/blob/master/sdk/RELEASE.md", }, install_requires=REQUIRES, - tests_require=TESTS_REQUIRE, extras_require=EXTRAS_REQUIRE, packages=[ 'kfp', @@ -123,8 +115,6 @@ def find_version(*file_path_parts): 'kfp.v2.components.experimental', 'kfp.v2.dsl', 'kfp.v2.dsl.experimental', - 'kfp.v2.google.client', - 'kfp.v2.google.experimental', ], classifiers=[ 'Intended Audience :: Developers', From b10ad7aa5a39fdd418c061a1ef1f1549637b9d8d Mon Sep 17 00:00:00 2001 From: Yaqi Date: Tue, 9 Nov 2021 12:21:41 -0800 Subject: [PATCH 7/7] remove test --- .../compiler_cli_tests/compiler_cli_tests.py | 3 - .../pipeline_with_custom_job_spec.json | 169 ------------------ .../pipeline_with_custom_job_spec.py | 59 ------ 3 files changed, 231 deletions(-) delete mode 100644 sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.json delete mode 100644 sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.py diff --git a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py index 16849e072bb..3a1837eca1c 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py @@ -186,9 +186,6 @@ def test_lightweight_python_functions_v2_with_outputs_experimental(self): def test_xgboost_sample_pipeline(self): self._test_compile_py_to_json('xgboost_sample_pipeline') - def test_pipeline_with_custom_job_spec(self): - self._test_compile_py_to_json('pipeline_with_custom_job_spec') - def test_pipeline_with_metrics_outputs(self): self._test_compile_py_to_json('pipeline_with_metrics_outputs') diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.json deleted file mode 100644 index 38e72ae8754..00000000000 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.json +++ /dev/null @@ -1,169 +0,0 @@ -{ - "pipelineSpec": { - "components": { - "comp-training-op": { - "executorLabel": "exec-training-op", - "inputDefinitions": { - "parameters": { - "input1": { - "parameterType": "STRING" - } - } - } - }, - "comp-training-op-2": { - "executorLabel": "exec-training-op-2", - "inputDefinitions": { - "parameters": { - "input1": { - "parameterType": "STRING" - } - } - } - } - }, - "deploymentSpec": { - "executors": { - "exec-training-op": { - "customJob": { - "customJob": { - "displayName": "custom-job-simple", - "jobSpec": { - "workerPoolSpecs": [ - { - "containerSpec": { - "args": [ - "--executor_input", - "{{$}}", - "--function_to_execute", - "training_op" - ], - "command": [ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n", - "sh", - "-ec", - "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", - "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef training_op(input1: str):\n print('dummy training master: {}'.format(input1))\n\n" - ], - "imageUri": "python:3.7" - }, - "machineSpec": { - "machineType": "n1-standard-4" - }, - "replicaCount": "1" - }, - { - "containerSpec": { - "args": [ - "--executor_input", - "{{$}}", - "--function_to_execute", - "training_op" - ], - "command": [ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n", - "sh", - "-ec", - "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", - "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef training_op(input1: str):\n print('dummy training master: {}'.format(input1))\n\n" - ], - "imageUri": "python:3.7" - }, - "machineSpec": { - "machineType": "n1-standard-4" - }, - "replicaCount": "9" - } - ] - } - } - } - }, - "exec-training-op-2": { - "customJob": { - "customJob": { - "displayName": "custom-job-advanced", - "jobSpec": { - "workerPoolSpecs": [ - { - "containerSpec": { - "command": [ - "sh", - "-c", - "set -e -x\necho \"worker1:\" \"$0\"\n", - "{{$.inputs.parameters['input1']}}" - ], - "imageUri": "alpine" - }, - "machineSpec": { - "machineType": "n1-standard-4" - }, - "replicaCount": "1" - } - ] - } - } - } - } - } - }, - "pipelineInfo": { - "name": "pipeline-on-custom-job" - }, - "root": { - "dag": { - "tasks": { - "training-op": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-training-op" - }, - "inputs": { - "parameters": { - "input1": { - "runtimeValue": { - "constant": "hello-world" - } - } - } - }, - "taskInfo": { - "name": "training-op" - } - }, - "training-op-2": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-training-op-2" - }, - "inputs": { - "parameters": { - "input1": { - "runtimeValue": { - "constant": "advanced setting - raw workerPoolSpec" - } - } - } - }, - "taskInfo": { - "name": "training-op-2" - } - } - } - } - }, - "schemaVersion": "2.1.0", - "sdkVersion": "kfp-1.8.6" - }, - "runtimeConfig": { - "gcsOutputDirectory": "dummy_root" - } -} \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.py deleted file mode 100644 index e72aea5447c..00000000000 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_custom_job_spec.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2021 The Kubeflow Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from kfp.components._structures import InputValuePlaceholder -from kfp.v2 import dsl -from kfp.v2.dsl import component -from kfp.v2.google import experimental -from kfp.v2 import compiler - - -@component -def training_op(input1: str): - print('dummy training master: {}'.format(input1)) - - -@dsl.pipeline(name='pipeline-on-custom-job', pipeline_root='dummy_root') -def my_pipeline(): - - training_task1 = training_op('hello-world') - experimental.run_as_aiplatform_custom_job( - training_task1, replica_count=10, display_name='custom-job-simple') - - training_task2 = training_op('advanced setting - raw workerPoolSpec') - experimental.run_as_aiplatform_custom_job( - training_task2, - display_name='custom-job-advanced', - worker_pool_specs=[ - { - 'containerSpec': { - 'imageUri': - 'alpine', - 'command': [ - 'sh', '-c', 'set -e -x\necho \"worker1:\" \"$0\"\n', - InputValuePlaceholder('input1') - ] - }, - 'machineSpec': { - 'machineType': 'n1-standard-4' - }, - 'replicaCount': '1', - }, - ]) - - -if __name__ == '__main__': - compiler.Compiler().compile( - pipeline_func=my_pipeline, - package_path=__file__.replace('.py', '.json'))