diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py b/airflow/providers/google/cloud/example_dags/example_dataflow.py index eceb4f481cbfa..0c9aceb631c2c 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataflow.py +++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py @@ -152,6 +152,7 @@ # [START howto_operator_start_python_job_async] start_python_job_async = BeamRunPythonPipelineOperator( task_id="start-python-job-async", + runner="DataflowRunner", py_file=GCS_PYTHON, py_options=[], pipeline_options={ @@ -160,7 +161,11 @@ py_requirements=['apache-beam[gcp]==2.25.0'], py_interpreter='python3', py_system_site_packages=False, - dataflow_config={"location": 'europe-west3', "wait_until_finished": False}, + dataflow_config={ + "job_name": "start-python-job-async", + "location": 'europe-west3', + "wait_until_finished": False, + }, ) # [END howto_operator_start_python_job_async] @@ -194,6 +199,7 @@ def callback(metrics: List[Dict]) -> bool: job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", location='europe-west3', callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100), + fail_on_terminal_state=False, ) # [END howto_sensor_wait_for_job_metric] @@ -210,6 +216,7 @@ def check_message(messages: List[dict]) -> bool: job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", location='europe-west3', callback=check_message, + fail_on_terminal_state=False, ) # [END howto_sensor_wait_for_job_message] @@ -226,6 +233,7 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool: job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}", location='europe-west3', callback=check_autoscaling_event, + fail_on_terminal_state=False, ) # [END howto_sensor_wait_for_job_autoscaling_event]