Skip to content

Commit

Permalink
Merge pull request #12 from teamdatatonic/feature/enable-multiquery
Browse files Browse the repository at this point in the history
feat: enable multiquery
  • Loading branch information
ariadnafer authored May 22, 2023
2 parents 0a2f77a + ab77168 commit fe3d9d9
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 436 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from .bq_query_to_table import bq_query_to_table
from .extract_bq_to_dataset import extract_bq_to_dataset


__version__ = "0.0.1"
__all__ = [
"bq_query_to_table",
"extract_bq_to_dataset",
]

This file was deleted.

40 changes: 20 additions & 20 deletions pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl

from pipelines import generate_query
from bigquery_components import bq_query_to_table
from vertex_components import lookup_model, model_batch_predict


Expand All @@ -29,9 +28,10 @@ def tensorflow_pipeline(
project_location: str = os.environ.get("VERTEX_LOCATION"),
ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"),
model_name: str = "simple_tensorflow",
dataset_id: str = "preprocessing",
preprocessing_dataset_id: str = "preprocessing",
dataset_location: str = os.environ.get("VERTEX_LOCATION"),
ingestion_dataset_id: str = "chicago_taxi_trips",
prediction_dataset_id: str = "prediction",
timestamp: str = "2022-12-01 00:00:00",
batch_prediction_machine_type: str = "n1-standard-4",
batch_prediction_min_replicas: int = 3,
Expand All @@ -49,13 +49,14 @@ def tensorflow_pipeline(
Args:
project_id (str): project id of the Google Cloud project
project_location (str): location of the Google Cloud project
pipeline_files_gcs_path (str): GCS path where the pipeline files are located
ingestion_project_id (str): project id containing the source bigquery data
for ingestion. This can be the same as `project_id` if the source data is
in the same project where the ML pipeline is executed.
model_name (str): name of model
model_label (str): label of model
dataset_id (str): id of BQ dataset used to store all staging data & predictions
preprocessing_dataset_id (str): id of BQ dataset used to
store all staging data .
prediction_dataset_id (str): id of BQ dataset used to
store all predictions.
dataset_location (str): location of dataset
ingestion_dataset_id (str): dataset id of ingestion data
timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format
Expand Down Expand Up @@ -87,24 +88,21 @@ def tensorflow_pipeline(
# operations
queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
queries_folder / "ingest.sql",
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}",
preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}",
ingested_table=ingested_table,
dataset_region=project_location,
filter_column=time_column,
filter_start_value=timestamp,
)

# data ingestion and preprocessing operations
kwargs = dict(
bq_client_project_id=project_id,
destination_project_id=project_id,
dataset_id=dataset_id,
dataset_location=dataset_location,
query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")),
)
ingest = bq_query_to_table(
query=ingest_query, table_id=ingested_table, **kwargs
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")

# lookup champion model
Expand All @@ -120,8 +118,10 @@ def tensorflow_pipeline(
)

# batch predict from BigQuery to BigQuery
bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}"
bigquery_destination_output_uri = f"bq://{project_id}.{dataset_id}"
bigquery_source_input_uri = (
f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}"
)
bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}"
instance_config = {"instanceType": "object"}

# predict data
Expand All @@ -143,7 +143,7 @@ def tensorflow_pipeline(
monitoring_skew_config=monitoring_skew_config,
instance_config=instance_config,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Batch prediction job")
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
-- Copyright 2022 Google LLC

-- Licensed under the Apache License, Version 2.0 (the "License");
-- Licensed under the Apache License, Version 2.0 (the 'License');
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at

-- https://www.apache.org/licenses/LICENSE-2.0

-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- distributed under the License is distributed on an 'AS IS' BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead
-- Treat 'filter_start_value' as the current time, unless it is empty then use CURRENT_DATETIME() instead
-- This allows us to set the filter_start_value to a specific time for testing or for backfill

-- If prediction dataset don't exist, create it
CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}`
OPTIONS (
description = 'Prediction Dataset',
location = '{{ dataset_region }}');

-- We recreate the ingestion table every time the pipeline run,
-- so we need to drop the generated in the previous run
DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`;

CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS (
with filter_start_values as (
SELECT
IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value
SELECT
IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), CAST('{{ filter_start_value }}' AS DATETIME)) as filter_start_value
)
-- Ingest data between 2 and 3 months ago
,filtered_data as (
Expand All @@ -43,14 +55,15 @@ SELECT
trip_miles,
CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds
WHEN trip_seconds <= 0 then m.avg_trip_seconds
ELSE trip_seconds
ELSE trip_seconds
END AS FLOAT64) AS trip_seconds,
payment_type,
company,
FROM filtered_data as t, mean_time as m
WHERE
trip_miles > 0 AND fare > 0 AND fare < 1500
{% for field in ["fare", "trip_start_timestamp", "pickup_longitude",
"pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %}
{% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude',
'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %}
AND `{{ field }}` IS NOT NULL
{% endfor %}
);
95 changes: 22 additions & 73 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl
from pipelines import generate_query
from bigquery_components import bq_query_to_table, extract_bq_to_dataset
from bigquery_components import extract_bq_to_dataset
from vertex_components import (
lookup_model,
custom_train_job,
Expand Down Expand Up @@ -97,77 +97,24 @@ def tensorflow_pipeline(

queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
queries_folder / "ingest.sql",
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}",
ingested_table=ingested_table,
dataset_region=project_location,
filter_column=time_column,
target_column=label_column_name,
filter_start_value=timestamp,
train_table=train_table,
validation_table=valid_table,
test_table=test_table,
)
split_train_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
num_lots=10,
lots=tuple(range(8)),
)
split_valid_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
num_lots=10,
lots="(8)",
)
split_test_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
num_lots=10,
lots="(9)",
)
data_cleaning_query = generate_query(
queries_folder / "engineer_features.sql",
source_dataset=dataset_id,
source_table=train_table,
)

# data ingestion and preprocessing operations

kwargs = dict(
bq_client_project_id=project_id,
destination_project_id=project_id,
dataset_id=dataset_id,
dataset_location=dataset_location,
query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")),
)
ingest = bq_query_to_table(
query=ingest_query, table_id=ingested_table, **kwargs
).set_display_name("Ingest data")

# exporting data to GCS from BQ
split_train_data = (
bq_query_to_table(query=split_train_query, table_id=train_table, **kwargs)
.after(ingest)
.set_display_name("Split train data")
)
split_valid_data = (
bq_query_to_table(query=split_valid_query, table_id=valid_table, **kwargs)
.after(ingest)
.set_display_name("Split validation data")
)
split_test_data = (
bq_query_to_table(query=split_test_query, table_id=test_table, **kwargs)
.after(ingest)
.set_display_name("Split test data")
)
data_cleaning = (
bq_query_to_table(
query=data_cleaning_query, table_id=preprocessed_table, **kwargs
)
.after(split_train_data)
.set_display_name("Clean data")
)
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest & preprocess data")

# data extraction to gcs

Expand All @@ -176,11 +123,12 @@ def tensorflow_pipeline(
bq_client_project_id=project_id,
source_project_id=project_id,
dataset_id=dataset_id,
table_name=preprocessed_table,
table_name=train_table,
dataset_location=dataset_location,
)
.after(data_cleaning)
.set_display_name("Extract train data to storage")
.after(preprocessing)
.set_display_name("Extract train")
.set_caching_options(False)
).outputs["dataset"]
valid_dataset = (
extract_bq_to_dataset(
Expand All @@ -190,8 +138,9 @@ def tensorflow_pipeline(
table_name=valid_table,
dataset_location=dataset_location,
)
.after(split_valid_data)
.set_display_name("Extract validation data to storage")
.after(preprocessing)
.set_display_name("Extract validation data")
.set_caching_options(False)
).outputs["dataset"]
test_dataset = (
extract_bq_to_dataset(
Expand All @@ -202,8 +151,8 @@ def tensorflow_pipeline(
dataset_location=dataset_location,
destination_gcs_uri=test_dataset_uri,
)
.after(split_test_data)
.set_display_name("Extract test data to storage")
.after(preprocessing)
.set_display_name("Extract test data")
.set_caching_options(False)
).outputs["dataset"]

Expand Down
Loading

0 comments on commit fe3d9d9

Please sign in to comment.