Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable multiquery #12

Merged
merged 11 commits into from
May 22, 2023
Merged
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from .bq_query_to_table import bq_query_to_table
from .extract_bq_to_dataset import extract_bq_to_dataset


__version__ = "0.0.1"
__all__ = [
"bq_query_to_table",
"extract_bq_to_dataset",
]

This file was deleted.

40 changes: 20 additions & 20 deletions pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl

from pipelines import generate_query
from bigquery_components import bq_query_to_table
from vertex_components import lookup_model, model_batch_predict


Expand All @@ -29,9 +28,10 @@ def tensorflow_pipeline(
project_location: str = os.environ.get("VERTEX_LOCATION"),
ingestion_project_id: str = os.environ.get("VERTEX_PROJECT_ID"),
model_name: str = "simple_tensorflow",
dataset_id: str = "preprocessing",
preprocessing_dataset_id: str = "preprocessing",
dataset_location: str = os.environ.get("VERTEX_LOCATION"),
ingestion_dataset_id: str = "chicago_taxi_trips",
prediction_dataset_id: str = "prediction",
timestamp: str = "2022-12-01 00:00:00",
batch_prediction_machine_type: str = "n1-standard-4",
batch_prediction_min_replicas: int = 3,
Expand All @@ -49,13 +49,14 @@ def tensorflow_pipeline(
Args:
project_id (str): project id of the Google Cloud project
project_location (str): location of the Google Cloud project
pipeline_files_gcs_path (str): GCS path where the pipeline files are located
ingestion_project_id (str): project id containing the source bigquery data
for ingestion. This can be the same as `project_id` if the source data is
in the same project where the ML pipeline is executed.
model_name (str): name of model
model_label (str): label of model
dataset_id (str): id of BQ dataset used to store all staging data & predictions
preprocessing_dataset_id (str): id of BQ dataset used to
store all staging data .
prediction_dataset_id (str): id of BQ dataset used to
store all predictions.
dataset_location (str): location of dataset
ingestion_dataset_id (str): dataset id of ingestion data
timestamp (str): Optional. Empty or a specific timestamp in ISO 8601 format
Expand Down Expand Up @@ -87,24 +88,21 @@ def tensorflow_pipeline(
# operations
queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
queries_folder / "ingest.sql",
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
prediction_dataset=f"{ingestion_project_id}.{prediction_dataset_id}",
preprocessing_dataset=f"{ingestion_project_id}.{preprocessing_dataset_id}",
ingested_table=ingested_table,
dataset_region=project_location,
filter_column=time_column,
filter_start_value=timestamp,
)

# data ingestion and preprocessing operations
kwargs = dict(
bq_client_project_id=project_id,
destination_project_id=project_id,
dataset_id=dataset_id,
dataset_location=dataset_location,
query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")),
)
ingest = bq_query_to_table(
query=ingest_query, table_id=ingested_table, **kwargs
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")

# lookup champion model
Expand All @@ -120,8 +118,10 @@ def tensorflow_pipeline(
)

# batch predict from BigQuery to BigQuery
bigquery_source_input_uri = f"bq://{project_id}.{dataset_id}.{ingested_table}"
bigquery_destination_output_uri = f"bq://{project_id}.{dataset_id}"
bigquery_source_input_uri = (
f"bq://{project_id}.{preprocessing_dataset_id}.{ingested_table}"
)
bigquery_destination_output_uri = f"bq://{project_id}.{prediction_dataset_id}"
instance_config = {"instanceType": "object"}

# predict data
Expand All @@ -143,7 +143,7 @@ def tensorflow_pipeline(
monitoring_skew_config=monitoring_skew_config,
instance_config=instance_config,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Batch prediction job")
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
-- Copyright 2022 Google LLC

-- Licensed under the Apache License, Version 2.0 (the "License");
-- Licensed under the Apache License, Version 2.0 (the 'License');
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at

-- https://www.apache.org/licenses/LICENSE-2.0

-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- distributed under the License is distributed on an 'AS IS' BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead
-- Treat 'filter_start_value' as the current time, unless it is empty then use CURRENT_DATETIME() instead
-- This allows us to set the filter_start_value to a specific time for testing or for backfill

-- If prediction dataset don't exist, create it
CREATE SCHEMA IF NOT EXISTS `{{ prediction_dataset }}`
OPTIONS (
description = 'Prediction Dataset',
location = '{{ dataset_region }}');

-- We recreate the ingestion table every time the pipeline run,
-- so we need to drop the generated in the previous run
DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ ingested_table }}`;

CREATE TABLE `{{ preprocessing_dataset }}.{{ ingested_table }}` AS (
with filter_start_values as (
SELECT
IF("{{ filter_start_value }}" = '', CURRENT_DATETIME(), CAST("{{ filter_start_value }}" AS DATETIME)) as filter_start_value
SELECT
IF('{{ filter_start_value }}' = '', CURRENT_DATETIME(), CAST('{{ filter_start_value }}' AS DATETIME)) as filter_start_value
)
-- Ingest data between 2 and 3 months ago
,filtered_data as (
Expand All @@ -43,14 +55,15 @@ SELECT
trip_miles,
CAST( CASE WHEN trip_seconds is NULL then m.avg_trip_seconds
WHEN trip_seconds <= 0 then m.avg_trip_seconds
ELSE trip_seconds
ELSE trip_seconds
END AS FLOAT64) AS trip_seconds,
payment_type,
company,
FROM filtered_data as t, mean_time as m
WHERE
trip_miles > 0 AND fare > 0 AND fare < 1500
{% for field in ["fare", "trip_start_timestamp", "pickup_longitude",
"pickup_latitude", "dropoff_longitude", "dropoff_latitude","payment_type","company"] %}
{% for field in ['fare', 'trip_start_timestamp', 'pickup_longitude',
'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude','payment_type','company'] %}
AND `{{ field }}` IS NOT NULL
{% endfor %}
);
93 changes: 21 additions & 72 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl
from pipelines import generate_query
from bigquery_components import bq_query_to_table, extract_bq_to_dataset
from bigquery_components import extract_bq_to_dataset
from vertex_components import (
lookup_model,
custom_train_job,
Expand Down Expand Up @@ -97,90 +97,38 @@ def tensorflow_pipeline(

queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
queries_folder / "ingest.sql",
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}",
ingested_table=ingested_table,
dataset_region=project_location,
filter_column=time_column,
target_column=label_column_name,
filter_start_value=timestamp,
)
split_train_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
num_lots=10,
lots=tuple(range(8)),
)
split_valid_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
num_lots=10,
lots="(8)",
)
split_test_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
num_lots=10,
lots="(9)",
)
data_cleaning_query = generate_query(
queries_folder / "engineer_features.sql",
source_dataset=dataset_id,
source_table=train_table,
train_table=train_table,
validation_table=valid_table,
test_table=test_table,
)

# data ingestion and preprocessing operations

kwargs = dict(
bq_client_project_id=project_id,
destination_project_id=project_id,
dataset_id=dataset_id,
dataset_location=dataset_location,
query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE")),
)
ingest = bq_query_to_table(
query=ingest_query, table_id=ingested_table, **kwargs
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")
felix-datatonic marked this conversation as resolved.
Show resolved Hide resolved

# exporting data to GCS from BQ
split_train_data = (
bq_query_to_table(query=split_train_query, table_id=train_table, **kwargs)
.after(ingest)
.set_display_name("Split train data")
)
split_valid_data = (
bq_query_to_table(query=split_valid_query, table_id=valid_table, **kwargs)
.after(ingest)
.set_display_name("Split validation data")
)
split_test_data = (
bq_query_to_table(query=split_test_query, table_id=test_table, **kwargs)
.after(ingest)
.set_display_name("Split test data")
)
data_cleaning = (
bq_query_to_table(
query=data_cleaning_query, table_id=preprocessed_table, **kwargs
)
.after(split_train_data)
.set_display_name("Clean data")
)

# data extraction to gcs

train_dataset = (
extract_bq_to_dataset(
bq_client_project_id=project_id,
source_project_id=project_id,
dataset_id=dataset_id,
table_name=preprocessed_table,
table_name=train_table,
dataset_location=dataset_location,
)
.after(data_cleaning)
.set_display_name("Extract train data to storage")
.after(preprocessing)
.set_display_name("Extract train")
.set_caching_options(False)
).outputs["dataset"]
valid_dataset = (
extract_bq_to_dataset(
Expand All @@ -190,8 +138,9 @@ def tensorflow_pipeline(
table_name=valid_table,
dataset_location=dataset_location,
)
.after(split_valid_data)
.set_display_name("Extract validation data to storage")
.after(preprocessing)
.set_display_name("Extract validation data")
.set_caching_options(False)
).outputs["dataset"]
test_dataset = (
extract_bq_to_dataset(
Expand All @@ -202,8 +151,8 @@ def tensorflow_pipeline(
dataset_location=dataset_location,
destination_gcs_uri=test_dataset_uri,
)
.after(split_test_data)
.set_display_name("Extract test data to storage")
.after(preprocessing)
.set_display_name("Extract test data")
.set_caching_options(False)
).outputs["dataset"]

Expand Down
Loading