Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add additional_experiement flag in the tables and forecasting training job #979

Merged
merged 18 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 12 additions & 196 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3371,6 +3371,7 @@ def run(
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
additional_experiments: Optional[List[str]] = None,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -3497,6 +3498,8 @@ def run(

Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
additional_experiments (List[str]):
Optional. Additional experiment flags for the automl tables training.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand All @@ -3519,6 +3522,9 @@ def run(
if self._has_run:
raise RuntimeError("AutoML Tabular Training has already run.")

if additional_experiments:
self._add_additional_experiments(additional_experiments)

return self._run(
dataset=dataset,
target_column=target_column,
Expand Down Expand Up @@ -3961,6 +3967,7 @@ def run(
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
model_labels: Optional[Dict[str, str]] = None,
additional_experiments: Optional[List[str]] = None,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -4107,6 +4114,8 @@ def run(
are allowed.
See https://goo.gl/xmQnxf for more information
and examples of labels.
additional_experiments (List[str]):
Optional. Additional experiment flags for the time series forcasting training.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand All @@ -4132,6 +4141,9 @@ def run(
if self._has_run:
raise RuntimeError("AutoML Forecasting Training has already run.")

if additional_experiments:
self._add_additional_experiments(additional_experiments)

return self._run(
dataset=dataset,
target_column=target_column,
Expand Down Expand Up @@ -4160,202 +4172,6 @@ def run(
sync=sync,
)

def _run_with_experiments(
self,
dataset: datasets.TimeSeriesDataset,
target_column: str,
time_column: str,
time_series_identifier_column: str,
unavailable_at_forecast_columns: List[str],
available_at_forecast_columns: List[str],
forecast_horizon: int,
data_granularity_unit: str,
data_granularity_count: int,
predefined_split_column_name: Optional[str] = None,
weight_column: Optional[str] = None,
time_series_attribute_columns: Optional[List[str]] = None,
context_window: Optional[int] = None,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
quantiles: Optional[List[float]] = None,
validation_options: Optional[str] = None,
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
model_labels: Optional[Dict[str, str]] = None,
sync: bool = True,
additional_experiments: Optional[List[str]] = None,
) -> models.Model:
"""Runs the training job with experiment flags and returns a model.

The training data splits are set by default: Roughly 80% will be used for training,
10% for validation, and 10% for test.

Args:
dataset (datasets.TimeSeriesDataset):
Required. The dataset within the same Project from which data will be used to train the Model. The
Dataset must use schema compatible with Model being trained,
and what is compatible should be described in the used
TrainingPipeline's [training_task_definition]
[google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition].
For time series Datasets, all their data is exported to
training, to pick and choose from.
target_column (str):
Required. Name of the column that the Model is to predict values for.
time_column (str):
Required. Name of the column that identifies time order in the time series.
time_series_identifier_column (str):
Required. Name of the column that identifies the time series.
unavailable_at_forecast_columns (List[str]):
Required. Column names of columns that are unavailable at forecast.
Each column contains information for the given entity (identified by the
[time_series_identifier_column]) that is unknown before the forecast
(e.g. population of a city in a given year, or weather on a given day).
available_at_forecast_columns (List[str]):
Required. Column names of columns that are available at forecast.
Each column contains information for the given entity (identified by the
[time_series_identifier_column]) that is known at forecast.
forecast_horizon: (int):
Required. The amount of time into the future for which forecasted values for the target are
returned. Expressed in number of units defined by the [data_granularity_unit] and
[data_granularity_count] field. Inclusive.
data_granularity_unit (str):
Required. The data granularity unit. Accepted values are ``minute``,
``hour``, ``day``, ``week``, ``month``, ``year``.
data_granularity_count (int):
Required. The number of data granularity units between data points in the training
data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all other
values of [data_granularity_unit], must be 1.
predefined_split_column_name (str):
Optional. The key is a name of one of the Dataset's data
columns. The value of the key (either the label's value or
value in the column) must be one of {``TRAIN``,
``VALIDATE``, ``TEST``}, and it defines to which set the
given piece of data is assigned. If for a piece of data the
key is not present or has an invalid value, that piece is
ignored by the pipeline.

Supported only for tabular and time series Datasets.
weight_column (str):
Optional. Name of the column that should be used as the weight column.
Higher values in this column give more importance to the row
during Model training. The column must have numeric values between 0 and
10000 inclusively, and 0 value means that the row is ignored.
If the weight column field is not set, then all rows are assumed to have
equal weight of 1.
time_series_attribute_columns (List[str]):
Optional. Column names that should be used as attribute columns.
Each column is constant within a time series.
context_window (int):
Optional. The amount of time into the past training and prediction data is used for
model training and prediction respectively. Expressed in number of units defined by the
[data_granularity_unit] and [data_granularity_count] fields. When not provided uses the
default value of 0 which means the model sets each series context window to be 0 (also
known as "cold start"). Inclusive.
export_evaluated_data_items (bool):
Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
export_evaluated_data_items_bigquery_destination_uri (string):
Optional. URI of desired destination BigQuery table for exported test set predictions.

Expected format:
``bq://<project_id>:<dataset_id>:<table>``

If not specified, then results are exported to the following auto-created BigQuery
table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>.evaluated_examples``

Applies only if [export_evaluated_data_items] is True.
export_evaluated_data_items_override_destination (bool):
Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri],
if the table exists, for exported test set predictions. If False, and the
table exists, then the training job will fail.

Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
quantiles (List[float]):
Quantiles to use for the `minizmize-quantile-loss`
[AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in
this case.

Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive.
Each quantile must be unique.
validation_options (str):
Validation options for the data validation component. The available options are:
"fail-pipeline" - (default), will validate against the validation and fail the pipeline
if it fails.
"ignore-validation" - ignore the results of the validation and continue the pipeline
budget_milli_node_hours (int):
Optional. The train budget of creating this Model, expressed in milli node
hours i.e. 1,000 value in this field means 1 node hour.
The training cost of the model will not exceed this budget. The final
cost will be attempted to be close to the budget, though may end up
being (even) noticeably smaller - at the backend's discretion. This
especially may happen when further model training ceases to provide
any improvements.
If the budget is set to a value known to be insufficient to train a
Model for the given training set, the training won't be attempted and
will error.
The minimum value is 1000 and the maximum is 72000.
model_display_name (str):
Optional. If the script produces a managed Vertex AI Model. The display name of
the Model. The name can be up to 128 characters long and can be consist
of any UTF-8 characters.

If not provided upon creation, the job's display_name is used.
model_labels (Dict[str, str]):
Optional. The labels with user-defined metadata to
organize your Models.
Label keys and values can be no longer than 64
characters (Unicode codepoints), can only
contain lowercase letters, numeric characters,
underscores and dashes. International characters
are allowed.
See https://goo.gl/xmQnxf for more information
and examples of labels.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
additional_experiments (List[str]):
Additional experiment flags for the time series forcasting training.

Returns:
model: The trained Vertex AI Model resource or None if training did not
produce a Vertex AI Model.

Raises:
RuntimeError: If Training job has already been run or is waiting to run.
"""

if additional_experiments:
self._add_additional_experiments(additional_experiments)

return self.run(
dataset=dataset,
target_column=target_column,
time_column=time_column,
time_series_identifier_column=time_series_identifier_column,
unavailable_at_forecast_columns=unavailable_at_forecast_columns,
available_at_forecast_columns=available_at_forecast_columns,
forecast_horizon=forecast_horizon,
data_granularity_unit=data_granularity_unit,
data_granularity_count=data_granularity_count,
predefined_split_column_name=predefined_split_column_name,
weight_column=weight_column,
time_series_attribute_columns=time_series_attribute_columns,
context_window=context_window,
budget_milli_node_hours=budget_milli_node_hours,
export_evaluated_data_items=export_evaluated_data_items,
export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri,
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
quantiles=quantiles,
validation_options=validation_options,
model_display_name=model_display_name,
model_labels=model_labels,
sync=sync,
)

@base.optional_sync()
def _run(
self,
Expand Down
74 changes: 7 additions & 67 deletions tests/unit/aiplatform/test_automl_forecasting_training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@
"validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)

_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
**_TEST_TRAINING_TASK_INPUTS_DICT,
Expand All @@ -102,6 +100,10 @@
struct_pb2.Value(),
)

_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)

_TEST_DATASET_NAME = "test-dataset-name"

_TEST_MODEL_DISPLAY_NAME = "model-display-name"
Expand Down Expand Up @@ -269,6 +271,7 @@ def test_run_call_pipeline_service_create(
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
quantiles=_TEST_TRAINING_QUANTILES,
validation_options=_TEST_TRAINING_VALIDATION_OPTIONS,
additional_experiments=_TEST_ADDITIONAL_EXPERIMENTS,
sync=sync,
)

Expand All @@ -290,7 +293,7 @@ def test_run_call_pipeline_service_create(
display_name=_TEST_DISPLAY_NAME,
labels=_TEST_LABELS,
training_task_definition=schema.training_job.definition.automl_forecasting,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
)
Expand Down Expand Up @@ -380,69 +383,6 @@ def test_run_call_pipeline_if_no_model_display_name_nor_model_labels(
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_with_experiments(
self,
mock_pipeline_service_create,
mock_dataset_time_series,
mock_model_service_get,
sync,
):
aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME)

job = AutoMLForecastingTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
)

model_from_job = job._run_with_experiments(
dataset=mock_dataset_time_series,
target_column=_TEST_TRAINING_TARGET_COLUMN,
time_column=_TEST_TRAINING_TIME_COLUMN,
time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON,
data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT,
data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
context_window=_TEST_TRAINING_CONTEXT_WINDOW,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS,
export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
quantiles=_TEST_TRAINING_QUANTILES,
validation_options=_TEST_TRAINING_VALIDATION_OPTIONS,
sync=sync,
additional_experiments=_TEST_ADDITIONAL_EXPERIMENTS,
)

if not sync:
model_from_job.wait()

# Test that if defaults to the job display name
true_managed_model = gca_model.Model(display_name=_TEST_DISPLAY_NAME)

true_input_data_config = gca_training_pipeline.InputDataConfig(
dataset_id=mock_dataset_time_series.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_forecasting,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_if_set_additional_experiments(
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def test_run_call_pipeline_service_create(
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING,
additional_experiments=_TEST_ADDITIONAL_EXPERIMENTS,
sync=sync,
)

Expand All @@ -354,7 +355,7 @@ def test_run_call_pipeline_service_create(
display_name=_TEST_DISPLAY_NAME,
labels=_TEST_LABELS,
training_task_definition=schema.training_job.definition.automl_tabular,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
Expand Down