From 4b063843a76b91625cd7a712812a1a1e3da6eda5 Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Fri, 15 Jul 2022 06:44:11 -0700 Subject: [PATCH] feat: Added the PipelineJob.from_pipeline_func method (#1415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The new factory method reduces the pipeline submission boilerplate to absolute minimum. ```python aiplatform.PipelineJob.from_pipeline_func(training_pipeline).submit() ``` What it does: 1. Compiles pipeline 2. Provides sensible default values for the pipeline display name, job_id, context etc. 3. Generates GCS directory for the pipeline output artifacts if needed 4. Creates the GCS bucket for the artifacts if it does not exist. (And gives the Pipelines service account the required permissions) Example usage: ```python def training_pipeline(number_of_epochs: int = 10): train_op( number_of_epochs=number_of_epochs, learning_rate="0.1", ) job = aiplatform.PipelineJob.from_pipeline_func(training_pipeline) job.submit() ``` --- Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-aiplatform/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # 🦕 --- google/cloud/aiplatform/pipeline_jobs.py | 141 +++++++++++++++++- google/cloud/aiplatform/utils/gcs_utils.py | 104 ++++++++++++- .../utils/resource_manager_utils.py | 27 ++++ tests/system/aiplatform/test_pipeline_job.py | 61 ++++++++ tests/unit/aiplatform/test_pipeline_jobs.py | 43 ++++++ 5 files changed, 372 insertions(+), 4 deletions(-) create mode 100644 tests/system/aiplatform/test_pipeline_job.py diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 1aa259ea0a..e4d93d1e92 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -19,7 +19,8 @@ import logging import time import re -from typing import Any, Dict, List, Optional, Union +import tempfile +from typing import Any, Callable, Dict, List, Optional, Union from google.auth import credentials as auth_credentials from google.cloud import aiplatform @@ -33,6 +34,7 @@ from google.cloud.aiplatform.metadata import constants as metadata_constants from google.cloud.aiplatform.metadata import experiment_resources from google.cloud.aiplatform.metadata import utils as metadata_utils +from google.cloud.aiplatform.utils import gcs_utils from google.cloud.aiplatform.utils import yaml_utils from google.cloud.aiplatform.utils import pipeline_utils from google.protobuf import json_format @@ -131,7 +133,9 @@ def __init__( Optional. The unique ID of the job run. If not specified, pipeline name + timestamp will be used. pipeline_root (str): - Optional. The root of the pipeline outputs. Default to be staging bucket. + Optional. The root of the pipeline outputs. If not set, the staging bucket + set in aiplatform.init will be used. If that's not set a pipeline-specific + artifacts bucket will be used. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter names to its values that control the pipeline run. @@ -219,6 +223,13 @@ def __init__( or pipeline_job["pipelineSpec"].get("defaultPipelineRoot") or initializer.global_config.staging_bucket ) + pipeline_root = ( + pipeline_root + or gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( pipeline_job ) @@ -332,6 +343,13 @@ def submit( if network: self._gca_resource.network = network + gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir=self.pipeline_spec.get("gcsOutputDirectory"), + service_account=self._gca_resource.service_account, + project=self.project, + location=self.location, + ) + # Prevents logs from being supressed on TFX pipelines if self._gca_resource.pipeline_spec.get("sdkVersion", "").startswith("tfx"): _LOGGER.setLevel(logging.INFO) @@ -772,6 +790,125 @@ def clone( return cloned + @staticmethod + def from_pipeline_func( + # Parameters for the PipelineJob constructor + pipeline_func: Callable, + parameter_values: Optional[Dict[str, Any]] = None, + output_artifacts_gcs_dir: Optional[str] = None, + enable_caching: Optional[bool] = None, + context_name: Optional[str] = "pipeline", + display_name: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + job_id: Optional[str] = None, + # Parameters for the Vertex SDK + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + ) -> "PipelineJob": + """Creates PipelineJob by compiling a pipeline function. + + Args: + pipeline_func (Callable): + Required. A pipeline function to compile. + A pipeline function creates instances of components and connects + component inputs to outputs. + parameter_values (Dict[str, Any]): + Optional. The mapping from runtime parameter names to its values that + control the pipeline run. + output_artifacts_gcs_dir (str): + Optional. The GCS location of the pipeline outputs. + A GCS bucket for artifacts will be created if not specified. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + + If this is not set, defaults to the compile time settings, which + are True for all tasks by default, while users may specify + different caching options for individual tasks. + + If this is set, the setting applies to all tasks in the pipeline. + + Overrides the compile time settings. + context_name (str): + Optional. The name of metadata context. Used for cached execution reuse. + display_name (str): + Optional. The user-defined name of this Pipeline. + labels (Dict[str, str]): + Optional. The user defined metadata to organize PipelineJob. + job_id (str): + Optional. The unique ID of the job run. + If not specified, pipeline name + timestamp will be used. + + project (str): + Optional. The project that you want to run this PipelineJob in. If not set, + the project set in aiplatform.init will be used. + location (str): + Optional. Location to create PipelineJob. If not set, + location set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create this PipelineJob. + Overrides credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the job. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If this is set, then all + resources created by the PipelineJob will + be encrypted with the provided encryption key. + + Overrides encryption_spec_key_name set in aiplatform.init. + + Returns: + A Vertex AI PipelineJob. + + Raises: + ValueError: If job_id or labels have incorrect format. + """ + + # Importing the KFP module here to prevent import errors when the kfp package is not installed. + try: + from kfp.v2 import compiler as compiler_v2 + except ImportError as err: + raise RuntimeError( + "Cannot import the kfp.v2.compiler module. Please install or update the kfp package." + ) from err + + automatic_display_name = " ".join( + [ + pipeline_func.__name__.replace("_", " "), + datetime.datetime.now().isoformat(sep=" "), + ] + ) + display_name = display_name or automatic_display_name + job_id = job_id or re.sub( + r"[^-a-z0-9]", "-", automatic_display_name.lower() + ).strip("-") + pipeline_file = tempfile.mktemp(suffix=".json") + compiler_v2.Compiler().compile( + pipeline_func=pipeline_func, + pipeline_name=context_name, + package_path=pipeline_file, + ) + pipeline_job = PipelineJob( + template_path=pipeline_file, + parameter_values=parameter_values, + pipeline_root=output_artifacts_gcs_dir, + enable_caching=enable_caching, + display_name=display_name, + job_id=job_id, + labels=labels, + project=project, + location=location, + credentials=credentials, + encryption_spec_key_name=encryption_spec_key_name, + ) + return pipeline_job + def get_associated_experiment(self) -> Optional["aiplatform.Experiment"]: """Gets the aiplatform.Experiment associated with this PipelineJob, or None if this PipelineJob is not associated with an experiment. diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 855b7991f1..6079d7908d 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ from google.cloud import storage from google.cloud.aiplatform import initializer - +from google.cloud.aiplatform.utils import resource_manager_utils _logger = logging.getLogger(__name__) @@ -163,3 +163,103 @@ def stage_local_data_in_gcs( ) return staged_data_uri + + +def generate_gcs_directory_for_pipeline_artifacts( + project: Optional[str] = None, + location: Optional[str] = None, +): + """Gets or creates the GCS directory for Vertex Pipelines artifacts. + + Args: + service_account: Optional. Google Cloud service account that will be used + to run the pipelines. If this function creates a new bucket it will give + permission to the specified service account to access the bucket. + If not provided, the Google Cloud Compute Engine service account will be used. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + + Returns: + Google Cloud Storage URI of the staged data. + """ + project = project or initializer.global_config.project + location = location or initializer.global_config.location + + pipelines_bucket_name = project + "-vertex-pipelines-" + location + output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/" + return output_artifacts_gcs_dir + + +def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir: Optional[str] = None, + service_account: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, +): + """Gets or creates the GCS directory for Vertex Pipelines artifacts. + + Args: + output_artifacts_gcs_dir: Optional. The GCS location for the pipeline outputs. + It will be generated if not specified. + service_account: Optional. Google Cloud service account that will be used + to run the pipelines. If this function creates a new bucket it will give + permission to the specified service account to access the bucket. + If not provided, the Google Cloud Compute Engine service account will be used. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + credentials: The custom credentials to use when making API calls. + If not provided, default credentials will be used. + + Returns: + Google Cloud Storage URI of the staged data. + """ + project = project or initializer.global_config.project + location = location or initializer.global_config.location + credentials = credentials or initializer.global_config.credentials + + output_artifacts_gcs_dir = ( + output_artifacts_gcs_dir + or generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) + + # Creating the bucket if needed + storage_client = storage.Client( + project=project, + credentials=credentials, + ) + + pipelines_bucket = storage.Blob.from_string( + uri=output_artifacts_gcs_dir, + client=storage_client, + ).bucket + + if not pipelines_bucket.exists(): + _logger.info( + f'Creating GCS bucket for Vertex Pipelines: "{pipelines_bucket.name}"' + ) + pipelines_bucket = storage_client.create_bucket( + bucket_or_name=pipelines_bucket, + project=project, + location=location, + ) + # Giving the service account read and write access to the new bucket + # Workaround for error: "Failed to create pipeline job. Error: Service account `NNNNNNNN-compute@developer.gserviceaccount.com` + # does not have `[storage.objects.get, storage.objects.create]` IAM permission(s) to the bucket `xxxxxxxx-vertex-pipelines-us-central1`. + # Please either copy the files to the Google Cloud Storage bucket owned by your project, or grant the required IAM permission(s) to the service account." + if not service_account: + # Getting the project number to use in service account + project_number = resource_manager_utils.get_project_number(project) + service_account = f"{project_number}-compute@developer.gserviceaccount.com" + bucket_iam_policy = pipelines_bucket.get_iam_policy() + bucket_iam_policy.setdefault("roles/storage.objectCreator", set()).add( + f"serviceAccount:{service_account}" + ) + bucket_iam_policy.setdefault("roles/storage.objectViewer", set()).add( + f"serviceAccount:{service_account}" + ) + pipelines_bucket.set_iam_policy(bucket_iam_policy) + return output_artifacts_gcs_dir diff --git a/google/cloud/aiplatform/utils/resource_manager_utils.py b/google/cloud/aiplatform/utils/resource_manager_utils.py index f918c766bf..e6cbc0988b 100644 --- a/google/cloud/aiplatform/utils/resource_manager_utils.py +++ b/google/cloud/aiplatform/utils/resource_manager_utils.py @@ -48,3 +48,30 @@ def get_project_id( project = projects_client.get_project(name=f"projects/{project_number}") return project.project_id + + +def get_project_number( + project_id: str, + credentials: Optional[auth_credentials.Credentials] = None, +) -> str: + """Gets project ID given the project number + + Args: + project_id (str): + Required. Google Cloud project unique ID. + credentials: The custom credentials to use when making API calls. + Optional. If not provided, default credentials will be used. + + Returns: + str - The automatically generated unique numerical identifier for your GCP project. + + """ + + credentials = credentials or initializer.global_config.credentials + + projects_client = resourcemanager.ProjectsClient(credentials=credentials) + + project = projects_client.get_project(name=f"projects/{project_id}") + project_number = project.name.split("/", 1)[1] + + return project_number diff --git a/tests/system/aiplatform/test_pipeline_job.py b/tests/system/aiplatform/test_pipeline_job.py new file mode 100644 index 0000000000..004ad768a3 --- /dev/null +++ b/tests/system/aiplatform/test_pipeline_job.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +from google.cloud import aiplatform +from tests.system.aiplatform import e2e_base + + +@pytest.mark.usefixtures("tear_down_resources") +class TestPipelineJob(e2e_base.TestEndToEnd): + + _temp_prefix = "tmpvrtxsdk-e2e" + + def test_add_pipeline_job_to_experiment(self, shared_state): + from kfp import components + + # Components: + def train( + number_of_epochs: int, + learning_rate: float, + ): + print(f"number_of_epochs={number_of_epochs}") + print(f"learning_rate={learning_rate}") + + train_op = components.create_component_from_func(train) + + # Pipeline: + def training_pipeline(number_of_epochs: int = 10): + train_op( + number_of_epochs=number_of_epochs, + learning_rate="0.1", + ) + + # Submitting the pipeline: + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + ) + job = aiplatform.PipelineJob.from_pipeline_func( + pipeline_func=training_pipeline, + ) + job.submit() + + shared_state.setdefault("resources", []).append(job) + + job.wait() diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index eaea8e720e..63c09ac799 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -35,6 +35,7 @@ from google.cloud.aiplatform_v1 import MetadataServiceClient from google.cloud.aiplatform import pipeline_jobs from google.cloud.aiplatform.compat.types import pipeline_failure_policy +from google.cloud.aiplatform.utils import gcs_utils from google.cloud import storage from google.protobuf import json_format @@ -224,6 +225,31 @@ def mock_pipeline_service_create(): yield mock_create_pipeline_job +@pytest.fixture +def mock_pipeline_bucket_exists(): + def mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir=None, + service_account=None, + project=None, + location=None, + credentials=None, + ): + output_artifacts_gcs_dir = ( + output_artifacts_gcs_dir + or gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) + return output_artifacts_gcs_dir + + with mock.patch( + "google.cloud.aiplatform.utils.gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist", + new=mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist, + ) as mock_context: + yield mock_context + + def make_pipeline_job(state): return gca_pipeline_job.PipelineJob( name=_TEST_PIPELINE_JOB_NAME, @@ -426,6 +452,7 @@ def test_run_call_pipeline_service_create( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -503,6 +530,7 @@ def test_run_call_pipeline_service_create_artifact_registry( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, mock_request_urlopen, job_spec, mock_load_yaml_and_json, @@ -586,6 +614,7 @@ def test_run_call_pipeline_service_create_with_timeout( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -667,6 +696,7 @@ def test_run_call_pipeline_service_create_with_timeout_not_explicitly_set( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -748,6 +778,7 @@ def test_run_call_pipeline_service_create_with_failure_policy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, failure_policy, @@ -832,6 +863,7 @@ def test_run_call_pipeline_service_create_legacy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -913,6 +945,7 @@ def test_run_call_pipeline_service_create_tfx( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -990,6 +1023,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1065,6 +1099,7 @@ def test_done_method_pipeline_service( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1103,6 +1138,7 @@ def test_submit_call_pipeline_service_pipeline_job_create_legacy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1183,6 +1219,7 @@ def test_get_pipeline_job(self, mock_pipeline_service_get): @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1220,6 +1257,7 @@ def test_cancel_pipeline_job( @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1294,6 +1332,7 @@ def test_cancel_pipeline_job_without_running( @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1334,6 +1373,7 @@ def test_clone_pipeline_job( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1411,6 +1451,7 @@ def test_clone_pipeline_job_with_all_args( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1497,6 +1538,7 @@ def test_get_associated_experiment_from_pipeline_returns_none_without_experiment self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1541,6 +1583,7 @@ def test_get_associated_experiment_from_pipeline_returns_experiment( get_metadata_store_mock, mock_create_pipeline_job_with_experiment, mock_get_pipeline_job_with_experiment, + mock_pipeline_bucket_exists, ): aiplatform.init( project=_TEST_PROJECT,