Skip to content

Commit

Permalink
Merge branch 'dataflow-system-tests' into fix-system-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mnojek committed Sep 27, 2021
2 parents f40a6fb + a9a3d27 commit e5830ed
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion airflow/providers/google/cloud/example_dags/example_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
# [START howto_operator_start_python_job_async]
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start-python-job-async",
runner="DataflowRunner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={
Expand All @@ -160,7 +161,11 @@
py_requirements=['apache-beam[gcp]==2.25.0'],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config={"location": 'europe-west3', "wait_until_finished": False},
dataflow_config={
"job_name": "start-python-job-async",
"location": 'europe-west3',
"wait_until_finished": False,
},
)
# [END howto_operator_start_python_job_async]

Expand Down Expand Up @@ -194,6 +199,7 @@ def callback(metrics: List[Dict]) -> bool:
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
)
# [END howto_sensor_wait_for_job_metric]

Expand All @@ -210,6 +216,7 @@ def check_message(messages: List[dict]) -> bool:
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_message,
fail_on_terminal_state=False,
)
# [END howto_sensor_wait_for_job_message]

Expand All @@ -226,6 +233,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
location='europe-west3',
callback=check_autoscaling_event,
fail_on_terminal_state=False,
)
# [END howto_sensor_wait_for_job_autoscaling_event]

Expand Down

0 comments on commit e5830ed

Please sign in to comment.