Skip to content

Commit

Permalink
feat: Added GA support for running Custom and Hp tuning jobs on Persi…
Browse files Browse the repository at this point in the history
…stent Resources

PiperOrigin-RevId: 620975901
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Apr 1, 2024
1 parent 6aaa5d0 commit 35ecbac
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 51 deletions.
12 changes: 11 additions & 1 deletion google/cloud/aiplatform/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,7 @@ def __init__(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
staging_bucket: Optional[str] = None,
persistent_resource_id: Optional[str] = None,
):
"""Constructs a Custom Job with Worker Pool Specs.
Expand Down Expand Up @@ -1802,6 +1803,13 @@ def __init__(
staging_bucket (str):
Optional. Bucket for produced custom job artifacts. Overrides
staging_bucket set in aiplatform.init.
persistent_resource_id (str):
Optional. The ID of the PersistentResource in the same Project
and Location. If this is specified, the job will be run on
existing machines held by the PersistentResource instead of
on-demand short-live machines. The network and CMEK configs on
the job should be consistent with those on the PersistentResource,
otherwise, the job will be rejected.
Raises:
RuntimeError: If staging bucket was not set using aiplatform.init
Expand Down Expand Up @@ -1836,6 +1844,7 @@ def __init__(
base_output_directory=gca_io_compat.GcsDestination(
output_uri_prefix=base_output_dir
),
persistent_resource_id=persistent_resource_id,
),
labels=labels,
encryption_spec=initializer.global_config.get_encryption_spec(
Expand Down Expand Up @@ -2669,7 +2678,8 @@ def __init__(
of any UTF-8 characters.
custom_job (aiplatform.CustomJob):
Required. Configured CustomJob. The worker pool spec from this custom job
applies to the CustomJobs created in all the trials.
applies to the CustomJobs created in all the trials. A persistent_resource_id can be
specified on the custom job to be used when running this Hyperparameter Tuning job.
metric_spec: Dict[str, str]
Required. Dictionary representing metrics to optimize. The dictionary key is the metric_id,
which is reported by your training job, and the dictionary value is the
Expand Down
11 changes: 7 additions & 4 deletions google/cloud/aiplatform/preview/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@


class CustomJob(jobs.CustomJob):
"""Vertex AI Custom Job."""
"""Deprecated. Vertex AI Custom Job (preview)."""

def __init__(
self,
Expand All @@ -88,7 +88,9 @@ def __init__(
staging_bucket: Optional[str] = None,
persistent_resource_id: Optional[str] = None,
):
"""Constructs a Custom Job with Worker Pool Specs.
"""Deprecated. Please use the GA (non-preview) version of this class.
Constructs a Custom Job with Worker Pool Specs.
```
Example usage:
Expand Down Expand Up @@ -472,7 +474,7 @@ def submit(


class HyperparameterTuningJob(jobs.HyperparameterTuningJob):
"""Vertex AI Hyperparameter Tuning Job."""
"""Deprecated. Vertex AI Hyperparameter Tuning Job (preview)."""

def __init__(
self,
Expand All @@ -492,7 +494,8 @@ def __init__(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
):
"""
"""Deprecated. Please use the GA (non-preview) version of this class.
Configures a HyperparameterTuning Job.
Example usage:
Expand Down
46 changes: 21 additions & 25 deletions tests/unit/aiplatform/test_custom_job_persistent_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
from unittest.mock import patch

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.services import (
job_service_client_v1beta1,
)
from google.cloud.aiplatform.compat.types import custom_job_v1beta1
from google.cloud.aiplatform.compat.types import encryption_spec_v1beta1
from google.cloud.aiplatform.compat.types import io_v1beta1
from google.cloud.aiplatform.compat.types import (
job_state_v1beta1 as gca_job_state_compat,
)
from google.cloud.aiplatform.preview import jobs
from google.cloud.aiplatform import jobs
from google.cloud.aiplatform.compat.services import job_service_client_v1
from google.cloud.aiplatform.compat.types import custom_job_v1
from google.cloud.aiplatform.compat.types import encryption_spec_v1
from google.cloud.aiplatform.compat.types import io_v1
from google.cloud.aiplatform.compat.types import job_state_v1 as gca_job_state_compat
import constants as test_constants
import pytest

Expand Down Expand Up @@ -58,7 +54,7 @@

# CMEK encryption
_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_1234"
_TEST_DEFAULT_ENCRYPTION_SPEC = encryption_spec_v1beta1.EncryptionSpec(
_TEST_DEFAULT_ENCRYPTION_SPEC = encryption_spec_v1.EncryptionSpec(
kms_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME
)

Expand All @@ -78,14 +74,14 @@

# Persistent Resource
_TEST_PERSISTENT_RESOURCE_ID = "test-persistent-resource-1"
_TEST_CUSTOM_JOB_WITH_PERSISTENT_RESOURCE_PROTO = custom_job_v1beta1.CustomJob(
_TEST_CUSTOM_JOB_WITH_PERSISTENT_RESOURCE_PROTO = custom_job_v1.CustomJob(
display_name=_TEST_DISPLAY_NAME,
job_spec=custom_job_v1beta1.CustomJobSpec(
job_spec=custom_job_v1.CustomJobSpec(
worker_pool_specs=_TEST_WORKER_POOL_SPEC,
base_output_directory=io_v1beta1.GcsDestination(
base_output_directory=io_v1.GcsDestination(
output_uri_prefix=_TEST_BASE_OUTPUT_DIR
),
scheduling=custom_job_v1beta1.Scheduling(
scheduling=custom_job_v1.Scheduling(
timeout=duration_pb2.Duration(seconds=_TEST_TIMEOUT),
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
disable_retries=_TEST_DISABLE_RETRIES,
Expand All @@ -108,21 +104,21 @@ def _get_custom_job_proto(state=None, name=None, error=None):


@pytest.fixture
def create_preview_custom_job_mock():
def create_custom_job_mock():
with mock.patch.object(
job_service_client_v1beta1.JobServiceClient, "create_custom_job"
) as create_preview_custom_job_mock:
create_preview_custom_job_mock.return_value = _get_custom_job_proto(
job_service_client_v1.JobServiceClient, "create_custom_job"
) as create_custom_job_mock:
create_custom_job_mock.return_value = _get_custom_job_proto(
name=_TEST_CUSTOM_JOB_NAME,
state=gca_job_state_compat.JobState.JOB_STATE_PENDING,
)
yield create_preview_custom_job_mock
yield create_custom_job_mock


@pytest.fixture
def get_custom_job_mock():
with patch.object(
job_service_client_v1beta1.JobServiceClient, "get_custom_job"
job_service_client_v1.JobServiceClient, "get_custom_job"
) as get_custom_job_mock:
get_custom_job_mock.side_effect = [
_get_custom_job_proto(
Expand Down Expand Up @@ -152,7 +148,7 @@ def teardown_method(self):

@pytest.mark.parametrize("sync", [True, False])
def test_create_custom_job_with_persistent_resource(
self, create_preview_custom_job_mock, get_custom_job_mock, sync
self, create_custom_job_mock, get_custom_job_mock, sync
):

aiplatform.init(
Expand Down Expand Up @@ -188,7 +184,7 @@ def test_create_custom_job_with_persistent_resource(

expected_custom_job = _get_custom_job_proto()

create_preview_custom_job_mock.assert_called_once_with(
create_custom_job_mock.assert_called_once_with(
parent=_TEST_PARENT,
custom_job=expected_custom_job,
timeout=None,
Expand All @@ -201,7 +197,7 @@ def test_create_custom_job_with_persistent_resource(
assert job.network == _TEST_NETWORK

def test_submit_custom_job_with_persistent_resource(
self, create_preview_custom_job_mock, get_custom_job_mock
self, create_custom_job_mock, get_custom_job_mock
):

aiplatform.init(
Expand Down Expand Up @@ -236,7 +232,7 @@ def test_submit_custom_job_with_persistent_resource(

expected_custom_job = _get_custom_job_proto()

create_preview_custom_job_mock.assert_called_once_with(
create_custom_job_mock.assert_called_once_with(
parent=_TEST_PARENT,
custom_job=expected_custom_job,
timeout=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.services import (
job_service_client_v1beta1,
job_service_client_v1,
)
from google.cloud.aiplatform import hyperparameter_tuning as hpt
from google.cloud.aiplatform.compat.types import (
custom_job_v1beta1,
encryption_spec_v1beta1,
hyperparameter_tuning_job_v1beta1,
io_v1beta1,
job_state_v1beta1 as gca_job_state_compat,
study_v1beta1 as gca_study_compat,
custom_job_v1,
encryption_spec_v1,
hyperparameter_tuning_job_v1,
io_v1,
job_state_v1 as gca_job_state_compat,
study_v1 as gca_study_compat,
)
from google.cloud.aiplatform.preview import jobs
from google.cloud.aiplatform import jobs
import constants as test_constants
import pytest

Expand All @@ -59,7 +59,7 @@

# CMEK encryption
_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_1234"
_TEST_DEFAULT_ENCRYPTION_SPEC = encryption_spec_v1beta1.EncryptionSpec(
_TEST_DEFAULT_ENCRYPTION_SPEC = encryption_spec_v1.EncryptionSpec(
kms_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME
)

Expand Down Expand Up @@ -95,12 +95,12 @@
# Persistent Resource
_TEST_PERSISTENT_RESOURCE_ID = "test-persistent-resource-1"

_TEST_TRIAL_JOB_SPEC = custom_job_v1beta1.CustomJobSpec(
_TEST_TRIAL_JOB_SPEC = custom_job_v1.CustomJobSpec(
worker_pool_specs=test_constants.TrainingJobConstants._TEST_WORKER_POOL_SPEC,
base_output_directory=io_v1beta1.GcsDestination(
base_output_directory=io_v1.GcsDestination(
output_uri_prefix=test_constants.TrainingJobConstants._TEST_BASE_OUTPUT_DIR
),
scheduling=custom_job_v1beta1.Scheduling(
scheduling=custom_job_v1.Scheduling(
timeout=duration_pb2.Duration(seconds=_TEST_TIMEOUT),
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
disable_retries=_TEST_DISABLE_RETRIES,
Expand All @@ -110,7 +110,7 @@
persistent_resource_id=_TEST_PERSISTENT_RESOURCE_ID,
)

_TEST_BASE_HYPERPARAMETER_TUNING_JOB_WITH_PERSISTENT_RESOURCE_PROTO = hyperparameter_tuning_job_v1beta1.HyperparameterTuningJob(
_TEST_BASE_HYPERPARAMETER_TUNING_JOB_WITH_PERSISTENT_RESOURCE_PROTO = hyperparameter_tuning_job_v1.HyperparameterTuningJob(
display_name=_TEST_DISPLAY_NAME,
study_spec=gca_study_compat.StudySpec(
metrics=[
Expand Down Expand Up @@ -197,23 +197,23 @@ def _get_hyperparameter_tuning_job_proto(state=None, name=None, error=None):


@pytest.fixture
def create_preview_hyperparameter_tuning_job_mock():
def create_hyperparameter_tuning_job_mock():
with mock.patch.object(
job_service_client_v1beta1.JobServiceClient, "create_hyperparameter_tuning_job"
) as create_preview_hyperparameter_tuning_job_mock:
create_preview_hyperparameter_tuning_job_mock.return_value = (
job_service_client_v1.JobServiceClient, "create_hyperparameter_tuning_job"
) as create_hyperparameter_tuning_job_mock:
create_hyperparameter_tuning_job_mock.return_value = (
_get_hyperparameter_tuning_job_proto(
name=_TEST_HYPERPARAMETERTUNING_JOB_NAME,
state=gca_job_state_compat.JobState.JOB_STATE_PENDING,
)
)
yield create_preview_hyperparameter_tuning_job_mock
yield create_hyperparameter_tuning_job_mock


@pytest.fixture
def get_hyperparameter_tuning_job_mock():
with patch.object(
job_service_client_v1beta1.JobServiceClient, "get_hyperparameter_tuning_job"
job_service_client_v1.JobServiceClient, "get_hyperparameter_tuning_job"
) as get_hyperparameter_tuning_job_mock:
get_hyperparameter_tuning_job_mock.side_effect = [
_get_hyperparameter_tuning_job_proto(
Expand Down Expand Up @@ -248,7 +248,7 @@ def teardown_method(self):
@pytest.mark.parametrize("sync", [True, False])
def test_create_hyperparameter_tuning_job_with_persistent_resource(
self,
create_preview_hyperparameter_tuning_job_mock,
create_hyperparameter_tuning_job_mock,
get_hyperparameter_tuning_job_mock,
sync,
):
Expand Down Expand Up @@ -308,7 +308,7 @@ def test_create_hyperparameter_tuning_job_with_persistent_resource(

expected_hyperparameter_tuning_job = _get_hyperparameter_tuning_job_proto()

create_preview_hyperparameter_tuning_job_mock.assert_called_once_with(
create_hyperparameter_tuning_job_mock.assert_called_once_with(
parent=_TEST_PARENT,
hyperparameter_tuning_job=expected_hyperparameter_tuning_job,
timeout=None,
Expand Down

0 comments on commit 35ecbac

Please sign in to comment.