diff --git a/airflow/providers/apache/beam/example_dags/example_beam.py b/airflow/providers/apache/beam/example_dags/example_beam.py
deleted file mode 100644
index ea52458303129..0000000000000
--- a/airflow/providers/apache/beam/example_dags/example_beam.py
+++ /dev/null
@@ -1,437 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG for Apache Beam operators
-"""
-import os
-from datetime import datetime
-from urllib.parse import urlparse
-
-from airflow import models
-from airflow.providers.apache.beam.operators.beam import (
- BeamRunGoPipelineOperator,
- BeamRunJavaPipelineOperator,
- BeamRunPythonPipelineOperator,
-)
-from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
-from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
-from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
-from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
-from airflow.utils.trigger_rule import TriggerRule
-
-GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
-GCS_INPUT = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/shakespeare/kinglear.txt')
-GCS_TMP = os.environ.get('APACHE_BEAM_GCS_TMP', 'gs://INVALID BUCKET NAME/temp/')
-GCS_STAGING = os.environ.get('APACHE_BEAM_GCS_STAGING', 'gs://INVALID BUCKET NAME/staging/')
-GCS_OUTPUT = os.environ.get('APACHE_BEAM_GCS_OUTPUT', 'gs://INVALID BUCKET NAME/output')
-GCS_PYTHON = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/wordcount_debugging.py')
-GCS_PYTHON_DATAFLOW_ASYNC = os.environ.get(
- 'APACHE_BEAM_PYTHON_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.py'
-)
-GCS_GO = os.environ.get('APACHE_BEAM_GO', 'gs://INVALID BUCKET NAME/wordcount_debugging.go')
-GCS_GO_DATAFLOW_ASYNC = os.environ.get(
- 'APACHE_BEAM_GO_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.go'
-)
-GCS_JAR_DIRECT_RUNNER = os.environ.get(
- 'APACHE_BEAM_DIRECT_RUNNER_JAR',
- 'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-DirectRunner.jar',
-)
-GCS_JAR_DATAFLOW_RUNNER = os.environ.get(
- 'APACHE_BEAM_DATAFLOW_RUNNER_JAR', 'gs://INVALID BUCKET NAME/word-count-beam-bundled-0.1.jar'
-)
-GCS_JAR_SPARK_RUNNER = os.environ.get(
- 'APACHE_BEAM_SPARK_RUNNER_JAR',
- 'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-SparkRunner.jar',
-)
-GCS_JAR_FLINK_RUNNER = os.environ.get(
- 'APACHE_BEAM_FLINK_RUNNER_JAR',
- 'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-FlinkRunner.jar',
-)
-
-GCS_JAR_DIRECT_RUNNER_PARTS = urlparse(GCS_JAR_DIRECT_RUNNER)
-GCS_JAR_DIRECT_RUNNER_BUCKET_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.netloc
-GCS_JAR_DIRECT_RUNNER_OBJECT_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.path[1:]
-GCS_JAR_DATAFLOW_RUNNER_PARTS = urlparse(GCS_JAR_DATAFLOW_RUNNER)
-GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.netloc
-GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.path[1:]
-GCS_JAR_SPARK_RUNNER_PARTS = urlparse(GCS_JAR_SPARK_RUNNER)
-GCS_JAR_SPARK_RUNNER_BUCKET_NAME = GCS_JAR_SPARK_RUNNER_PARTS.netloc
-GCS_JAR_SPARK_RUNNER_OBJECT_NAME = GCS_JAR_SPARK_RUNNER_PARTS.path[1:]
-GCS_JAR_FLINK_RUNNER_PARTS = urlparse(GCS_JAR_FLINK_RUNNER)
-GCS_JAR_FLINK_RUNNER_BUCKET_NAME = GCS_JAR_FLINK_RUNNER_PARTS.netloc
-GCS_JAR_FLINK_RUNNER_OBJECT_NAME = GCS_JAR_FLINK_RUNNER_PARTS.path[1:]
-
-
-DEFAULT_ARGS = {
- 'default_pipeline_options': {'output': '/tmp/example_beam'},
- 'trigger_rule': TriggerRule.ALL_DONE,
-}
-START_DATE = datetime(2021, 1, 1)
-
-
-with models.DAG(
- "example_beam_native_java_direct_runner",
- schedule_interval=None, # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=['example'],
-) as dag_native_java_direct_runner:
-
- # [START howto_operator_start_java_direct_runner_pipeline]
- jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
- task_id="jar_to_local_direct_runner",
- bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
- object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
- filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
- )
-
- start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
- task_id="start_java_pipeline_direct_runner",
- jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
- pipeline_options={
- 'output': '/tmp/start_java_pipeline_direct_runner',
- 'inputFile': GCS_INPUT,
- },
- job_class='org.apache.beam.examples.WordCount',
- )
-
- jar_to_local_direct_runner >> start_java_pipeline_direct_runner
- # [END howto_operator_start_java_direct_runner_pipeline]
-
-with models.DAG(
- "example_beam_native_java_dataflow_runner",
- schedule_interval=None, # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=['example'],
-) as dag_native_java_dataflow_runner:
- # [START howto_operator_start_java_dataflow_runner_pipeline]
- jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
- task_id="jar_to_local_dataflow_runner",
- bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
- object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
- filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
- )
-
- start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
- task_id="start_java_pipeline_dataflow",
- runner="DataflowRunner",
- jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
- pipeline_options={
- 'tempLocation': GCS_TMP,
- 'stagingLocation': GCS_STAGING,
- 'output': GCS_OUTPUT,
- },
- job_class='org.apache.beam.examples.WordCount',
- dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
- )
-
- jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
- # [END howto_operator_start_java_dataflow_runner_pipeline]
-
-with models.DAG(
- "example_beam_native_java_spark_runner",
- schedule_interval=None, # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=['example'],
-) as dag_native_java_spark_runner:
-
- jar_to_local_spark_runner = GCSToLocalFilesystemOperator(
- task_id="jar_to_local_spark_runner",
- bucket=GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
- object_name=GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
- filename="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
- )
-
- start_java_pipeline_spark_runner = BeamRunJavaPipelineOperator(
- task_id="start_java_pipeline_spark_runner",
- runner="SparkRunner",
- jar="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
- pipeline_options={
- 'output': '/tmp/start_java_pipeline_spark_runner',
- 'inputFile': GCS_INPUT,
- },
- job_class='org.apache.beam.examples.WordCount',
- )
-
- jar_to_local_spark_runner >> start_java_pipeline_spark_runner
-
-with models.DAG(
- "example_beam_native_java_flink_runner",
- schedule_interval=None, # Override to match your needs
- start_date=START_DATE,
- catchup=False,
- tags=['example'],
-) as dag_native_java_flink_runner:
-
- jar_to_local_flink_runner = GCSToLocalFilesystemOperator(
- task_id="jar_to_local_flink_runner",
- bucket=GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
- object_name=GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
- filename="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
- )
-
- start_java_pipeline_flink_runner = BeamRunJavaPipelineOperator(
- task_id="start_java_pipeline_flink_runner",
- runner="FlinkRunner",
- jar="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
- pipeline_options={
- 'output': '/tmp/start_java_pipeline_flink_runner',
- 'inputFile': GCS_INPUT,
- },
- job_class='org.apache.beam.examples.WordCount',
- )
-
- jar_to_local_flink_runner >> start_java_pipeline_flink_runner
-
-
-with models.DAG(
- "example_beam_native_python",
- start_date=START_DATE,
- schedule_interval=None, # Override to match your needs
- catchup=False,
- default_args=DEFAULT_ARGS,
- tags=['example'],
-) as dag_native_python:
-
- # [START howto_operator_start_python_direct_runner_pipeline_local_file]
- start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
- task_id="start_python_pipeline_local_direct_runner",
- py_file='apache_beam.examples.wordcount',
- py_options=['-m'],
- py_requirements=['apache-beam[gcp]==2.26.0'],
- py_interpreter='python3',
- py_system_site_packages=False,
- )
- # [END howto_operator_start_python_direct_runner_pipeline_local_file]
-
- # [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
- start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
- task_id="start_python_pipeline_direct_runner",
- py_file=GCS_PYTHON,
- py_options=[],
- pipeline_options={"output": GCS_OUTPUT},
- py_requirements=['apache-beam[gcp]==2.26.0'],
- py_interpreter='python3',
- py_system_site_packages=False,
- )
- # [END howto_operator_start_python_direct_runner_pipeline_gcs_file]
-
- # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
- start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
- task_id="start_python_pipeline_dataflow_runner",
- runner="DataflowRunner",
- py_file=GCS_PYTHON,
- pipeline_options={
- 'tempLocation': GCS_TMP,
- 'stagingLocation': GCS_STAGING,
- 'output': GCS_OUTPUT,
- },
- py_options=[],
- py_requirements=['apache-beam[gcp]==2.26.0'],
- py_interpreter='python3',
- py_system_site_packages=False,
- dataflow_config=DataflowConfiguration(
- job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
- ),
- )
- # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
-
- start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator(
- task_id="start_python_pipeline_local_spark_runner",
- py_file='apache_beam.examples.wordcount',
- runner="SparkRunner",
- py_options=['-m'],
- py_requirements=['apache-beam[gcp]==2.26.0'],
- py_interpreter='python3',
- py_system_site_packages=False,
- )
-
- start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator(
- task_id="start_python_pipeline_local_flink_runner",
- py_file='apache_beam.examples.wordcount',
- runner="FlinkRunner",
- py_options=['-m'],
- pipeline_options={
- 'output': '/tmp/start_python_pipeline_local_flink_runner',
- },
- py_requirements=['apache-beam[gcp]==2.26.0'],
- py_interpreter='python3',
- py_system_site_packages=False,
- )
-
- (
- [
- start_python_pipeline_local_direct_runner,
- start_python_pipeline_direct_runner,
- ]
- >> start_python_pipeline_local_flink_runner
- >> start_python_pipeline_local_spark_runner
- )
-
-
-with models.DAG(
- "example_beam_native_python_dataflow_async",
- default_args=DEFAULT_ARGS,
- start_date=START_DATE,
- schedule_interval=None, # Override to match your needs
- catchup=False,
- tags=['example'],
-) as dag_native_python_dataflow_async:
- # [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
- start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
- task_id="start_python_job_dataflow_runner_async",
- runner="DataflowRunner",
- py_file=GCS_PYTHON_DATAFLOW_ASYNC,
- pipeline_options={
- 'tempLocation': GCS_TMP,
- 'stagingLocation': GCS_STAGING,
- 'output': GCS_OUTPUT,
- },
- py_options=[],
- py_requirements=['apache-beam[gcp]==2.26.0'],
- py_interpreter='python3',
- py_system_site_packages=False,
- dataflow_config=DataflowConfiguration(
- job_name='{{task.task_id}}',
- project_id=GCP_PROJECT_ID,
- location="us-central1",
- wait_until_finished=False,
- ),
- )
-
- wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
- task_id="wait-for-python-job-async-done",
- job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
- expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
- project_id=GCP_PROJECT_ID,
- location='us-central1',
- )
-
- start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
- # [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
-
-
-with models.DAG(
- "example_beam_native_go",
- start_date=START_DATE,
- schedule_interval="@once",
- catchup=False,
- default_args=DEFAULT_ARGS,
- tags=['example'],
-) as dag_native_go:
-
- # [START howto_operator_start_go_direct_runner_pipeline_local_file]
- start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
- task_id="start_go_pipeline_local_direct_runner",
- go_file='files/apache_beam/examples/wordcount.go',
- )
- # [END howto_operator_start_go_direct_runner_pipeline_local_file]
-
- # [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
- start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
- task_id="start_go_pipeline_direct_runner",
- go_file=GCS_GO,
- pipeline_options={"output": GCS_OUTPUT},
- )
- # [END howto_operator_start_go_direct_runner_pipeline_gcs_file]
-
- # [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
- start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
- task_id="start_go_pipeline_dataflow_runner",
- runner="DataflowRunner",
- go_file=GCS_GO,
- pipeline_options={
- 'tempLocation': GCS_TMP,
- 'stagingLocation': GCS_STAGING,
- 'output': GCS_OUTPUT,
- 'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
- },
- dataflow_config=DataflowConfiguration(
- job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
- ),
- )
- # [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
-
- start_go_pipeline_local_spark_runner = BeamRunGoPipelineOperator(
- task_id="start_go_pipeline_local_spark_runner",
- go_file='/files/apache_beam/examples/wordcount.go',
- runner="SparkRunner",
- pipeline_options={
- 'endpoint': '/your/spark/endpoint',
- },
- )
-
- start_go_pipeline_local_flink_runner = BeamRunGoPipelineOperator(
- task_id="start_go_pipeline_local_flink_runner",
- go_file='/files/apache_beam/examples/wordcount.go',
- runner="FlinkRunner",
- pipeline_options={
- 'output': '/tmp/start_go_pipeline_local_flink_runner',
- },
- )
-
- (
- [
- start_go_pipeline_local_direct_runner,
- start_go_pipeline_direct_runner,
- ]
- >> start_go_pipeline_local_flink_runner
- >> start_go_pipeline_local_spark_runner
- )
-
-
-with models.DAG(
- "example_beam_native_go_dataflow_async",
- default_args=DEFAULT_ARGS,
- start_date=START_DATE,
- schedule_interval="@once",
- catchup=False,
- tags=['example'],
-) as dag_native_go_dataflow_async:
- # [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
- start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
- task_id="start_go_job_dataflow_runner_async",
- runner="DataflowRunner",
- go_file=GCS_GO_DATAFLOW_ASYNC,
- pipeline_options={
- 'tempLocation': GCS_TMP,
- 'stagingLocation': GCS_STAGING,
- 'output': GCS_OUTPUT,
- 'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
- },
- dataflow_config=DataflowConfiguration(
- job_name='{{task.task_id}}',
- project_id=GCP_PROJECT_ID,
- location="us-central1",
- wait_until_finished=False,
- ),
- )
-
- wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
- task_id="wait-for-go-job-async-done",
- job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
- expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
- project_id=GCP_PROJECT_ID,
- location='us-central1',
- )
-
- start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
- # [END howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
diff --git a/docs/apache-airflow-providers-apache-beam/index.rst b/docs/apache-airflow-providers-apache-beam/index.rst
index d3573e9309412..740e7c89092a0 100644
--- a/docs/apache-airflow-providers-apache-beam/index.rst
+++ b/docs/apache-airflow-providers-apache-beam/index.rst
@@ -27,7 +27,7 @@ Content
Python API <_api/airflow/providers/apache/beam/index>
PyPI Repository
- Example DAGs
+ Example DAGs
.. toctree::
:maxdepth: 1
diff --git a/docs/apache-airflow-providers-apache-beam/operators.rst b/docs/apache-airflow-providers-apache-beam/operators.rst
index bb18191124ad6..2468c917f3511 100644
--- a/docs/apache-airflow-providers-apache-beam/operators.rst
+++ b/docs/apache-airflow-providers-apache-beam/operators.rst
@@ -49,13 +49,13 @@ recommend avoiding unless the Dataflow job requires it.
Python Pipelines with DirectRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_direct_runner_pipeline_local_file]
:end-before: [END howto_operator_start_python_direct_runner_pipeline_local_file]
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
@@ -64,13 +64,13 @@ Python Pipelines with DirectRunner
Python Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
:end-before: [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python_dataflow.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
@@ -92,7 +92,7 @@ has the ability to download or available on the local filesystem (provide the ab
Java Pipelines with DirectRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_beam.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_java_direct_runner_pipeline]
@@ -101,7 +101,7 @@ Java Pipelines with DirectRunner
Java Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_java_dataflow.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_java_dataflow_runner_pipeline]
@@ -125,13 +125,13 @@ init the module and install dependencies with ``go run init example.com/main`` a
Go Pipelines with DirectRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_direct_runner_pipeline_local_file]
:end-before: [END howto_operator_start_go_direct_runner_pipeline_local_file]
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
@@ -140,13 +140,13 @@ Go Pipelines with DirectRunner
Go Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
:end-before: [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
-.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
+.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go_dataflow.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
diff --git a/tests/providers/apache/beam/operators/test_beam_system.py b/tests/providers/apache/beam/operators/test_beam_system.py
index 4defe58c2d064..a589169d5daf8 100644
--- a/tests/providers/apache/beam/operators/test_beam_system.py
+++ b/tests/providers/apache/beam/operators/test_beam_system.py
@@ -23,7 +23,7 @@
from tests.test_utils import AIRFLOW_MAIN_FOLDER
from tests.test_utils.system_tests_class import SystemTest
-BEAM_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "airflow", "providers", "apache", "beam", "example_dags")
+BEAM_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "tests", "system", "providers", "apache", "beam")
@pytest.mark.system("apache.beam")
diff --git a/airflow/providers/apache/beam/example_dags/__init__.py b/tests/system/providers/apache/beam/__init__.py
similarity index 100%
rename from airflow/providers/apache/beam/example_dags/__init__.py
rename to tests/system/providers/apache/beam/__init__.py
diff --git a/tests/system/providers/apache/beam/example_beam.py b/tests/system/providers/apache/beam/example_beam.py
new file mode 100644
index 0000000000000..94d34f1140c14
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_beam.py
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+ GCS_INPUT,
+ GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
+ GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_java_direct_runner",
+ schedule_interval=None, # Override to match your needs
+ start_date=START_DATE,
+ catchup=False,
+ tags=['example'],
+) as dag:
+
+ # [START howto_operator_start_java_direct_runner_pipeline]
+ jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
+ task_id="jar_to_local_direct_runner",
+ bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
+ object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
+ filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
+ )
+
+ start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
+ task_id="start_java_pipeline_direct_runner",
+ jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
+ pipeline_options={
+ 'output': '/tmp/start_java_pipeline_direct_runner',
+ 'inputFile': GCS_INPUT,
+ },
+ job_class='org.apache.beam.examples.WordCount',
+ )
+
+ jar_to_local_direct_runner >> start_java_pipeline_direct_runner
+ # [END howto_operator_start_java_direct_runner_pipeline]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_beam_java_flink.py b/tests/system/providers/apache/beam/example_beam_java_flink.py
new file mode 100644
index 0000000000000..a82331e6b553f
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_beam_java_flink.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+ GCS_INPUT,
+ GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
+ GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_java_flink_runner",
+ schedule_interval=None, # Override to match your needs
+ start_date=START_DATE,
+ catchup=False,
+ tags=['example'],
+) as dag:
+
+ jar_to_local_flink_runner = GCSToLocalFilesystemOperator(
+ task_id="jar_to_local_flink_runner",
+ bucket=GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
+ object_name=GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
+ filename="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
+ )
+
+ start_java_pipeline_flink_runner = BeamRunJavaPipelineOperator(
+ task_id="start_java_pipeline_flink_runner",
+ runner="FlinkRunner",
+ jar="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
+ pipeline_options={
+ 'output': '/tmp/start_java_pipeline_flink_runner',
+ 'inputFile': GCS_INPUT,
+ },
+ job_class='org.apache.beam.examples.WordCount',
+ )
+
+ jar_to_local_flink_runner >> start_java_pipeline_flink_runner
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_beam_java_spark.py b/tests/system/providers/apache/beam/example_beam_java_spark.py
new file mode 100644
index 0000000000000..ca602b30c4bd1
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_beam_java_spark.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+ GCS_INPUT,
+ GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
+ GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_java_spark_runner",
+ schedule_interval=None, # Override to match your needs
+ start_date=START_DATE,
+ catchup=False,
+ tags=['example'],
+) as dag:
+
+ jar_to_local_spark_runner = GCSToLocalFilesystemOperator(
+ task_id="jar_to_local_spark_runner",
+ bucket=GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
+ object_name=GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
+ filename="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
+ )
+
+ start_java_pipeline_spark_runner = BeamRunJavaPipelineOperator(
+ task_id="start_java_pipeline_spark_runner",
+ runner="SparkRunner",
+ jar="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
+ pipeline_options={
+ 'output': '/tmp/start_java_pipeline_spark_runner',
+ 'inputFile': GCS_INPUT,
+ },
+ job_class='org.apache.beam.examples.WordCount',
+ )
+
+ jar_to_local_spark_runner >> start_java_pipeline_spark_runner
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_go.py b/tests/system/providers/apache/beam/example_go.py
new file mode 100644
index 0000000000000..768136560a5b5
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_go.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from tests.system.providers.apache.beam.utils import (
+ DEFAULT_ARGS,
+ GCP_PROJECT_ID,
+ GCS_GO,
+ GCS_OUTPUT,
+ GCS_STAGING,
+ GCS_TMP,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_go",
+ start_date=START_DATE,
+ schedule_interval="@once",
+ catchup=False,
+ default_args=DEFAULT_ARGS,
+ tags=['example'],
+) as dag:
+
+ # [START howto_operator_start_go_direct_runner_pipeline_local_file]
+ start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
+ task_id="start_go_pipeline_local_direct_runner",
+ go_file='files/apache_beam/examples/wordcount.go',
+ )
+ # [END howto_operator_start_go_direct_runner_pipeline_local_file]
+
+ # [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
+ start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
+ task_id="start_go_pipeline_direct_runner",
+ go_file=GCS_GO,
+ pipeline_options={"output": GCS_OUTPUT},
+ )
+ # [END howto_operator_start_go_direct_runner_pipeline_gcs_file]
+
+ # [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
+ start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
+ task_id="start_go_pipeline_dataflow_runner",
+ runner="DataflowRunner",
+ go_file=GCS_GO,
+ pipeline_options={
+ 'tempLocation': GCS_TMP,
+ 'stagingLocation': GCS_STAGING,
+ 'output': GCS_OUTPUT,
+ 'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
+ },
+ dataflow_config=DataflowConfiguration(
+ job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
+ ),
+ )
+ # [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
+
+ start_go_pipeline_local_spark_runner = BeamRunGoPipelineOperator(
+ task_id="start_go_pipeline_local_spark_runner",
+ go_file='/files/apache_beam/examples/wordcount.go',
+ runner="SparkRunner",
+ pipeline_options={
+ 'endpoint': '/your/spark/endpoint',
+ },
+ )
+
+ start_go_pipeline_local_flink_runner = BeamRunGoPipelineOperator(
+ task_id="start_go_pipeline_local_flink_runner",
+ go_file='/files/apache_beam/examples/wordcount.go',
+ runner="FlinkRunner",
+ pipeline_options={
+ 'output': '/tmp/start_go_pipeline_local_flink_runner',
+ },
+ )
+
+ (
+ [
+ start_go_pipeline_local_direct_runner,
+ start_go_pipeline_direct_runner,
+ ]
+ >> start_go_pipeline_local_flink_runner
+ >> start_go_pipeline_local_spark_runner
+ )
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_go_dataflow.py b/tests/system/providers/apache/beam/example_go_dataflow.py
new file mode 100644
index 0000000000000..590318f25a41b
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_go_dataflow.py
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunGoPipelineOperator
+from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
+from tests.system.providers.apache.beam.utils import (
+ DEFAULT_ARGS,
+ GCP_PROJECT_ID,
+ GCS_GO_DATAFLOW_ASYNC,
+ GCS_OUTPUT,
+ GCS_STAGING,
+ GCS_TMP,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_go_dataflow_async",
+ default_args=DEFAULT_ARGS,
+ start_date=START_DATE,
+ schedule_interval="@once",
+ catchup=False,
+ tags=['example'],
+) as dag:
+ # [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
+ start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
+ task_id="start_go_job_dataflow_runner_async",
+ runner="DataflowRunner",
+ go_file=GCS_GO_DATAFLOW_ASYNC,
+ pipeline_options={
+ 'tempLocation': GCS_TMP,
+ 'stagingLocation': GCS_STAGING,
+ 'output': GCS_OUTPUT,
+ 'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
+ },
+ dataflow_config=DataflowConfiguration(
+ job_name='{{task.task_id}}',
+ project_id=GCP_PROJECT_ID,
+ location="us-central1",
+ wait_until_finished=False,
+ ),
+ )
+
+ wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
+ task_id="wait-for-go-job-async-done",
+ job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
+ expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
+ project_id=GCP_PROJECT_ID,
+ location='us-central1',
+ )
+
+ start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
+ # [END howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_java_dataflow.py b/tests/system/providers/apache/beam/example_java_dataflow.py
new file mode 100644
index 0000000000000..092dff4aafc2a
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_java_dataflow.py
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
+from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
+from tests.system.providers.apache.beam.utils import (
+ GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
+ GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
+ GCS_OUTPUT,
+ GCS_STAGING,
+ GCS_TMP,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_java_dataflow_runner",
+ schedule_interval=None, # Override to match your needs
+ start_date=START_DATE,
+ catchup=False,
+ tags=['example'],
+) as dag:
+ # [START howto_operator_start_java_dataflow_runner_pipeline]
+ jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
+ task_id="jar_to_local_dataflow_runner",
+ bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
+ object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
+ filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
+ )
+
+ start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
+ task_id="start_java_pipeline_dataflow",
+ runner="DataflowRunner",
+ jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
+ pipeline_options={
+ 'tempLocation': GCS_TMP,
+ 'stagingLocation': GCS_STAGING,
+ 'output': GCS_OUTPUT,
+ },
+ job_class='org.apache.beam.examples.WordCount',
+ dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
+ )
+
+ jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
+ # [END howto_operator_start_java_dataflow_runner_pipeline]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_python.py b/tests/system/providers/apache/beam/example_python.py
new file mode 100644
index 0000000000000..3349e30565ed6
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_python.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from tests.system.providers.apache.beam.utils import (
+ DEFAULT_ARGS,
+ GCP_PROJECT_ID,
+ GCS_OUTPUT,
+ GCS_PYTHON,
+ GCS_STAGING,
+ GCS_TMP,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_python",
+ start_date=START_DATE,
+ schedule_interval=None, # Override to match your needs
+ catchup=False,
+ default_args=DEFAULT_ARGS,
+ tags=['example'],
+) as dag:
+
+ # [START howto_operator_start_python_direct_runner_pipeline_local_file]
+ start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
+ task_id="start_python_pipeline_local_direct_runner",
+ py_file='apache_beam.examples.wordcount',
+ py_options=['-m'],
+ py_requirements=['apache-beam[gcp]==2.26.0'],
+ py_interpreter='python3',
+ py_system_site_packages=False,
+ )
+ # [END howto_operator_start_python_direct_runner_pipeline_local_file]
+
+ # [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
+ start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
+ task_id="start_python_pipeline_direct_runner",
+ py_file=GCS_PYTHON,
+ py_options=[],
+ pipeline_options={"output": GCS_OUTPUT},
+ py_requirements=['apache-beam[gcp]==2.26.0'],
+ py_interpreter='python3',
+ py_system_site_packages=False,
+ )
+ # [END howto_operator_start_python_direct_runner_pipeline_gcs_file]
+
+ # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+ start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
+ task_id="start_python_pipeline_dataflow_runner",
+ runner="DataflowRunner",
+ py_file=GCS_PYTHON,
+ pipeline_options={
+ 'tempLocation': GCS_TMP,
+ 'stagingLocation': GCS_STAGING,
+ 'output': GCS_OUTPUT,
+ },
+ py_options=[],
+ py_requirements=['apache-beam[gcp]==2.26.0'],
+ py_interpreter='python3',
+ py_system_site_packages=False,
+ dataflow_config=DataflowConfiguration(
+ job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
+ ),
+ )
+ # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+
+ start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator(
+ task_id="start_python_pipeline_local_spark_runner",
+ py_file='apache_beam.examples.wordcount',
+ runner="SparkRunner",
+ py_options=['-m'],
+ py_requirements=['apache-beam[gcp]==2.26.0'],
+ py_interpreter='python3',
+ py_system_site_packages=False,
+ )
+
+ start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator(
+ task_id="start_python_pipeline_local_flink_runner",
+ py_file='apache_beam.examples.wordcount',
+ runner="FlinkRunner",
+ py_options=['-m'],
+ pipeline_options={
+ 'output': '/tmp/start_python_pipeline_local_flink_runner',
+ },
+ py_requirements=['apache-beam[gcp]==2.26.0'],
+ py_interpreter='python3',
+ py_system_site_packages=False,
+ )
+
+ (
+ [
+ start_python_pipeline_local_direct_runner,
+ start_python_pipeline_direct_runner,
+ ]
+ >> start_python_pipeline_local_flink_runner
+ >> start_python_pipeline_local_spark_runner
+ )
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/example_python_dataflow.py b/tests/system/providers/apache/beam/example_python_dataflow.py
new file mode 100644
index 0000000000000..f119e31a66e40
--- /dev/null
+++ b/tests/system/providers/apache/beam/example_python_dataflow.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Apache Beam operators
+"""
+
+from airflow import models
+from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
+from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
+from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
+from airflow.providers.google.cloud.sensors.dataflow import DataflowJobStatusSensor
+from tests.system.providers.apache.beam.utils import (
+ DEFAULT_ARGS,
+ GCP_PROJECT_ID,
+ GCS_OUTPUT,
+ GCS_PYTHON_DATAFLOW_ASYNC,
+ GCS_STAGING,
+ GCS_TMP,
+ START_DATE,
+)
+
+with models.DAG(
+ "example_beam_native_python_dataflow_async",
+ default_args=DEFAULT_ARGS,
+ start_date=START_DATE,
+ schedule_interval=None, # Override to match your needs
+ catchup=False,
+ tags=['example'],
+) as dag:
+ # [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
+ start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
+ task_id="start_python_job_dataflow_runner_async",
+ runner="DataflowRunner",
+ py_file=GCS_PYTHON_DATAFLOW_ASYNC,
+ pipeline_options={
+ 'tempLocation': GCS_TMP,
+ 'stagingLocation': GCS_STAGING,
+ 'output': GCS_OUTPUT,
+ },
+ py_options=[],
+ py_requirements=['apache-beam[gcp]==2.26.0'],
+ py_interpreter='python3',
+ py_system_site_packages=False,
+ dataflow_config=DataflowConfiguration(
+ job_name='{{task.task_id}}',
+ project_id=GCP_PROJECT_ID,
+ location="us-central1",
+ wait_until_finished=False,
+ ),
+ )
+
+ wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
+ task_id="wait-for-python-job-async-done",
+ job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
+ expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
+ project_id=GCP_PROJECT_ID,
+ location='us-central1',
+ )
+
+ start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
+ # [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
+
+
+from tests.system.utils import get_test_run
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/apache/beam/utils.py b/tests/system/providers/apache/beam/utils.py
new file mode 100644
index 0000000000000..462e22169a9e7
--- /dev/null
+++ b/tests/system/providers/apache/beam/utils.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Utils for Apache Beam operator example DAG's
+"""
+import os
+from datetime import datetime
+from urllib.parse import urlparse
+
+from airflow.utils.trigger_rule import TriggerRule
+
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCS_INPUT = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/shakespeare/kinglear.txt')
+GCS_TMP = os.environ.get('APACHE_BEAM_GCS_TMP', 'gs://INVALID BUCKET NAME/temp/')
+GCS_STAGING = os.environ.get('APACHE_BEAM_GCS_STAGING', 'gs://INVALID BUCKET NAME/staging/')
+GCS_OUTPUT = os.environ.get('APACHE_BEAM_GCS_OUTPUT', 'gs://INVALID BUCKET NAME/output')
+GCS_PYTHON = os.environ.get('APACHE_BEAM_PYTHON', 'gs://INVALID BUCKET NAME/wordcount_debugging.py')
+GCS_PYTHON_DATAFLOW_ASYNC = os.environ.get(
+ 'APACHE_BEAM_PYTHON_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.py'
+)
+GCS_GO = os.environ.get('APACHE_BEAM_GO', 'gs://INVALID BUCKET NAME/wordcount_debugging.go')
+GCS_GO_DATAFLOW_ASYNC = os.environ.get(
+ 'APACHE_BEAM_GO_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.go'
+)
+GCS_JAR_DIRECT_RUNNER = os.environ.get(
+ 'APACHE_BEAM_DIRECT_RUNNER_JAR',
+ 'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-DirectRunner.jar',
+)
+GCS_JAR_DATAFLOW_RUNNER = os.environ.get(
+ 'APACHE_BEAM_DATAFLOW_RUNNER_JAR', 'gs://INVALID BUCKET NAME/word-count-beam-bundled-0.1.jar'
+)
+GCS_JAR_SPARK_RUNNER = os.environ.get(
+ 'APACHE_BEAM_SPARK_RUNNER_JAR',
+ 'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-SparkRunner.jar',
+)
+GCS_JAR_FLINK_RUNNER = os.environ.get(
+ 'APACHE_BEAM_FLINK_RUNNER_JAR',
+ 'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-FlinkRunner.jar',
+)
+
+GCS_JAR_DIRECT_RUNNER_PARTS = urlparse(GCS_JAR_DIRECT_RUNNER)
+GCS_JAR_DIRECT_RUNNER_BUCKET_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.netloc
+GCS_JAR_DIRECT_RUNNER_OBJECT_NAME = GCS_JAR_DIRECT_RUNNER_PARTS.path[1:]
+GCS_JAR_DATAFLOW_RUNNER_PARTS = urlparse(GCS_JAR_DATAFLOW_RUNNER)
+GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.netloc
+GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME = GCS_JAR_DATAFLOW_RUNNER_PARTS.path[1:]
+GCS_JAR_SPARK_RUNNER_PARTS = urlparse(GCS_JAR_SPARK_RUNNER)
+GCS_JAR_SPARK_RUNNER_BUCKET_NAME = GCS_JAR_SPARK_RUNNER_PARTS.netloc
+GCS_JAR_SPARK_RUNNER_OBJECT_NAME = GCS_JAR_SPARK_RUNNER_PARTS.path[1:]
+GCS_JAR_FLINK_RUNNER_PARTS = urlparse(GCS_JAR_FLINK_RUNNER)
+GCS_JAR_FLINK_RUNNER_BUCKET_NAME = GCS_JAR_FLINK_RUNNER_PARTS.netloc
+GCS_JAR_FLINK_RUNNER_OBJECT_NAME = GCS_JAR_FLINK_RUNNER_PARTS.path[1:]
+
+
+DEFAULT_ARGS = {
+ 'default_pipeline_options': {'output': '/tmp/example_beam'},
+ 'trigger_rule': TriggerRule.ALL_DONE,
+}
+START_DATE = datetime(2021, 1, 1)