Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added the PipelineJob.from_pipeline_func method #1415

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import logging
import time
import re
from typing import Any, Dict, List, Optional, Union
import tempfile
from typing import Any, Callable, Dict, List, Optional, Union

from google.auth import credentials as auth_credentials
from google.cloud import aiplatform
Expand All @@ -33,6 +34,7 @@
from google.cloud.aiplatform.metadata import constants as metadata_constants
from google.cloud.aiplatform.metadata import experiment_resources
from google.cloud.aiplatform.metadata import utils as metadata_utils
from google.cloud.aiplatform.utils import gcs_utils
from google.cloud.aiplatform.utils import yaml_utils
from google.cloud.aiplatform.utils import pipeline_utils
from google.protobuf import json_format
Expand Down Expand Up @@ -131,7 +133,9 @@ def __init__(
Optional. The unique ID of the job run.
If not specified, pipeline name + timestamp will be used.
pipeline_root (str):
Optional. The root of the pipeline outputs. Default to be staging bucket.
Optional. The root of the pipeline outputs. If not set, the staging bucket
set in aiplatform.init will be used. If that's not set a pipeline-specific
artifacts bucket will be used.
parameter_values (Dict[str, Any]):
Optional. The mapping from runtime parameter names to its values that
control the pipeline run.
Expand Down Expand Up @@ -219,6 +223,13 @@ def __init__(
or pipeline_job["pipelineSpec"].get("defaultPipelineRoot")
or initializer.global_config.staging_bucket
)
pipeline_root = (
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved
pipeline_root
or gcs_utils.generate_gcs_directory_for_pipeline_artifacts(
project=project,
location=location,
)
)
builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json(
pipeline_job
)
Expand Down Expand Up @@ -332,6 +343,13 @@ def submit(
if network:
self._gca_resource.network = network

gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
output_artifacts_gcs_dir=self.pipeline_spec.get("gcsOutputDirectory"),
service_account=self._gca_resource.service_account,
project=self.project,
location=self.location,
)

# Prevents logs from being supressed on TFX pipelines
if self._gca_resource.pipeline_spec.get("sdkVersion", "").startswith("tfx"):
_LOGGER.setLevel(logging.INFO)
Expand Down Expand Up @@ -772,6 +790,125 @@ def clone(

return cloned

@staticmethod
def from_pipeline_func(
# Parameters for the PipelineJob constructor
pipeline_func: Callable,
parameter_values: Optional[Dict[str, Any]] = None,
output_artifacts_gcs_dir: Optional[str] = None,
enable_caching: Optional[bool] = None,
context_name: Optional[str] = "pipeline",
display_name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
job_id: Optional[str] = None,
# Parameters for the Vertex SDK
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
encryption_spec_key_name: Optional[str] = None,
) -> "PipelineJob":
"""Creates PipelineJob by compiling a pipeline function.

Args:
pipeline_func (Callable):
Required. A pipeline function to compile.
A pipeline function creates instances of components and connects
component inputs to outputs.
parameter_values (Dict[str, Any]):
Optional. The mapping from runtime parameter names to its values that
control the pipeline run.
output_artifacts_gcs_dir (str):
Optional. The GCS location of the pipeline outputs.
A GCS bucket for artifacts will be created if not specified.
enable_caching (bool):
Optional. Whether to turn on caching for the run.

If this is not set, defaults to the compile time settings, which
are True for all tasks by default, while users may specify
different caching options for individual tasks.

If this is set, the setting applies to all tasks in the pipeline.

Overrides the compile time settings.
context_name (str):
Optional. The name of metadata context. Used for cached execution reuse.
display_name (str):
Optional. The user-defined name of this Pipeline.
labels (Dict[str, str]):
Optional. The user defined metadata to organize PipelineJob.
job_id (str):
Optional. The unique ID of the job run.
If not specified, pipeline name + timestamp will be used.

project (str):
Optional. The project that you want to run this PipelineJob in. If not set,
the project set in aiplatform.init will be used.
location (str):
Optional. Location to create PipelineJob. If not set,
location set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to create this PipelineJob.
Overrides credentials set in aiplatform.init.
encryption_spec_key_name (str):
Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the job. Has the
form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``.
The key needs to be in the same region as where the compute
resource is created.

If this is set, then all
resources created by the PipelineJob will
be encrypted with the provided encryption key.

Overrides encryption_spec_key_name set in aiplatform.init.

Returns:
A Vertex AI PipelineJob.

Raises:
ValueError: If job_id or labels have incorrect format.
"""

# Importing the KFP module here to prevent import errors when the kfp package is not installed.
try:
from kfp.v2 import compiler as compiler_v2
except ImportError as err:
raise RuntimeError(
"Cannot import the kfp.v2.compiler module. Please install or update the kfp package."
) from err

automatic_display_name = " ".join(
[
pipeline_func.__name__.replace("_", " "),
datetime.datetime.now().isoformat(sep=" "),
]
)
display_name = display_name or automatic_display_name
job_id = job_id or re.sub(
r"[^-a-z0-9]", "-", automatic_display_name.lower()
).strip("-")
pipeline_file = tempfile.mktemp(suffix=".json")
compiler_v2.Compiler().compile(
pipeline_func=pipeline_func,
pipeline_name=context_name,
package_path=pipeline_file,
)
pipeline_job = PipelineJob(
template_path=pipeline_file,
parameter_values=parameter_values,
pipeline_root=output_artifacts_gcs_dir,
enable_caching=enable_caching,
display_name=display_name,
job_id=job_id,
labels=labels,
project=project,
location=location,
credentials=credentials,
encryption_spec_key_name=encryption_spec_key_name,
)
return pipeline_job

def get_associated_experiment(self) -> Optional["aiplatform.Experiment"]:
"""Gets the aiplatform.Experiment associated with this PipelineJob,
or None if this PipelineJob is not associated with an experiment.
Expand Down
104 changes: 102 additions & 2 deletions google/cloud/aiplatform/utils/gcs_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
from google.cloud import storage
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved

from google.cloud.aiplatform import initializer

from google.cloud.aiplatform.utils import resource_manager_utils

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -163,3 +163,103 @@ def stage_local_data_in_gcs(
)

return staged_data_uri


def generate_gcs_directory_for_pipeline_artifacts(
project: Optional[str] = None,
location: Optional[str] = None,
):
"""Gets or creates the GCS directory for Vertex Pipelines artifacts.

Args:
service_account: Optional. Google Cloud service account that will be used
to run the pipelines. If this function creates a new bucket it will give
permission to the specified service account to access the bucket.
If not provided, the Google Cloud Compute Engine service account will be used.
project: Optional. Google Cloud Project that contains the staging bucket.
location: Optional. Google Cloud location to use for the staging bucket.

Returns:
Google Cloud Storage URI of the staged data.
"""
project = project or initializer.global_config.project
location = location or initializer.global_config.location

pipelines_bucket_name = project + "-vertex-pipelines-" + location
output_artifacts_gcs_dir = "gs://" + pipelines_bucket_name + "/output_artifacts/"
return output_artifacts_gcs_dir


def create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
output_artifacts_gcs_dir: Optional[str] = None,
service_account: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
):
Ark-kun marked this conversation as resolved.
Show resolved Hide resolved
"""Gets or creates the GCS directory for Vertex Pipelines artifacts.

Args:
output_artifacts_gcs_dir: Optional. The GCS location for the pipeline outputs.
It will be generated if not specified.
service_account: Optional. Google Cloud service account that will be used
to run the pipelines. If this function creates a new bucket it will give
permission to the specified service account to access the bucket.
If not provided, the Google Cloud Compute Engine service account will be used.
project: Optional. Google Cloud Project that contains the staging bucket.
location: Optional. Google Cloud location to use for the staging bucket.
credentials: The custom credentials to use when making API calls.
If not provided, default credentials will be used.

Returns:
Google Cloud Storage URI of the staged data.
"""
project = project or initializer.global_config.project
location = location or initializer.global_config.location
credentials = credentials or initializer.global_config.credentials

output_artifacts_gcs_dir = (
output_artifacts_gcs_dir
or generate_gcs_directory_for_pipeline_artifacts(
project=project,
location=location,
)
)

# Creating the bucket if needed
storage_client = storage.Client(
project=project,
credentials=credentials,
)

pipelines_bucket = storage.Blob.from_string(
uri=output_artifacts_gcs_dir,
client=storage_client,
).bucket

if not pipelines_bucket.exists():
_logger.info(
f'Creating GCS bucket for Vertex Pipelines: "{pipelines_bucket.name}"'
)
pipelines_bucket = storage_client.create_bucket(
bucket_or_name=pipelines_bucket,
project=project,
location=location,
)
# Giving the service account read and write access to the new bucket
# Workaround for error: "Failed to create pipeline job. Error: Service account `[email protected]`
# does not have `[storage.objects.get, storage.objects.create]` IAM permission(s) to the bucket `xxxxxxxx-vertex-pipelines-us-central1`.
# Please either copy the files to the Google Cloud Storage bucket owned by your project, or grant the required IAM permission(s) to the service account."
if not service_account:
# Getting the project number to use in service account
project_number = resource_manager_utils.get_project_number(project)
service_account = f"{project_number}[email protected]"
bucket_iam_policy = pipelines_bucket.get_iam_policy()
bucket_iam_policy.setdefault("roles/storage.objectCreator", set()).add(
f"serviceAccount:{service_account}"
)
bucket_iam_policy.setdefault("roles/storage.objectViewer", set()).add(
f"serviceAccount:{service_account}"
)
pipelines_bucket.set_iam_policy(bucket_iam_policy)
return output_artifacts_gcs_dir
27 changes: 27 additions & 0 deletions google/cloud/aiplatform/utils/resource_manager_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,30 @@ def get_project_id(
project = projects_client.get_project(name=f"projects/{project_number}")

return project.project_id


def get_project_number(
project_id: str,
credentials: Optional[auth_credentials.Credentials] = None,
) -> str:
"""Gets project ID given the project number

Args:
project_id (str):
Required. Google Cloud project unique ID.
credentials: The custom credentials to use when making API calls.
Optional. If not provided, default credentials will be used.

Returns:
str - The automatically generated unique numerical identifier for your GCP project.

"""

credentials = credentials or initializer.global_config.credentials

projects_client = resourcemanager.ProjectsClient(credentials=credentials)

project = projects_client.get_project(name=f"projects/{project_id}")
project_number = project.name.split("/", 1)[1]

return project_number
61 changes: 61 additions & 0 deletions tests/system/aiplatform/test_pipeline_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest

from google.cloud import aiplatform
from tests.system.aiplatform import e2e_base


@pytest.mark.usefixtures("tear_down_resources")
class TestPipelineJob(e2e_base.TestEndToEnd):

_temp_prefix = "tmpvrtxsdk-e2e"

def test_add_pipeline_job_to_experiment(self, shared_state):
from kfp import components

# Components:
def train(
number_of_epochs: int,
learning_rate: float,
):
print(f"number_of_epochs={number_of_epochs}")
print(f"learning_rate={learning_rate}")

train_op = components.create_component_from_func(train)

# Pipeline:
def training_pipeline(number_of_epochs: int = 10):
train_op(
number_of_epochs=number_of_epochs,
learning_rate="0.1",
)

# Submitting the pipeline:
aiplatform.init(
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
)
job = aiplatform.PipelineJob.from_pipeline_func(
pipeline_func=training_pipeline,
)
job.submit()

shared_state.setdefault("resources", []).append(job)

job.wait()
Loading