Skip to content

Commit

Permalink
refactor: replace custom BQ component with BigqueryQueryJobOp
Browse files Browse the repository at this point in the history
  • Loading branch information
ariadnafer committed May 16, 2023
1 parent 22f6dcd commit 0b8b77f
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 259 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from .bq_query_to_table import bq_query_to_table
from .extract_bq_to_dataset import extract_bq_to_dataset


__version__ = "0.0.1"
__all__ = [
"bq_query_to_table",
"extract_bq_to_dataset",
]

This file was deleted.

8 changes: 7 additions & 1 deletion pipelines/src/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ def generate_query(input_file: Path, **replacements) -> str:
with open(input_file, "r") as f:
query_template = f.read()

return Template(query_template).render(**replacements)
# Render the template with the provided replacements
query = Template(query_template).render(**replacements)

# Escape double quotes, newline and tab characters
query = query.replace('"', '\\"').replace("\n", "\\n").replace("\t", "\\t")

return query
12 changes: 4 additions & 8 deletions pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl

from pipelines import generate_query
from bigquery_components import bq_query_to_table
from vertex_components import lookup_model, model_batch_predict


Expand Down Expand Up @@ -101,13 +101,9 @@ def tensorflow_pipeline(
)

# data ingestion and preprocessing operations
kwargs = dict(
bq_client_project_id=project_id,
dataset_location=dataset_location,
)
ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name(
"Ingest data"
)
ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
).set_display_name("Ingest data")

# lookup champion model
champion_model = (
Expand Down
66 changes: 13 additions & 53 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl
from pipelines import generate_query
from bigquery_components import bq_query_to_table, extract_bq_to_dataset
from bigquery_components import extract_bq_to_dataset
from vertex_components import (
lookup_model,
custom_train_job,
Expand Down Expand Up @@ -106,57 +107,14 @@ def tensorflow_pipeline(
filter_column=time_column,
target_column=label_column_name,
filter_start_value=timestamp,
train_table=train_table,
validation_table=valid_table,
test_table=test_table,
)
split_train_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}",
target_table=train_table,
num_lots=10,
lots=tuple(range(8)),
)
split_valid_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}",
target_table=valid_table,
num_lots=10,
lots="(8)",
)
split_test_query = generate_query(
queries_folder / "sample.sql",
source_dataset=dataset_id,
source_table=ingested_table,
preprocessing_dataset=f"{ingestion_project_id}.{dataset_id}",
target_table=test_table,
num_lots=10,
lots="(9)",
)

# data ingestion and preprocessing operations

kwargs = dict(bq_client_project_id=project_id, dataset_location=dataset_location)
ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name(
"Ingest data"
)

split_train_data = (
bq_query_to_table(query=split_train_query, **kwargs)
.after(ingest)
.set_display_name("Split train data")
)
split_valid_data = (
bq_query_to_table(query=split_valid_query, **kwargs)
.after(ingest)
.set_display_name("Split validation data")
)
split_test_data = (
bq_query_to_table(query=split_test_query, **kwargs)
.after(ingest)
.set_display_name("Split test data")
)
ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
).set_display_name("Ingest data")

# data extraction to gcs

Expand All @@ -168,8 +126,9 @@ def tensorflow_pipeline(
table_name=train_table,
dataset_location=dataset_location,
)
.after(split_train_data)
.after(ingest)
.set_display_name("Extract train data to storage")
.set_caching_options(False)
).outputs["dataset"]
valid_dataset = (
extract_bq_to_dataset(
Expand All @@ -179,8 +138,9 @@ def tensorflow_pipeline(
table_name=valid_table,
dataset_location=dataset_location,
)
.after(split_valid_data)
.after(ingest)
.set_display_name("Extract validation data to storage")
.set_caching_options(False)
).outputs["dataset"]
test_dataset = (
extract_bq_to_dataset(
Expand All @@ -191,7 +151,7 @@ def tensorflow_pipeline(
dataset_location=dataset_location,
destination_gcs_uri=test_dataset_uri,
)
.after(split_test_data)
.after(ingest)
.set_display_name("Extract test data to storage")
.set_caching_options(False)
).outputs["dataset"]
Expand Down

This file was deleted.

50 changes: 33 additions & 17 deletions pipelines/src/pipelines/tensorflow/training/queries/ingest.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
-- Copyright 2022 Google LLC

-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at

-- https://www.apache.org/licenses/LICENSE-2.0

-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

-- Treat "filter_start_value" as the current time, unless it is empty then use CURRENT_DATETIME() instead
-- This allows us to set the filter_start_value to a specific time for testing or for backfill

CREATE SCHEMA IF NOT EXISTS `{{ preprocessing_dataset }}`
OPTIONS (
description = 'Preprocessing Dataset',
Expand Down Expand Up @@ -66,3 +49,36 @@ WHERE
AND `{{ field }}` IS NOT NULL
{% endfor %}
);

DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ train_table }}`;

CREATE TABLE `{{ preprocessing_dataset }}.{{ train_table }}` AS (
SELECT
*
FROM
`{{ preprocessing_dataset }}.{{ ingested_table }}` AS t
WHERE
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))),
10) IN (0, 1, 2, 3, 4, 5, 6, 7));

DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ validation_table }}`;

CREATE TABLE `{{ preprocessing_dataset }}.{{ validation_table }}` AS (
SELECT
*
FROM
`{{ preprocessing_dataset }}.{{ ingested_table }}` AS t
WHERE
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))),
10) IN (8));

DROP TABLE IF EXISTS `{{ preprocessing_dataset }}.{{ test_table }}`;

CREATE TABLE `{{ preprocessing_dataset }}.{{ test_table }}` AS (
SELECT
*
FROM
`{{ preprocessing_dataset }}.{{ ingested_table }}` AS t
WHERE
MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(t))),
10) IN (9));
10 changes: 0 additions & 10 deletions pipelines/src/pipelines/tensorflow/training/queries/sample.sql

This file was deleted.

13 changes: 4 additions & 9 deletions pipelines/src/pipelines/xgboost/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import os
import pathlib

from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from kfp.v2 import compiler, dsl

from pipelines import generate_query
from bigquery_components import bq_query_to_table
from vertex_components import lookup_model, model_batch_predict


Expand Down Expand Up @@ -96,14 +96,9 @@ def xgboost_pipeline(
filter_start_value=timestamp,
)

# data ingestion and preprocessing operations
kwargs = dict(
bq_client_project_id=project_id,
dataset_location=dataset_location,
)
ingest = bq_query_to_table(query=ingest_query, **kwargs).set_display_name(
"Ingest data"
)
ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
).set_display_name("Ingest data")

# lookup champion model
champion_model = (
Expand Down
Loading

0 comments on commit 0b8b77f

Please sign in to comment.