Skip to content

Commit

Permalink
feat: Add Vertex Forecasting E2E test. (#1248)
Browse files Browse the repository at this point in the history
Adds e2e test for AutoML Forecasting and unit test for `TimeSeriesDataset`. Also adds `create_request_timeout` to `TimeSeriesDataset`, which #1099 seems to have missed.

---

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-aiplatform/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
TheMichaelHu authored May 27, 2022
1 parent dc3be45 commit e82c179
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/datasets/time_series_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def create(
labels: Optional[Dict[str, str]] = None,
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> "TimeSeriesDataset":
"""Creates a new time series dataset.
Expand Down Expand Up @@ -102,6 +103,8 @@ def create(
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
Returns:
time_series_dataset (TimeSeriesDataset):
Expand Down Expand Up @@ -141,6 +144,7 @@ def create(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
)

def import_data(self):
Expand Down
27 changes: 27 additions & 0 deletions tests/system/aiplatform/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
)
_TEST_DATASET_DISPLAY_NAME = "permanent_50_flowers_dataset"
_TEST_TABULAR_CLASSIFICATION_GCS_SOURCE = "gs://ucaip-sample-resources/iris_1000.csv"
_TEST_FORECASTING_BQ_SOURCE = (
"bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train"
)
_TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE = f"gs://{TEST_BUCKET}/ai-platform-unified/sdk/datasets/text_entity_extraction_dataset.jsonl"
_TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE = (
"gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl"
Expand Down Expand Up @@ -306,6 +309,30 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema(
finally:
tabular_dataset.delete()

def test_create_time_series_dataset(self):
"""Use the Dataset.create() method to create a new time series dataset.
Then confirm the dataset was successfully created and references GCS source."""

try:
time_series_dataset = aiplatform.TimeSeriesDataset.create(
display_name=self._make_display_name(key="create_time_series_dataset"),
bq_source=[_TEST_FORECASTING_BQ_SOURCE],
create_request_timeout=None,
)

gapic_metadata = time_series_dataset.to_dict()["metadata"]
bq_source_uri = gapic_metadata["inputConfig"]["bigquerySource"]["uri"]

assert _TEST_FORECASTING_BQ_SOURCE == bq_source_uri
assert (
time_series_dataset.metadata_schema_uri
== aiplatform.schema.dataset.metadata.time_series
)

finally:
if time_series_dataset is not None:
time_series_dataset.delete()

def test_export_data(self, storage_client, staging_bucket):
"""Get an existing dataset, export data to a newly created folder in
Google Cloud Storage, then verify data was successfully exported."""
Expand Down
127 changes: 127 additions & 0 deletions tests/system/aiplatform/test_e2e_forecasting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.types import job_state
from google.cloud.aiplatform.compat.types import pipeline_state
import pytest
from tests.system.aiplatform import e2e_base

_TRAINING_DATASET_BQ_PATH = (
"bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train"
)
_PREDICTION_DATASET_BQ_PATH = (
"bq://ucaip-sample-tests:ucaip_test_us_central1.2021_sales_predict"
)


@pytest.mark.usefixtures("prepare_staging_bucket", "delete_staging_bucket")
class TestEndToEndForecasting(e2e_base.TestEndToEnd):
"""End to end system test of the Vertex SDK with forecasting data."""

_temp_prefix = "temp-vertex-sdk-e2e-forecasting"

def test_end_to_end_forecasting(self, shared_state):
"""Builds a dataset, trains models, and gets batch predictions."""
ds = None
automl_job = None
automl_model = None
automl_batch_prediction_job = None

aiplatform.init(
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
staging_bucket=shared_state["staging_bucket_name"],
)
try:
# Create and import to single managed dataset for both training
# jobs.
ds = aiplatform.TimeSeriesDataset.create(
display_name=self._make_display_name("dataset"),
bq_source=[_TRAINING_DATASET_BQ_PATH],
sync=False,
create_request_timeout=180.0,
)

time_column = "date"
time_series_identifier_column = "store_name"
target_column = "sale_dollars"
column_specs = {
time_column: "timestamp",
target_column: "numeric",
"city": "categorical",
"zip_code": "categorical",
"county": "categorical",
}

# Define both training jobs
# TODO(humichael): Add seq2seq job.
automl_job = aiplatform.AutoMLForecastingTrainingJob(
display_name=self._make_display_name("train-housing-automl"),
optimization_objective="minimize-rmse",
column_specs=column_specs,
)

# Kick off both training jobs, AutoML job will take approx one hour
# to run.
automl_model = automl_job.run(
dataset=ds,
target_column=target_column,
time_column=time_column,
time_series_identifier_column=time_series_identifier_column,
available_at_forecast_columns=[time_column],
unavailable_at_forecast_columns=[target_column],
time_series_attribute_columns=["city", "zip_code", "county"],
forecast_horizon=30,
context_window=30,
data_granularity_unit="day",
data_granularity_count=1,
budget_milli_node_hours=1000,
model_display_name=self._make_display_name("automl-liquor-model"),
sync=False,
)

automl_batch_prediction_job = automl_model.batch_predict(
job_display_name=self._make_display_name("automl-liquor-model"),
instances_format="bigquery",
machine_type="n1-standard-4",
bigquery_source=_PREDICTION_DATASET_BQ_PATH,
gcs_destination_prefix=(
f'gs://{shared_state["staging_bucket_name"]}/bp_results/'
),
sync=False,
)

automl_batch_prediction_job.wait()

assert (
automl_job.state
== pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)
assert (
automl_batch_prediction_job.state
== job_state.JobState.JOB_STATE_SUCCEEDED
)
finally:
if ds is not None:
ds.delete()
if automl_job is not None:
automl_job.delete()
if automl_model is not None:
automl_model.delete()
if automl_batch_prediction_job is not None:
automl_batch_prediction_job.delete()

0 comments on commit e82c179

Please sign in to comment.