Skip to content

Commit

Permalink
refactor: rename queries and pipeline steps
Browse files Browse the repository at this point in the history
  • Loading branch information
ariadnafer committed May 20, 2023
1 parent c087024 commit f53af72
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 25 deletions.
7 changes: 2 additions & 5 deletions pipelines/src/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json

from pathlib import Path
from jinja2 import Template

Expand All @@ -30,7 +30,4 @@ def generate_query(input_file: Path, **replacements) -> str:
with open(input_file, "r") as f:
query_template = f.read()

# Render the template with the provided replacements
query = Template(query_template).render(**replacements)

return query
return Template(query_template).render(**replacements)
8 changes: 4 additions & 4 deletions pipelines/src/pipelines/tensorflow/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def tensorflow_pipeline(
# operations
queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
Expand All @@ -101,8 +101,8 @@ def tensorflow_pipeline(
)

# data ingestion and preprocessing operations
ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")

# lookup champion model
Expand Down Expand Up @@ -143,7 +143,7 @@ def tensorflow_pipeline(
monitoring_skew_config=monitoring_skew_config,
instance_config=instance_config,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Batch prediction job")
)

Expand Down
12 changes: 6 additions & 6 deletions pipelines/src/pipelines/tensorflow/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def tensorflow_pipeline(

queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
Expand All @@ -112,8 +112,8 @@ def tensorflow_pipeline(
test_table=test_table,
)

ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")

# data extraction to gcs
Expand All @@ -126,7 +126,7 @@ def tensorflow_pipeline(
table_name=train_table,
dataset_location=dataset_location,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Extract train")
.set_caching_options(False)
).outputs["dataset"]
Expand All @@ -138,7 +138,7 @@ def tensorflow_pipeline(
table_name=valid_table,
dataset_location=dataset_location,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Extract validation data")
.set_caching_options(False)
).outputs["dataset"]
Expand All @@ -151,7 +151,7 @@ def tensorflow_pipeline(
dataset_location=dataset_location,
destination_gcs_uri=test_dataset_uri,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Extract test data")
.set_caching_options(False)
).outputs["dataset"]
Expand Down
8 changes: 4 additions & 4 deletions pipelines/src/pipelines/xgboost/prediction/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def xgboost_pipeline(
# operations
queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
Expand All @@ -96,8 +96,8 @@ def xgboost_pipeline(
filter_start_value=timestamp,
)

ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")

# lookup champion model
Expand Down Expand Up @@ -135,7 +135,7 @@ def xgboost_pipeline(
monitoring_alert_email_addresses=monitoring_alert_email_addresses,
monitoring_skew_config=monitoring_skew_config,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Batch prediction job")
)

Expand Down
12 changes: 6 additions & 6 deletions pipelines/src/pipelines/xgboost/training/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def xgboost_pipeline(

queries_folder = pathlib.Path(__file__).parent / "queries"

ingest_query = generate_query(
preprocessing_query = generate_query(
queries_folder / "preprocessing.sql",
source_dataset=f"{ingestion_project_id}.{ingestion_dataset_id}",
source_table=ingestion_table,
Expand All @@ -110,8 +110,8 @@ def xgboost_pipeline(
test_table=test_table,
)

ingest = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=ingest_query
preprocessing = BigqueryQueryJobOp(
project=project_id, location=dataset_location, query=preprocessing_query
).set_display_name("Ingest data")

# data extraction to gcs
Expand All @@ -124,7 +124,7 @@ def xgboost_pipeline(
table_name=train_table,
dataset_location=dataset_location,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Extract train data")
.set_caching_options(False)
).outputs["dataset"]
Expand All @@ -136,7 +136,7 @@ def xgboost_pipeline(
table_name=valid_table,
dataset_location=dataset_location,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Extract validation data")
.set_caching_options(False)
).outputs["dataset"]
Expand All @@ -149,7 +149,7 @@ def xgboost_pipeline(
dataset_location=dataset_location,
destination_gcs_uri=test_dataset_uri,
)
.after(ingest)
.after(preprocessing)
.set_display_name("Extract test data")
.set_caching_options(False)
).outputs["dataset"]
Expand Down

0 comments on commit f53af72

Please sign in to comment.