Skip to content

Commit

Permalink
Merge pull request #18 from teamdatatonic/feat/enable-concurrent-runs
Browse files Browse the repository at this point in the history
Add ADDITIONAL_SUFFIX environment variable
  • Loading branch information
Jonny Browning (Datatonic) authored Jun 23, 2023
2 parents 44730dd + d240286 commit 47cf5d6
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 11 deletions.
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@ test-all-components-coverage: ## Run tests with coverage
done

sync-assets: ## Sync assets folder to GCS.
@if [ -d "./pipelines/assets/" ] ; then \
echo "Syncing assets to GCS" && \
gsutil -m rsync -r -d ./pipelines/assets ${PIPELINE_FILES_GCS_PATH}/assets ; \
@if [ -d "./pipelines/assets/" ]; then \
echo "Syncing assets to GCS"; \
PIPELINE_FILES_GCS_PATH=$${PIPELINE_FILES_GCS_PATH}$(if $(strip $(RESOURCE_SUFFIX)),/$(RESOURCE_SUFFIX)); \
gsutil -m rsync -r -d ./pipelines/assets "$${PIPELINE_FILES_GCS_PATH}/assets"; \
else \
echo "No assets folder found" ; \
fi ;
echo "No assets folder found"; \
fi;

run: ## Compile pipeline, copy assets to GCS, and run pipeline in sandbox environment. Must specify pipeline=<training|prediction>. Optionally specify enable_pipeline_caching=<true|false> (defaults to default Vertex caching behaviour)
@ $(MAKE) compile-pipeline && \
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ This will execute the pipeline using the chosen template on Vertex AI, namely it
1. Copy the `assets` folders to Cloud Storage
1. Trigger the pipeline with the help of `pipelines/trigger/main.py`

To avoid resource conflicts when running pipelines concurrently in the same Google Cloud project with multiple developers, populate the `RESOURCE_SUFFIX` environment variable in your `env.sh` file. This will append your defined suffix to Google resources and ensure each developer's resources remain unique and separate, preventing unintentional overwriting.

#### Pipeline input parameters

The ML pipelines have input parameters. As you can see in the pipeline definition files (`pipelines/pipelines/<xgboost|tensorflow>/<training|prediction>/pipeline.py`), they have default values, and some of these default values are derived from environment variables (which in turn are defined in `env.sh`).
Expand Down
1 change: 1 addition & 0 deletions cloudbuild/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ steps:
- PIPELINE_TEMPLATE=${_PIPELINE_TEMPLATE}
- VERTEX_PIPELINE_ROOT=${_TEST_VERTEX_PIPELINE_ROOT}
- PIPELINE_FILES_GCS_PATH=${_PIPELINE_PUBLISH_GCS_PATH}/${COMMIT_SHA}
- RESOURCE_SUFFIX=_${COMMIT_SHA}

options:
logging: CLOUD_LOGGING_ONLY
Expand Down
5 changes: 4 additions & 1 deletion env.sh.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ export VERTEX_LOCATION=europe-west2
export VERTEX_NETWORK= # optional
export VERTEX_PROJECT_ID=my-gcp-project

# Suffix (e.g. '_<your name>') to facilitate running concurrent pipelines in the same Google Cloud project. Populate if working in a team to avoid overwriting resources during development
export RESOURCE_SUFFIX=

# Leave as-is
export VERTEX_SA_EMAIL=vertex-pipelines@${VERTEX_PROJECT_ID}.iam.gserviceaccount.com
export PIPELINE_FILES_GCS_PATH=gs://${VERTEX_PROJECT_ID}-pl-assets
export PIPELINE_FILES_GCS_PATH=gs://${VERTEX_PROJECT_ID}-pl-assets/${RESOURCE_SUFFIX}
export VERTEX_PIPELINE_ROOT=gs://${VERTEX_PROJECT_ID}-pl-root
5 changes: 4 additions & 1 deletion pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def tensorflow_pipeline(
batch_prediction_machine_type: str = "n1-standard-4",
batch_prediction_min_replicas: int = 3,
batch_prediction_max_replicas: int = 10,
resource_suffix: str = os.environ.get("RESOURCE_SUFFIX"),
):
"""
Tensorflow prediction pipeline which:
Expand Down Expand Up @@ -68,6 +69,8 @@ def tensorflow_pipeline(
Vertex Batch Prediction job for horizontal scalability
batch_prediction_max_replicas (int): Maximum no of machines to distribute the
Vertex Batch Prediction job for horizontal scalability.
resource_suffix (str): Optional. Additional suffix to append GCS resources
that get overwritten.
Returns:
None
Expand All @@ -79,7 +82,7 @@ def tensorflow_pipeline(
file_pattern = "" # e.g. "files-*.csv", used as file pattern on storage
time_column = "trip_start_timestamp"
ingestion_table = "taxi_trips"
table_suffix = "_tf_prediction" # suffix to table names
table_suffix = "_tf_prediction" + str(resource_suffix) # suffix to table names
ingested_table = "ingested_data" + table_suffix
monitoring_alert_email_addresses = []
monitoring_skew_config = {"defaultSkewThreshold": {"value": 0.001}}
Expand Down
6 changes: 4 additions & 2 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def tensorflow_pipeline(
timestamp: str = "2022-12-01 00:00:00",
staging_bucket: str = os.environ.get("VERTEX_PIPELINE_ROOT"),
pipeline_files_gcs_path: str = os.environ.get("PIPELINE_FILES_GCS_PATH"),
resource_suffix: str = os.environ.get("RESOURCE_SUFFIX"),
test_dataset_uri: str = "",
):
"""
Expand All @@ -51,7 +52,6 @@ def tensorflow_pipeline(
Args:
project_id (str): project id of the Google Cloud project
project_location (str): location of the Google Cloud project
pipeline_files_gcs_path (str): GCS path where the pipeline files are located
ingestion_project_id (str): project id containing the source bigquery data
for ingestion. This can be the same as `project_id` if the source data is
in the same project where the ML pipeline is executed.
Expand All @@ -65,6 +65,8 @@ def tensorflow_pipeline(
If any time part is missing, it will be regarded as zero.
staging_bucket (str): Staging bucket for pipeline artifacts.
pipeline_files_gcs_path (str): GCS path where the pipeline files are located
resource_suffix (str): Optional. Additional suffix to append GCS resources
that get overwritten.
test_dataset_uri (str): Optional. GCS URI of statis held-out test dataset.
"""

Expand All @@ -73,7 +75,7 @@ def tensorflow_pipeline(
label_column_name = "total_fare"
time_column = "trip_start_timestamp"
ingestion_table = "taxi_trips"
table_suffix = "_tf_training" # suffix to table names
table_suffix = "_tf_training" + str(resource_suffix) # suffix to table names
ingested_table = "ingested_data" + table_suffix
preprocessed_table = "preprocessed_data" + table_suffix
train_table = "train_data" + table_suffix
Expand Down
5 changes: 4 additions & 1 deletion pipelines/src/pipelines/xgboost/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def xgboost_pipeline(
batch_prediction_machine_type: str = "n1-standard-4",
batch_prediction_min_replicas: int = 3,
batch_prediction_max_replicas: int = 10,
resource_suffix: str = os.environ.get("RESOURCE_SUFFIX"),
):
"""
XGB prediction pipeline which:
Expand Down Expand Up @@ -65,6 +66,8 @@ def xgboost_pipeline(
Vertex Batch Prediction job for horizontal scalability
batch_prediction_max_replicas (int): Maximum no of machines to distribute the
Vertex Batch Prediction job for horizontal scalability.
resource_suffix (str): Optional. Additional suffix to append GCS resources
that get overwritten.
Returns:
None
Expand All @@ -75,7 +78,7 @@ def xgboost_pipeline(
# into different components of the pipeline
time_column = "trip_start_timestamp"
ingestion_table = "taxi_trips"
table_suffix = "_xgb_prediction" # suffix to table names
table_suffix = "_xgb_prediction" + str(resource_suffix) # suffix to table names
ingested_table = "ingested_data" + table_suffix
monitoring_alert_email_addresses = []
monitoring_skew_config = {"defaultSkewThreshold": {"value": 0.001}}
Expand Down
5 changes: 4 additions & 1 deletion pipelines/src/pipelines/xgboost/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def xgboost_pipeline(
timestamp: str = "2022-12-01 00:00:00",
staging_bucket: str = os.environ.get("VERTEX_PIPELINE_ROOT"),
pipeline_files_gcs_path: str = os.environ.get("PIPELINE_FILES_GCS_PATH"),
resource_suffix: str = os.environ.get("RESOURCE_SUFFIX"),
test_dataset_uri: str = "",
):
"""
Expand All @@ -63,6 +64,8 @@ def xgboost_pipeline(
If any time part is missing, it will be regarded as zero.
staging_bucket (str): Staging bucket for pipeline artifacts.
pipeline_files_gcs_path (str): GCS path where the pipeline files are located.
resource_suffix (str): Optional. Additional suffix to append GCS resources
that get overwritten.
test_dataset_uri (str): Optional. GCS URI of statis held-out test dataset.
"""

Expand All @@ -71,7 +74,7 @@ def xgboost_pipeline(
label_column_name = "total_fare"
time_column = "trip_start_timestamp"
ingestion_table = "taxi_trips"
table_suffix = "_xgb_training" # suffix to table names
table_suffix = "_xgb_training" + str(resource_suffix) # suffix to table names
ingested_table = "ingested_data" + table_suffix
preprocessed_table = "preprocessed_data" + table_suffix
train_table = "train_data" + table_suffix
Expand Down

0 comments on commit 47cf5d6

Please sign in to comment.