Skip to content

Commit

Permalink
fix(components/google-cloud): Correct custom job default values (#6638)
Browse files Browse the repository at this point in the history
* Correct custom job default values

* update default values

* Add IR test

* revert changes in forcast test file

* remove extra imports

* correct place holder

* fix default value for base_output_directory

* update encryption key and base path

* change fromat of encryption key

* change output json

* correct pipeline.json

* print output manually

* change dictSubset to dictequal in uni tests.

* remove redundant tests
  • Loading branch information
SinaChavoshi authored Sep 30, 2021
1 parent 8b67505 commit ab58885
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,26 @@
from google_cloud_pipeline_components.aiplatform import utils
from kfp.components import structures

_DEFAULT_CUSTOM_JOB_MACHINE_TYPE = 'n1-standard-4'
_DEFAULT_CUSTOM_JOB_CONTAINER_IMAGE = utils.DEFAULT_CONTAINER_IMAGE


def custom_training_job_op(
component_spec: Callable,
display_name: Optional[str] = None,
replica_count: Optional[int] = None,
machine_type: Optional[str] = None,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
boot_disk_type: Optional[str] = None,
boot_disk_size_gb: Optional[int] = None,
timeout: Optional[str] = None,
restart_job_on_worker_restart: Optional[bool] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
display_name: Optional[str] = "",
replica_count: Optional[int] = 1,
machine_type: Optional[str] = "n1-standard-4",
accelerator_type: Optional[str] = "",
accelerator_count: Optional[int] = 1,
boot_disk_type: Optional[str] = "pd-ssd",
boot_disk_size_gb: Optional[int] = 100,
timeout: Optional[str] = "",
restart_job_on_worker_restart: Optional[bool] = False,
service_account: Optional[str] = "",
network: Optional[str] = "",
worker_pool_specs: Optional[List[Mapping[str, Any]]] = None,
encryption_spec_key_name: Optional[str] = None,
tensorboard: Optional[str] = None,
base_output_directory: Optional[str] = None,
encryption_spec_key_name: Optional[str] = "",
tensorboard: Optional[str] = "",
base_output_directory: Optional[str] = "",
labels: Optional[Dict[str, str]] = None,
) -> Callable:
"""Run a pipeline task using Vertex AI custom training job.
Expand All @@ -64,7 +63,7 @@ def custom_training_job_op(
accelerator_type: Optional. The type of accelerator(s) that may be attached
to the machine as per accelerator_count. Optional.
accelerator_count: Optional. The number of accelerators to attach to the
machine.
machine. Defaults to 1 if accelerator_type is set.
boot_disk_type: Optional. Type of the boot disk (default is "pd-ssd"). Valid
values: "pd-ssd" (Persistent Disk Solid State Drive) or "pd-standard"
(Persistent Disk Hard Disk Drive).
Expand All @@ -88,10 +87,8 @@ def custom_training_job_op(
tensorboard: The name of a Vertex AI Tensorboard resource to which this
CustomJob will upload Tensorboard logs.
base_output_directory: The Cloud Storage location to store the output of
this CustomJob or HyperparameterTuningJob. For HyperparameterTuningJob,
the baseOutputDirectory of each child CustomJob backing a Trial is set
to a subdirectory of name [id][Trial.id] under its parent
HyperparameterTuningJob's baseOutputDirectory.
this CustomJob or HyperparameterTuningJob. see below for more details:
https://cloud.google.com/vertex-ai/docs/reference/rest/v1/GcsDestination
labels: Optional. The labels with user-defined metadata to organize
CustomJobs. See https://goo.gl/xmQnxf for more information.
Returns:
Expand Down Expand Up @@ -141,7 +138,7 @@ def _is_output_parameter(output_key: str) -> bool:

worker_pool_spec = {
'machine_spec': {
'machine_type': machine_type or _DEFAULT_CUSTOM_JOB_MACHINE_TYPE
'machine_type': machine_type
},
'replica_count': 1,
'container_spec': {
Expand All @@ -164,24 +161,22 @@ def _is_output_parameter(output_key: str) -> bool:
dsl_utils.resolve_cmd_lines(container_args_copy,
_is_output_parameter)
worker_pool_spec['container_spec']['args'] = container_args_copy
if accelerator_type is not None:
if accelerator_type:
worker_pool_spec['machine_spec'][
'accelerator_type'] = accelerator_type
if accelerator_count is not None:
worker_pool_spec['machine_spec'][
'accelerator_count'] = accelerator_count
if boot_disk_type is not None:
if boot_disk_type:
if 'disk_spec' not in worker_pool_spec:
worker_pool_spec['disk_spec'] = {}
worker_pool_spec['disk_spec']['boot_disk_type'] = boot_disk_type
if boot_disk_size_gb is not None:
if 'disk_spec' not in worker_pool_spec:
worker_pool_spec['disk_spec'] = {}
worker_pool_spec['disk_spec'][
'boot_disk_size_gb'] = boot_disk_size_gb

job_spec['worker_pool_specs'] = [worker_pool_spec]
if replica_count is not None and replica_count > 1:
if replica_count > 1:
additional_worker_pool_spec = copy.deepcopy(worker_pool_spec)
additional_worker_pool_spec['replica_count'] = str(replica_count -
1)
Expand All @@ -192,30 +187,40 @@ def _is_output_parameter(output_key: str) -> bool:
if labels is not None:
job_spec['labels'] = labels

if timeout is not None:
if timeout:
if 'scheduling' not in job_spec:
job_spec['scheduling'] = {}
job_spec['scheduling']['timeout'] = timeout
if restart_job_on_worker_restart is not None:
if restart_job_on_worker_restart:
if 'scheduling' not in job_spec:
job_spec['scheduling'] = {}
job_spec['scheduling'][
'restart_job_on_worker_restart'] = restart_job_on_worker_restart

if encryption_spec_key_name:
job_spec['encryption_spec'] = {}
job_spec['encryption_spec'][
'kms_key_name'] = "{{$.inputs.parameters['encryption_spec_key_name']}}"
input_specs.append(
structures.InputSpec(
name='encryption_spec_key_name',
type='String',
optional=True,
default=encryption_spec_key_name),)

# Remove any existing service_account from component input list.
input_specs[:] = [
input_spec for input_spec in input_specs
if input_spec.name not in ('service_account', 'network',
'encryption_spec_key_name', 'tensorboard',
if input_spec.name not in ('service_account', 'network', 'tensorboard',
'base_output_directory')
]
job_spec['service_account'] = "{{$.inputs.parameters['service_account}']}}"
job_spec['network'] = "{{$.inputs.parameters['network}']}}"
job_spec[
'encryption_spec_key_name'] = "{{$.inputs.parameters['encryption_spec_key_name}']}}"
job_spec['tensorboard'] = "{{$.inputs.parameters['tensorboard}']}}"
job_spec[
'base_output_directory'] = "{{$.inputs.parameters['base_output_directory}']}}"
job_spec['service_account'] = "{{$.inputs.parameters['service_account']}}"
job_spec['network'] = "{{$.inputs.parameters['network']}}"

job_spec['tensorboard'] = "{{$.inputs.parameters['tensorboard']}}"
job_spec['base_output_directory'] = {}
job_spec['base_output_directory'][
'output_uri_prefix'] = "{{$.inputs.parameters['base_output_directory']}}"
custom_job_payload = {
'display_name': display_name or component_spec.component_spec.name,
'job_spec': job_spec
Expand All @@ -234,11 +239,6 @@ def _is_output_parameter(output_key: str) -> bool:
type='String',
optional=True,
default=tensorboard),
structures.InputSpec(
name='encryption_spec_key_name',
type='String',
optional=True,
default=encryption_spec_key_name),
structures.InputSpec(
name='network', type='String', optional=True, default=network),
structures.InputSpec(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2021 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test google-cloud-pipeline-Components to ensure the compile without error."""

import json
import os
import unittest
import kfp
from kfp import components
from kfp.v2 import compiler
from google_cloud_pipeline_components.experimental.custom_job import custom_job


class CustomJobCompileTest(unittest.TestCase):

def setUp(self):
super(CustomJobCompileTest, self).setUp()
self._project = "test_project"
self._location = "us-central1"
self._test_input_string = "test_input_string"
self._package_path = "pipeline.json"
self._test_component = components.load_component_from_text(
"name: Producer\n"
"inputs:\n"
"- {name: input_text, type: String, description: 'Represents an input parameter.'}\n"
"outputs:\n"
"- {name: output_value, type: String, description: 'Represents an output paramter.'}\n"
"implementation:\n"
" container:\n"
" image: google/cloud-sdk:latest\n"
" command:\n"
" - sh\n"
" - -c\n"
" - |\n"
" set -e -x\n"
" echo '$0, this is an output parameter' | gsutil cp - '$1'\n"
" - {inputValue: input_text}\n"
" - {outputPath: output_value}\n")

def tearDown(self):
if os.path.exists(self._package_path):
os.remove(self._package_path)

def test_custom_job_op_compile(self):

custom_job_op = custom_job.custom_training_job_op(self._test_component)

@kfp.dsl.pipeline(name="training-test")
def pipeline():
custom_job_task = custom_job_op(
self._test_input_string,
project=self._project,
location=self._location)

compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path)

with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)

with open(
os.path.join(
os.path.dirname(__file__),
'../testdata/custom_job_pipeline.json')) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK version during comparision
del executor_output_json['pipelineSpec']['sdkVersion']
self.assertEqual(executor_output_json, expected_executor_output_json)
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{
"pipelineSpec": {
"components": {
"comp-producer": {
"executorLabel": "exec-producer",
"inputDefinitions": {
"parameters": {
"base_output_directory": {
"type": "STRING"
},
"input_text": {
"type": "STRING"
},
"location": {
"type": "STRING"
},
"network": {
"type": "STRING"
},
"project": {
"type": "STRING"
},
"service_account": {
"type": "STRING"
},
"tensorboard": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"parameters": {
"gcp_resources": {
"type": "STRING"
},
"output_value": {
"type": "STRING"
}
}
}
}
},
"deploymentSpec": {
"executors": {
"exec-producer": {
"container": {
"args": [
"--type",
"CustomJob",
"--payload",
"{\"display_name\": \"Producer\", \"job_spec\": {\"worker_pool_specs\": [{\"machine_spec\": {\"machine_type\": \"n1-standard-4\"}, \"replica_count\": 1, \"container_spec\": {\"image_uri\": \"google/cloud-sdk:latest\", \"command\": [\"sh\", \"-c\", \"set -e -x\\necho '$0, this is an output parameter' | gsutil cp - '$1'\\n\", \"{{$.inputs.parameters['input_text']}}\", \"{{$.outputs.parameters['output_value'].output_file}}\"]}, \"disk_spec\": {\"boot_disk_type\": \"pd-ssd\", \"boot_disk_size_gb\": 100}}], \"service_account\": \"{{$.inputs.parameters['service_account']}}\", \"network\": \"{{$.inputs.parameters['network']}}\", \"tensorboard\": \"{{$.inputs.parameters['tensorboard']}}\", \"base_output_directory\": {\"output_uri_prefix\": \"{{$.inputs.parameters['base_output_directory']}}\"}}}",
"--project",
"{{$.inputs.parameters['project']}}",
"--location",
"{{$.inputs.parameters['location']}}",
"--gcp_resources",
"{{$.outputs.parameters['gcp_resources'].output_file}}"
],
"command": [
"python3",
"-u",
"-m",
"google_cloud_pipeline_components.container.experimental.gcp_launcher.launcher"
],
"image": "gcr.io/ml-pipeline/google-cloud-pipeline-components:latest"
}
}
}
},
"pipelineInfo": {
"name": "training-test"
},
"root": {
"dag": {
"tasks": {
"producer": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-producer"
},
"inputs": {
"parameters": {
"base_output_directory": {
"runtimeValue": {
"constantValue": {
"stringValue": ""
}
}
},
"input_text": {
"runtimeValue": {
"constantValue": {
"stringValue": "test_input_string"
}
}
},
"location": {
"runtimeValue": {
"constantValue": {
"stringValue": "us-central1"
}
}
},
"network": {
"runtimeValue": {
"constantValue": {
"stringValue": ""
}
}
},
"project": {
"runtimeValue": {
"constantValue": {
"stringValue": "test_project"
}
}
},
"service_account": {
"runtimeValue": {
"constantValue": {
"stringValue": ""
}
}
},
"tensorboard": {
"runtimeValue": {
"constantValue": {
"stringValue": ""
}
}
}
}
},
"taskInfo": {
"name": "producer"
}
}
}
}
},
"schemaVersion": "2.0.0"
},
"runtimeConfig": {}
}
Loading

0 comments on commit ab58885

Please sign in to comment.