Skip to content

Commit

Permalink
feat(components/google-cloud): Support parametrized input for reserve…
Browse files Browse the repository at this point in the history
…d_ip_range and other Vertex Training parameters in custom job utility.

PiperOrigin-RevId: 445583042
  • Loading branch information
SinaChavoshi authored and Jagadeesh J committed May 11, 2022
1 parent 3923714 commit 34b5d0b
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 1,061 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ description: |
under the VPC network that can be used for this job.
If set, we will deploy the job within the provided ip ranges. Otherwise,
the job will be deployed to any ip ranges under the provided VPC network.
nfs_mounts (Optional[Sequence[Dict[str, str]]]):A list of NFS mount specs in Json
dict format. For API spec, see
https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount
For more details about mounting NFS for CustomJob, see
https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share
base_output_directory (Optional[str]): The Cloud Storage location to store
the output of this CustomJob or HyperparameterTuningJob. see below for more details:
https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination
Expand All @@ -86,6 +91,7 @@ inputs:
- {name: enable_web_access, type: Boolean, optional: true, default: 'false'}
- {name: network, type: String, optional: true, default: ''}
- {name: reserved_ip_ranges, type: JsonArray, optional: true, default: "[]" }
- {name: nfs_mounts, type: JsonArray, optional: true, default: "{}" }
- {name: base_output_directory, type: String, optional: true, default: ''}
- {name: labels, type: JsonObject, optional: true, default: '{}'}
- {name: encryption_spec_key_name, type: String, optional: true, default: ''}
Expand All @@ -112,6 +118,7 @@ implementation:
', "enable_web_access": "', {inputValue: enable_web_access}, '"',
', "network": "', {inputValue: network}, '"',
', "reserved_ip_ranges": ', {inputValue: reserved_ip_ranges},
', "nfs_mounts": ', {inputValue: nfs_mounts},
', "base_output_directory": {',
'"output_uri_prefix": "', {inputValue: base_output_directory}, '"',
'}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
# TODO(chavoshi): switch to using V2 only once it is ready.
import copy
import json
import os
import tempfile
from typing import Callable, Dict, Optional, Sequence

from google_cloud_pipeline_components.aiplatform import utils
from kfp import components
from kfp.components import structures
from kfp.dsl import dsl_utils
from kfp.v2.components.types import type_utils
import yaml

_DEFAULT_CUSTOM_JOB_CONTAINER_IMAGE = utils.DEFAULT_CONTAINER_IMAGE
# Executor replacement is used as executor content needs to be jsonified before
Expand All @@ -43,15 +43,15 @@ def create_custom_training_job_op_from_component(
accelerator_count: Optional[int] = 1,
boot_disk_type: Optional[str] = 'pd-ssd',
boot_disk_size_gb: Optional[int] = 100,
timeout: Optional[str] = '',
timeout: Optional[str] = '604800s',
restart_job_on_worker_restart: Optional[bool] = False,
service_account: Optional[str] = '',
network: Optional[str] = '',
encryption_spec_key_name: Optional[str] = '',
tensorboard: Optional[str] = '',
enable_web_access: Optional[bool] = False,
reserved_ip_ranges: Optional[Sequence[str]] = [],
nfs_mounts: Optional[Sequence[Dict]] = [],
reserved_ip_ranges: Optional[Sequence[str]] = None,
nfs_mounts: Optional[Sequence[Dict[str, str]]] = None,
base_output_directory: Optional[str] = '',
labels: Optional[Dict[str, str]] = None,
) -> Callable: # pylint: disable=g-bare-generic
Expand Down Expand Up @@ -119,15 +119,6 @@ def create_custom_training_job_op_from_component(
number, as in 12345, and {network} is a network name. Private services
access must already be configured for the network. If left unspecified,
the job is not peered with any network.
reserved_ip_ranges (Optional[Sequence[str]]): A list of names for the
reserved ip ranges under the VPC network that can be used for this job. If
set, we will deploy the job within the provided ip ranges. Otherwise, the
job will be deployed to any ip ranges under the provided VPC network.
nfs_mounts (Optional[Sequence[Dict]]): A list of NFS mount specs in Json
dict format. For API spec, see
https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount
For more details about mounting NFS for CustomJob, see
https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share
encryption_spec_key_name (Optional[str]): Customer-managed encryption key
options for the CustomJob. If this is set, then all resources created by
the CustomJob will be encrypted with the provided encryption key.
Expand All @@ -138,6 +129,15 @@ def create_custom_training_job_op_from_component(
access](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell)
to training containers. If set to `true`, you can access interactive
shells at the URIs given by [CustomJob.web_access_uris][].
reserved_ip_ranges (Optional[Sequence[str]]): A list of names for the
reserved ip ranges under the VPC network that can be used for this job. If
set, we will deploy the job within the provided ip ranges. Otherwise, the
job will be deployed to any ip ranges under the provided VPC network.
nfs_mounts (Optional[Sequence[Dict]]): A list of NFS mount specs in Json
dict format. For API spec, see
https://cloud.devsite.corp.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec#NfsMount
For more details about mounting NFS for CustomJob, see
https://cloud.devsite.corp.google.com/vertex-ai/docs/training/train-nfs-share
base_output_directory (Optional[str]): The Cloud Storage location to store
the output of this CustomJob or
HyperparameterTuningJob. see below for more details:
Expand All @@ -151,7 +151,7 @@ def create_custom_training_job_op_from_component(
operator.
"""
job_spec = {}
worker_pool_specs = {}
input_specs = []
output_specs = []

Expand Down Expand Up @@ -213,109 +213,95 @@ def _is_output_parameter(output_key: str) -> bool:
worker_pool_spec['disk_spec'] = {}
worker_pool_spec['disk_spec']['boot_disk_size_gb'] = boot_disk_size_gb

job_spec['worker_pool_specs'] = [worker_pool_spec]
worker_pool_specs = [worker_pool_spec]
if int(replica_count) > 1:
additional_worker_pool_spec = copy.deepcopy(worker_pool_spec)
additional_worker_pool_spec['replica_count'] = str(replica_count - 1)
job_spec['worker_pool_specs'].append(additional_worker_pool_spec)

# TODO(chavoshi): Use input parameter instead of hard coded string label.
# This requires Dictionary input type to be supported in V2.
if labels is not None:
job_spec['labels'] = labels
worker_pool_specs.append(additional_worker_pool_spec)

if timeout:
if 'scheduling' not in job_spec:
job_spec['scheduling'] = {}
job_spec['scheduling']['timeout'] = timeout
if restart_job_on_worker_restart:
if 'scheduling' not in job_spec:
job_spec['scheduling'] = {}
job_spec['scheduling'][
'restart_job_on_worker_restart'] = restart_job_on_worker_restart
if enable_web_access:
job_spec['enable_web_access'] = enable_web_access
if reserved_ip_ranges:
job_spec['reserved_ip_ranges'] = reserved_ip_ranges
if nfs_mounts:
job_spec['nfs_mounts'] = nfs_mounts
if encryption_spec_key_name:
job_spec['encryption_spec'] = {}
job_spec['encryption_spec'][
'kms_key_name'] = "{{$.inputs.parameters['encryption_spec_key_name']}}"
input_specs.append(
structures.InputSpec(
name='encryption_spec_key_name',
type='String',
optional=True,
default=encryption_spec_key_name),)

# Remove any existing service_account from component input list.
# Remove any Vertex Training duplicate input_spec from component input list.
input_specs[:] = [
input_spec for input_spec in input_specs
if input_spec.name not in ('service_account', 'network', 'tensorboard',
'base_output_directory')
if input_spec.name not in ('project', 'location', 'display_name',
'worker_pool_specs', 'timeout',
'restart_job_on_worker_restart',
'service_account', 'tensorboard', 'network',
'reserved_ip_ranges', 'nfs_mounts',
'base_output_directory', 'labels',
'encryption_spec_key_name')
]
job_spec['service_account'] = "{{$.inputs.parameters['service_account']}}"
job_spec['network'] = "{{$.inputs.parameters['network']}}"

job_spec['tensorboard'] = "{{$.inputs.parameters['tensorboard']}}"
job_spec['base_output_directory'] = {}
job_spec['base_output_directory'][
'output_uri_prefix'] = "{{$.inputs.parameters['base_output_directory']}}"
custom_job_payload = {
'display_name': display_name or component_spec.component_spec.name,
'job_spec': job_spec
}
custom_training_job_json = None
with open(os.path.join(os.path.dirname(__file__), 'component.yaml')) as file:
custom_training_job_json = yaml.load(file, Loader=yaml.FullLoader)

for input_item in custom_training_job_json['inputs']:
if 'display_name' in input_item.values():
input_item[
'default'] = display_name if display_name else component_spec.component_spec.name
input_item['optional'] = True
elif 'worker_pool_specs' in input_item.values():
input_item['default'] = json.dumps(worker_pool_specs)
input_item['optional'] = True
elif 'timeout' in input_item.values():
input_item['default'] = timeout
input_item['optional'] = True
elif 'restart_job_on_worker_restart' in input_item.values():
input_item['default'] = json.dumps(restart_job_on_worker_restart)
input_item['optional'] = True
elif 'service_account' in input_item.values():
input_item['default'] = service_account
input_item['optional'] = True
elif 'tensorboard' in input_item.values():
input_item['default'] = tensorboard
input_item['optional'] = True
elif 'enable_web_access' in input_item.values():
input_item['default'] = json.dumps(enable_web_access)
input_item['optional'] = True
elif 'network' in input_item.values():
input_item['default'] = network
input_item['optional'] = True
elif 'reserved_ip_ranges' in input_item.values():
input_item['default'] = json.dumps(
reserved_ip_ranges) if reserved_ip_ranges else '[]'
input_item['optional'] = True
elif 'nfs_mounts' in input_item.values():
input_item['default'] = json.dumps(nfs_mounts) if nfs_mounts else '{}'
input_item['optional'] = True
elif 'base_output_directory' in input_item.values():
input_item['default'] = base_output_directory
input_item['optional'] = True
elif 'labels' in input_item.values():
input_item['default'] = json.dumps(labels) if labels else '{}'
input_item['optional'] = True
elif 'encryption_spec_key_name' in input_item.values():
input_item['default'] = encryption_spec_key_name
input_item['optional'] = True
else:
# This field does not need to be updated.
continue

custom_job_component_spec = structures.ComponentSpec(
name=component_spec.component_spec.name,
inputs=input_specs + [
structures.InputSpec(
name='base_output_directory',
type='String',
optional=True,
default=base_output_directory),
structures.InputSpec(
name='tensorboard',
type='String',
optional=True,
default=tensorboard),
structures.InputSpec(
name='network', type='String', optional=True, default=network),
structures.InputSpec(
name='service_account',
type='String',
optional=True,
default=service_account),
structures.InputSpec(name='project', type='String'),
structures.InputSpec(name='location', type='String')
],
outputs=output_specs +
[structures.OutputSpec(name='gcp_resources', type='String')],
implementation=structures.ContainerImplementation(
container=structures.ContainerSpec(
image=_DEFAULT_CUSTOM_JOB_CONTAINER_IMAGE,
command=[
'python3', '-u', '-m',
'google_cloud_pipeline_components.container.v1.gcp_launcher.launcher'
],
args=[
'--type',
'CustomJob',
'--payload',
json.dumps(custom_job_payload),
'--project',
structures.InputValuePlaceholder(input_name='project'),
'--location',
structures.InputValuePlaceholder(input_name='location'),
'--gcp_resources',
structures.OutputPathPlaceholder(output_name='gcp_resources'),
],
)))
# Copying over the input and output spec from the given component.
for input_spec in input_specs:
custom_training_job_json['inputs'].append(input_spec.to_dict())

# pytype: enable=attribute-error
for output_spec in output_specs:
custom_training_job_json['outputs'].append(output_spec.to_dict())

# Copy the component name and description
custom_training_job_json['name'] = component_spec.component_spec.name

if component_spec.component_spec.description:
# TODO(chavoshi) Add support for docstring parsing.
component_description = 'A custom job that wraps '
component_description += f'{component_spec.component_spec.name}.\n\nOrigional component'
component_description += f' description:\n{component_spec.component_spec.description}\n\nCustom'
component_description += ' Job wrapper description:\n'
component_description += custom_training_job_json['description']
custom_training_job_json['description'] = component_description

component_path = tempfile.mktemp()
custom_job_component_spec.save(component_path)
with open(component_path, 'w') as out_file:
yaml.dump(custom_training_job_json, out_file)

return components.load_component_from_file(component_path)

This file was deleted.

Loading

0 comments on commit 34b5d0b

Please sign in to comment.