diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 1aa259ea0a..e4d93d1e92 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -19,7 +19,8 @@ import logging import time import re -from typing import Any, Dict, List, Optional, Union +import tempfile +from typing import Any, Callable, Dict, List, Optional, Union from google.auth import credentials as auth_credentials from google.cloud import aiplatform @@ -33,6 +34,7 @@ from google.cloud.aiplatform.metadata import constants as metadata_constants from google.cloud.aiplatform.metadata import experiment_resources from google.cloud.aiplatform.metadata import utils as metadata_utils +from google.cloud.aiplatform.utils import gcs_utils from google.cloud.aiplatform.utils import yaml_utils from google.cloud.aiplatform.utils import pipeline_utils from google.protobuf import json_format @@ -131,7 +133,9 @@ def __init__( Optional. The unique ID of the job run. If not specified, pipeline name + timestamp will be used. pipeline_root (str): - Optional. The root of the pipeline outputs. Default to be staging bucket. + Optional. The root of the pipeline outputs. If not set, the staging bucket + set in aiplatform.init will be used. If that's not set a pipeline-specific + artifacts bucket will be used. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter names to its values that control the pipeline run. @@ -219,6 +223,13 @@ def __init__( or pipeline_job["pipelineSpec"].get("defaultPipelineRoot") or initializer.global_config.staging_bucket ) + pipeline_root = ( + pipeline_root + or gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( pipeline_job ) @@ -332,6 +343,13 @@ def submit( if network: self._gca_resource.network = network + gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir=self.pipeline_spec.get("gcsOutputDirectory"), + service_account=self._gca_resource.service_account, + project=self.project, + location=self.location, + ) + # Prevents logs from being supressed on TFX pipelines if self._gca_resource.pipeline_spec.get("sdkVersion", "").startswith("tfx"): _LOGGER.setLevel(logging.INFO) @@ -772,6 +790,125 @@ def clone( return cloned + @staticmethod + def from_pipeline_func( + # Parameters for the PipelineJob constructor + pipeline_func: Callable, + parameter_values: Optional[Dict[str, Any]] = None, + output_artifacts_gcs_dir: Optional[str] = None, + enable_caching: Optional[bool] = None, + context_name: Optional[str] = "pipeline", + display_name: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + job_id: Optional[str] = None, + # Parameters for the Vertex SDK + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + ) -> "PipelineJob": + """Creates PipelineJob by compiling a pipeline function. + + Args: + pipeline_func (Callable): + Required. A pipeline function to compile. + A pipeline function creates instances of components and connects + component inputs to outputs. + parameter_values (Dict[str, Any]): + Optional. The mapping from runtime parameter names to its values that + control the pipeline run. + output_artifacts_gcs_dir (str): + Optional. The GCS location of the pipeline outputs. + A GCS bucket for artifacts will be created if not specified. + enable_caching (bool): + Optional. Whether to turn on caching for the run. + + If this is not set, defaults to the compile time settings, which + are True for all tasks by default, while users may specify + different caching options for individual tasks. + + If this is set, the setting applies to all tasks in the pipeline. + + Overrides the compile time settings. + context_name (str): + Optional. The name of metadata context. Used for cached execution reuse. + display_name (str): + Optional. The user-defined name of this Pipeline. + labels (Dict[str, str]): + Optional. The user defined metadata to organize PipelineJob. + job_id (str): + Optional. The unique ID of the job run. + If not specified, pipeline name + timestamp will be used. + + project (str): + Optional. The project that you want to run this PipelineJob in. If not set, + the project set in aiplatform.init will be used. + location (str): + Optional. Location to create PipelineJob. If not set, + location set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create this PipelineJob. + Overrides credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the job. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If this is set, then all + resources created by the PipelineJob will + be encrypted with the provided encryption key. + + Overrides encryption_spec_key_name set in aiplatform.init. + + Returns: + A Vertex AI PipelineJob. + + Raises: + ValueError: If job_id or labels have incorrect format. + """ + + # Importing the KFP module here to prevent import errors when the kfp package is not installed. + try: + from kfp.v2 import compiler as compiler_v2 + except ImportError as err: + raise RuntimeError( + "Cannot import the kfp.v2.compiler module. Please install or update the kfp package." + ) from err + + automatic_display_name = " ".join( + [ + pipeline_func.__name__.replace("_", " "), + datetime.datetime.now().isoformat(sep=" "), + ] + ) + display_name = display_name or automatic_display_name + job_id = job_id or re.sub( + r"[^-a-z0-9]", "-", automatic_display_name.lower() + ).strip("-") + pipeline_file = tempfile.mktemp(suffix=".json") + compiler_v2.Compiler().compile( + pipeline_func=pipeline_func, + pipeline_name=context_name, + package_path=pipeline_file, + ) + pipeline_job = PipelineJob( + template_path=pipeline_file, + parameter_values=parameter_values, + pipeline_root=output_artifacts_gcs_dir, + enable_caching=enable_caching, + display_name=display_name, + job_id=job_id, + labels=labels, + project=project, + location=location, + credentials=credentials, + encryption_spec_key_name=encryption_spec_key_name, + ) + return pipeline_job + def get_associated_experiment(self) -> Optional["aiplatform.Experiment"]: """Gets the aiplatform.Experiment associated with this PipelineJob, or None if this PipelineJob is not associated with an experiment. diff --git a/google/cloud/aiplatform/utils/gcs_utils.py b/google/cloud/aiplatform/utils/gcs_utils.py index 855b7991f1..6079d7908d 100644 --- a/google/cloud/aiplatform/utils/gcs_utils.py +++ b/google/cloud/aiplatform/utils/gcs_utils.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ from google.cloud import storage from google.cloud.aiplatform import initializer - +from google.cloud.aiplatform.utils import resource_manager_utils _logger = logging.getLogger(__name__) @@ -163,3 +163,103 @@ def stage_local_data_in_gcs( ) return staged_data_uri + + +def generate_gcs_directory_for_pipeline_artifacts( + project: Optional[str] = None, + location: Optional[str] = None, +): + """Gets or creates the GCS directory for Vertex Pipelines artifacts. + + Args: + service_account: Optional. Google Cloud service account that will be used + to run the pipelines. If this function creates a new bucket it will give + permission to the specified service account to access the bucket. + If not provided, the Google Cloud Compute Engine service account will be used. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + + Returns: + Google Cloud Storage URI of the staged data. + """ + project = project or initializer.global_config.project + location = location or initializer.global_config.location + + pipelines_bucket_name = project + "-vertex-pipelines-" + location + output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/" + return output_artifacts_gcs_dir + + +def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir: Optional[str] = None, + service_account: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, +): + """Gets or creates the GCS directory for Vertex Pipelines artifacts. + + Args: + output_artifacts_gcs_dir: Optional. The GCS location for the pipeline outputs. + It will be generated if not specified. + service_account: Optional. Google Cloud service account that will be used + to run the pipelines. If this function creates a new bucket it will give + permission to the specified service account to access the bucket. + If not provided, the Google Cloud Compute Engine service account will be used. + project: Optional. Google Cloud Project that contains the staging bucket. + location: Optional. Google Cloud location to use for the staging bucket. + credentials: The custom credentials to use when making API calls. + If not provided, default credentials will be used. + + Returns: + Google Cloud Storage URI of the staged data. + """ + project = project or initializer.global_config.project + location = location or initializer.global_config.location + credentials = credentials or initializer.global_config.credentials + + output_artifacts_gcs_dir = ( + output_artifacts_gcs_dir + or generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) + + # Creating the bucket if needed + storage_client = storage.Client( + project=project, + credentials=credentials, + ) + + pipelines_bucket = storage.Blob.from_string( + uri=output_artifacts_gcs_dir, + client=storage_client, + ).bucket + + if not pipelines_bucket.exists(): + _logger.info( + f'Creating GCS bucket for Vertex Pipelines: "{pipelines_bucket.name}"' + ) + pipelines_bucket = storage_client.create_bucket( + bucket_or_name=pipelines_bucket, + project=project, + location=location, + ) + # Giving the service account read and write access to the new bucket + # Workaround for error: "Failed to create pipeline job. Error: Service account `NNNNNNNN-compute@developer.gserviceaccount.com` + # does not have `[storage.objects.get, storage.objects.create]` IAM permission(s) to the bucket `xxxxxxxx-vertex-pipelines-us-central1`. + # Please either copy the files to the Google Cloud Storage bucket owned by your project, or grant the required IAM permission(s) to the service account." + if not service_account: + # Getting the project number to use in service account + project_number = resource_manager_utils.get_project_number(project) + service_account = f"{project_number}-compute@developer.gserviceaccount.com" + bucket_iam_policy = pipelines_bucket.get_iam_policy() + bucket_iam_policy.setdefault("roles/storage.objectCreator", set()).add( + f"serviceAccount:{service_account}" + ) + bucket_iam_policy.setdefault("roles/storage.objectViewer", set()).add( + f"serviceAccount:{service_account}" + ) + pipelines_bucket.set_iam_policy(bucket_iam_policy) + return output_artifacts_gcs_dir diff --git a/google/cloud/aiplatform/utils/resource_manager_utils.py b/google/cloud/aiplatform/utils/resource_manager_utils.py index f918c766bf..e6cbc0988b 100644 --- a/google/cloud/aiplatform/utils/resource_manager_utils.py +++ b/google/cloud/aiplatform/utils/resource_manager_utils.py @@ -48,3 +48,30 @@ def get_project_id( project = projects_client.get_project(name=f"projects/{project_number}") return project.project_id + + +def get_project_number( + project_id: str, + credentials: Optional[auth_credentials.Credentials] = None, +) -> str: + """Gets project ID given the project number + + Args: + project_id (str): + Required. Google Cloud project unique ID. + credentials: The custom credentials to use when making API calls. + Optional. If not provided, default credentials will be used. + + Returns: + str - The automatically generated unique numerical identifier for your GCP project. + + """ + + credentials = credentials or initializer.global_config.credentials + + projects_client = resourcemanager.ProjectsClient(credentials=credentials) + + project = projects_client.get_project(name=f"projects/{project_id}") + project_number = project.name.split("/", 1)[1] + + return project_number diff --git a/tests/system/aiplatform/test_pipeline_job.py b/tests/system/aiplatform/test_pipeline_job.py new file mode 100644 index 0000000000..004ad768a3 --- /dev/null +++ b/tests/system/aiplatform/test_pipeline_job.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +from google.cloud import aiplatform +from tests.system.aiplatform import e2e_base + + +@pytest.mark.usefixtures("tear_down_resources") +class TestPipelineJob(e2e_base.TestEndToEnd): + + _temp_prefix = "tmpvrtxsdk-e2e" + + def test_add_pipeline_job_to_experiment(self, shared_state): + from kfp import components + + # Components: + def train( + number_of_epochs: int, + learning_rate: float, + ): + print(f"number_of_epochs={number_of_epochs}") + print(f"learning_rate={learning_rate}") + + train_op = components.create_component_from_func(train) + + # Pipeline: + def training_pipeline(number_of_epochs: int = 10): + train_op( + number_of_epochs=number_of_epochs, + learning_rate="0.1", + ) + + # Submitting the pipeline: + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + ) + job = aiplatform.PipelineJob.from_pipeline_func( + pipeline_func=training_pipeline, + ) + job.submit() + + shared_state.setdefault("resources", []).append(job) + + job.wait() diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index eaea8e720e..63c09ac799 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -35,6 +35,7 @@ from google.cloud.aiplatform_v1 import MetadataServiceClient from google.cloud.aiplatform import pipeline_jobs from google.cloud.aiplatform.compat.types import pipeline_failure_policy +from google.cloud.aiplatform.utils import gcs_utils from google.cloud import storage from google.protobuf import json_format @@ -224,6 +225,31 @@ def mock_pipeline_service_create(): yield mock_create_pipeline_job +@pytest.fixture +def mock_pipeline_bucket_exists(): + def mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist( + output_artifacts_gcs_dir=None, + service_account=None, + project=None, + location=None, + credentials=None, + ): + output_artifacts_gcs_dir = ( + output_artifacts_gcs_dir + or gcs_utils.generate_gcs_directory_for_pipeline_artifacts( + project=project, + location=location, + ) + ) + return output_artifacts_gcs_dir + + with mock.patch( + "google.cloud.aiplatform.utils.gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist", + new=mock_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist, + ) as mock_context: + yield mock_context + + def make_pipeline_job(state): return gca_pipeline_job.PipelineJob( name=_TEST_PIPELINE_JOB_NAME, @@ -426,6 +452,7 @@ def test_run_call_pipeline_service_create( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -503,6 +530,7 @@ def test_run_call_pipeline_service_create_artifact_registry( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, mock_request_urlopen, job_spec, mock_load_yaml_and_json, @@ -586,6 +614,7 @@ def test_run_call_pipeline_service_create_with_timeout( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -667,6 +696,7 @@ def test_run_call_pipeline_service_create_with_timeout_not_explicitly_set( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -748,6 +778,7 @@ def test_run_call_pipeline_service_create_with_failure_policy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, failure_policy, @@ -832,6 +863,7 @@ def test_run_call_pipeline_service_create_legacy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -913,6 +945,7 @@ def test_run_call_pipeline_service_create_tfx( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, sync, @@ -990,6 +1023,7 @@ def test_submit_call_pipeline_service_pipeline_job_create( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1065,6 +1099,7 @@ def test_done_method_pipeline_service( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1103,6 +1138,7 @@ def test_submit_call_pipeline_service_pipeline_job_create_legacy( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1183,6 +1219,7 @@ def test_get_pipeline_job(self, mock_pipeline_service_get): @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1220,6 +1257,7 @@ def test_cancel_pipeline_job( @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1294,6 +1332,7 @@ def test_cancel_pipeline_job_without_running( @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get_with_fail", + "mock_pipeline_bucket_exists", ) @pytest.mark.parametrize( "job_spec", @@ -1334,6 +1373,7 @@ def test_clone_pipeline_job( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1411,6 +1451,7 @@ def test_clone_pipeline_job_with_all_args( self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1497,6 +1538,7 @@ def test_get_associated_experiment_from_pipeline_returns_none_without_experiment self, mock_pipeline_service_create, mock_pipeline_service_get, + mock_pipeline_bucket_exists, job_spec, mock_load_yaml_and_json, ): @@ -1541,6 +1583,7 @@ def test_get_associated_experiment_from_pipeline_returns_experiment( get_metadata_store_mock, mock_create_pipeline_job_with_experiment, mock_get_pipeline_job_with_experiment, + mock_pipeline_bucket_exists, ): aiplatform.init( project=_TEST_PROJECT,