Skip to content

Commit

Permalink
Merge branch 'main' into feat/airflow-2-3-3-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaeld authored Dec 7, 2022
2 parents 0372029 + 88eea5e commit 837464c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 50 deletions.
72 changes: 72 additions & 0 deletions dags/bqetl_artifact_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Nightly deploy of bigquery etl views.
"""

from airflow import DAG
from datetime import timedelta, datetime
from utils.gcp import gke_command
from utils.tags import Tag

default_args = {
"owner": "[email protected]",
"email": [
"[email protected]",
"[email protected]",
"[email protected]",
],
"depends_on_past": False,
"start_date": datetime(2022, 12, 6),
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=30),
}

tags = [Tag.ImpactTier.tier_1]

with DAG("bqetl_artifact_deployment", default_args=default_args, schedule_interval="@daily", doc_md=__doc__, tags=tags,) as dag:
docker_image = "gcr.io/moz-fx-data-airflow-prod-88e0/bigquery-etl:latest"

publish_public_udfs = gke_command(
task_id="publish_public_udfs",
command=["script/publish_public_udfs"],
docker_image=docker_image
)

publish_persistent_udfs = gke_command(
task_id="publish_persistent_udfs",
cmds=["bash", "-c"],
command=[
"script/publish_persistent_udfs --project-id=moz-fx-data-shared-prod && "
"script/publish_persistent_udfs --project-id=mozdata"
],
docker_image=docker_image,
)

publish_new_tables = gke_command(
task_id="publish_new_tables",
cmds=["bash", "-c"],
command=[
"script/bqetl generate all && "
"script/bqetl query schema update '*' &&"
"script/bqetl query schema deploy '*' --skip-existing"
],
docker_image=docker_image,
)

publish_views = gke_command(
task_id="publish_views",
cmds=["bash", "-c"],
command=[
"script/bqetl generate all && "
"script/bqetl view publish --target-project=moz-fx-data-shared-prod && "
"script/bqetl view publish --target-project=mozdata --user-facing-only && "
"script/publish_public_data_views --target-project=moz-fx-data-shared-prod && "
"script/publish_public_data_views --target-project=mozdata"
],
docker_image=docker_image,
)

publish_views.set_upstream(publish_public_udfs)
publish_views.set_upstream(publish_persistent_udfs)
publish_views.set_upstream(publish_new_tables)
45 changes: 30 additions & 15 deletions dags/glam_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,27 +290,42 @@
dag=dag,
)

# Testing without SubDag because it keeps getting stuck on "running"
# and not actually executing anything. Also, they're known for causing deadlocks in
# Celelery (might be our case) thus are discouraged.
clients_histogram_bucket_counts = bigquery_etl_query(
task_id="clients_histogram_bucket_counts",
destination_table="clients_histogram_bucket_counts_v1",
dataset_id=dev_dataset_id,
project_id=prod_project_id,
owner="[email protected]",
parameters=("submission_date:DATE:{{ds}}",),
arguments=("--replace",),
dag=dag,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/glam-dev-bigquery-etl:latest",
)

# SubdagOperator uses a SequentialExecutor by default
# so its tasks will run sequentially.
# Note: In 2.0, SubDagOperator is changed to use airflow scheduler instead of
# backfill to schedule tasks in the subdag. User no longer need to specify
# the executor in SubDagOperator. (We don't but the assumption that Sequential
# Executor is used is now wrong)
clients_histogram_bucket_counts = SubDagOperator(
subdag=repeated_subdag(
GLAM_DAG,
"clients_histogram_bucket_counts",
default_args,
dag.schedule_interval,
dev_dataset_id,
("submission_date:DATE:{{ds}}",),
25,
None,
docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/glam-dev-bigquery-etl:latest",
),
task_id="clients_histogram_bucket_counts",
dag=dag,
)
#clients_histogram_bucket_counts = SubDagOperator(
# subdag=repeated_subdag(
# GLAM_DAG,
# "clients_histogram_bucket_counts",
# default_args,
# dag.schedule_interval,
# dev_dataset_id,
# ("submission_date:DATE:{{ds}}",),
# 25,
# None,
# docker_image="gcr.io/moz-fx-data-airflow-prod-88e0/glam-dev-bigquery-etl:latest",
# ),
# task_id="clients_histogram_bucket_counts",
# dag=dag,
#)

clients_histogram_probe_counts = bigquery_etl_query(
task_id="clients_histogram_probe_counts",
Expand Down
35 changes: 0 additions & 35 deletions dags/mozfun.py

This file was deleted.

0 comments on commit 837464c

Please sign in to comment.