From 16010a8b361488424ad7eb8d362f213e2938b104 Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Tue, 3 May 2022 13:34:38 -0700 Subject: [PATCH 1/8] Add batch_size kwarg for batch prediction jobs --- google/cloud/aiplatform/jobs.py | 18 +++++++- google/cloud/aiplatform/models.py | 9 ++++ tests/unit/aiplatform/test_jobs.py | 6 +++ tests/unit/aiplatform/test_models.py | 69 +++++++++++++++------------- 4 files changed, 68 insertions(+), 34 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index fc4f829882..22106e0ec3 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -40,6 +40,7 @@ job_state as gca_job_state, hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, machine_resources as gca_machine_resources_compat, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compact, study as gca_study_compat, ) from google.cloud.aiplatform.constants import base as constants @@ -364,6 +365,7 @@ def create( accelerator_count: Optional[int] = None, starting_replica_count: Optional[int] = None, max_replica_count: Optional[int] = None, + batch_size: Optional[int] = None, generate_explanation: Optional[bool] = False, explanation_metadata: Optional["aiplatform.explain.ExplanationMetadata"] = None, explanation_parameters: Optional[ @@ -477,6 +479,13 @@ def create( The maximum number of machine replicas the batch operation may be scaled to. Only used if `machine_type` is set. Default is 10. + batch_size (Optional[int]): + The number of the records (e.g. instances) of the operation given in each batch + to a machine replica. Machine type, and size of a single record should be considered + when setting this parameter, higher value speeds up the batch operation's execution, + but too high value will result in a whole batch not fitting in a machine's memory, + and the whole operation will fail. + The default value is 64. generate_explanation (bool): Optional. Generate explanation along with the batch prediction results. This will cause the batch prediction output to include @@ -647,7 +656,14 @@ def create( gapic_batch_prediction_job.dedicated_resources = dedicated_resources - gapic_batch_prediction_job.manual_batch_tuning_parameters = None + manual_batch_tuning_parameters = ( + gca_manual_batch_tuning_parameters_compact.ManualBatchTuningParameters() + ) + manual_batch_tuning_parameters.batch_size = batch_size + + gapic_batch_prediction_job.manual_batch_tuning_parameters = ( + manual_batch_tuning_parameters + ) # User Labels gapic_batch_prediction_job.labels = labels diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index b15ed791bf..e8ac1def1e 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -2276,6 +2276,7 @@ def batch_predict( accelerator_count: Optional[int] = None, starting_replica_count: Optional[int] = None, max_replica_count: Optional[int] = None, + batch_size: Optional[int] = None, generate_explanation: Optional[bool] = False, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, @@ -2391,6 +2392,13 @@ def batch_predict( The maximum number of machine replicas the batch operation may be scaled to. Only used if `machine_type` is set. Default is 10. + batch_size (Optional[int]): + The number of the records (e.g. instances) of the operation given in each batch + to a machine replica. Machine type, and size of a single record should be considered + when setting this parameter, higher value speeds up the batch operation's execution, + but too high value will result in a whole batch not fitting in a machine's memory, + and the whole operation will fail. + The default value is 64. generate_explanation (bool): Optional. Generate explanation along with the batch prediction results. This will cause the batch prediction output to include @@ -2462,6 +2470,7 @@ def batch_predict( accelerator_count=accelerator_count, starting_replica_count=starting_replica_count, max_replica_count=max_replica_count, + batch_size=batch_size, generate_explanation=generate_explanation, explanation_metadata=explanation_metadata, explanation_parameters=explanation_parameters, diff --git a/tests/unit/aiplatform/test_jobs.py b/tests/unit/aiplatform/test_jobs.py index 6b52fb7902..ec75883624 100644 --- a/tests/unit/aiplatform/test_jobs.py +++ b/tests/unit/aiplatform/test_jobs.py @@ -37,6 +37,7 @@ io as gca_io_compat, job_state as gca_job_state_compat, machine_resources as gca_machine_resources_compat, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, ) from google.cloud.aiplatform_v1.services.job_service import client as job_service_client @@ -132,6 +133,7 @@ _TEST_ACCELERATOR_COUNT = 2 _TEST_STARTING_REPLICA_COUNT = 2 _TEST_MAX_REPLICA_COUNT = 12 +_TEST_BATCH_SIZE = 16 _TEST_LABEL = {"team": "experimentation", "trial_id": "x435"} @@ -674,6 +676,7 @@ def test_batch_predict_with_all_args( accelerator_count=_TEST_ACCELERATOR_COUNT, starting_replica_count=_TEST_STARTING_REPLICA_COUNT, max_replica_count=_TEST_MAX_REPLICA_COUNT, + batch_size=_TEST_BATCH_SIZE, generate_explanation=True, explanation_metadata=_TEST_EXPLANATION_METADATA, explanation_parameters=_TEST_EXPLANATION_PARAMETERS, @@ -712,6 +715,9 @@ def test_batch_predict_with_all_args( starting_replica_count=_TEST_STARTING_REPLICA_COUNT, max_replica_count=_TEST_MAX_REPLICA_COUNT, ), + manual_batch_tuning_parameters=gca_manual_batch_tuning_parameters_compat.ManualBatchTuningParameters( + batch_size=_TEST_BATCH_SIZE + ), generate_explanation=True, explanation_spec=gca_explanation_compat.ExplanationSpec( metadata=_TEST_EXPLANATION_METADATA, diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index 0e1673b84b..77a55848be 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -49,6 +49,7 @@ env_var as gca_env_var, explanation as gca_explanation, machine_resources as gca_machine_resources, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, model_service as gca_model_service, model_evaluation as gca_model_evaluation, endpoint_service as gca_endpoint_service, @@ -86,6 +87,8 @@ _TEST_STARTING_REPLICA_COUNT = 2 _TEST_MAX_REPLICA_COUNT = 12 +_TEST_BATCH_SIZE = 16 + _TEST_PIPELINE_RESOURCE_NAME = ( "projects/my-project/locations/us-central1/trainingPipeline/12345" ) @@ -1328,6 +1331,7 @@ def test_batch_predict_with_all_args(self, create_batch_prediction_job_mock, syn accelerator_count=_TEST_ACCELERATOR_COUNT, starting_replica_count=_TEST_STARTING_REPLICA_COUNT, max_replica_count=_TEST_MAX_REPLICA_COUNT, + batch_size=_TEST_BATCH_SIZE, generate_explanation=True, explanation_metadata=_TEST_EXPLANATION_METADATA, explanation_parameters=_TEST_EXPLANATION_PARAMETERS, @@ -1342,41 +1346,40 @@ def test_batch_predict_with_all_args(self, create_batch_prediction_job_mock, syn batch_prediction_job.wait() # Construct expected request - expected_gapic_batch_prediction_job = ( - gca_batch_prediction_job.BatchPredictionJob( - display_name=_TEST_BATCH_PREDICTION_DISPLAY_NAME, - model=model_service_client.ModelServiceClient.model_path( - _TEST_PROJECT, _TEST_LOCATION, _TEST_ID - ), - input_config=gca_batch_prediction_job.BatchPredictionJob.InputConfig( - instances_format="jsonl", - gcs_source=gca_io.GcsSource( - uris=[_TEST_BATCH_PREDICTION_GCS_SOURCE] - ), - ), - output_config=gca_batch_prediction_job.BatchPredictionJob.OutputConfig( - gcs_destination=gca_io.GcsDestination( - output_uri_prefix=_TEST_BATCH_PREDICTION_GCS_DEST_PREFIX - ), - predictions_format="csv", - ), - dedicated_resources=gca_machine_resources.BatchDedicatedResources( - machine_spec=gca_machine_resources.MachineSpec( - machine_type=_TEST_MACHINE_TYPE, - accelerator_type=_TEST_ACCELERATOR_TYPE, - accelerator_count=_TEST_ACCELERATOR_COUNT, - ), - starting_replica_count=_TEST_STARTING_REPLICA_COUNT, - max_replica_count=_TEST_MAX_REPLICA_COUNT, + expected_gapic_batch_prediction_job = gca_batch_prediction_job.BatchPredictionJob( + display_name=_TEST_BATCH_PREDICTION_DISPLAY_NAME, + model=model_service_client.ModelServiceClient.model_path( + _TEST_PROJECT, _TEST_LOCATION, _TEST_ID + ), + input_config=gca_batch_prediction_job.BatchPredictionJob.InputConfig( + instances_format="jsonl", + gcs_source=gca_io.GcsSource(uris=[_TEST_BATCH_PREDICTION_GCS_SOURCE]), + ), + output_config=gca_batch_prediction_job.BatchPredictionJob.OutputConfig( + gcs_destination=gca_io.GcsDestination( + output_uri_prefix=_TEST_BATCH_PREDICTION_GCS_DEST_PREFIX ), - generate_explanation=True, - explanation_spec=gca_explanation.ExplanationSpec( - metadata=_TEST_EXPLANATION_METADATA, - parameters=_TEST_EXPLANATION_PARAMETERS, + predictions_format="csv", + ), + dedicated_resources=gca_machine_resources.BatchDedicatedResources( + machine_spec=gca_machine_resources.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, ), - labels=_TEST_LABEL, - encryption_spec=_TEST_ENCRYPTION_SPEC, - ) + starting_replica_count=_TEST_STARTING_REPLICA_COUNT, + max_replica_count=_TEST_MAX_REPLICA_COUNT, + ), + manual_batch_tuning_parameters=gca_manual_batch_tuning_parameters_compat.ManualBatchTuningParameters( + batch_size=_TEST_BATCH_SIZE + ), + generate_explanation=True, + explanation_spec=gca_explanation.ExplanationSpec( + metadata=_TEST_EXPLANATION_METADATA, + parameters=_TEST_EXPLANATION_PARAMETERS, + ), + labels=_TEST_LABEL, + encryption_spec=_TEST_ENCRYPTION_SPEC, ) create_batch_prediction_job_mock.assert_called_once_with( From a7b98b341ca5afda42279e697d21dce96f7b1992 Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Wed, 4 May 2022 15:26:51 -0700 Subject: [PATCH 2/8] Fix errors Update the copyright year. Change the order of the argument. Fix the syntax error. --- google/cloud/aiplatform/jobs.py | 22 +++++++++++----------- google/cloud/aiplatform/models.py | 18 +++++++++--------- tests/unit/aiplatform/test_jobs.py | 2 +- tests/unit/aiplatform/test_models.py | 2 +- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 22106e0ec3..f714b6fe75 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ job_state as gca_job_state, hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, machine_resources as gca_machine_resources_compat, - manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compact, + manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, study as gca_study_compat, ) from google.cloud.aiplatform.constants import base as constants @@ -365,7 +365,6 @@ def create( accelerator_count: Optional[int] = None, starting_replica_count: Optional[int] = None, max_replica_count: Optional[int] = None, - batch_size: Optional[int] = None, generate_explanation: Optional[bool] = False, explanation_metadata: Optional["aiplatform.explain.ExplanationMetadata"] = None, explanation_parameters: Optional[ @@ -378,6 +377,7 @@ def create( encryption_spec_key_name: Optional[str] = None, sync: bool = True, create_request_timeout: Optional[float] = None, + batch_size: Optional[int] = None, ) -> "BatchPredictionJob": """Create a batch prediction job. @@ -479,13 +479,6 @@ def create( The maximum number of machine replicas the batch operation may be scaled to. Only used if `machine_type` is set. Default is 10. - batch_size (Optional[int]): - The number of the records (e.g. instances) of the operation given in each batch - to a machine replica. Machine type, and size of a single record should be considered - when setting this parameter, higher value speeds up the batch operation's execution, - but too high value will result in a whole batch not fitting in a machine's memory, - and the whole operation will fail. - The default value is 64. generate_explanation (bool): Optional. Generate explanation along with the batch prediction results. This will cause the batch prediction output to include @@ -543,6 +536,13 @@ def create( be immediately returned and synced when the Future has completed. create_request_timeout (float): Optional. The timeout for the create request in seconds. + batch_size (Optional[int]): + The number of the records (e.g. instances) of the operation given in each batch + to a machine replica. Machine type, and size of a single record should be considered + when setting this parameter, higher value speeds up the batch operation's execution, + but too high value will result in a whole batch not fitting in a machine's memory, + and the whole operation will fail. + The default value is 64. Returns: (jobs.BatchPredictionJob): Instantiated representation of the created batch prediction job. @@ -657,7 +657,7 @@ def create( gapic_batch_prediction_job.dedicated_resources = dedicated_resources manual_batch_tuning_parameters = ( - gca_manual_batch_tuning_parameters_compact.ManualBatchTuningParameters() + gca_manual_batch_tuning_parameters_compat.ManualBatchTuningParameters() ) manual_batch_tuning_parameters.batch_size = batch_size diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index e8ac1def1e..4619546347 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -2276,7 +2276,6 @@ def batch_predict( accelerator_count: Optional[int] = None, starting_replica_count: Optional[int] = None, max_replica_count: Optional[int] = None, - batch_size: Optional[int] = None, generate_explanation: Optional[bool] = False, explanation_metadata: Optional[explain.ExplanationMetadata] = None, explanation_parameters: Optional[explain.ExplanationParameters] = None, @@ -2285,6 +2284,7 @@ def batch_predict( encryption_spec_key_name: Optional[str] = None, sync: bool = True, create_request_timeout: Optional[float] = None, + batch_size: Optional[int] = None, ) -> jobs.BatchPredictionJob: """Creates a batch prediction job using this Model and outputs prediction results to the provided destination prefix in the specified @@ -2392,13 +2392,6 @@ def batch_predict( The maximum number of machine replicas the batch operation may be scaled to. Only used if `machine_type` is set. Default is 10. - batch_size (Optional[int]): - The number of the records (e.g. instances) of the operation given in each batch - to a machine replica. Machine type, and size of a single record should be considered - when setting this parameter, higher value speeds up the batch operation's execution, - but too high value will result in a whole batch not fitting in a machine's memory, - and the whole operation will fail. - The default value is 64. generate_explanation (bool): Optional. Generate explanation along with the batch prediction results. This will cause the batch prediction output to include @@ -2450,6 +2443,13 @@ def batch_predict( Overrides encryption_spec_key_name set in aiplatform.init. create_request_timeout (float): Optional. The timeout for the create request in seconds. + batch_size (Optional[int]): + The number of the records (e.g. instances) of the operation given in each batch + to a machine replica. Machine type, and size of a single record should be considered + when setting this parameter, higher value speeds up the batch operation's execution, + but too high value will result in a whole batch not fitting in a machine's memory, + and the whole operation will fail. + The default value is 64. Returns: (jobs.BatchPredictionJob): Instantiated representation of the created batch prediction job. diff --git a/tests/unit/aiplatform/test_jobs.py b/tests/unit/aiplatform/test_jobs.py index 548ea11561..73a4f8da0c 100644 --- a/tests/unit/aiplatform/test_jobs.py +++ b/tests/unit/aiplatform/test_jobs.py @@ -720,7 +720,6 @@ def test_batch_predict_with_all_args( accelerator_count=_TEST_ACCELERATOR_COUNT, starting_replica_count=_TEST_STARTING_REPLICA_COUNT, max_replica_count=_TEST_MAX_REPLICA_COUNT, - batch_size=_TEST_BATCH_SIZE, generate_explanation=True, explanation_metadata=_TEST_EXPLANATION_METADATA, explanation_parameters=_TEST_EXPLANATION_PARAMETERS, @@ -728,6 +727,7 @@ def test_batch_predict_with_all_args( credentials=creds, sync=sync, create_request_timeout=None, + batch_size=_TEST_BATCH_SIZE, ) batch_prediction_job.wait_for_resource_creation() diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index c58eeb8f8d..eaf63d9fdd 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -1397,7 +1397,6 @@ def test_batch_predict_with_all_args(self, create_batch_prediction_job_mock, syn accelerator_count=_TEST_ACCELERATOR_COUNT, starting_replica_count=_TEST_STARTING_REPLICA_COUNT, max_replica_count=_TEST_MAX_REPLICA_COUNT, - batch_size=_TEST_BATCH_SIZE, generate_explanation=True, explanation_metadata=_TEST_EXPLANATION_METADATA, explanation_parameters=_TEST_EXPLANATION_PARAMETERS, @@ -1406,6 +1405,7 @@ def test_batch_predict_with_all_args(self, create_batch_prediction_job_mock, syn encryption_spec_key_name=_TEST_ENCRYPTION_KEY_NAME, sync=sync, create_request_timeout=None, + batch_size=_TEST_BATCH_SIZE, ) if not sync: From f721ec57cb9585f20864ad30a76b0492835fb07a Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Tue, 10 May 2022 10:13:42 -0700 Subject: [PATCH 3/8] fix: change description layout --- google/cloud/aiplatform/jobs.py | 4 ++-- google/cloud/aiplatform/models.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index f714b6fe75..00d6f11780 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -536,8 +536,8 @@ def create( be immediately returned and synced when the Future has completed. create_request_timeout (float): Optional. The timeout for the create request in seconds. - batch_size (Optional[int]): - The number of the records (e.g. instances) of the operation given in each batch + batch_size (int): + Optional. The number of the records (e.g. instances) of the operation given in each batch to a machine replica. Machine type, and size of a single record should be considered when setting this parameter, higher value speeds up the batch operation's execution, but too high value will result in a whole batch not fitting in a machine's memory, diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 4619546347..95f3044cbe 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -2443,8 +2443,8 @@ def batch_predict( Overrides encryption_spec_key_name set in aiplatform.init. create_request_timeout (float): Optional. The timeout for the create request in seconds. - batch_size (Optional[int]): - The number of the records (e.g. instances) of the operation given in each batch + batch_size (int): + Optional. The number of the records (e.g. instances) of the operation given in each batch to a machine replica. Machine type, and size of a single record should be considered when setting this parameter, higher value speeds up the batch operation's execution, but too high value will result in a whole batch not fitting in a machine's memory, From 1a223db1f27de89d9412d7da1d1e1ac777080ef9 Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Tue, 17 May 2022 14:39:49 -0700 Subject: [PATCH 4/8] feat: add clone method to PipelineJob --- google/cloud/aiplatform/pipeline_jobs.py | 140 ++++++++++++++++- tests/unit/aiplatform/test_pipeline_jobs.py | 164 ++++++++++++++++++++ 2 files changed, 303 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 4c8a3ad806..0ada7d6d6e 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -473,3 +473,141 @@ def list( def wait_for_resource_creation(self) -> None: """Waits until resource has been created.""" self._wait_for_resource_creation() + + def clone( + self, + display_name: Optional[str] = None, + job_id: Optional[str] = None, + pipeline_root: Optional[str] = None, + parameter_values: Optional[Dict[str, Any]] = None, + enable_caching: Optional[bool] = None, + encryption_spec_key_name: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + credentials: Optional[auth_credentials.Credentials] = None, + project: Optional[str] = None, + location: Optional[str] = None, + ) -> "PipelineJob": + """Returns a new PipelineJob object with the same settings as the original one. + + Args: + display_name (str): + Optional. The user-defined name of this cloned Pipeline. + If not specified, original pipeline name will be used. + job_id (str): + Optional. The unique ID of the job run. + If not specified, "cloned" + pipeline name + timestamp will be used. + pipeline_root (str): + Optional. The root of the pipeline outputs. Default to be the same + staging bucket as original pipeline. + parameter_values (Dict[str, Any]): + Optional. The mapping from runtime parameter names to its values that + control the pipeline run. Defaults to be the same values as original + PipelineJob. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + If this is not set, defaults to be the same as original pipeline. + If this is set, the setting applies to all tasks in the pipeline. + encryption_spec_key_name (str): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the job. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute resource is created. + If this is set, then all + resources created by the PipelineJob will + be encrypted with the provided encryption key. + If not specified, encryption_spec of original PipelineJob will be used. + labels (Dict[str,str]): + Optional. The user defined metadata to organize PipelineJob. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create this PipelineJob. + Overrides credentials set in aiplatform.init. + project (str), + Optional. The project that you want to run this PipelineJob in. + If not set, the project set in original PipelineJob will be used. + location (str), + Optional. Location to create PipelineJob. + If not set, location set in original PipelineJob will be used. + """ + ## Initialize an empty PipelineJob + if not project: + project = self.project + if not location: + location = self.location + if not credentials: + credentials = self.credentials + + cloned = self.__class__._empty_constructor( + project=project, + location=location, + credentials=credentials, + ) + cloned._parent = initializer.global_config.common_location_path( + project=project, location=location + ) + + ## Get gca_resource from original PipelineJob + pipeline_job = json_format.MessageToDict(self._gca_resource._pb) + + ## Set pipeline_spec + pipeline_spec = pipeline_job['pipelineSpec'] + if 'deploymentConfig' in pipeline_spec: + del pipeline_spec['deploymentConfig'] + + ## Set caching + if enable_caching is not None: + _set_enable_caching_value(pipeline_spec, enable_caching) + + ## Set job_id + pipeline_name = pipeline_spec["pipelineInfo"]["name"] + cloned.job_id = job_id or "cloned-{pipeline_name}-{timestamp}".format( + pipeline_name=re.sub("[^-0-9a-z]+", "-", pipeline_name.lower()) + .lstrip("-") + .rstrip("-"), + timestamp=_get_current_time().strftime("%Y%m%d%H%M%S"), + ) + if not _VALID_NAME_PATTERN.match(cloned.job_id): + raise ValueError( + "Generated job ID: {} is illegal as a Vertex pipelines job ID. " + "Expecting an ID following the regex pattern " + '"[a-z][-a-z0-9]{{0,127}}"'.format(cloned.job_id) + ) + + ## Set display_name, labels and encryption_spec + if display_name: + utils.validate_display_name(display_name) + elif not display_name and "displayName" in pipeline_job: + display_name = pipeline_job["displayName"] + + if labels: + utils.validate_labels(labels) + elif not labels and "labels" in pipeline_job: + labels = pipeline_job["labels"] + + if encryption_spec_key_name or "encryptionSpec" not in pipeline_job: + encryption_spec = initializer.global_config.get_encryption_spec( + encryption_spec_key_name=encryption_spec_key_name + ) + else: + encryption_spec = pipeline_job["encryptionSpec"] + + ## Set runtime_config + builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( + pipeline_job + ) + builder.update_pipeline_root(pipeline_root) + builder.update_runtime_parameters(parameter_values) + runtime_config_dict = builder.build() + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(runtime_config_dict, runtime_config) + + ## Create gca_resource for cloned PipelineJob + cloned._gca_resource = gca_pipeline_job_v1.PipelineJob( + display_name=display_name, + pipeline_spec=pipeline_spec, + labels=labels, + runtime_config=runtime_config, + encryption_spec=encryption_spec, + ) + + return cloned diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index df5e294b03..1de5af8b13 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -1045,3 +1045,167 @@ def test_pipeline_failure_raises(self, mock_load_yaml_and_json, sync): if not sync: job.wait() + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_clone_pipeline_job( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec, + mock_load_yaml_and_json, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + ) + + cloned = job.clone(job_id="cloned-"+_TEST_PIPELINE_JOB_ID) + + cloned.submit( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + expected_runtime_config_dict = { + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, + } + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + job_spec = yaml.safe_load(job_spec) + pipeline_spec = job_spec.get("pipelineSpec") or job_spec + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id="cloned-"+_TEST_PIPELINE_JOB_ID, + timeout=None, + ) + + assert not mock_pipeline_service_get.called + + cloned.wait() + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert cloned._gca_resource == make_pipeline_job( + gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_clone_pipeline_job_with_all_args( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec, + mock_load_yaml_and_json, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + ) + + cloned = job.clone( + display_name=f"cloned-{_TEST_PIPELINE_JOB_DISPLAY_NAME}", + job_id=f"cloned-{_TEST_PIPELINE_JOB_ID}", + pipeline_root=f"cloned-{_TEST_GCS_BUCKET_NAME}", + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + credentials=_TEST_CREDENTIALS, + project=_TEST_PROJECT, + location=_TEST_LOCATION, + ) + + cloned.submit( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + expected_runtime_config_dict = { + "gcsOutputDirectory": f"cloned-{_TEST_GCS_BUCKET_NAME}", + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, + } + runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + job_spec = yaml.safe_load(job_spec) + pipeline_spec = job_spec.get("pipelineSpec") or job_spec + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + display_name=f"cloned-{_TEST_PIPELINE_JOB_DISPLAY_NAME}", + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=f"cloned-{_TEST_PIPELINE_JOB_ID}", + timeout=None, + ) + + assert not mock_pipeline_service_get.called + + cloned.wait() + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert cloned._gca_resource == make_pipeline_job( + gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + From 2aec2d85bc0dd51b1f55a3b295f1e416e65bea38 Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Tue, 17 May 2022 15:03:49 -0700 Subject: [PATCH 5/8] fix: blacken and lint --- google/cloud/aiplatform/pipeline_jobs.py | 6 +++--- tests/unit/aiplatform/test_pipeline_jobs.py | 13 ++++++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 0ada7d6d6e..e537786eec 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -550,9 +550,9 @@ def clone( pipeline_job = json_format.MessageToDict(self._gca_resource._pb) ## Set pipeline_spec - pipeline_spec = pipeline_job['pipelineSpec'] - if 'deploymentConfig' in pipeline_spec: - del pipeline_spec['deploymentConfig'] + pipeline_spec = pipeline_job["pipelineSpec"] + if "deploymentConfig" in pipeline_spec: + del pipeline_spec["deploymentConfig"] ## Set caching if enable_caching is not None: diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 1de5af8b13..7decb97132 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -1045,7 +1045,7 @@ def test_pipeline_failure_raises(self, mock_load_yaml_and_json, sync): if not sync: job.wait() - + @pytest.mark.parametrize( "job_spec", [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], @@ -1072,7 +1072,7 @@ def test_clone_pipeline_job( enable_caching=True, ) - cloned = job.clone(job_id="cloned-"+_TEST_PIPELINE_JOB_ID) + cloned = job.clone(job_id=f"cloned-{_TEST_PIPELINE_JOB_ID}") cloned.submit( service_account=_TEST_SERVICE_ACCOUNT, @@ -1107,12 +1107,12 @@ def test_clone_pipeline_job( mock_pipeline_service_create.assert_called_once_with( parent=_TEST_PARENT, pipeline_job=expected_gapic_pipeline_job, - pipeline_job_id="cloned-"+_TEST_PIPELINE_JOB_ID, + pipeline_job_id=f"cloned-{_TEST_PIPELINE_JOB_ID}", timeout=None, ) assert not mock_pipeline_service_get.called - + cloned.wait() mock_pipeline_service_get.assert_called_with( @@ -1122,7 +1122,7 @@ def test_clone_pipeline_job( assert cloned._gca_resource == make_pipeline_job( gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED ) - + @pytest.mark.parametrize( "job_spec", [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], @@ -1198,7 +1198,7 @@ def test_clone_pipeline_job_with_all_args( ) assert not mock_pipeline_service_get.called - + cloned.wait() mock_pipeline_service_get.assert_called_with( @@ -1208,4 +1208,3 @@ def test_clone_pipeline_job_with_all_args( assert cloned._gca_resource == make_pipeline_job( gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED ) - From 7c2d84fe674fb707434d0d3be1f6c31dabb743b6 Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Tue, 17 May 2022 15:49:47 -0700 Subject: [PATCH 6/8] Update pipeline_jobs.py --- google/cloud/aiplatform/pipeline_jobs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index e537786eec..aaacff033f 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -492,7 +492,7 @@ def clone( Args: display_name (str): Optional. The user-defined name of this cloned Pipeline. - If not specified, original pipeline name will be used. + If not specified, original pipeline display name will be used. job_id (str): Optional. The unique ID of the job run. If not specified, "cloned" + pipeline name + timestamp will be used. @@ -528,6 +528,12 @@ def clone( location (str), Optional. Location to create PipelineJob. If not set, location set in original PipelineJob will be used. + + Returns: + A Vertex AI PipelineJob. + + Raises: + ValueError: If job_id or labels have incorrect format. """ ## Initialize an empty PipelineJob if not project: From 429b4e05d1f2559968d6f8c1f847b803e48abcd1 Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Tue, 31 May 2022 14:33:45 -0700 Subject: [PATCH 7/8] fix: update library names --- google/cloud/aiplatform/pipeline_jobs.py | 4 ++-- tests/unit/aiplatform/test_pipeline_jobs.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 8e470c3b7f..36b8a57f25 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -602,11 +602,11 @@ def clone( builder.update_pipeline_root(pipeline_root) builder.update_runtime_parameters(parameter_values) runtime_config_dict = builder.build() - runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(runtime_config_dict, runtime_config) ## Create gca_resource for cloned PipelineJob - cloned._gca_resource = gca_pipeline_job_v1.PipelineJob( + cloned._gca_resource = gca_pipeline_job.PipelineJob( display_name=display_name, pipeline_spec=pipeline_spec, labels=labels, diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index be0418ffe3..1f6f2bb50c 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -1077,14 +1077,14 @@ def test_clone_pipeline_job( "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, } - runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) job_spec = yaml.safe_load(job_spec) pipeline_spec = job_spec.get("pipelineSpec") or job_spec # Construct expected request - expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob( display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, pipeline_spec={ "components": {}, @@ -1113,7 +1113,7 @@ def test_clone_pipeline_job( ) assert cloned._gca_resource == make_pipeline_job( - gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) @pytest.mark.parametrize( @@ -1163,14 +1163,14 @@ def test_clone_pipeline_job_with_all_args( "gcsOutputDirectory": f"cloned-{_TEST_GCS_BUCKET_NAME}", "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, } - runtime_config = gca_pipeline_job_v1.PipelineJob.RuntimeConfig()._pb + runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb json_format.ParseDict(expected_runtime_config_dict, runtime_config) job_spec = yaml.safe_load(job_spec) pipeline_spec = job_spec.get("pipelineSpec") or job_spec # Construct expected request - expected_gapic_pipeline_job = gca_pipeline_job_v1.PipelineJob( + expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob( display_name=f"cloned-{_TEST_PIPELINE_JOB_DISPLAY_NAME}", pipeline_spec={ "components": {}, @@ -1199,5 +1199,5 @@ def test_clone_pipeline_job_with_all_args( ) assert cloned._gca_resource == make_pipeline_job( - gca_pipeline_state_v1.PipelineState.PIPELINE_STATE_SUCCEEDED + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) From 2666b28dc383f9be30abbbab2814659db45f640b Mon Sep 17 00:00:00 2001 From: jaycee-li Date: Thu, 2 Jun 2022 09:52:46 -0700 Subject: [PATCH 8/8] fix: formatting error --- google/cloud/aiplatform/pipeline_jobs.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 36b8a57f25..bc50a47aa2 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -144,15 +144,15 @@ def __init__( be encrypted with the provided encryption key. Overrides encryption_spec_key_name set in aiplatform.init. - labels (Dict[str,str]): + labels (Dict[str, str]): Optional. The user defined metadata to organize PipelineJob. credentials (auth_credentials.Credentials): Optional. Custom credentials to use to create this PipelineJob. Overrides credentials set in aiplatform.init. - project (str), + project (str): Optional. The project that you want to run this PipelineJob in. If not set, the project set in aiplatform.init will be used. - location (str), + location (str): Optional. Location to create PipelineJob. If not set, location set in aiplatform.init will be used. @@ -215,9 +215,9 @@ def __init__( ) if not _VALID_NAME_PATTERN.match(self.job_id): raise ValueError( - "Generated job ID: {} is illegal as a Vertex pipelines job ID. " + f"Generated job ID: {self.job_id} is illegal as a Vertex pipelines job ID. " "Expecting an ID following the regex pattern " - '"[a-z][-a-z0-9]{{0,127}}"'.format(job_id) + f'"{_VALID_NAME_PATTERN.pattern[1:-1]}"' ) if enable_caching is not None: @@ -515,15 +515,15 @@ def clone( resources created by the PipelineJob will be encrypted with the provided encryption key. If not specified, encryption_spec of original PipelineJob will be used. - labels (Dict[str,str]): + labels (Dict[str, str]): Optional. The user defined metadata to organize PipelineJob. credentials (auth_credentials.Credentials): Optional. Custom credentials to use to create this PipelineJob. Overrides credentials set in aiplatform.init. - project (str), + project (str): Optional. The project that you want to run this PipelineJob in. If not set, the project set in original PipelineJob will be used. - location (str), + location (str): Optional. Location to create PipelineJob. If not set, location set in original PipelineJob will be used. @@ -572,9 +572,9 @@ def clone( ) if not _VALID_NAME_PATTERN.match(cloned.job_id): raise ValueError( - "Generated job ID: {} is illegal as a Vertex pipelines job ID. " + f"Generated job ID: {cloned.job_id} is illegal as a Vertex pipelines job ID. " "Expecting an ID following the regex pattern " - '"[a-z][-a-z0-9]{{0,127}}"'.format(cloned.job_id) + f'"{_VALID_NAME_PATTERN.pattern[1:-1]}"' ) ## Set display_name, labels and encryption_spec