From 34b5d0bd57b5424132a687ebc649de3c47d5dcaf Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 29 Apr 2022 22:56:26 -0700 Subject: [PATCH] feat(components/google-cloud): Support parametrized input for reserved_ip_range and other Vertex Training parameters in custom job utility. PiperOrigin-RevId: 445583042 --- .../v1/custom_job/component.yaml | 7 + .../v1/custom_job/utils.py | 202 ++-- ...est_custom_training_job_wrapper_compile.py | 95 -- .../custom_training_job_pipeline.json | 270 +++--- .../custom_training_job_wrapper_pipeline.json | 145 --- .../custom_job/unit/test_custom_job_utils.py | 879 ++++++------------ 6 files changed, 537 insertions(+), 1061 deletions(-) delete mode 100644 components/google-cloud/tests/v1/custom_job/integration/test_custom_training_job_wrapper_compile.py delete mode 100644 components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_wrapper_pipeline.json diff --git a/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/component.yaml b/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/component.yaml index 5af605c5c1aa..34910601fbac 100644 --- a/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/component.yaml +++ b/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/component.yaml @@ -61,6 +61,11 @@ description: | under the VPC network that can be used for this job. If set, we will deploy the job within the provided ip ranges. Otherwise, the job will be deployed to any ip ranges under the provided VPC network. + nfs_mounts (Optional[Sequence[Dict[str, str]]]):A list of NFS mount specs in Json + dict format. For API spec, see + https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount + For more details about mounting NFS for CustomJob, see + https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share base_output_directory (Optional[str]): The Cloud Storage location to store the output of this CustomJob or HyperparameterTuningJob. see below for more details: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination @@ -86,6 +91,7 @@ inputs: - {name: enable_web_access, type: Boolean, optional: true, default: 'false'} - {name: network, type: String, optional: true, default: ''} - {name: reserved_ip_ranges, type: JsonArray, optional: true, default: "[]" } +- {name: nfs_mounts, type: JsonArray, optional: true, default: "{}" } - {name: base_output_directory, type: String, optional: true, default: ''} - {name: labels, type: JsonObject, optional: true, default: '{}'} - {name: encryption_spec_key_name, type: String, optional: true, default: ''} @@ -112,6 +118,7 @@ implementation: ', "enable_web_access": "', {inputValue: enable_web_access}, '"', ', "network": "', {inputValue: network}, '"', ', "reserved_ip_ranges": ', {inputValue: reserved_ip_ranges}, + ', "nfs_mounts": ', {inputValue: nfs_mounts}, ', "base_output_directory": {', '"output_uri_prefix": "', {inputValue: base_output_directory}, '"', '}', diff --git a/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/utils.py b/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/utils.py index fc5b30b9522b..9c337d4e2c2f 100644 --- a/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/utils.py +++ b/components/google-cloud/google_cloud_pipeline_components/v1/custom_job/utils.py @@ -17,14 +17,14 @@ # TODO(chavoshi): switch to using V2 only once it is ready. import copy import json +import os import tempfile from typing import Callable, Dict, Optional, Sequence - from google_cloud_pipeline_components.aiplatform import utils from kfp import components -from kfp.components import structures from kfp.dsl import dsl_utils from kfp.v2.components.types import type_utils +import yaml _DEFAULT_CUSTOM_JOB_CONTAINER_IMAGE = utils.DEFAULT_CONTAINER_IMAGE # Executor replacement is used as executor content needs to be jsonified before @@ -43,15 +43,15 @@ def create_custom_training_job_op_from_component( accelerator_count: Optional[int] = 1, boot_disk_type: Optional[str] = 'pd-ssd', boot_disk_size_gb: Optional[int] = 100, - timeout: Optional[str] = '', + timeout: Optional[str] = '604800s', restart_job_on_worker_restart: Optional[bool] = False, service_account: Optional[str] = '', network: Optional[str] = '', encryption_spec_key_name: Optional[str] = '', tensorboard: Optional[str] = '', enable_web_access: Optional[bool] = False, - reserved_ip_ranges: Optional[Sequence[str]] = [], - nfs_mounts: Optional[Sequence[Dict]] = [], + reserved_ip_ranges: Optional[Sequence[str]] = None, + nfs_mounts: Optional[Sequence[Dict[str, str]]] = None, base_output_directory: Optional[str] = '', labels: Optional[Dict[str, str]] = None, ) -> Callable: # pylint: disable=g-bare-generic @@ -119,15 +119,6 @@ def create_custom_training_job_op_from_component( number, as in 12345, and {network} is a network name. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network. - reserved_ip_ranges (Optional[Sequence[str]]): A list of names for the - reserved ip ranges under the VPC network that can be used for this job. If - set, we will deploy the job within the provided ip ranges. Otherwise, the - job will be deployed to any ip ranges under the provided VPC network. - nfs_mounts (Optional[Sequence[Dict]]): A list of NFS mount specs in Json - dict format. For API spec, see - https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount - For more details about mounting NFS for CustomJob, see - https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share encryption_spec_key_name (Optional[str]): Customer-managed encryption key options for the CustomJob. If this is set, then all resources created by the CustomJob will be encrypted with the provided encryption key. @@ -138,6 +129,15 @@ def create_custom_training_job_op_from_component( access](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) to training containers. If set to `true`, you can access interactive shells at the URIs given by [CustomJob.web_access_uris][]. + reserved_ip_ranges (Optional[Sequence[str]]): A list of names for the + reserved ip ranges under the VPC network that can be used for this job. If + set, we will deploy the job within the provided ip ranges. Otherwise, the + job will be deployed to any ip ranges under the provided VPC network. + nfs_mounts (Optional[Sequence[Dict]]): A list of NFS mount specs in Json + dict format. For API spec, see + https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount + For more details about mounting NFS for CustomJob, see + https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share base_output_directory (Optional[str]): The Cloud Storage location to store the output of this CustomJob or HyperparameterTuningJob. see below for more details: @@ -151,7 +151,7 @@ def create_custom_training_job_op_from_component( operator. """ - job_spec = {} + worker_pool_specs = {} input_specs = [] output_specs = [] @@ -213,109 +213,95 @@ def _is_output_parameter(output_key: str) -> bool: worker_pool_spec['disk_spec'] = {} worker_pool_spec['disk_spec']['boot_disk_size_gb'] = boot_disk_size_gb - job_spec['worker_pool_specs'] = [worker_pool_spec] + worker_pool_specs = [worker_pool_spec] if int(replica_count) > 1: additional_worker_pool_spec = copy.deepcopy(worker_pool_spec) additional_worker_pool_spec['replica_count'] = str(replica_count - 1) - job_spec['worker_pool_specs'].append(additional_worker_pool_spec) - - # TODO(chavoshi): Use input parameter instead of hard coded string label. - # This requires Dictionary input type to be supported in V2. - if labels is not None: - job_spec['labels'] = labels + worker_pool_specs.append(additional_worker_pool_spec) - if timeout: - if 'scheduling' not in job_spec: - job_spec['scheduling'] = {} - job_spec['scheduling']['timeout'] = timeout - if restart_job_on_worker_restart: - if 'scheduling' not in job_spec: - job_spec['scheduling'] = {} - job_spec['scheduling'][ - 'restart_job_on_worker_restart'] = restart_job_on_worker_restart - if enable_web_access: - job_spec['enable_web_access'] = enable_web_access - if reserved_ip_ranges: - job_spec['reserved_ip_ranges'] = reserved_ip_ranges - if nfs_mounts: - job_spec['nfs_mounts'] = nfs_mounts - if encryption_spec_key_name: - job_spec['encryption_spec'] = {} - job_spec['encryption_spec'][ - 'kms_key_name'] = "{{$.inputs.parameters['encryption_spec_key_name']}}" - input_specs.append( - structures.InputSpec( - name='encryption_spec_key_name', - type='String', - optional=True, - default=encryption_spec_key_name),) - - # Remove any existing service_account from component input list. + # Remove any Vertex Training duplicate input_spec from component input list. input_specs[:] = [ input_spec for input_spec in input_specs - if input_spec.name not in ('service_account', 'network', 'tensorboard', - 'base_output_directory') + if input_spec.name not in ('project', 'location', 'display_name', + 'worker_pool_specs', 'timeout', + 'restart_job_on_worker_restart', + 'service_account', 'tensorboard', 'network', + 'reserved_ip_ranges', 'nfs_mounts', + 'base_output_directory', 'labels', + 'encryption_spec_key_name') ] - job_spec['service_account'] = "{{$.inputs.parameters['service_account']}}" - job_spec['network'] = "{{$.inputs.parameters['network']}}" - job_spec['tensorboard'] = "{{$.inputs.parameters['tensorboard']}}" - job_spec['base_output_directory'] = {} - job_spec['base_output_directory'][ - 'output_uri_prefix'] = "{{$.inputs.parameters['base_output_directory']}}" - custom_job_payload = { - 'display_name': display_name or component_spec.component_spec.name, - 'job_spec': job_spec - } + custom_training_job_json = None + with open(os.path.join(os.path.dirname(__file__), 'component.yaml')) as file: + custom_training_job_json = yaml.load(file, Loader=yaml.FullLoader) + + for input_item in custom_training_job_json['inputs']: + if 'display_name' in input_item.values(): + input_item[ + 'default'] = display_name if display_name else component_spec.component_spec.name + input_item['optional'] = True + elif 'worker_pool_specs' in input_item.values(): + input_item['default'] = json.dumps(worker_pool_specs) + input_item['optional'] = True + elif 'timeout' in input_item.values(): + input_item['default'] = timeout + input_item['optional'] = True + elif 'restart_job_on_worker_restart' in input_item.values(): + input_item['default'] = json.dumps(restart_job_on_worker_restart) + input_item['optional'] = True + elif 'service_account' in input_item.values(): + input_item['default'] = service_account + input_item['optional'] = True + elif 'tensorboard' in input_item.values(): + input_item['default'] = tensorboard + input_item['optional'] = True + elif 'enable_web_access' in input_item.values(): + input_item['default'] = json.dumps(enable_web_access) + input_item['optional'] = True + elif 'network' in input_item.values(): + input_item['default'] = network + input_item['optional'] = True + elif 'reserved_ip_ranges' in input_item.values(): + input_item['default'] = json.dumps( + reserved_ip_ranges) if reserved_ip_ranges else '[]' + input_item['optional'] = True + elif 'nfs_mounts' in input_item.values(): + input_item['default'] = json.dumps(nfs_mounts) if nfs_mounts else '{}' + input_item['optional'] = True + elif 'base_output_directory' in input_item.values(): + input_item['default'] = base_output_directory + input_item['optional'] = True + elif 'labels' in input_item.values(): + input_item['default'] = json.dumps(labels) if labels else '{}' + input_item['optional'] = True + elif 'encryption_spec_key_name' in input_item.values(): + input_item['default'] = encryption_spec_key_name + input_item['optional'] = True + else: + # This field does not need to be updated. + continue - custom_job_component_spec = structures.ComponentSpec( - name=component_spec.component_spec.name, - inputs=input_specs + [ - structures.InputSpec( - name='base_output_directory', - type='String', - optional=True, - default=base_output_directory), - structures.InputSpec( - name='tensorboard', - type='String', - optional=True, - default=tensorboard), - structures.InputSpec( - name='network', type='String', optional=True, default=network), - structures.InputSpec( - name='service_account', - type='String', - optional=True, - default=service_account), - structures.InputSpec(name='project', type='String'), - structures.InputSpec(name='location', type='String') - ], - outputs=output_specs + - [structures.OutputSpec(name='gcp_resources', type='String')], - implementation=structures.ContainerImplementation( - container=structures.ContainerSpec( - image=_DEFAULT_CUSTOM_JOB_CONTAINER_IMAGE, - command=[ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - args=[ - '--type', - 'CustomJob', - '--payload', - json.dumps(custom_job_payload), - '--project', - structures.InputValuePlaceholder(input_name='project'), - '--location', - structures.InputValuePlaceholder(input_name='location'), - '--gcp_resources', - structures.OutputPathPlaceholder(output_name='gcp_resources'), - ], - ))) + # Copying over the input and output spec from the given component. + for input_spec in input_specs: + custom_training_job_json['inputs'].append(input_spec.to_dict()) - # pytype: enable=attribute-error + for output_spec in output_specs: + custom_training_job_json['outputs'].append(output_spec.to_dict()) + + # Copy the component name and description + custom_training_job_json['name'] = component_spec.component_spec.name + + if component_spec.component_spec.description: + # TODO(chavoshi) Add support for docstring parsing. + component_description = 'A custom job that wraps ' + component_description += f'{component_spec.component_spec.name}.\n\nOrigional component' + component_description += f' description:\n{component_spec.component_spec.description}\n\nCustom' + component_description += ' Job wrapper description:\n' + component_description += custom_training_job_json['description'] + custom_training_job_json['description'] = component_description component_path = tempfile.mktemp() - custom_job_component_spec.save(component_path) + with open(component_path, 'w') as out_file: + yaml.dump(custom_training_job_json, out_file) + return components.load_component_from_file(component_path) diff --git a/components/google-cloud/tests/v1/custom_job/integration/test_custom_training_job_wrapper_compile.py b/components/google-cloud/tests/v1/custom_job/integration/test_custom_training_job_wrapper_compile.py deleted file mode 100644 index ebb37b7d47c7..000000000000 --- a/components/google-cloud/tests/v1/custom_job/integration/test_custom_training_job_wrapper_compile.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2021 The Kubeflow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test google-cloud-pipeline-Components to ensure the compile without error.""" - -import json -import os - -from google_cloud_pipeline_components.v1.custom_job import utils -import kfp -from kfp import components -from kfp.v2 import compiler - -import unittest - - -class CustomTrainingJobWrapperCompileTest(unittest.TestCase): - - def setUp(self): - super(CustomTrainingJobWrapperCompileTest, self).setUp() - self._project = "test_project" - self._location = "us-central1" - self._test_input_string = "test_input_string" - self._package_path = "pipeline.json" - self._container_component = components.load_component_from_text( - "name: Producer\n" - "inputs:\n" - "- {name: input_text, type: String, description: 'Represents an input parameter.'}\n" - "outputs:\n" - "- {name: output_value, type: String, description: 'Represents an output paramter.'}\n" - "implementation:\n" - " container:\n" - " image: google/cloud-sdk:latest\n" - " command:\n" - " - sh\n" - " - -c\n" - " - |\n" - " set -e -x\n" - " echo '$0, this is an output parameter' | gsutil cp - '$1'\n" - " - {inputValue: input_text}\n" - " - {outputPath: output_value}\n") - self._python_componeont = self._create_a_pytnon_based_component() - - def tearDown(self): - super(CustomTrainingJobWrapperCompileTest, self).tearDown() - if os.path.exists(self._package_path): - os.remove(self._package_path) - - def _create_a_pytnon_based_component(self) -> callable: - """Creates a test python based component factory.""" - - @kfp.v2.dsl.component - def sum_numbers(a: int, b: int) -> int: - return a + b - - return sum_numbers - - def test_container_based_custom_job_op_compile(self): - - custom_job_op = utils.create_custom_training_job_op_from_component( - self._container_component) - - @kfp.dsl.pipeline(name="training-test") - def pipeline(): - custom_job_task = custom_job_op( # pylint: disable=unused-variable - self._test_input_string, - project=self._project, - location=self._location) - - compiler.Compiler().compile( - pipeline_func=pipeline, package_path=self._package_path) - - with open(self._package_path) as f: - executor_output_json = json.load(f, strict=False) - - with open( - os.path.join( - os.path.dirname(__file__), - "../testdata/custom_training_job_wrapper_pipeline.json")) as ef: - expected_executor_output_json = json.load(ef, strict=False) - - # Ignore the kfp SDK & schema version during comparision - del executor_output_json["pipelineSpec"]["sdkVersion"] - del executor_output_json["pipelineSpec"]["schemaVersion"] - self.assertEqual(executor_output_json, expected_executor_output_json) diff --git a/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_pipeline.json b/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_pipeline.json index ba8e7d6fb35d..22449115133f 100644 --- a/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_pipeline.json +++ b/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_pipeline.json @@ -1,141 +1,141 @@ { - "components":{ - "comp-custom-training-job":{ - "executorLabel":"exec-custom-training-job", - "inputDefinitions":{ - "parameters":{ - "display_name":{ - "parameterType":"STRING" - }, - "labels":{ - "parameterType":"STRUCT" - }, - "location":{ - "parameterType":"STRING" - }, - "project":{ - "parameterType":"STRING" - }, - "reserved_ip_ranges":{ - "parameterType":"LIST" - }, - "service_account":{ - "parameterType":"STRING" - }, - "worker_pool_specs":{ - "parameterType":"LIST" - } - } - }, - "outputDefinitions":{ - "parameters":{ - "gcp_resources":{ - "parameterType":"STRING" - } - } - } + "components": { + "comp-custom-training-job": { + "executorLabel": "exec-custom-training-job", + "inputDefinitions": { + "parameters": { + "display_name": { + "parameterType": "STRING" + }, + "labels": { + "parameterType": "STRUCT" + }, + "location": { + "parameterType": "STRING" + }, + "project": { + "parameterType": "STRING" + }, + "reserved_ip_ranges": { + "parameterType": "LIST" + }, + "service_account": { + "parameterType": "STRING" + }, + "worker_pool_specs": { + "parameterType": "LIST" + } + } + }, + "outputDefinitions": { + "parameters": { + "gcp_resources": { + "parameterType": "STRING" + } + } } - }, - "deploymentSpec":{ - "executors":{ - "exec-custom-training-job":{ - "container":{ - "args":[ - "--type", - "CustomJob", - "--payload", - "{\"display_name\": \"{{$.inputs.parameters['display_name']}}\", \"job_spec\": {\"worker_pool_specs\": {{$.inputs.parameters['worker_pool_specs']}}, \"scheduling\": {\"timeout\": \"\", \"restart_job_on_worker_restart\": \"\"}, \"service_account\": \"{{$.inputs.parameters['service_account']}}\", \"tensorboard\": \"\", \"enable_web_access\": \"\", \"network\": \"\", \"reserved_ip_ranges\": {{$.inputs.parameters['reserved_ip_ranges']}}, \"base_output_directory\": {\"output_uri_prefix\": \"\"}}, \"labels\": {{$.inputs.parameters['labels']}}, \"encryption_spec\": {\"kms_key_name\":\"\"}}", - "--project", - "{{$.inputs.parameters['project']}}", - "--location", - "{{$.inputs.parameters['location']}}", - "--gcp_resources", - "{{$.outputs.parameters['gcp_resources'].output_file}}" - ], - "command":[ - "python3", - "-u", - "-m", - "google_cloud_pipeline_components.container.v1.gcp_launcher.launcher" - ], - "image":"gcr.io/ml-pipeline/google-cloud-pipeline-components:latest" - } - } + } + }, + "deploymentSpec": { + "executors": { + "exec-custom-training-job": { + "container": { + "args": [ + "--type", + "CustomJob", + "--payload", + "{\"display_name\": \"{{$.inputs.parameters['display_name']}}\", \"job_spec\": {\"worker_pool_specs\": {{$.inputs.parameters['worker_pool_specs']}}, \"scheduling\": {\"timeout\": \"\", \"restart_job_on_worker_restart\": \"\"}, \"service_account\": \"{{$.inputs.parameters['service_account']}}\", \"tensorboard\": \"\", \"enable_web_access\": \"\", \"network\": \"\", \"reserved_ip_ranges\": {{$.inputs.parameters['reserved_ip_ranges']}}, \"nfs_mounts\": , \"base_output_directory\": {\"output_uri_prefix\": \"\"}}, \"labels\": {{$.inputs.parameters['labels']}}, \"encryption_spec\": {\"kms_key_name\":\"\"}}", + "--project", + "{{$.inputs.parameters['project']}}", + "--location", + "{{$.inputs.parameters['location']}}", + "--gcp_resources", + "{{$.outputs.parameters['gcp_resources'].output_file}}" + ], + "command": [ + "python3", + "-u", + "-m", + "google_cloud_pipeline_components.container.v1.gcp_launcher.launcher" + ], + "image": "gcr.io/ml-pipeline/google-cloud-pipeline-components:latest" + } } - }, - "pipelineInfo":{ - "name":"training-test" - }, - "root":{ - "dag":{ - "tasks":{ - "custom-training-job":{ - "cachingOptions":{ - "enableCache":true - }, - "componentRef":{ - "name":"comp-custom-training-job" - }, - "inputs":{ - "parameters":{ - "display_name":{ - "runtimeValue":{ - "constant":"fake_job" - } - }, - "labels":{ - "runtimeValue":{ - "constant":{ - "key1":"val1" - } - } - }, - "location":{ - "runtimeValue":{ - "constant":"us-central1" - } - }, - "project":{ - "runtimeValue":{ - "constant":"test_project" - } - }, - "reserved_ip_ranges":{ - "runtimeValue":{ - "constant":[ - "1.0.0.0" - ] - } - }, - "service_account":{ - "runtimeValue":{ - "constant":"fake_sa" - } - }, - "worker_pool_specs":{ - "runtimeValue":{ - "constant":[ - { - "container_spec":{ - "image_uri":"gcr.io/project_id/test" - }, - "machine_spec":{ - "accelerator_count":1.0, - "accelerator_type":"NVIDIA_TESLA_T4", - "machine_type":"n1-standard-4" - }, - "replica_count":1.0 - } - ] - } - } + } + }, + "pipelineInfo": { + "name": "training-test" + }, + "root": { + "dag": { + "tasks": { + "custom-training-job": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-custom-training-job" + }, + "inputs": { + "parameters": { + "display_name": { + "runtimeValue": { + "constant": "fake_job" + } + }, + "labels": { + "runtimeValue": { + "constant": { + "key1": "val1" } - }, - "taskInfo":{ - "name":"custom-training-job" - } + } + }, + "location": { + "runtimeValue": { + "constant": "us-central1" + } + }, + "project": { + "runtimeValue": { + "constant": "test_project" + } + }, + "reserved_ip_ranges": { + "runtimeValue": { + "constant": [ + "1.0.0.0" + ] + } + }, + "service_account": { + "runtimeValue": { + "constant": "fake_sa" + } + }, + "worker_pool_specs": { + "runtimeValue": { + "constant": [ + { + "container_spec": { + "image_uri": "gcr.io/project_id/test" + }, + "machine_spec": { + "accelerator_count": 1.0, + "accelerator_type": "NVIDIA_TESLA_T4", + "machine_type": "n1-standard-4" + }, + "replica_count": 1.0 + } + ] + } + } } - } + }, + "taskInfo": { + "name": "custom-training-job" + } + } } - } -} + } + } +} \ No newline at end of file diff --git a/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_wrapper_pipeline.json b/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_wrapper_pipeline.json deleted file mode 100644 index cb5416414f81..000000000000 --- a/components/google-cloud/tests/v1/custom_job/testdata/custom_training_job_wrapper_pipeline.json +++ /dev/null @@ -1,145 +0,0 @@ -{ - "pipelineSpec": { - "components": { - "comp-producer": { - "executorLabel": "exec-producer", - "inputDefinitions": { - "parameters": { - "base_output_directory": { - "type": "STRING" - }, - "input_text": { - "type": "STRING" - }, - "location": { - "type": "STRING" - }, - "network": { - "type": "STRING" - }, - "project": { - "type": "STRING" - }, - "service_account": { - "type": "STRING" - }, - "tensorboard": { - "type": "STRING" - } - } - }, - "outputDefinitions": { - "parameters": { - "gcp_resources": { - "type": "STRING" - }, - "output_value": { - "type": "STRING" - } - } - } - } - }, - "deploymentSpec": { - "executors": { - "exec-producer": { - "container": { - "args": [ - "--type", - "CustomJob", - "--payload", - "{\"display_name\": \"Producer\", \"job_spec\": {\"worker_pool_specs\": [{\"machine_spec\": {\"machine_type\": \"n1-standard-4\"}, \"replica_count\": 1, \"container_spec\": {\"image_uri\": \"google/cloud-sdk:latest\", \"command\": [\"sh\", \"-c\", \"set -e -x\\necho '$0, this is an output parameter' | gsutil cp - '$1'\\n\", \"{{$.inputs.parameters['input_text']}}\", \"{{$.outputs.parameters['output_value'].output_file}}\"]}, \"disk_spec\": {\"boot_disk_type\": \"pd-ssd\", \"boot_disk_size_gb\": 100}}], \"service_account\": \"{{$.inputs.parameters['service_account']}}\", \"network\": \"{{$.inputs.parameters['network']}}\", \"tensorboard\": \"{{$.inputs.parameters['tensorboard']}}\", \"base_output_directory\": {\"output_uri_prefix\": \"{{$.inputs.parameters['base_output_directory']}}\"}}}", - "--project", - "{{$.inputs.parameters['project']}}", - "--location", - "{{$.inputs.parameters['location']}}", - "--gcp_resources", - "{{$.outputs.parameters['gcp_resources'].output_file}}" - ], - "command": [ - "python3", - "-u", - "-m", - "google_cloud_pipeline_components.container.v1.gcp_launcher.launcher" - ], - "image": "gcr.io/ml-pipeline/google-cloud-pipeline-components:latest" - } - } - } - }, - "pipelineInfo": { - "name": "training-test" - }, - "root": { - "dag": { - "tasks": { - "producer": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-producer" - }, - "inputs": { - "parameters": { - "base_output_directory": { - "runtimeValue": { - "constantValue": { - "stringValue": "" - } - } - }, - "input_text": { - "runtimeValue": { - "constantValue": { - "stringValue": "test_input_string" - } - } - }, - "location": { - "runtimeValue": { - "constantValue": { - "stringValue": "us-central1" - } - } - }, - "network": { - "runtimeValue": { - "constantValue": { - "stringValue": "" - } - } - }, - "project": { - "runtimeValue": { - "constantValue": { - "stringValue": "test_project" - } - } - }, - "service_account": { - "runtimeValue": { - "constantValue": { - "stringValue": "" - } - } - }, - "tensorboard": { - "runtimeValue": { - "constantValue": { - "stringValue": "" - } - } - } - } - }, - "taskInfo": { - "name": "producer" - } - } - } - } - } - }, - "runtimeConfig": {} -} diff --git a/components/google-cloud/tests/v1/custom_job/unit/test_custom_job_utils.py b/components/google-cloud/tests/v1/custom_job/unit/test_custom_job_utils.py index a14211844bb5..66256886e42e 100644 --- a/components/google-cloud/tests/v1/custom_job/unit/test_custom_job_utils.py +++ b/components/google-cloud/tests/v1/custom_job/unit/test_custom_job_utils.py @@ -50,13 +50,122 @@ def _create_a_container_based_component(self) -> callable: def test_run_as_vertex_ai_custom_job_on_container_spec_with_defualts_values_converts_correctly( self): expected_results = { - 'name': 'ContainerComponent', + 'name': + 'ContainerComponent', + 'description': + 'Launch a Custom training job using Vertex CustomJob API.\n\n ' + 'Args:\n project (str):\n Required. Project to ' + 'create the custom training job in.\n location ' + '(Optional[str]):\n Location for creating the custom ' + 'training job. If not set,\n default to us-central1.\n' + ' display_name (str): The name of the custom training job.\n' + ' worker_pool_specs (Optional[Sequence[str]]): Serialized ' + 'json spec of the worker pools\n including machine type ' + 'and Docker image. All worker pools except the first one are\n' + ' optional and can be skipped by providing an empty ' + 'value.\n\n For more details about the WorkerPoolSpec, ' + 'see\n ' + 'https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#WorkerPoolSpec\n' + ' timeout (Optional[str]): The maximum job running time. The ' + 'default is 7\n days. A duration in seconds with up to ' + 'nine fractional digits, terminated\n by \'s\', for ' + 'example: "3.5s".\n restart_job_on_worker_restart ' + '(Optional[bool]): Restarts the entire\n CustomJob if a ' + 'worker gets restarted. This feature can be used by\n ' + 'distributed training jobs that are not resilient to workers ' + 'leaving and\n joining a job.\n service_account ' + '(Optional[str]): Sets the default service account for\n ' + 'workload run-as account. The service account running the ' + 'pipeline\n ' + '(https://cloud.google.com/vertex-ai/docs/pipelines/configure-project#service-account)\n' + ' submitting jobs must have act-as permission on this' + ' run-as account. If\n unspecified, the Vertex AI ' + 'Custom Code Service\n ' + 'Agent(https://cloud.google.com/vertex-ai/docs/general/access-control#service-agents)\n' + ' for the CustomJob\'s project.\n tensorboard ' + '(Optional[str]): The name of a Vertex AI Tensorboard resource ' + 'to\n which this CustomJob will upload Tensorboard ' + 'logs.\n enable_web_access (Optional[bool]): Whether you want' + ' Vertex AI to enable\n [interactive shell ' + 'access](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell)\n' + ' to training containers.\n If set to `true`, ' + 'you can access interactive shells at the URIs given\n by' + ' [CustomJob.web_access_uris][].\n network (Optional[str]): ' + 'The full name of the Compute Engine network to\n which ' + 'the job should be peered. For example,\n ' + 'projects/12345/global/networks/myVPC. Format is of the form\n' + ' projects/{project}/global/networks/{network}. Where ' + '{project} is a project\n number, as in 12345, and ' + '{network} is a network name. Private services\n access ' + 'must already be configured for the network. If left ' + 'unspecified,\n the job is not peered with any network.\n' + ' reserved_ip_ranges (Optional[Sequence[str]]): A list of ' + 'names for the reserved ip ranges\n under the VPC network' + ' that can be used for this job.\n If set, we will deploy' + ' the job within the provided ip ranges. Otherwise,\n the' + ' job will be deployed to any ip ranges under the provided VPC ' + 'network.\n nfs_mounts (Optional[Sequence[Dict[str, str]]]):A' + ' list of NFS mount specs in Json\n dict format. For API ' + 'spec, see\n ' + 'https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount\n' + ' For more details about mounting NFS for CustomJob, ' + 'see\n ' + 'https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share\n' + ' base_output_directory (Optional[str]): The Cloud Storage ' + 'location to store\n the output of this CustomJob or ' + 'HyperparameterTuningJob. see below for more details:\n ' + 'https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination\n' + ' labels (Optional[Dict[str, str]]): The labels with ' + 'user-defined metadata to organize CustomJobs.\n See ' + 'https://goo.gl/xmQnxf for more information.\n ' + 'encryption_spec_key_name (Optional[str]): Customer-managed ' + 'encryption key\n options for the CustomJob. If this is ' + 'set, then all resources created by\n the CustomJob will ' + 'be encrypted with the provided encryption key.\n\n Returns:\n' + ' gcp_resources (str):\n Serialized gcp_resources ' + 'proto tracking the custom training job.\n For more ' + 'details, see ' + 'https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.\n', 'inputs': [{ - 'name': 'input_text', + 'name': 'project', + 'type': 'String' + }, { + 'name': 'location', 'type': 'String', - 'description': 'Represents an input parameter.' + 'default': 'us-central1' }, { - 'name': 'base_output_directory', + 'name': 'display_name', + 'type': 'String', + 'default': 'ContainerComponent', + 'optional': True + }, { + 'name': + 'worker_pool_specs', + 'type': + 'JsonArray', + 'default': + '[{"machine_spec": {"machine_type": "n1-standard-4"}, ' + '"replica_count": 1, "container_spec": {"image_uri": ' + '"google/cloud-sdk:latest", "command": ["sh", "-c", "set -e ' + '-x\\necho \\"$0, this is an output parameter\\"\\n", ' + '"{{$.inputs.parameters[\'input_text\']}}", ' + '"{{$.outputs.parameters[\'output_value\'].output_file}}"]}, ' + '"disk_spec": {"boot_disk_type": "pd-ssd", ' + '"boot_disk_size_gb": 100}}]', + 'optional': + True + }, { + 'name': 'timeout', + 'type': 'String', + 'default': '604800s', + 'optional': True + }, { + 'name': 'restart_job_on_worker_restart', + 'type': 'Boolean', + 'default': 'false', + 'optional': True + }, { + 'name': 'service_account', 'type': 'String', 'default': '', 'optional': True @@ -65,58 +174,96 @@ def test_run_as_vertex_ai_custom_job_on_container_spec_with_defualts_values_conv 'type': 'String', 'default': '', 'optional': True + }, { + 'name': 'enable_web_access', + 'type': 'Boolean', + 'default': 'false', + 'optional': True }, { 'name': 'network', 'type': 'String', 'default': '', 'optional': True }, { - 'name': 'service_account', + 'name': 'reserved_ip_ranges', + 'type': 'JsonArray', + 'default': '[]', + 'optional': True + }, { + 'name': 'nfs_mounts', + 'type': 'JsonArray', + 'default': '{}', + 'optional': True + }, { + 'name': 'base_output_directory', 'type': 'String', 'default': '', 'optional': True }, { - 'name': 'project', - 'type': 'String' + 'name': 'labels', + 'type': 'JsonObject', + 'default': '{}', + 'optional': True }, { - 'name': 'location', - 'type': 'String' + 'name': 'encryption_spec_key_name', + 'type': 'String', + 'default': '', + 'optional': True + }, { + 'name': 'input_text', + 'type': 'String', + 'description': 'Represents an input parameter.' }], 'outputs': [{ + 'name': 'gcp_resources', + 'type': 'String' + }, { 'name': 'output_value', 'type': 'String', 'description': 'Represents an output paramter.' - }, { - 'name': 'gcp_resources', - 'type': 'String' }], 'implementation': { 'container': { 'image': - 'test_launcher_image', + 'gcr.io/ml-pipeline/google-cloud-pipeline-components:latest', 'command': [ 'python3', '-u', '-m', 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' ], 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { + '--type', 'CustomJob', '--payload', { + 'concat': [ + '{', '"display_name": "', { + 'inputValue': 'display_name' + }, '"', ', "job_spec": {', '"worker_pool_specs": ', + { + 'inputValue': 'worker_pool_specs' + }, ', "scheduling": {', '"timeout": "', { + 'inputValue': 'timeout' + }, '"', ', "restart_job_on_worker_restart": "', { + 'inputValue': 'restart_job_on_worker_restart' + }, '"', '}', ', "service_account": "', { + 'inputValue': 'service_account' + }, '"', ', "tensorboard": "', { + 'inputValue': 'tensorboard' + }, '"', ', "enable_web_access": "', { + 'inputValue': 'enable_web_access' + }, '"', ', "network": "', { + 'inputValue': 'network' + }, '"', ', "reserved_ip_ranges": ', { + 'inputValue': 'reserved_ip_ranges' + }, ', "nfs_mounts": ', { + 'inputValue': 'nfs_mounts' + }, ', "base_output_directory": {', + '"output_uri_prefix": "', { + 'inputValue': 'base_output_directory' + }, '"', '}', '}', ', "labels": ', { + 'inputValue': 'labels' + }, ', "encryption_spec": {"kms_key_name":"', { + 'inputValue': 'encryption_spec_key_name' + }, '"}', '}' + ] + }, '--project', { 'inputValue': 'project' }, '--location', { 'inputValue': 'location' @@ -136,609 +283,185 @@ def test_run_as_vertex_ai_custom_job_on_container_spec_with_defualts_values_conv def test_run_as_vertex_ai_custom_with_accelerator_type_and_count_converts_correctly( self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4", "accelerator_type": ' - '"test_accelerator_type", "accelerator_count": 2}, ' - '"replica_count": 1, "container_spec": {"image_uri": ' - '"google/cloud-sdk:latest", "command": ["sh", "-c", "set ' - '-e -x\\necho \\"$0, this is an output parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function, accelerator_type='test_accelerator_type', accelerator_count=2) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='worker_pool_specs', + type='JsonArray', + description=None, + default='[{"machine_spec": {"machine_type": "n1-standard-4", "accelerator_type": "test_accelerator_type", "accelerator_count": 2}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}, "disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100}}]', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_boot_disk_type_and_size_converts_correctly( self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}, {"machine_spec": ' - '{"machine_type": "n1-standard-4"}, "replica_count": "1", ' - '"container_spec": {"image_uri": ' - '"google/cloud-sdk:latest", "command": ["sh", "-c", "set ' - '-e -x\\necho \\"$0, this is an output parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( - component_factory_function, replica_count=2) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + component_factory_function, + boot_disk_type='test_type', + boot_disk_size_gb=200) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='worker_pool_specs', + type='JsonArray', + description=None, + default='[{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}, "disk_spec": {"boot_disk_type": "test_type", "boot_disk_size_gb": 200}}]', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_replica_count_greater_than_1_converts_correctly( self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}, {"machine_spec": ' - '{"machine_type": "n1-standard-4"}, "replica_count": "1", ' - '"container_spec": {"image_uri": ' - '"google/cloud-sdk:latest", "command": ["sh", "-c", "set ' - '-e -x\\necho \\"$0, this is an output parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( - component_factory_function, replica_count=2) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + component_factory_function, replica_count=5) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='worker_pool_specs', + type='JsonArray', + description=None, + default='[{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}, "disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100}}, {"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": "4", "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}, "disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100}}]', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_time_out_converts_correctly(self): component_factory_function = self._create_a_container_based_component() - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "scheduling": {"timeout": ' - '2}, "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( - component_factory_function, timeout=2) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + component_factory_function, timeout='2s') + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='timeout', + type='String', + description=None, + default='2s', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_restart_job_on_worker_restart_converts_correctly( self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "scheduling": ' - '{"restart_job_on_worker_restart": true}, ' - '"service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function, restart_job_on_worker_restart=True) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='restart_job_on_worker_restart', + type='Boolean', + description=None, + default='true', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_custom_service_account_converts_correctly( self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function, service_account='test_service_account') - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='service_account', + type='String', + description=None, + default='test_service_account', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_display_name_converts_correctly(self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "test_display_name", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function, display_name='test_display_name') - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='display_name', + type='String', + description=None, + default='test_display_name', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_network_converts_correctly(self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function, network='test_network') - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='network', + type='String', + description=None, + default='test_network', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_labels_converts_correctly(self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "labels": {"test_key": ' - '"test_value"}, "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function, labels={'test_key': 'test_value'}) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='labels', + type='JsonObject', + description=None, + default='{"test_key": "test_value"}', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_reserved_ip_ranges(self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "reserved_ip_ranges": ' - '["1.0.0.0", "2.0.0.0"], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( - component_factory_function, reserved_ip_ranges=['1.0.0.0', '2.0.0.0']) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + component_factory_function, + reserved_ip_ranges=['test_ip_range_network']) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='reserved_ip_ranges', + type='JsonArray', + description=None, + default='["test_ip_range_network"]', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_nfs_mount(self): component_factory_function = self._create_a_container_based_component() - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"]},' - ' "disk_spec": {"boot_disk_type": "pd-ssd", ' - '"boot_disk_size_gb": 100}}], "nfs_mounts": ' - '[{"server": "s1", "path": "p1"}], "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } custom_job_spec = utils.create_custom_training_job_op_from_component( - component_factory_function, nfs_mounts=[{'server': 's1', 'path': 'p1'}]) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + component_factory_function, nfs_mounts=[{ + 'server': 's1', + 'path': 'p1' + }]) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='nfs_mounts', + type='JsonArray', + description=None, + default='[{"server": "s1", "path": "p1"}]', + optional=True, + annotations=None) + ]) def test_run_as_vertex_ai_custom_with_environment_variable(self): component_factory_function = self._create_a_container_based_component() - component_factory_function.component_spec.implementation.container.env = [ - 'test_env_variable' - ] - - expected_sub_results = { - 'implementation': { - 'container': { - 'image': - 'test_launcher_image', - 'command': [ - 'python3', '-u', '-m', - 'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher' - ], - 'args': [ - '--type', 'CustomJob', '--payload', - '{"display_name": "ContainerComponent", "job_spec": ' - '{"worker_pool_specs": [{"machine_spec": {"machine_type": ' - '"n1-standard-4"}, "replica_count": 1, "container_spec": ' - '{"image_uri": "google/cloud-sdk:latest", "command": ' - '["sh", "-c", "set -e -x\\necho \\"$0, this is an output ' - 'parameter\\"\\n", ' - '"{{$.inputs.parameters[\'input_text\']}}", ' - '"{{$.outputs.parameters[\'output_value\'].output_file}}"],' - ' "env": ["test_env_variable"]}, "disk_spec": ' - '{"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100}}],' - ' "service_account": ' - '"{{$.inputs.parameters[\'service_account\']}}", ' - '"network": "{{$.inputs.parameters[\'network\']}}", ' - '"tensorboard": ' - '"{{$.inputs.parameters[\'tensorboard\']}}", ' - '"base_output_directory": {"output_uri_prefix": ' - '"{{$.inputs.parameters[\'base_output_directory\']}}"}}}', - '--project', { - 'inputValue': 'project' - }, '--location', { - 'inputValue': 'location' - }, '--gcp_resources', { - 'outputPath': 'gcp_resources' - } - ] - } - } - } + component_factory_function.component_spec.implementation.container.env = [{ + 'name': 'FOO', + 'value': 'BAR' + }] custom_job_spec = utils.create_custom_training_job_op_from_component( component_factory_function) - - self.assertDictContainsSubset( - subset=expected_sub_results, - dictionary=custom_job_spec.component_spec.to_dict()) + self.assertContainsSubsequence(custom_job_spec.component_spec.inputs, [ + components.structures.InputSpec( + name='worker_pool_specs', + type='JsonArray', + description=None, + default='[{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"], "env": [{"name": "FOO", "value": "BAR"}]}, "disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100}}]', + optional=True, + annotations=None) + ])