From e82c1792293396045a1032df015a3700fc38609b Mon Sep 17 00:00:00 2001 From: Michael Hu Date: Fri, 27 May 2022 12:50:15 -0400 Subject: [PATCH] feat: Add Vertex Forecasting E2E test. (#1248) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds e2e test for AutoML Forecasting and unit test for `TimeSeriesDataset`. Also adds `create_request_timeout` to `TimeSeriesDataset`, which #1099 seems to have missed. --- Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-aiplatform/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) Fixes # 🦕 --- .../datasets/time_series_dataset.py | 4 + tests/system/aiplatform/test_dataset.py | 27 ++++ .../system/aiplatform/test_e2e_forecasting.py | 127 ++++++++++++++++++ 3 files changed, 158 insertions(+) create mode 100644 tests/system/aiplatform/test_e2e_forecasting.py diff --git a/google/cloud/aiplatform/datasets/time_series_dataset.py b/google/cloud/aiplatform/datasets/time_series_dataset.py index ec5546f12a..6bde6be7a5 100644 --- a/google/cloud/aiplatform/datasets/time_series_dataset.py +++ b/google/cloud/aiplatform/datasets/time_series_dataset.py @@ -46,6 +46,7 @@ def create( labels: Optional[Dict[str, str]] = None, encryption_spec_key_name: Optional[str] = None, sync: bool = True, + create_request_timeout: Optional[float] = None, ) -> "TimeSeriesDataset": """Creates a new time series dataset. @@ -102,6 +103,8 @@ def create( Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. Returns: time_series_dataset (TimeSeriesDataset): @@ -141,6 +144,7 @@ def create( encryption_spec_key_name=encryption_spec_key_name ), sync=sync, + create_request_timeout=create_request_timeout, ) def import_data(self): diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index 54e2528e1f..7cd3c0416c 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -51,6 +51,9 @@ ) _TEST_DATASET_DISPLAY_NAME = "permanent_50_flowers_dataset" _TEST_TABULAR_CLASSIFICATION_GCS_SOURCE = "gs://ucaip-sample-resources/iris_1000.csv" +_TEST_FORECASTING_BQ_SOURCE = ( + "bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train" +) _TEST_TEXT_ENTITY_EXTRACTION_GCS_SOURCE = f"gs://{TEST_BUCKET}/ai-platform-unified/sdk/datasets/text_entity_extraction_dataset.jsonl" _TEST_IMAGE_OBJECT_DETECTION_GCS_SOURCE = ( "gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl" @@ -306,6 +309,30 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema( finally: tabular_dataset.delete() + def test_create_time_series_dataset(self): + """Use the Dataset.create() method to create a new time series dataset. + Then confirm the dataset was successfully created and references GCS source.""" + + try: + time_series_dataset = aiplatform.TimeSeriesDataset.create( + display_name=self._make_display_name(key="create_time_series_dataset"), + bq_source=[_TEST_FORECASTING_BQ_SOURCE], + create_request_timeout=None, + ) + + gapic_metadata = time_series_dataset.to_dict()["metadata"] + bq_source_uri = gapic_metadata["inputConfig"]["bigquerySource"]["uri"] + + assert _TEST_FORECASTING_BQ_SOURCE == bq_source_uri + assert ( + time_series_dataset.metadata_schema_uri + == aiplatform.schema.dataset.metadata.time_series + ) + + finally: + if time_series_dataset is not None: + time_series_dataset.delete() + def test_export_data(self, storage_client, staging_bucket): """Get an existing dataset, export data to a newly created folder in Google Cloud Storage, then verify data was successfully exported.""" diff --git a/tests/system/aiplatform/test_e2e_forecasting.py b/tests/system/aiplatform/test_e2e_forecasting.py new file mode 100644 index 0000000000..b0f3e19711 --- /dev/null +++ b/tests/system/aiplatform/test_e2e_forecasting.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from google.cloud import aiplatform +from google.cloud.aiplatform.compat.types import job_state +from google.cloud.aiplatform.compat.types import pipeline_state +import pytest +from tests.system.aiplatform import e2e_base + +_TRAINING_DATASET_BQ_PATH = ( + "bq://ucaip-sample-tests:ucaip_test_us_central1.2020_sales_train" +) +_PREDICTION_DATASET_BQ_PATH = ( + "bq://ucaip-sample-tests:ucaip_test_us_central1.2021_sales_predict" +) + + +@pytest.mark.usefixtures("prepare_staging_bucket", "delete_staging_bucket") +class TestEndToEndForecasting(e2e_base.TestEndToEnd): + """End to end system test of the Vertex SDK with forecasting data.""" + + _temp_prefix = "temp-vertex-sdk-e2e-forecasting" + + def test_end_to_end_forecasting(self, shared_state): + """Builds a dataset, trains models, and gets batch predictions.""" + ds = None + automl_job = None + automl_model = None + automl_batch_prediction_job = None + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + staging_bucket=shared_state["staging_bucket_name"], + ) + try: + # Create and import to single managed dataset for both training + # jobs. + ds = aiplatform.TimeSeriesDataset.create( + display_name=self._make_display_name("dataset"), + bq_source=[_TRAINING_DATASET_BQ_PATH], + sync=False, + create_request_timeout=180.0, + ) + + time_column = "date" + time_series_identifier_column = "store_name" + target_column = "sale_dollars" + column_specs = { + time_column: "timestamp", + target_column: "numeric", + "city": "categorical", + "zip_code": "categorical", + "county": "categorical", + } + + # Define both training jobs + # TODO(humichael): Add seq2seq job. + automl_job = aiplatform.AutoMLForecastingTrainingJob( + display_name=self._make_display_name("train-housing-automl"), + optimization_objective="minimize-rmse", + column_specs=column_specs, + ) + + # Kick off both training jobs, AutoML job will take approx one hour + # to run. + automl_model = automl_job.run( + dataset=ds, + target_column=target_column, + time_column=time_column, + time_series_identifier_column=time_series_identifier_column, + available_at_forecast_columns=[time_column], + unavailable_at_forecast_columns=[target_column], + time_series_attribute_columns=["city", "zip_code", "county"], + forecast_horizon=30, + context_window=30, + data_granularity_unit="day", + data_granularity_count=1, + budget_milli_node_hours=1000, + model_display_name=self._make_display_name("automl-liquor-model"), + sync=False, + ) + + automl_batch_prediction_job = automl_model.batch_predict( + job_display_name=self._make_display_name("automl-liquor-model"), + instances_format="bigquery", + machine_type="n1-standard-4", + bigquery_source=_PREDICTION_DATASET_BQ_PATH, + gcs_destination_prefix=( + f'gs://{shared_state["staging_bucket_name"]}/bp_results/' + ), + sync=False, + ) + + automl_batch_prediction_job.wait() + + assert ( + automl_job.state + == pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + assert ( + automl_batch_prediction_job.state + == job_state.JobState.JOB_STATE_SUCCEEDED + ) + finally: + if ds is not None: + ds.delete() + if automl_job is not None: + automl_job.delete() + if automl_model is not None: + automl_model.delete() + if automl_batch_prediction_job is not None: + automl_batch_prediction_job.delete()