Skip to content

Commit

Permalink
chore(components): Bump Starry Net images and enforce that TF Record …
Browse files Browse the repository at this point in the history
…generation always runs before test set generation to speed up pipelines runs

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 655633942
  • Loading branch information
Googler committed Jul 24, 2024
1 parent 7660e8a commit 58b342a
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 39 deletions.
2 changes: 2 additions & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## Upcoming release
* Updated the Starry Net pipeline's template gallery description, and added dataprep_nan_threshold and dataprep_zero_threshold args to the Starry Net pipeline.
* Fix bug in Starry Net's upload decomposition plot step due to protobuf upgrade, by pinning protobuf library to 3.20.*.
* Bump Starry Net image tags.
* In the Starry-Net pipeline, enforce that TF Record generation always runs before test set generation to speed up pipelines runs.
* Add support for running tasks on a `PersistentResource` (see [CustomJobSpec](https://cloud.google.com/vertex-ai/docs/reference/rest/v1beta1/CustomJobSpec)) via `persistent_resource_id` parameter on `v1.custom_job.CustomTrainingJobOp` and `v1.custom_job.create_custom_training_job_from_component`
* Bump image for Structured Data pipelines.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def dataprep(
ts_identifier_columns: str,
time_column: str,
static_covariate_columns: str,
static_covariates_vocab_path: str, # pytype: disable=unused-argument
target_column: str,
machine_type: str,
docker_region: str,
Expand Down Expand Up @@ -78,6 +79,8 @@ def dataprep(
data source.
time_column: The column with timestamps in the BigQuery source.
static_covariate_columns: The names of the staic covariates.
static_covariates_vocab_path: The path to the master static covariates vocab
json.
target_column: The target column in the Big Query data source.
machine_type: The machine type of the dataflow workers.
docker_region: The docker region, used to determine which image to use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_training_artifacts(
instance_schema_uri=str,
)
return outputs(
f'{docker_region}-docker.pkg.dev/vertex-ai/starryn/predictor:20240617_2142_RC00', # pylint: disable=too-many-function-args
f'{docker_region}-docker.pkg.dev/vertex-ai/starryn/predictor:20240723_0542_RC00', # pylint: disable=too-many-function-args
private_dir, # pylint: disable=too-many-function-args
os.path.join(private_dir, 'predict_schema.yaml'), # pylint: disable=too-many-function-args
os.path.join(private_dir, 'instance_schema.yaml'), # pylint: disable=too-many-function-args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
# limitations under the License.
"""Version constants for starry net components."""

DATAPREP_VERSION = '20240617_2225_RC00'
PREDICTOR_VERSION = '20240617_2142_RC00'
TRAINER_VERSION = '20240617_2142_RC00'
DATAPREP_VERSION = '20240722_2225_RC00'
PREDICTOR_VERSION = '20240723_0542_RC00'
TRAINER_VERSION = '20240723_0542_RC00'
Original file line number Diff line number Diff line change
Expand Up @@ -239,41 +239,6 @@ def starry_net( # pylint: disable=dangerous-default-value
model_blocks=trainer_model_blocks,
static_covariates=dataprep_static_covariate_columns,
)
test_set_task = DataprepOp(
backcast_length=dataprep_backcast_length,
forecast_length=dataprep_forecast_length,
train_end_date=dataprep_train_end_date,
n_val_windows=dataprep_n_val_windows,
n_test_windows=dataprep_n_test_windows,
test_set_stride=dataprep_test_set_stride,
model_blocks=create_dataprep_args_task.outputs['model_blocks'],
bigquery_source=dataprep_bigquery_data_path,
ts_identifier_columns=create_dataprep_args_task.outputs[
'ts_identifier_columns'],
time_column=dataprep_time_column,
static_covariate_columns=create_dataprep_args_task.outputs[
'static_covariate_columns'],
target_column=dataprep_target_column,
machine_type=dataflow_machine_type,
docker_region=create_dataprep_args_task.outputs['docker_region'],
location=location,
project=project,
job_id=job_id,
job_name_prefix='test-set',
num_workers=dataflow_starting_replica_count,
max_num_workers=dataflow_max_replica_count,
disk_size_gb=dataflow_disk_size_gb,
test_set_only=True,
bigquery_output=dataprep_test_set_bigquery_dataset,
nan_threshold=dataprep_nan_threshold,
zero_threshold=dataprep_zero_threshold,
gcs_source=dataprep_csv_data_path,
gcs_static_covariate_source=dataprep_csv_static_covariates_path,
encryption_spec_key_name=encryption_spec_key_name
)
test_set_task.set_display_name('create-test-set')
set_test_set_task = SetTestSetOp(
dataprep_dir=test_set_task.outputs['dataprep_dir'])
with dsl.If(create_dataprep_args_task.outputs['create_tf_records'] == True, # pylint: disable=singleton-comparison
'create-tf-records'):
create_tf_records_task = DataprepOp(
Expand All @@ -290,6 +255,7 @@ def starry_net( # pylint: disable=dangerous-default-value
time_column=dataprep_time_column,
static_covariate_columns=create_dataprep_args_task.outputs[
'static_covariate_columns'],
static_covariates_vocab_path='',
target_column=dataprep_target_column,
machine_type=dataflow_machine_type,
docker_region=create_dataprep_args_task.outputs['docker_region'],
Expand Down Expand Up @@ -325,6 +291,42 @@ def starry_net( # pylint: disable=dangerous-default-value
'static_covariates_vocab_path'],
set_tfrecord_args_this_run_task.outputs['static_covariates_vocab_path']
)
test_set_task = DataprepOp(
backcast_length=dataprep_backcast_length,
forecast_length=dataprep_forecast_length,
train_end_date=dataprep_train_end_date,
n_val_windows=dataprep_n_val_windows,
n_test_windows=dataprep_n_test_windows,
test_set_stride=dataprep_test_set_stride,
model_blocks=create_dataprep_args_task.outputs['model_blocks'],
bigquery_source=dataprep_bigquery_data_path,
ts_identifier_columns=create_dataprep_args_task.outputs[
'ts_identifier_columns'],
time_column=dataprep_time_column,
static_covariate_columns=create_dataprep_args_task.outputs[
'static_covariate_columns'],
static_covariates_vocab_path=static_covariates_vocab_path,
target_column=dataprep_target_column,
machine_type=dataflow_machine_type,
docker_region=create_dataprep_args_task.outputs['docker_region'],
location=location,
project=project,
job_id=job_id,
job_name_prefix='test-set',
num_workers=dataflow_starting_replica_count,
max_num_workers=dataflow_max_replica_count,
disk_size_gb=dataflow_disk_size_gb,
test_set_only=True,
bigquery_output=dataprep_test_set_bigquery_dataset,
nan_threshold=dataprep_nan_threshold,
zero_threshold=dataprep_zero_threshold,
gcs_source=dataprep_csv_data_path,
gcs_static_covariate_source=dataprep_csv_static_covariates_path,
encryption_spec_key_name=encryption_spec_key_name
)
test_set_task.set_display_name('create-test-set')
set_test_set_task = SetTestSetOp(
dataprep_dir=test_set_task.outputs['dataprep_dir'])
train_tf_record_patterns = dsl.OneOf(
set_tfrecord_args_previous_run_task.outputs['train_tf_record_patterns'],
set_tfrecord_args_this_run_task.outputs['train_tf_record_patterns']
Expand Down

0 comments on commit 58b342a

Please sign in to comment.