From 542d9542008a932a6de27babd21c02c9cf7cd48c Mon Sep 17 00:00:00 2001 From: Felix Schaumann Date: Thu, 11 May 2023 08:30:16 +0200 Subject: [PATCH 01/10] Fix missing model monitoring in XGB train script --- .../tensorflow/prediction/pipeline.py | 16 +++++---- .../training/assets/train_tf_model.py | 13 ++++--- .../pipelines/tensorflow/training/pipeline.py | 1 + .../pipelines/xgboost/prediction/pipeline.py | 16 +++++---- .../training/assets/train_xgb_model.py | 35 ++++++++++++++----- .../pipelines/xgboost/training/pipeline.py | 1 + 6 files changed, 57 insertions(+), 25 deletions(-) diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index 5121aea3..b011b0c1 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -108,12 +108,16 @@ def tensorflow_pipeline( ).set_display_name("Ingest data") # lookup champion model - champion_model = lookup_model( - model_name=model_name, - project_location=project_location, - project_id=project_id, - fail_on_model_not_found=True, - ).set_display_name("Look up champion model") + champion_model = ( + lookup_model( + model_name=model_name, + project_location=project_location, + project_id=project_id, + fail_on_model_not_found=True, + ) + .set_display_name("Look up champion model") + .set_caching_options(False) + ) # batch predict from BigQuery to BigQuery bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" diff --git a/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py b/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py index c89a7880..5d164ae7 100644 --- a/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py +++ b/pipelines/src/pipelines/tensorflow/training/assets/train_tf_model.py @@ -219,6 +219,9 @@ def _get_temp_dir(dirpath, task_id): parser.add_argument("--hparams", default={}, type=json.loads) args = parser.parse_args() +if args.model.startswith("gs://"): + args.model = Path("/gcs/" + args.model[5:]) + # merge dictionaries by overwriting default_model_params if provided in model_params hparams = {**DEFAULT_HPARAMS, **args.hparams} logging.info(f"Using model hyper-parameters: {hparams}") @@ -261,9 +264,9 @@ def _get_temp_dir(dirpath, task_id): logging.info("not chief node, exiting now") sys.exit() -os.makedirs(args.model, exist_ok=True) logging.info(f"Save model to: {args.model}") -tf_model.save(args.model, save_format="tf") +args.model.mkdir(parents=True) +tf_model.save(str(args.model), save_format="tf") logging.info(f"Save metrics to: {args.metrics}") eval_metrics = dict(zip(tf_model.metrics_names, tf_model.evaluate(test_ds))) @@ -281,11 +284,13 @@ def _get_temp_dir(dirpath, task_id): json.dump(metrics, fp) # Persist URIs of training file(s) for model monitoring in batch predictions -path = Path(args.model) / TRAINING_DATASET_INFO +# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501 +# for the expected schema. +path = args.model / TRAINING_DATASET_INFO training_dataset_for_monitoring = { "gcsSource": {"uris": [args.train_data]}, "dataFormat": "csv", - "targetField": hparams["label"], + "targetField": label, } logging.info(f"Save training dataset info for model monitoring: {path}") logging.info(f"Training dataset: {training_dataset_for_monitoring}") diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index cb99e091..2767d210 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -215,6 +215,7 @@ def tensorflow_pipeline( fail_on_model_not_found=False, ) .set_display_name("Lookup past model") + .set_caching_options(False) .outputs["model_resource_name"] ) diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index 0bd1ec91..ed474eb8 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -102,12 +102,16 @@ def xgboost_pipeline( ).set_display_name("Ingest data") # lookup champion model - champion_model = lookup_model( - model_name=model_name, - project_location=project_location, - project_id=project_id, - fail_on_model_not_found=True, - ).set_display_name("Look up champion model") + champion_model = ( + lookup_model( + model_name=model_name, + project_location=project_location, + project_id=project_id, + fail_on_model_not_found=True, + ) + .set_display_name("Look up champion model") + .set_caching_options(False) + ) # batch predict from BigQuery to BigQuery bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" diff --git a/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py b/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py index 31d95247..71cc65b5 100644 --- a/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py +++ b/pipelines/src/pipelines/xgboost/training/assets/train_xgb_model.py @@ -1,4 +1,6 @@ import argparse +from pathlib import Path + import joblib import json import os @@ -14,7 +16,9 @@ logging.basicConfig(level=logging.DEBUG) - +# used for monitoring during prediction time +TRAINING_DATASET_INFO = "training_dataset.json" +# numeric/categorical features in Chicago trips dataset to be preprocessed NUM_COLS = ["dayofweek", "hourofday", "trip_distance", "trip_miles", "trip_seconds"] ORD_COLS = ["company"] OHE_COLS = ["payment_type"] @@ -39,6 +43,9 @@ def indices_in_list(elements: list, base_list: list) -> list: parser.add_argument("--hparams", default={}, type=json.loads) args = parser.parse_args() +if args.model.startswith("gs://"): + args.model = Path("/gcs/" + args.model[5:]) + logging.info("Read csv files into dataframes") df_train = pd.read_csv(args.train_data) df_valid = pd.read_csv(args.valid_data) @@ -111,15 +118,25 @@ def indices_in_list(elements: list, base_list: list) -> list: "rootMeanSquaredLogError": np.sqrt(metrics.mean_squared_log_error(y_test, y_pred)), } -try: - model_path = args.model.replace("gs://", "/gcs/") - logging.info(f"Save model to: {model_path}") - os.makedirs(model_path, exist_ok=True) - joblib.dump(pipeline, model_path + "model.joblib") -except Exception as e: - print(e) - raise e +logging.info(f"Save model to: {args.model}") +args.model.mkdir(parents=True) +joblib.dump(pipeline, str(args.model / "model.joblib")) logging.info(f"Metrics: {metrics}") with open(args.metrics, "w") as fp: json.dump(metrics, fp) + +# Persist URIs of training file(s) for model monitoring in batch predictions +# See https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelMonitoringObjectiveConfig.TrainingDataset # noqa: E501 +# for the expected schema. +path = args.model / TRAINING_DATASET_INFO +training_dataset_for_monitoring = { + "gcsSource": {"uris": [args.train_data]}, + "dataFormat": "csv", + "targetField": label, +} +logging.info(f"Training dataset info: {training_dataset_for_monitoring}") + +with open(path, "w") as fp: + logging.info(f"Save training dataset info for model monitoring: {path}") + json.dump(training_dataset_for_monitoring, fp) diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 1db2023c..27cb47ed 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -212,6 +212,7 @@ def xgboost_pipeline( fail_on_model_not_found=False, ) .set_display_name("Lookup past model") + .set_caching_options(False) .outputs["model_resource_name"] ) From 22f6dcd3c863764356e2e801f97c0bc05876deff Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Fri, 12 May 2023 14:52:56 +0200 Subject: [PATCH 02/10] feat: enable multiquery, update queries and pipelines --- .../bigquery_components/bq_query_to_table.py | 20 +-------- .../tensorflow/prediction/pipeline.py | 30 +++++++------ .../tensorflow/prediction/queries/ingest.sql | 13 +++++- .../pipelines/tensorflow/training/pipeline.py | 45 +++++++------------ .../tensorflow/training/queries/ingest.sql | 29 ++++++++---- .../tensorflow/training/queries/sample.sql | 24 +++------- .../pipelines/xgboost/prediction/pipeline.py | 30 ++++++++----- .../xgboost/prediction/queries/ingest.sql | 9 ++++ .../pipelines/xgboost/training/pipeline.py | 44 +++++++----------- .../xgboost/training/queries/ingest.sql | 27 +++++++---- .../xgboost/training/queries/sample.sql | 12 +++-- 11 files changed, 145 insertions(+), 138 deletions(-) diff --git a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py b/components/bigquery-components/src/bigquery_components/bq_query_to_table.py index 9530bd92..4b6afc29 100644 --- a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py +++ b/components/bigquery-components/src/bigquery_components/bq_query_to_table.py @@ -22,25 +22,15 @@ def bq_query_to_table( query: str, bq_client_project_id: str, - destination_project_id: str, - dataset_id: str = None, - table_id: str = None, dataset_location: str = "EU", - query_job_config: dict = None, ) -> None: """ Run query & create a new BigQuery table Args: query (str): SQL query to execute, results are saved in a BigQuery table bq_client_project_id (str): project id that will be used by the bq client - destination_project_id (str): project id where BQ table will be created - dataset_id (str): dataset id where BQ table will be created - table_id (str): table name (without project id and dataset id) dataset_location (str): bq dataset location - query_job_config (dict): dict containing optional parameters required by the bq query operation. No need to specify destination param - See available parameters here - https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJobConfig.html Returns: None """ @@ -50,13 +40,7 @@ def bq_query_to_table( logging.getLogger().setLevel(logging.INFO) - if (dataset_id is not None) and (table_id is not None): - dest_table_ref = f"{destination_project_id}.{dataset_id}.{table_id}" - else: - dest_table_ref = None - if query_job_config is None: - query_job_config = {} - job_config = bigquery.QueryJobConfig(destination=dest_table_ref, **query_job_config) + job_config = bigquery.QueryJobConfig() bq_client = bigquery.client.Client( project=bq_client_project_id, location=dataset_location @@ -65,7 +49,7 @@ def bq_query_to_table( try: result = query_job.result() - logging.info(f"BQ table {dest_table_ref} created") + logging.info(f"BQ Job finished") except GoogleCloudError as e: logging.error(e) logging.error(query_job.error_result) diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index b011b0c1..3a9fdb02 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -29,9 +28,10 @@ def tensorflow_pipeline( project_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"), model_name: str = "simple_tensorflow", - dataset_id: str = "preprocessing", + preprocessing_dataset_id: str = "preprocessing", dataset_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_dataset_id: str = "chicago_taxi_trips", + prediction_dataset_id: str = "prediction", timestamp: str = "2022-12-01 00:00:00", batch_prediction_machine_type: str = "n1-standard-4", batch_prediction_min_replicas: int = 3, @@ -49,13 +49,14 @@ def tensorflow_pipeline( Args: project_id (str): project id of the Google Cloud project project_location (str): location of the Google Cloud project - pipeline_files_gcs_path (str): GCS path where the pipeline files are located ingestion_project_id (str): project id containing the source bigquery data for ingestion. This can be the same as `project_id` if the source data is in the same project where the ML pipeline is executed. model_name (str): name of model - model_label (str): label of model - dataset_id (str): id of BQ dataset used to store all staging data & predictions + preprocessing_dataset_id (str): id of BQ dataset used to + store all staging data . + prediction_dataset_id (str): id of BQ dataset used to + store all predictions. dataset_location (str): location of dataset ingestion_dataset_id (str): dataset id of ingestion data timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format @@ -91,6 +92,10 @@ def tensorflow_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", + preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, filter_start_value=timestamp, ) @@ -98,14 +103,11 @@ def tensorflow_pipeline( # data ingestion and preprocessing operations kwargs = dict( bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" + ) # lookup champion model champion_model = ( @@ -120,8 +122,10 @@ def tensorflow_pipeline( ) # batch predict from BigQuery to BigQuery - bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" - bigquery_destination_output_uri = f"bq://{project_id}.{dataset_id}" + bigquery_source_input_uri = ( + f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}" + ) + bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}" instance_config = {"instanceType": "object"} # predict data diff --git a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql index e01a7c57..5a44d671 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql @@ -14,8 +14,16 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill +CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` + OPTIONS ( + description = 'Prediction Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( with filter_start_values as ( - SELECT + SELECT IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value ) -- Ingest data between 2 and 3 months ago @@ -43,7 +51,7 @@ SELECT trip_miles, CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds WHEN trip_seconds <= 0 then m.avg_trip_seconds - ELSE trip_seconds + ELSE trip_seconds END AS FLOAT64) AS trip_seconds, payment_type, company, @@ -54,3 +62,4 @@ WHERE "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} + ); diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index 2767d210..56aea64c 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -101,6 +100,9 @@ def tensorflow_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, @@ -109,6 +111,8 @@ def tensorflow_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=train_table, num_lots=10, lots=tuple(range(8)), ) @@ -116,6 +120,8 @@ def tensorflow_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=valid_table, num_lots=10, lots="(8)", ) @@ -123,51 +129,34 @@ def tensorflow_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=test_table, num_lots=10, lots="(9)", ) - data_cleaning_query = generate_query( - queries_folder / "engineer_features.sql", - source_dataset=dataset_id, - source_table=train_table, - ) # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, - dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), + kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") - # exporting data to GCS from BQ split_train_data = ( - bq_query_to_table(query=split_train_query, table_id=train_table, **kwargs) + bq_query_to_table(query=split_train_query, **kwargs) .after(ingest) .set_display_name("Split train data") ) split_valid_data = ( - bq_query_to_table(query=split_valid_query, table_id=valid_table, **kwargs) + bq_query_to_table(query=split_valid_query, **kwargs) .after(ingest) .set_display_name("Split validation data") ) split_test_data = ( - bq_query_to_table(query=split_test_query, table_id=test_table, **kwargs) + bq_query_to_table(query=split_test_query, **kwargs) .after(ingest) .set_display_name("Split test data") ) - data_cleaning = ( - bq_query_to_table( - query=data_cleaning_query, table_id=preprocessed_table, **kwargs - ) - .after(split_train_data) - .set_display_name("Clean data") - ) # data extraction to gcs @@ -176,10 +165,10 @@ def tensorflow_pipeline( bq_client_project_id=project_id, source_project_id=project_id, dataset_id=dataset_id, - table_name=preprocessed_table, + table_name=train_table, dataset_location=dataset_location, ) - .after(data_cleaning) + .after(split_train_data) .set_display_name("Extract train data to storage") ).outputs["dataset"] valid_dataset = ( diff --git a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql index de9459fc..7f9f3b0b 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql @@ -14,26 +14,36 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill -with filter_start_values as ( - SELECT - IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value + +CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` + OPTIONS ( + description = 'Preprocessing Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( +WITH filter_start_values AS ( +SELECT + IF("{{ filter_start_value }}" = '', + CURRENT_DATETIME(), + CAST("{{ filter_start_value }}" AS DATETIME)) AS filter_start_value ) -- Ingest data between 2 and 3 months ago -,filtered_data as ( +,filtered_data AS ( SELECT * FROM `{{ source_dataset }}.{{ source_table }}`, filter_start_values WHERE DATE({{ filter_column }}) BETWEEN - DATE_SUB(DATE(CAST(filter_start_values.filter_start_value as DATETIME)), INTERVAL 3 MONTH) AND + DATE_SUB(DATE(CAST(filter_start_values.filter_start_value AS DATETIME)), INTERVAL 3 MONTH) AND DATE_SUB(DATE(filter_start_value), INTERVAL 2 MONTH) ) -- Use the average trip_seconds as a replacement for NULL or 0 values -,mean_time as ( +,mean_time AS ( SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds FROM filtered_data ) - SELECT CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, @@ -43,15 +53,16 @@ SELECT trip_miles, CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds WHEN trip_seconds <= 0 then m.avg_trip_seconds - ELSE trip_seconds + ELSE trip_seconds END AS FLOAT64) AS trip_seconds, payment_type, company, (fare + tips + tolls + extras) AS `{{ target_column }}`, -FROM filtered_data as t, mean_time as m +FROM filtered_data AS t, mean_time AS m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} +); diff --git a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql b/pipelines/src/pipelines/tensorflow/training/queries/sample.sql index bf47bbf6..be8458be 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/sample.sql @@ -1,20 +1,10 @@ --- Copyright 2022 Google LLC +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - -SELECT * +CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( +SELECT + * FROM - `{{ source_dataset }}.{{ source_table }}` AS t + `{{ source_dataset }}.{{ source_table }}` AS t WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }} + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + {{ num_lots }}) IN {{ lots }}); diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index ed474eb8..0c4504ec 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -29,9 +28,10 @@ def xgboost_pipeline( project_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"), model_name: str = "simple_xgboost", - dataset_id: str = "preprocessing", + preprocessing_dataset_id: str = "preprocessing", dataset_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_dataset_id: str = "chicago_taxi_trips", + prediction_dataset_id: str = "prediction", timestamp: str = "2022-12-01 00:00:00", batch_prediction_machine_type: str = "n1-standard-4", batch_prediction_min_replicas: int = 3, @@ -50,14 +50,17 @@ def xgboost_pipeline( for ingestion. This can be the same as `project_id` if the source data is in the same project where the ML pipeline is executed. model_name (str): name of model - dataset_id (str): id of BQ dataset used to store all staging data & predictions + preprocessing_dataset_id (str): id of BQ dataset used to + store all staging data . + prediction_dataset_id (str): id of BQ dataset used to + store all predictions. dataset_location (str): location of dataset ingestion_dataset_id (str): dataset id of ingestion data timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format (YYYY-MM-DDThh:mm:ss.sss±hh:mm or YYYY-MM-DDThh:mm:ss). If any time part is missing, it will be regarded as zero. batch_prediction_machine_type (str): Machine type to be used for Vertex Batch - Prediction. Example machine_types - n1-standard-4, n1-standard-16 etc + Prediction. Example machine_types - n1-standard-4, n1-standard-16 etc. batch_prediction_min_replicas (int): Minimum no of machines to distribute the Vertex Batch Prediction job for horizontal scalability batch_prediction_max_replicas (int): Maximum no of machines to distribute the @@ -85,6 +88,10 @@ def xgboost_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", + preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, filter_start_value=timestamp, ) @@ -92,14 +99,11 @@ def xgboost_pipeline( # data ingestion and preprocessing operations kwargs = dict( bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" + ) # lookup champion model champion_model = ( @@ -114,8 +118,10 @@ def xgboost_pipeline( ) # batch predict from BigQuery to BigQuery - bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" - bigquery_destination_output_uri = f"bq://{project_id}.{dataset_id}" + bigquery_source_input_uri = ( + f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}" + ) + bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}" batch_prediction = ( model_batch_predict( diff --git a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql b/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql index e01a7c57..bf8fe57a 100644 --- a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql @@ -14,6 +14,14 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill +CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` + OPTIONS ( + description = 'Prediction Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( with filter_start_values as ( SELECT IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value @@ -54,3 +62,4 @@ WHERE "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} + ); diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 27cb47ed..6c9ad9d9 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -99,6 +98,9 @@ def xgboost_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, @@ -107,6 +109,8 @@ def xgboost_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=train_table, num_lots=10, lots=tuple(range(8)), ) @@ -114,57 +118,43 @@ def xgboost_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=valid_table, num_lots=10, lots="(8)", ) - data_cleaning_query = generate_query( - queries_folder / "engineer_features.sql", - source_dataset=dataset_id, - source_table=train_table, - ) split_test_query = generate_query( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=test_table, num_lots=10, lots="(9)", ) # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, - dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), + kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") split_train_data = ( - bq_query_to_table(query=split_train_query, table_id=train_table, **kwargs) + bq_query_to_table(query=split_train_query, **kwargs) .after(ingest) .set_display_name("Split train data") ) split_valid_data = ( - bq_query_to_table(query=split_valid_query, table_id=valid_table, **kwargs) + bq_query_to_table(query=split_valid_query, **kwargs) .after(ingest) .set_display_name("Split validation data") ) split_test_data = ( - bq_query_to_table(query=split_test_query, table_id=test_table, **kwargs) + bq_query_to_table(query=split_test_query, **kwargs) .after(ingest) .set_display_name("Split test data") ) - data_cleaning = ( - bq_query_to_table( - query=data_cleaning_query, table_id=preprocessed_table, **kwargs - ) - .after(split_train_data) - .set_display_name("Clean data") - ) # data extraction to gcs @@ -173,10 +163,10 @@ def xgboost_pipeline( bq_client_project_id=project_id, source_project_id=project_id, dataset_id=dataset_id, - table_name=preprocessed_table, + table_name=train_table, dataset_location=dataset_location, ) - .after(data_cleaning) + .after(split_train_data) .set_display_name("Extract train data to storage") ).outputs["dataset"] valid_dataset = ( diff --git a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql index de9459fc..fbb6df32 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql @@ -14,26 +14,36 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill -with filter_start_values as ( - SELECT - IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value + +CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` + OPTIONS ( + description = 'Preprocessing Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( +WITH filter_start_values AS ( +SELECT + IF("{{ filter_start_value }}" = '', + CURRENT_DATETIME(), + CAST("{{ filter_start_value }}" AS DATETIME)) AS filter_start_value ) -- Ingest data between 2 and 3 months ago -,filtered_data as ( +,filtered_data AS ( SELECT * FROM `{{ source_dataset }}.{{ source_table }}`, filter_start_values WHERE DATE({{ filter_column }}) BETWEEN - DATE_SUB(DATE(CAST(filter_start_values.filter_start_value as DATETIME)), INTERVAL 3 MONTH) AND + DATE_SUB(DATE(CAST(filter_start_values.filter_start_value AS DATETIME)), INTERVAL 3 MONTH) AND DATE_SUB(DATE(filter_start_value), INTERVAL 2 MONTH) ) -- Use the average trip_seconds as a replacement for NULL or 0 values -,mean_time as ( +,mean_time AS ( SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds FROM filtered_data ) - SELECT CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, @@ -48,10 +58,11 @@ SELECT payment_type, company, (fare + tips + tolls + extras) AS `{{ target_column }}`, -FROM filtered_data as t, mean_time as m +FROM filtered_data AS t, mean_time AS m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} +); diff --git a/pipelines/src/pipelines/xgboost/training/queries/sample.sql b/pipelines/src/pipelines/xgboost/training/queries/sample.sql index bb2dc9ce..be8458be 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/sample.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/sample.sql @@ -1,6 +1,10 @@ -SELECT * +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( +SELECT + * FROM - `{{ source_dataset }}.{{ source_table }}` AS t + `{{ source_dataset }}.{{ source_table }}` AS t WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }} + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + {{ num_lots }}) IN {{ lots }}); From 0b8b77f8758d4d64e4b5e1fb4754acc495598c42 Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Tue, 16 May 2023 13:53:18 +0200 Subject: [PATCH 03/10] refactor: replace custom BQ component with BigqueryQueryJobOp --- .../src/bigquery_components/__init__.py | 2 - .../bigquery_components/bq_query_to_table.py | 57 ---------------- pipelines/src/pipelines/__init__.py | 8 ++- .../tensorflow/prediction/pipeline.py | 12 ++-- .../pipelines/tensorflow/training/pipeline.py | 66 ++++--------------- .../training/queries/engineer_features.sql | 18 ----- .../tensorflow/training/queries/ingest.sql | 50 +++++++++----- .../tensorflow/training/queries/sample.sql | 10 --- .../pipelines/xgboost/prediction/pipeline.py | 13 ++-- .../pipelines/xgboost/training/pipeline.py | 66 ++++--------------- .../training/queries/engineer_features.sql | 4 -- .../xgboost/training/queries/ingest.sql | 50 +++++++++----- .../xgboost/training/queries/sample.sql | 10 --- 13 files changed, 107 insertions(+), 259 deletions(-) delete mode 100644 components/bigquery-components/src/bigquery_components/bq_query_to_table.py delete mode 100644 pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql delete mode 100644 pipelines/src/pipelines/tensorflow/training/queries/sample.sql delete mode 100644 pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql delete mode 100644 pipelines/src/pipelines/xgboost/training/queries/sample.sql diff --git a/components/bigquery-components/src/bigquery_components/__init__.py b/components/bigquery-components/src/bigquery_components/__init__.py index 2e3870ac..e979abab 100644 --- a/components/bigquery-components/src/bigquery_components/__init__.py +++ b/components/bigquery-components/src/bigquery_components/__init__.py @@ -1,9 +1,7 @@ -from .bq_query_to_table import bq_query_to_table from .extract_bq_to_dataset import extract_bq_to_dataset __version__ = "0.0.1" __all__ = [ - "bq_query_to_table", "extract_bq_to_dataset", ] diff --git a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py b/components/bigquery-components/src/bigquery_components/bq_query_to_table.py deleted file mode 100644 index 4b6afc29..00000000 --- a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2022 Google LLC - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# https://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from kfp.v2.dsl import component - - -@component( - base_image="python:3.7", - packages_to_install=["google-cloud-bigquery==2.30.0"], -) -def bq_query_to_table( - query: str, - bq_client_project_id: str, - dataset_location: str = "EU", -) -> None: - """ - Run query & create a new BigQuery table - Args: - query (str): SQL query to execute, results are saved in a BigQuery table - bq_client_project_id (str): project id that will be used by the bq client - dataset_location (str): bq dataset location - required by the bq query operation. No need to specify destination param - Returns: - None - """ - from google.cloud.exceptions import GoogleCloudError - from google.cloud import bigquery - import logging - - logging.getLogger().setLevel(logging.INFO) - - job_config = bigquery.QueryJobConfig() - - bq_client = bigquery.client.Client( - project=bq_client_project_id, location=dataset_location - ) - query_job = bq_client.query(query, job_config=job_config) - - try: - result = query_job.result() - logging.info(f"BQ Job finished") - except GoogleCloudError as e: - logging.error(e) - logging.error(query_job.error_result) - logging.error(query_job.errors) - raise e diff --git a/pipelines/src/pipelines/__init__.py b/pipelines/src/pipelines/__init__.py index 113da917..badd535e 100644 --- a/pipelines/src/pipelines/__init__.py +++ b/pipelines/src/pipelines/__init__.py @@ -30,4 +30,10 @@ def generate_query(input_file: Path, **replacements) -> str: with open(input_file, "r") as f: query_template = f.read() - return Template(query_template).render(**replacements) + # Render the template with the provided replacements + query = Template(query_template).render(**replacements) + + # Escape double quotes, newline and tab characters + query = query.replace('"', '\\"').replace("\n", "\\n").replace("\t", "\\t") + + return query diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index 3a9fdb02..60ad3b47 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -15,10 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table from vertex_components import lookup_model, model_batch_predict @@ -101,13 +101,9 @@ def tensorflow_pipeline( ) # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - dataset_location=dataset_location, - ) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # lookup champion model champion_model = ( diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index 56aea64c..ba98bea1 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -15,9 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table, extract_bq_to_dataset +from bigquery_components import extract_bq_to_dataset from vertex_components import ( lookup_model, custom_train_job, @@ -106,57 +107,14 @@ def tensorflow_pipeline( filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, + train_table=train_table, + validation_table=valid_table, + test_table=test_table, ) - split_train_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=train_table, - num_lots=10, - lots=tuple(range(8)), - ) - split_valid_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=valid_table, - num_lots=10, - lots="(8)", - ) - split_test_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=test_table, - num_lots=10, - lots="(9)", - ) - - # data ingestion and preprocessing operations - kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) - - split_train_data = ( - bq_query_to_table(query=split_train_query, **kwargs) - .after(ingest) - .set_display_name("Split train data") - ) - split_valid_data = ( - bq_query_to_table(query=split_valid_query, **kwargs) - .after(ingest) - .set_display_name("Split validation data") - ) - split_test_data = ( - bq_query_to_table(query=split_test_query, **kwargs) - .after(ingest) - .set_display_name("Split test data") - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # data extraction to gcs @@ -168,8 +126,9 @@ def tensorflow_pipeline( table_name=train_table, dataset_location=dataset_location, ) - .after(split_train_data) + .after(ingest) .set_display_name("Extract train data to storage") + .set_caching_options(False) ).outputs["dataset"] valid_dataset = ( extract_bq_to_dataset( @@ -179,8 +138,9 @@ def tensorflow_pipeline( table_name=valid_table, dataset_location=dataset_location, ) - .after(split_valid_data) + .after(ingest) .set_display_name("Extract validation data to storage") + .set_caching_options(False) ).outputs["dataset"] test_dataset = ( extract_bq_to_dataset( @@ -191,7 +151,7 @@ def tensorflow_pipeline( dataset_location=dataset_location, destination_gcs_uri=test_dataset_uri, ) - .after(split_test_data) + .after(ingest) .set_display_name("Extract test data to storage") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql b/pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql deleted file mode 100644 index b8094a35..00000000 --- a/pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql +++ /dev/null @@ -1,18 +0,0 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - -/* The Purpose of this query is to clean the data before the training*/ -SELECT - * -FROM `{{ source_dataset }}.{{ source_table }}` diff --git a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql index 7f9f3b0b..b3a31606 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql @@ -1,20 +1,3 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead --- This allows us to set the filter_start_value to a specific time for testing or for backfill - CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', @@ -66,3 +49,36 @@ WHERE AND `{{ field }}` IS NOT NULL {% endfor %} ); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (0, 1, 2, 3, 4, 5, 6, 7)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ validation_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ validation_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (8)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ test_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ test_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (9)); diff --git a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql b/pipelines/src/pipelines/tensorflow/training/queries/sample.sql deleted file mode 100644 index be8458be..00000000 --- a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql +++ /dev/null @@ -1,10 +0,0 @@ -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( -SELECT - * -FROM - `{{ source_dataset }}.{{ source_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }}); diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index 0c4504ec..7fdea71c 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -15,10 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table from vertex_components import lookup_model, model_batch_predict @@ -96,14 +96,9 @@ def xgboost_pipeline( filter_start_value=timestamp, ) - # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - dataset_location=dataset_location, - ) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # lookup champion model champion_model = ( diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 6c9ad9d9..385e41ab 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -15,9 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table, extract_bq_to_dataset +from bigquery_components import extract_bq_to_dataset from vertex_components import ( lookup_model, custom_train_job, @@ -104,57 +105,14 @@ def xgboost_pipeline( filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, + train_table=train_table, + validation_table=valid_table, + test_table=test_table, ) - split_train_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=train_table, - num_lots=10, - lots=tuple(range(8)), - ) - split_valid_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=valid_table, - num_lots=10, - lots="(8)", - ) - split_test_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=test_table, - num_lots=10, - lots="(9)", - ) - - # data ingestion and preprocessing operations - kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) - - split_train_data = ( - bq_query_to_table(query=split_train_query, **kwargs) - .after(ingest) - .set_display_name("Split train data") - ) - split_valid_data = ( - bq_query_to_table(query=split_valid_query, **kwargs) - .after(ingest) - .set_display_name("Split validation data") - ) - split_test_data = ( - bq_query_to_table(query=split_test_query, **kwargs) - .after(ingest) - .set_display_name("Split test data") - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # data extraction to gcs @@ -166,8 +124,9 @@ def xgboost_pipeline( table_name=train_table, dataset_location=dataset_location, ) - .after(split_train_data) + .after(ingest) .set_display_name("Extract train data to storage") + .set_caching_options(False) ).outputs["dataset"] valid_dataset = ( extract_bq_to_dataset( @@ -177,8 +136,9 @@ def xgboost_pipeline( table_name=valid_table, dataset_location=dataset_location, ) - .after(split_valid_data) + .after(ingest) .set_display_name("Extract validation data to storage") + .set_caching_options(False) ).outputs["dataset"] test_dataset = ( extract_bq_to_dataset( @@ -189,7 +149,7 @@ def xgboost_pipeline( dataset_location=dataset_location, destination_gcs_uri=test_dataset_uri, ) - .after(split_test_data) + .after(ingest) .set_display_name("Extract test data to storage") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql b/pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql deleted file mode 100644 index 148d9ce7..00000000 --- a/pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql +++ /dev/null @@ -1,4 +0,0 @@ -/* The Purpose of this query is to clean the data before the training*/ -SELECT - * -FROM `{{ source_dataset }}.{{ source_table }}` diff --git a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql index fbb6df32..10ecf037 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql @@ -1,20 +1,3 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead --- This allows us to set the filter_start_value to a specific time for testing or for backfill - CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', @@ -66,3 +49,36 @@ WHERE AND `{{ field }}` IS NOT NULL {% endfor %} ); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (0, 1, 2, 3, 4, 5, 6, 7)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ validation_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ validation_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (8)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ test_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ test_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (9)); diff --git a/pipelines/src/pipelines/xgboost/training/queries/sample.sql b/pipelines/src/pipelines/xgboost/training/queries/sample.sql deleted file mode 100644 index be8458be..00000000 --- a/pipelines/src/pipelines/xgboost/training/queries/sample.sql +++ /dev/null @@ -1,10 +0,0 @@ -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( -SELECT - * -FROM - `{{ source_dataset }}.{{ source_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }}); From 2d62d9f743216dc967df03df02a704bea8ecf03e Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Fri, 12 May 2023 14:52:56 +0200 Subject: [PATCH 04/10] feat: enable multiquery, update queries and pipelines --- .../bigquery_components/bq_query_to_table.py | 20 +-------- .../tensorflow/prediction/pipeline.py | 30 +++++++------ .../tensorflow/prediction/queries/ingest.sql | 13 +++++- .../pipelines/tensorflow/training/pipeline.py | 45 +++++++------------ .../tensorflow/training/queries/ingest.sql | 29 ++++++++---- .../tensorflow/training/queries/sample.sql | 24 +++------- .../pipelines/xgboost/prediction/pipeline.py | 30 ++++++++----- .../xgboost/prediction/queries/ingest.sql | 9 ++++ .../pipelines/xgboost/training/pipeline.py | 44 +++++++----------- .../xgboost/training/queries/ingest.sql | 27 +++++++---- .../xgboost/training/queries/sample.sql | 12 +++-- 11 files changed, 145 insertions(+), 138 deletions(-) diff --git a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py b/components/bigquery-components/src/bigquery_components/bq_query_to_table.py index 9530bd92..4b6afc29 100644 --- a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py +++ b/components/bigquery-components/src/bigquery_components/bq_query_to_table.py @@ -22,25 +22,15 @@ def bq_query_to_table( query: str, bq_client_project_id: str, - destination_project_id: str, - dataset_id: str = None, - table_id: str = None, dataset_location: str = "EU", - query_job_config: dict = None, ) -> None: """ Run query & create a new BigQuery table Args: query (str): SQL query to execute, results are saved in a BigQuery table bq_client_project_id (str): project id that will be used by the bq client - destination_project_id (str): project id where BQ table will be created - dataset_id (str): dataset id where BQ table will be created - table_id (str): table name (without project id and dataset id) dataset_location (str): bq dataset location - query_job_config (dict): dict containing optional parameters required by the bq query operation. No need to specify destination param - See available parameters here - https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJobConfig.html Returns: None """ @@ -50,13 +40,7 @@ def bq_query_to_table( logging.getLogger().setLevel(logging.INFO) - if (dataset_id is not None) and (table_id is not None): - dest_table_ref = f"{destination_project_id}.{dataset_id}.{table_id}" - else: - dest_table_ref = None - if query_job_config is None: - query_job_config = {} - job_config = bigquery.QueryJobConfig(destination=dest_table_ref, **query_job_config) + job_config = bigquery.QueryJobConfig() bq_client = bigquery.client.Client( project=bq_client_project_id, location=dataset_location @@ -65,7 +49,7 @@ def bq_query_to_table( try: result = query_job.result() - logging.info(f"BQ table {dest_table_ref} created") + logging.info(f"BQ Job finished") except GoogleCloudError as e: logging.error(e) logging.error(query_job.error_result) diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index b011b0c1..3a9fdb02 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -29,9 +28,10 @@ def tensorflow_pipeline( project_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"), model_name: str = "simple_tensorflow", - dataset_id: str = "preprocessing", + preprocessing_dataset_id: str = "preprocessing", dataset_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_dataset_id: str = "chicago_taxi_trips", + prediction_dataset_id: str = "prediction", timestamp: str = "2022-12-01 00:00:00", batch_prediction_machine_type: str = "n1-standard-4", batch_prediction_min_replicas: int = 3, @@ -49,13 +49,14 @@ def tensorflow_pipeline( Args: project_id (str): project id of the Google Cloud project project_location (str): location of the Google Cloud project - pipeline_files_gcs_path (str): GCS path where the pipeline files are located ingestion_project_id (str): project id containing the source bigquery data for ingestion. This can be the same as `project_id` if the source data is in the same project where the ML pipeline is executed. model_name (str): name of model - model_label (str): label of model - dataset_id (str): id of BQ dataset used to store all staging data & predictions + preprocessing_dataset_id (str): id of BQ dataset used to + store all staging data . + prediction_dataset_id (str): id of BQ dataset used to + store all predictions. dataset_location (str): location of dataset ingestion_dataset_id (str): dataset id of ingestion data timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format @@ -91,6 +92,10 @@ def tensorflow_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", + preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, filter_start_value=timestamp, ) @@ -98,14 +103,11 @@ def tensorflow_pipeline( # data ingestion and preprocessing operations kwargs = dict( bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" + ) # lookup champion model champion_model = ( @@ -120,8 +122,10 @@ def tensorflow_pipeline( ) # batch predict from BigQuery to BigQuery - bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" - bigquery_destination_output_uri = f"bq://{project_id}.{dataset_id}" + bigquery_source_input_uri = ( + f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}" + ) + bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}" instance_config = {"instanceType": "object"} # predict data diff --git a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql index e01a7c57..5a44d671 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql @@ -14,8 +14,16 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill +CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` + OPTIONS ( + description = 'Prediction Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( with filter_start_values as ( - SELECT + SELECT IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value ) -- Ingest data between 2 and 3 months ago @@ -43,7 +51,7 @@ SELECT trip_miles, CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds WHEN trip_seconds <= 0 then m.avg_trip_seconds - ELSE trip_seconds + ELSE trip_seconds END AS FLOAT64) AS trip_seconds, payment_type, company, @@ -54,3 +62,4 @@ WHERE "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} + ); diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index 2767d210..56aea64c 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -101,6 +100,9 @@ def tensorflow_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, @@ -109,6 +111,8 @@ def tensorflow_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=train_table, num_lots=10, lots=tuple(range(8)), ) @@ -116,6 +120,8 @@ def tensorflow_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=valid_table, num_lots=10, lots="(8)", ) @@ -123,51 +129,34 @@ def tensorflow_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=test_table, num_lots=10, lots="(9)", ) - data_cleaning_query = generate_query( - queries_folder / "engineer_features.sql", - source_dataset=dataset_id, - source_table=train_table, - ) # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, - dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), + kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") - # exporting data to GCS from BQ split_train_data = ( - bq_query_to_table(query=split_train_query, table_id=train_table, **kwargs) + bq_query_to_table(query=split_train_query, **kwargs) .after(ingest) .set_display_name("Split train data") ) split_valid_data = ( - bq_query_to_table(query=split_valid_query, table_id=valid_table, **kwargs) + bq_query_to_table(query=split_valid_query, **kwargs) .after(ingest) .set_display_name("Split validation data") ) split_test_data = ( - bq_query_to_table(query=split_test_query, table_id=test_table, **kwargs) + bq_query_to_table(query=split_test_query, **kwargs) .after(ingest) .set_display_name("Split test data") ) - data_cleaning = ( - bq_query_to_table( - query=data_cleaning_query, table_id=preprocessed_table, **kwargs - ) - .after(split_train_data) - .set_display_name("Clean data") - ) # data extraction to gcs @@ -176,10 +165,10 @@ def tensorflow_pipeline( bq_client_project_id=project_id, source_project_id=project_id, dataset_id=dataset_id, - table_name=preprocessed_table, + table_name=train_table, dataset_location=dataset_location, ) - .after(data_cleaning) + .after(split_train_data) .set_display_name("Extract train data to storage") ).outputs["dataset"] valid_dataset = ( diff --git a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql index de9459fc..7f9f3b0b 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql @@ -14,26 +14,36 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill -with filter_start_values as ( - SELECT - IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value + +CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` + OPTIONS ( + description = 'Preprocessing Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( +WITH filter_start_values AS ( +SELECT + IF("{{ filter_start_value }}" = '', + CURRENT_DATETIME(), + CAST("{{ filter_start_value }}" AS DATETIME)) AS filter_start_value ) -- Ingest data between 2 and 3 months ago -,filtered_data as ( +,filtered_data AS ( SELECT * FROM `{{ source_dataset }}.{{ source_table }}`, filter_start_values WHERE DATE({{ filter_column }}) BETWEEN - DATE_SUB(DATE(CAST(filter_start_values.filter_start_value as DATETIME)), INTERVAL 3 MONTH) AND + DATE_SUB(DATE(CAST(filter_start_values.filter_start_value AS DATETIME)), INTERVAL 3 MONTH) AND DATE_SUB(DATE(filter_start_value), INTERVAL 2 MONTH) ) -- Use the average trip_seconds as a replacement for NULL or 0 values -,mean_time as ( +,mean_time AS ( SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds FROM filtered_data ) - SELECT CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, @@ -43,15 +53,16 @@ SELECT trip_miles, CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds WHEN trip_seconds <= 0 then m.avg_trip_seconds - ELSE trip_seconds + ELSE trip_seconds END AS FLOAT64) AS trip_seconds, payment_type, company, (fare + tips + tolls + extras) AS `{{ target_column }}`, -FROM filtered_data as t, mean_time as m +FROM filtered_data AS t, mean_time AS m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} +); diff --git a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql b/pipelines/src/pipelines/tensorflow/training/queries/sample.sql index bf47bbf6..be8458be 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/sample.sql @@ -1,20 +1,10 @@ --- Copyright 2022 Google LLC +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - -SELECT * +CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( +SELECT + * FROM - `{{ source_dataset }}.{{ source_table }}` AS t + `{{ source_dataset }}.{{ source_table }}` AS t WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }} + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + {{ num_lots }}) IN {{ lots }}); diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index ed474eb8..0c4504ec 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -29,9 +28,10 @@ def xgboost_pipeline( project_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"), model_name: str = "simple_xgboost", - dataset_id: str = "preprocessing", + preprocessing_dataset_id: str = "preprocessing", dataset_location: str = os.environ.get("VERTEX_LOCATION"), ingestion_dataset_id: str = "chicago_taxi_trips", + prediction_dataset_id: str = "prediction", timestamp: str = "2022-12-01 00:00:00", batch_prediction_machine_type: str = "n1-standard-4", batch_prediction_min_replicas: int = 3, @@ -50,14 +50,17 @@ def xgboost_pipeline( for ingestion. This can be the same as `project_id` if the source data is in the same project where the ML pipeline is executed. model_name (str): name of model - dataset_id (str): id of BQ dataset used to store all staging data & predictions + preprocessing_dataset_id (str): id of BQ dataset used to + store all staging data . + prediction_dataset_id (str): id of BQ dataset used to + store all predictions. dataset_location (str): location of dataset ingestion_dataset_id (str): dataset id of ingestion data timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format (YYYY-MM-DDThh:mm:ss.sss±hh:mm or YYYY-MM-DDThh:mm:ss). If any time part is missing, it will be regarded as zero. batch_prediction_machine_type (str): Machine type to be used for Vertex Batch - Prediction. Example machine_types - n1-standard-4, n1-standard-16 etc + Prediction. Example machine_types - n1-standard-4, n1-standard-16 etc. batch_prediction_min_replicas (int): Minimum no of machines to distribute the Vertex Batch Prediction job for horizontal scalability batch_prediction_max_replicas (int): Maximum no of machines to distribute the @@ -85,6 +88,10 @@ def xgboost_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", + preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, filter_start_value=timestamp, ) @@ -92,14 +99,11 @@ def xgboost_pipeline( # data ingestion and preprocessing operations kwargs = dict( bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" + ) # lookup champion model champion_model = ( @@ -114,8 +118,10 @@ def xgboost_pipeline( ) # batch predict from BigQuery to BigQuery - bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}" - bigquery_destination_output_uri = f"bq://{project_id}.{dataset_id}" + bigquery_source_input_uri = ( + f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}" + ) + bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}" batch_prediction = ( model_batch_predict( diff --git a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql b/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql index e01a7c57..bf8fe57a 100644 --- a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql @@ -14,6 +14,14 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill +CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` + OPTIONS ( + description = 'Prediction Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( with filter_start_values as ( SELECT IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value @@ -54,3 +62,4 @@ WHERE "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} + ); diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 27cb47ed..6c9ad9d9 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import pathlib @@ -99,6 +98,9 @@ def xgboost_pipeline( queries_folder / "ingest.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + ingested_table=ingested_table, + dataset_region=project_location, filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, @@ -107,6 +109,8 @@ def xgboost_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=train_table, num_lots=10, lots=tuple(range(8)), ) @@ -114,57 +118,43 @@ def xgboost_pipeline( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=valid_table, num_lots=10, lots="(8)", ) - data_cleaning_query = generate_query( - queries_folder / "engineer_features.sql", - source_dataset=dataset_id, - source_table=train_table, - ) split_test_query = generate_query( queries_folder / "sample.sql", source_dataset=dataset_id, source_table=ingested_table, + preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", + target_table=test_table, num_lots=10, lots="(9)", ) # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - destination_project_id=project_id, - dataset_id=dataset_id, - dataset_location=dataset_location, - query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")), + kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) + ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( + "Ingest data" ) - ingest = bq_query_to_table( - query=ingest_query, table_id=ingested_table, **kwargs - ).set_display_name("Ingest data") split_train_data = ( - bq_query_to_table(query=split_train_query, table_id=train_table, **kwargs) + bq_query_to_table(query=split_train_query, **kwargs) .after(ingest) .set_display_name("Split train data") ) split_valid_data = ( - bq_query_to_table(query=split_valid_query, table_id=valid_table, **kwargs) + bq_query_to_table(query=split_valid_query, **kwargs) .after(ingest) .set_display_name("Split validation data") ) split_test_data = ( - bq_query_to_table(query=split_test_query, table_id=test_table, **kwargs) + bq_query_to_table(query=split_test_query, **kwargs) .after(ingest) .set_display_name("Split test data") ) - data_cleaning = ( - bq_query_to_table( - query=data_cleaning_query, table_id=preprocessed_table, **kwargs - ) - .after(split_train_data) - .set_display_name("Clean data") - ) # data extraction to gcs @@ -173,10 +163,10 @@ def xgboost_pipeline( bq_client_project_id=project_id, source_project_id=project_id, dataset_id=dataset_id, - table_name=preprocessed_table, + table_name=train_table, dataset_location=dataset_location, ) - .after(data_cleaning) + .after(split_train_data) .set_display_name("Extract train data to storage") ).outputs["dataset"] valid_dataset = ( diff --git a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql index de9459fc..fbb6df32 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql @@ -14,26 +14,36 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill -with filter_start_values as ( - SELECT - IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value + +CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` + OPTIONS ( + description = 'Preprocessing Dataset', + location = "{{ dataset_region }}"); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( +WITH filter_start_values AS ( +SELECT + IF("{{ filter_start_value }}" = '', + CURRENT_DATETIME(), + CAST("{{ filter_start_value }}" AS DATETIME)) AS filter_start_value ) -- Ingest data between 2 and 3 months ago -,filtered_data as ( +,filtered_data AS ( SELECT * FROM `{{ source_dataset }}.{{ source_table }}`, filter_start_values WHERE DATE({{ filter_column }}) BETWEEN - DATE_SUB(DATE(CAST(filter_start_values.filter_start_value as DATETIME)), INTERVAL 3 MONTH) AND + DATE_SUB(DATE(CAST(filter_start_values.filter_start_value AS DATETIME)), INTERVAL 3 MONTH) AND DATE_SUB(DATE(filter_start_value), INTERVAL 2 MONTH) ) -- Use the average trip_seconds as a replacement for NULL or 0 values -,mean_time as ( +,mean_time AS ( SELECT CAST(avg(trip_seconds) AS INT64) as avg_trip_seconds FROM filtered_data ) - SELECT CAST(EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS FLOAT64) AS dayofweek, CAST(EXTRACT(HOUR FROM trip_start_timestamp) AS FLOAT64) AS hourofday, @@ -48,10 +58,11 @@ SELECT payment_type, company, (fare + tips + tolls + extras) AS `{{ target_column }}`, -FROM filtered_data as t, mean_time as m +FROM filtered_data AS t, mean_time AS m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} AND `{{ field }}` IS NOT NULL {% endfor %} +); diff --git a/pipelines/src/pipelines/xgboost/training/queries/sample.sql b/pipelines/src/pipelines/xgboost/training/queries/sample.sql index bb2dc9ce..be8458be 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/sample.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/sample.sql @@ -1,6 +1,10 @@ -SELECT * +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( +SELECT + * FROM - `{{ source_dataset }}.{{ source_table }}` AS t + `{{ source_dataset }}.{{ source_table }}` AS t WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }} + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + {{ num_lots }}) IN {{ lots }}); From 299f834cedb6e6da3862a29f52d7abd44032c175 Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Tue, 16 May 2023 13:53:18 +0200 Subject: [PATCH 05/10] refactor: replace custom BQ component with BigqueryQueryJobOp --- .../src/bigquery_components/__init__.py | 2 - .../bigquery_components/bq_query_to_table.py | 57 ---------------- pipelines/src/pipelines/__init__.py | 8 ++- .../tensorflow/prediction/pipeline.py | 12 ++-- .../pipelines/tensorflow/training/pipeline.py | 66 ++++--------------- .../training/queries/engineer_features.sql | 18 ----- .../tensorflow/training/queries/ingest.sql | 50 +++++++++----- .../tensorflow/training/queries/sample.sql | 10 --- .../pipelines/xgboost/prediction/pipeline.py | 13 ++-- .../pipelines/xgboost/training/pipeline.py | 66 ++++--------------- .../training/queries/engineer_features.sql | 4 -- .../xgboost/training/queries/ingest.sql | 50 +++++++++----- .../xgboost/training/queries/sample.sql | 10 --- 13 files changed, 107 insertions(+), 259 deletions(-) delete mode 100644 components/bigquery-components/src/bigquery_components/bq_query_to_table.py delete mode 100644 pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql delete mode 100644 pipelines/src/pipelines/tensorflow/training/queries/sample.sql delete mode 100644 pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql delete mode 100644 pipelines/src/pipelines/xgboost/training/queries/sample.sql diff --git a/components/bigquery-components/src/bigquery_components/__init__.py b/components/bigquery-components/src/bigquery_components/__init__.py index 2e3870ac..e979abab 100644 --- a/components/bigquery-components/src/bigquery_components/__init__.py +++ b/components/bigquery-components/src/bigquery_components/__init__.py @@ -1,9 +1,7 @@ -from .bq_query_to_table import bq_query_to_table from .extract_bq_to_dataset import extract_bq_to_dataset __version__ = "0.0.1" __all__ = [ - "bq_query_to_table", "extract_bq_to_dataset", ] diff --git a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py b/components/bigquery-components/src/bigquery_components/bq_query_to_table.py deleted file mode 100644 index 4b6afc29..00000000 --- a/components/bigquery-components/src/bigquery_components/bq_query_to_table.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2022 Google LLC - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# https://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from kfp.v2.dsl import component - - -@component( - base_image="python:3.7", - packages_to_install=["google-cloud-bigquery==2.30.0"], -) -def bq_query_to_table( - query: str, - bq_client_project_id: str, - dataset_location: str = "EU", -) -> None: - """ - Run query & create a new BigQuery table - Args: - query (str): SQL query to execute, results are saved in a BigQuery table - bq_client_project_id (str): project id that will be used by the bq client - dataset_location (str): bq dataset location - required by the bq query operation. No need to specify destination param - Returns: - None - """ - from google.cloud.exceptions import GoogleCloudError - from google.cloud import bigquery - import logging - - logging.getLogger().setLevel(logging.INFO) - - job_config = bigquery.QueryJobConfig() - - bq_client = bigquery.client.Client( - project=bq_client_project_id, location=dataset_location - ) - query_job = bq_client.query(query, job_config=job_config) - - try: - result = query_job.result() - logging.info(f"BQ Job finished") - except GoogleCloudError as e: - logging.error(e) - logging.error(query_job.error_result) - logging.error(query_job.errors) - raise e diff --git a/pipelines/src/pipelines/__init__.py b/pipelines/src/pipelines/__init__.py index 113da917..badd535e 100644 --- a/pipelines/src/pipelines/__init__.py +++ b/pipelines/src/pipelines/__init__.py @@ -30,4 +30,10 @@ def generate_query(input_file: Path, **replacements) -> str: with open(input_file, "r") as f: query_template = f.read() - return Template(query_template).render(**replacements) + # Render the template with the provided replacements + query = Template(query_template).render(**replacements) + + # Escape double quotes, newline and tab characters + query = query.replace('"', '\\"').replace("\n", "\\n").replace("\t", "\\t") + + return query diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index 3a9fdb02..60ad3b47 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -15,10 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table from vertex_components import lookup_model, model_batch_predict @@ -101,13 +101,9 @@ def tensorflow_pipeline( ) # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - dataset_location=dataset_location, - ) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # lookup champion model champion_model = ( diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index 56aea64c..ba98bea1 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -15,9 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table, extract_bq_to_dataset +from bigquery_components import extract_bq_to_dataset from vertex_components import ( lookup_model, custom_train_job, @@ -106,57 +107,14 @@ def tensorflow_pipeline( filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, + train_table=train_table, + validation_table=valid_table, + test_table=test_table, ) - split_train_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=train_table, - num_lots=10, - lots=tuple(range(8)), - ) - split_valid_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=valid_table, - num_lots=10, - lots="(8)", - ) - split_test_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=test_table, - num_lots=10, - lots="(9)", - ) - - # data ingestion and preprocessing operations - kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) - - split_train_data = ( - bq_query_to_table(query=split_train_query, **kwargs) - .after(ingest) - .set_display_name("Split train data") - ) - split_valid_data = ( - bq_query_to_table(query=split_valid_query, **kwargs) - .after(ingest) - .set_display_name("Split validation data") - ) - split_test_data = ( - bq_query_to_table(query=split_test_query, **kwargs) - .after(ingest) - .set_display_name("Split test data") - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # data extraction to gcs @@ -168,8 +126,9 @@ def tensorflow_pipeline( table_name=train_table, dataset_location=dataset_location, ) - .after(split_train_data) + .after(ingest) .set_display_name("Extract train data to storage") + .set_caching_options(False) ).outputs["dataset"] valid_dataset = ( extract_bq_to_dataset( @@ -179,8 +138,9 @@ def tensorflow_pipeline( table_name=valid_table, dataset_location=dataset_location, ) - .after(split_valid_data) + .after(ingest) .set_display_name("Extract validation data to storage") + .set_caching_options(False) ).outputs["dataset"] test_dataset = ( extract_bq_to_dataset( @@ -191,7 +151,7 @@ def tensorflow_pipeline( dataset_location=dataset_location, destination_gcs_uri=test_dataset_uri, ) - .after(split_test_data) + .after(ingest) .set_display_name("Extract test data to storage") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql b/pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql deleted file mode 100644 index b8094a35..00000000 --- a/pipelines/src/pipelines/tensorflow/training/queries/engineer_features.sql +++ /dev/null @@ -1,18 +0,0 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - -/* The Purpose of this query is to clean the data before the training*/ -SELECT - * -FROM `{{ source_dataset }}.{{ source_table }}` diff --git a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql index 7f9f3b0b..b3a31606 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql @@ -1,20 +1,3 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead --- This allows us to set the filter_start_value to a specific time for testing or for backfill - CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', @@ -66,3 +49,36 @@ WHERE AND `{{ field }}` IS NOT NULL {% endfor %} ); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (0, 1, 2, 3, 4, 5, 6, 7)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ validation_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ validation_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (8)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ test_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ test_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (9)); diff --git a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql b/pipelines/src/pipelines/tensorflow/training/queries/sample.sql deleted file mode 100644 index be8458be..00000000 --- a/pipelines/src/pipelines/tensorflow/training/queries/sample.sql +++ /dev/null @@ -1,10 +0,0 @@ -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( -SELECT - * -FROM - `{{ source_dataset }}.{{ source_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }}); diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index 0c4504ec..7fdea71c 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -15,10 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table from vertex_components import lookup_model, model_batch_predict @@ -96,14 +96,9 @@ def xgboost_pipeline( filter_start_value=timestamp, ) - # data ingestion and preprocessing operations - kwargs = dict( - bq_client_project_id=project_id, - dataset_location=dataset_location, - ) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # lookup champion model champion_model = ( diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 6c9ad9d9..385e41ab 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -15,9 +15,10 @@ import os import pathlib +from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp from kfp.v2 import compiler, dsl from pipelines import generate_query -from bigquery_components import bq_query_to_table, extract_bq_to_dataset +from bigquery_components import extract_bq_to_dataset from vertex_components import ( lookup_model, custom_train_job, @@ -104,57 +105,14 @@ def xgboost_pipeline( filter_column=time_column, target_column=label_column_name, filter_start_value=timestamp, + train_table=train_table, + validation_table=valid_table, + test_table=test_table, ) - split_train_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=train_table, - num_lots=10, - lots=tuple(range(8)), - ) - split_valid_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=valid_table, - num_lots=10, - lots="(8)", - ) - split_test_query = generate_query( - queries_folder / "sample.sql", - source_dataset=dataset_id, - source_table=ingested_table, - preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", - target_table=test_table, - num_lots=10, - lots="(9)", - ) - - # data ingestion and preprocessing operations - kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location) - ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name( - "Ingest data" - ) - - split_train_data = ( - bq_query_to_table(query=split_train_query, **kwargs) - .after(ingest) - .set_display_name("Split train data") - ) - split_valid_data = ( - bq_query_to_table(query=split_valid_query, **kwargs) - .after(ingest) - .set_display_name("Split validation data") - ) - split_test_data = ( - bq_query_to_table(query=split_test_query, **kwargs) - .after(ingest) - .set_display_name("Split test data") - ) + ingest = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=ingest_query + ).set_display_name("Ingest data") # data extraction to gcs @@ -166,8 +124,9 @@ def xgboost_pipeline( table_name=train_table, dataset_location=dataset_location, ) - .after(split_train_data) + .after(ingest) .set_display_name("Extract train data to storage") + .set_caching_options(False) ).outputs["dataset"] valid_dataset = ( extract_bq_to_dataset( @@ -177,8 +136,9 @@ def xgboost_pipeline( table_name=valid_table, dataset_location=dataset_location, ) - .after(split_valid_data) + .after(ingest) .set_display_name("Extract validation data to storage") + .set_caching_options(False) ).outputs["dataset"] test_dataset = ( extract_bq_to_dataset( @@ -189,7 +149,7 @@ def xgboost_pipeline( dataset_location=dataset_location, destination_gcs_uri=test_dataset_uri, ) - .after(split_test_data) + .after(ingest) .set_display_name("Extract test data to storage") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql b/pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql deleted file mode 100644 index 148d9ce7..00000000 --- a/pipelines/src/pipelines/xgboost/training/queries/engineer_features.sql +++ /dev/null @@ -1,4 +0,0 @@ -/* The Purpose of this query is to clean the data before the training*/ -SELECT - * -FROM `{{ source_dataset }}.{{ source_table }}` diff --git a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql index fbb6df32..10ecf037 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/ingest.sql @@ -1,20 +1,3 @@ --- Copyright 2022 Google LLC - --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at - --- https://www.apache.org/licenses/LICENSE-2.0 - --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. - --- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead --- This allows us to set the filter_start_value to a specific time for testing or for backfill - CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', @@ -66,3 +49,36 @@ WHERE AND `{{ field }}` IS NOT NULL {% endfor %} ); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (0, 1, 2, 3, 4, 5, 6, 7)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ validation_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ validation_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (8)); + +DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ test_table }}`; + +CREATE TABLE `{{ preprocessing_dataset }}.{{ test_table }}` AS ( +SELECT + * +FROM + `{{ preprocessing_dataset }}.{{ ingested_table }}` AS t +WHERE + MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), + 10) IN (9)); diff --git a/pipelines/src/pipelines/xgboost/training/queries/sample.sql b/pipelines/src/pipelines/xgboost/training/queries/sample.sql deleted file mode 100644 index be8458be..00000000 --- a/pipelines/src/pipelines/xgboost/training/queries/sample.sql +++ /dev/null @@ -1,10 +0,0 @@ -DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ target_table }}`; - -CREATE TABLE `{{ preprocessing_dataset }}.{{ target_table }}` AS ( -SELECT - * -FROM - `{{ source_dataset }}.{{ source_table }}` AS t -WHERE - MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))), - {{ num_lots }}) IN {{ lots }}); From 8a7e368e139c625e9456675a551bf1a1c6e1710c Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Thu, 18 May 2023 12:53:00 +0200 Subject: [PATCH 06/10] docs: rename files and add comments --- .../pipelines/tensorflow/prediction/queries/ingest.sql | 4 ++++ pipelines/src/pipelines/tensorflow/training/pipeline.py | 8 ++++---- .../training/queries/{ingest.sql => preprocessing.sql} | 4 ++++ .../src/pipelines/xgboost/prediction/queries/ingest.sql | 4 ++++ pipelines/src/pipelines/xgboost/training/pipeline.py | 8 ++++---- .../training/queries/{ingest.sql => preprocessing.sql} | 4 ++++ 6 files changed, 24 insertions(+), 8 deletions(-) rename pipelines/src/pipelines/tensorflow/training/queries/{ingest.sql => preprocessing.sql} (92%) rename pipelines/src/pipelines/xgboost/training/queries/{ingest.sql => preprocessing.sql} (92%) diff --git a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql index 5a44d671..010cfaba 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql @@ -14,11 +14,15 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill + +-- If prediction dataset don't exist, create it CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` OPTIONS ( description = 'Prediction Dataset', location = "{{ dataset_region }}"); +-- We recreate the ingestion table every time the pipeline run, +-- so we need to drop the generated in the previous run DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index ba98bea1..bfeaef8b 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -98,7 +98,7 @@ def tensorflow_pipeline( queries_folder = pathlib.Path(__file__).parent / "queries" ingest_query = generate_query( - queries_folder / "ingest.sql", + queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", @@ -127,7 +127,7 @@ def tensorflow_pipeline( dataset_location=dataset_location, ) .after(ingest) - .set_display_name("Extract train data to storage") + .set_display_name("Extract train") .set_caching_options(False) ).outputs["dataset"] valid_dataset = ( @@ -139,7 +139,7 @@ def tensorflow_pipeline( dataset_location=dataset_location, ) .after(ingest) - .set_display_name("Extract validation data to storage") + .set_display_name("Extract validation data") .set_caching_options(False) ).outputs["dataset"] test_dataset = ( @@ -152,7 +152,7 @@ def tensorflow_pipeline( destination_gcs_uri=test_dataset_uri, ) .after(ingest) - .set_display_name("Extract test data to storage") + .set_display_name("Extract test data") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql similarity index 92% rename from pipelines/src/pipelines/tensorflow/training/queries/ingest.sql rename to pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql index b3a31606..d1644958 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql @@ -1,8 +1,11 @@ +-- If preprocessing dataset don't exist, create it CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', location = "{{ dataset_region }}"); +-- We recreate the ingestion table every time the pipeline run, +-- so we need to drop the generated in the previous run DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( @@ -50,6 +53,7 @@ WHERE {% endfor %} ); +-- Drop and creation of train, testing and validations tables DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( diff --git a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql b/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql index bf8fe57a..d6cf4ada 100644 --- a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql @@ -14,11 +14,15 @@ -- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill + +-- If prediction dataset don't exist, create it CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` OPTIONS ( description = 'Prediction Dataset', location = "{{ dataset_region }}"); +-- We recreate the ingestion table every time the pipeline run, +-- so we need to drop the generated in the previous run DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 385e41ab..d0a303b2 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -96,7 +96,7 @@ def xgboost_pipeline( queries_folder = pathlib.Path(__file__).parent / "queries" ingest_query = generate_query( - queries_folder / "ingest.sql", + queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}", @@ -125,7 +125,7 @@ def xgboost_pipeline( dataset_location=dataset_location, ) .after(ingest) - .set_display_name("Extract train data to storage") + .set_display_name("Extract train data") .set_caching_options(False) ).outputs["dataset"] valid_dataset = ( @@ -137,7 +137,7 @@ def xgboost_pipeline( dataset_location=dataset_location, ) .after(ingest) - .set_display_name("Extract validation data to storage") + .set_display_name("Extract validation data") .set_caching_options(False) ).outputs["dataset"] test_dataset = ( @@ -150,7 +150,7 @@ def xgboost_pipeline( destination_gcs_uri=test_dataset_uri, ) .after(ingest) - .set_display_name("Extract test data to storage") + .set_display_name("Extract test data") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql b/pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql similarity index 92% rename from pipelines/src/pipelines/xgboost/training/queries/ingest.sql rename to pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql index 10ecf037..3672262f 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql @@ -1,8 +1,11 @@ +-- If preprocessing dataset don't exist, create it CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', location = "{{ dataset_region }}"); +-- We recreate the ingestion table every time the pipeline run, +-- so we need to drop the generated in the previous run DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( @@ -50,6 +53,7 @@ WHERE {% endfor %} ); +-- Drop and creation of train, testing and validations tables DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS ( From c087024d19dde1ff44b220cbcb03eb5aca27332c Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Fri, 19 May 2023 23:23:45 +0200 Subject: [PATCH 07/10] fix: replace quotes in queries --- pipelines/src/pipelines/__init__.py | 5 +---- .../pipelines/tensorflow/prediction/pipeline.py | 2 +- .../queries/{ingest.sql => preprocessing.sql} | 14 +++++++------- .../tensorflow/training/queries/preprocessing.sql | 10 +++++----- .../src/pipelines/xgboost/prediction/pipeline.py | 2 +- .../queries/{ingest.sql => preprocessing.sql} | 14 +++++++------- .../xgboost/training/queries/preprocessing.sql | 10 +++++----- 7 files changed, 27 insertions(+), 30 deletions(-) rename pipelines/src/pipelines/tensorflow/prediction/queries/{ingest.sql => preprocessing.sql} (81%) rename pipelines/src/pipelines/xgboost/prediction/queries/{ingest.sql => preprocessing.sql} (81%) diff --git a/pipelines/src/pipelines/__init__.py b/pipelines/src/pipelines/__init__.py index badd535e..8eeeff45 100644 --- a/pipelines/src/pipelines/__init__.py +++ b/pipelines/src/pipelines/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json from pathlib import Path from jinja2 import Template @@ -33,7 +33,4 @@ def generate_query(input_file: Path, **replacements) -> str: # Render the template with the provided replacements query = Template(query_template).render(**replacements) - # Escape double quotes, newline and tab characters - query = query.replace('"', '\\"').replace("\n", "\\n").replace("\t", "\\t") - return query diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index 60ad3b47..8d6344ac 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -89,7 +89,7 @@ def tensorflow_pipeline( queries_folder = pathlib.Path(__file__).parent / "queries" ingest_query = generate_query( - queries_folder / "ingest.sql", + queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", diff --git a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql b/pipelines/src/pipelines/tensorflow/prediction/queries/preprocessing.sql similarity index 81% rename from pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql rename to pipelines/src/pipelines/tensorflow/prediction/queries/preprocessing.sql index 010cfaba..3e25a603 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/tensorflow/prediction/queries/preprocessing.sql @@ -1,25 +1,25 @@ -- Copyright 2022 Google LLC --- Licensed under the Apache License, Version 2.0 (the "License"); +-- Licensed under the Apache License, Version 2.0 (the 'License'); -- you may not use this file except in compliance with the License. -- You may obtain a copy of the License at -- https://www.apache.org/licenses/LICENSE-2.0 -- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, +-- distributed under the License is distributed on an 'AS IS' BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. --- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead +-- Treat 'filter_start_value' as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill -- If prediction dataset don't exist, create it CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` OPTIONS ( description = 'Prediction Dataset', - location = "{{ dataset_region }}"); + location = '{{ dataset_region }}'); -- We recreate the ingestion table every time the pipeline run, -- so we need to drop the generated in the previous run @@ -28,7 +28,7 @@ DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( with filter_start_values as ( SELECT - IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value + IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), CAST('{{ filter_start_value }}' AS DATETIME)) as filter_start_value ) -- Ingest data between 2 and 3 months ago ,filtered_data as ( @@ -62,8 +62,8 @@ SELECT FROM filtered_data as t, mean_time as m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 - {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", - "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} + {% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude', + 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %} AND `{{ field }}` IS NOT NULL {% endfor %} ); diff --git a/pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql b/pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql index d1644958..0182aac2 100644 --- a/pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql +++ b/pipelines/src/pipelines/tensorflow/training/queries/preprocessing.sql @@ -2,7 +2,7 @@ CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', - location = "{{ dataset_region }}"); + location = '{{ dataset_region }}'); -- We recreate the ingestion table every time the pipeline run, -- so we need to drop the generated in the previous run @@ -11,9 +11,9 @@ DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( WITH filter_start_values AS ( SELECT - IF("{{ filter_start_value }}" = '', + IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), - CAST("{{ filter_start_value }}" AS DATETIME)) AS filter_start_value + CAST('{{ filter_start_value }}' AS DATETIME)) AS filter_start_value ) -- Ingest data between 2 and 3 months ago ,filtered_data AS ( @@ -47,8 +47,8 @@ SELECT FROM filtered_data AS t, mean_time AS m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 - {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", - "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} + {% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude', + 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %} AND `{{ field }}` IS NOT NULL {% endfor %} ); diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index 7fdea71c..4cd796c1 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -85,7 +85,7 @@ def xgboost_pipeline( queries_folder = pathlib.Path(__file__).parent / "queries" ingest_query = generate_query( - queries_folder / "ingest.sql", + queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}", diff --git a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql b/pipelines/src/pipelines/xgboost/prediction/queries/preprocessing.sql similarity index 81% rename from pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql rename to pipelines/src/pipelines/xgboost/prediction/queries/preprocessing.sql index d6cf4ada..294223a4 100644 --- a/pipelines/src/pipelines/xgboost/prediction/queries/ingest.sql +++ b/pipelines/src/pipelines/xgboost/prediction/queries/preprocessing.sql @@ -1,25 +1,25 @@ -- Copyright 2022 Google LLC --- Licensed under the Apache License, Version 2.0 (the "License"); +-- Licensed under the Apache License, Version 2.0 (the 'License'); -- you may not use this file except in compliance with the License. -- You may obtain a copy of the License at -- https://www.apache.org/licenses/LICENSE-2.0 -- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, +-- distributed under the License is distributed on an 'AS IS' BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. --- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead +-- Treat 'filter_start_value' as the current time, unless it is empty then use CURRENT_DATETIME() instead -- This allows us to set the filter_start_value to a specific time for testing or for backfill -- If prediction dataset don't exist, create it CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}` OPTIONS ( description = 'Prediction Dataset', - location = "{{ dataset_region }}"); + location = '{{ dataset_region }}'); -- We recreate the ingestion table every time the pipeline run, -- so we need to drop the generated in the previous run @@ -28,7 +28,7 @@ DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( with filter_start_values as ( SELECT - IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value + IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), CAST('{{ filter_start_value }}' AS DATETIME)) as filter_start_value ) -- Ingest data between 2 and 3 months ago ,filtered_data as ( @@ -62,8 +62,8 @@ SELECT FROM filtered_data as t, mean_time as m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 - {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", - "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} + {% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude', + 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %} AND `{{ field }}` IS NOT NULL {% endfor %} ); diff --git a/pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql b/pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql index 3672262f..a7291038 100644 --- a/pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql +++ b/pipelines/src/pipelines/xgboost/training/queries/preprocessing.sql @@ -2,7 +2,7 @@ CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}` OPTIONS ( description = 'Preprocessing Dataset', - location = "{{ dataset_region }}"); + location = '{{ dataset_region }}'); -- We recreate the ingestion table every time the pipeline run, -- so we need to drop the generated in the previous run @@ -11,9 +11,9 @@ DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`; CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS ( WITH filter_start_values AS ( SELECT - IF("{{ filter_start_value }}" = '', + IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), - CAST("{{ filter_start_value }}" AS DATETIME)) AS filter_start_value + CAST('{{ filter_start_value }}' AS DATETIME)) AS filter_start_value ) -- Ingest data between 2 and 3 months ago ,filtered_data AS ( @@ -47,8 +47,8 @@ SELECT FROM filtered_data AS t, mean_time AS m WHERE trip_miles > 0 AND fare > 0 AND fare < 1500 - {% for field in ["fare", "trip_start_timestamp", "pickup_longitude", - "pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %} + {% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude', + 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %} AND `{{ field }}` IS NOT NULL {% endfor %} ); From e8b492f2343db6144d534d92da851d71c8ee150c Mon Sep 17 00:00:00 2001 From: ariadnafer Date: Sat, 20 May 2023 23:05:29 +0200 Subject: [PATCH 08/10] refactor: rename queries and pipeline steps --- pipelines/src/pipelines/__init__.py | 7 ++----- .../src/pipelines/tensorflow/prediction/pipeline.py | 8 ++++---- .../src/pipelines/tensorflow/training/pipeline.py | 12 ++++++------ .../src/pipelines/xgboost/prediction/pipeline.py | 8 ++++---- pipelines/src/pipelines/xgboost/training/pipeline.py | 12 ++++++------ 5 files changed, 22 insertions(+), 25 deletions(-) diff --git a/pipelines/src/pipelines/__init__.py b/pipelines/src/pipelines/__init__.py index 8eeeff45..113da917 100644 --- a/pipelines/src/pipelines/__init__.py +++ b/pipelines/src/pipelines/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json + from pathlib import Path from jinja2 import Template @@ -30,7 +30,4 @@ def generate_query(input_file: Path, **replacements) -> str: with open(input_file, "r") as f: query_template = f.read() - # Render the template with the provided replacements - query = Template(query_template).render(**replacements) - - return query + return Template(query_template).render(**replacements) diff --git a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py index 8d6344ac..7290a4c9 100644 --- a/pipelines/src/pipelines/tensorflow/prediction/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/prediction/pipeline.py @@ -88,7 +88,7 @@ def tensorflow_pipeline( # operations queries_folder = pathlib.Path(__file__).parent / "queries" - ingest_query = generate_query( + preprocessing_query = generate_query( queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, @@ -101,8 +101,8 @@ def tensorflow_pipeline( ) # data ingestion and preprocessing operations - ingest = BigqueryQueryJobOp( - project=project_id, location=dataset_location, query=ingest_query + preprocessing = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=preprocessing_query ).set_display_name("Ingest data") # lookup champion model @@ -143,7 +143,7 @@ def tensorflow_pipeline( monitoring_skew_config=monitoring_skew_config, instance_config=instance_config, ) - .after(ingest) + .after(preprocessing) .set_display_name("Batch prediction job") ) diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index bfeaef8b..c59a6bd3 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -97,7 +97,7 @@ def tensorflow_pipeline( queries_folder = pathlib.Path(__file__).parent / "queries" - ingest_query = generate_query( + preprocessing_query = generate_query( queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, @@ -112,8 +112,8 @@ def tensorflow_pipeline( test_table=test_table, ) - ingest = BigqueryQueryJobOp( - project=project_id, location=dataset_location, query=ingest_query + preprocessing = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=preprocessing_query ).set_display_name("Ingest data") # data extraction to gcs @@ -126,7 +126,7 @@ def tensorflow_pipeline( table_name=train_table, dataset_location=dataset_location, ) - .after(ingest) + .after(preprocessing) .set_display_name("Extract train") .set_caching_options(False) ).outputs["dataset"] @@ -138,7 +138,7 @@ def tensorflow_pipeline( table_name=valid_table, dataset_location=dataset_location, ) - .after(ingest) + .after(preprocessing) .set_display_name("Extract validation data") .set_caching_options(False) ).outputs["dataset"] @@ -151,7 +151,7 @@ def tensorflow_pipeline( dataset_location=dataset_location, destination_gcs_uri=test_dataset_uri, ) - .after(ingest) + .after(preprocessing) .set_display_name("Extract test data") .set_caching_options(False) ).outputs["dataset"] diff --git a/pipelines/src/pipelines/xgboost/prediction/pipeline.py b/pipelines/src/pipelines/xgboost/prediction/pipeline.py index 4cd796c1..5592a121 100644 --- a/pipelines/src/pipelines/xgboost/prediction/pipeline.py +++ b/pipelines/src/pipelines/xgboost/prediction/pipeline.py @@ -84,7 +84,7 @@ def xgboost_pipeline( # operations queries_folder = pathlib.Path(__file__).parent / "queries" - ingest_query = generate_query( + preprocessing_query = generate_query( queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, @@ -96,8 +96,8 @@ def xgboost_pipeline( filter_start_value=timestamp, ) - ingest = BigqueryQueryJobOp( - project=project_id, location=dataset_location, query=ingest_query + preprocessing = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=preprocessing_query ).set_display_name("Ingest data") # lookup champion model @@ -135,7 +135,7 @@ def xgboost_pipeline( monitoring_alert_email_addresses=monitoring_alert_email_addresses, monitoring_skew_config=monitoring_skew_config, ) - .after(ingest) + .after(preprocessing) .set_display_name("Batch prediction job") ) diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index d0a303b2..0befe4db 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -95,7 +95,7 @@ def xgboost_pipeline( queries_folder = pathlib.Path(__file__).parent / "queries" - ingest_query = generate_query( + preprocessing_query = generate_query( queries_folder / "preprocessing.sql", source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}", source_table=ingestion_table, @@ -110,8 +110,8 @@ def xgboost_pipeline( test_table=test_table, ) - ingest = BigqueryQueryJobOp( - project=project_id, location=dataset_location, query=ingest_query + preprocessing = BigqueryQueryJobOp( + project=project_id, location=dataset_location, query=preprocessing_query ).set_display_name("Ingest data") # data extraction to gcs @@ -124,7 +124,7 @@ def xgboost_pipeline( table_name=train_table, dataset_location=dataset_location, ) - .after(ingest) + .after(preprocessing) .set_display_name("Extract train data") .set_caching_options(False) ).outputs["dataset"] @@ -136,7 +136,7 @@ def xgboost_pipeline( table_name=valid_table, dataset_location=dataset_location, ) - .after(ingest) + .after(preprocessing) .set_display_name("Extract validation data") .set_caching_options(False) ).outputs["dataset"] @@ -149,7 +149,7 @@ def xgboost_pipeline( dataset_location=dataset_location, destination_gcs_uri=test_dataset_uri, ) - .after(ingest) + .after(preprocessing) .set_display_name("Extract test data") .set_caching_options(False) ).outputs["dataset"] From e0b5351ca2592273ba3847606df4a4462c980ba0 Mon Sep 17 00:00:00 2001 From: Felix Schaumann <89205956+felix-datatonic@users.noreply.github.com> Date: Mon, 22 May 2023 09:08:15 +0200 Subject: [PATCH 09/10] Update pipelines/src/pipelines/tensorflow/training/pipeline.py --- pipelines/src/pipelines/tensorflow/training/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/src/pipelines/tensorflow/training/pipeline.py b/pipelines/src/pipelines/tensorflow/training/pipeline.py index c59a6bd3..f6e70525 100644 --- a/pipelines/src/pipelines/tensorflow/training/pipeline.py +++ b/pipelines/src/pipelines/tensorflow/training/pipeline.py @@ -114,7 +114,7 @@ def tensorflow_pipeline( preprocessing = BigqueryQueryJobOp( project=project_id, location=dataset_location, query=preprocessing_query - ).set_display_name("Ingest data") + ).set_display_name("Ingest & preprocess data") # data extraction to gcs From ab771688758a5d3584ab18114a3396396fe8bb4a Mon Sep 17 00:00:00 2001 From: Felix Schaumann <89205956+felix-datatonic@users.noreply.github.com> Date: Mon, 22 May 2023 09:08:20 +0200 Subject: [PATCH 10/10] Update pipelines/src/pipelines/xgboost/training/pipeline.py --- pipelines/src/pipelines/xgboost/training/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/src/pipelines/xgboost/training/pipeline.py b/pipelines/src/pipelines/xgboost/training/pipeline.py index 0befe4db..6737a627 100644 --- a/pipelines/src/pipelines/xgboost/training/pipeline.py +++ b/pipelines/src/pipelines/xgboost/training/pipeline.py @@ -112,7 +112,7 @@ def xgboost_pipeline( preprocessing = BigqueryQueryJobOp( project=project_id, location=dataset_location, query=preprocessing_query - ).set_display_name("Ingest data") + ).set_display_name("Ingest & preprocess data") # data extraction to gcs