Skip to content

Commit

Permalink
AIP-47 - Migrate beam DAGs to new design #22439 (#24211)
Browse files Browse the repository at this point in the history
* AIP-47 - Migrate beam DAGs to new design #22439
  • Loading branch information
chethanuk authored Jun 5, 2022
1 parent 95ab664 commit 34e0ab9
Show file tree
Hide file tree
Showing 14 changed files with 748 additions and 449 deletions.
437 changes: 0 additions & 437 deletions airflow/providers/apache/beam/example_dags/example_beam.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-apache-beam/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Content

Python API <_api/airflow/providers/apache/beam/index>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-apache-beam/>
Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/apache/beam/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/apache/beam>

.. toctree::
:maxdepth: 1
Expand Down
20 changes: 10 additions & 10 deletions docs/apache-airflow-providers-apache-beam/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ recommend avoiding unless the Dataflow job requires it.
Python Pipelines with DirectRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_direct_runner_pipeline_local_file]
:end-before: [END howto_operator_start_python_direct_runner_pipeline_local_file]

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
Expand All @@ -64,13 +64,13 @@ Python Pipelines with DirectRunner
Python Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
:end-before: [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python_dataflow.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
Expand All @@ -92,7 +92,7 @@ has the ability to download or available on the local filesystem (provide the ab
Java Pipelines with DirectRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_beam.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_java_direct_runner_pipeline]
Expand All @@ -101,7 +101,7 @@ Java Pipelines with DirectRunner
Java Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_java_dataflow.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_java_dataflow_runner_pipeline]
Expand All @@ -125,13 +125,13 @@ init the module and install dependencies with ``go run init example.com/main`` a
Go Pipelines with DirectRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_direct_runner_pipeline_local_file]
:end-before: [END howto_operator_start_go_direct_runner_pipeline_local_file]

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
Expand All @@ -140,13 +140,13 @@ Go Pipelines with DirectRunner
Go Pipelines with DataflowRunner
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
:end-before: [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]

.. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
.. exampleinclude:: /../../tests/system/providers/apache/beam/example_go_dataflow.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/apache/beam/operators/test_beam_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from tests.test_utils import AIRFLOW_MAIN_FOLDER
from tests.test_utils.system_tests_class import SystemTest

BEAM_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "airflow", "providers", "apache", "beam", "example_dags")
BEAM_DAG_FOLDER = os.path.join(AIRFLOW_MAIN_FOLDER, "tests", "system", "providers", "apache", "beam")


@pytest.mark.system("apache.beam")
Expand Down
66 changes: 66 additions & 0 deletions tests/system/providers/apache/beam/example_beam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG for Apache Beam operators
"""

from airflow import models
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from tests.system.providers.apache.beam.utils import (
GCS_INPUT,
GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
START_DATE,
)

with models.DAG(
"example_beam_native_java_direct_runner",
schedule_interval=None, # Override to match your needs
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag:

# [START howto_operator_start_java_direct_runner_pipeline]
jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_direct_runner",
bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_direct_runner",
jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
pipeline_options={
'output': '/tmp/start_java_pipeline_direct_runner',
'inputFile': GCS_INPUT,
},
job_class='org.apache.beam.examples.WordCount',
)

jar_to_local_direct_runner >> start_java_pipeline_direct_runner
# [END howto_operator_start_java_direct_runner_pipeline]


from tests.system.utils import get_test_run

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
65 changes: 65 additions & 0 deletions tests/system/providers/apache/beam/example_beam_java_flink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG for Apache Beam operators
"""

from airflow import models
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from tests.system.providers.apache.beam.utils import (
GCS_INPUT,
GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
START_DATE,
)

with models.DAG(
"example_beam_native_java_flink_runner",
schedule_interval=None, # Override to match your needs
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag:

jar_to_local_flink_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_flink_runner",
bucket=GCS_JAR_FLINK_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_FLINK_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_flink_runner = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_flink_runner",
runner="FlinkRunner",
jar="/tmp/beam_wordcount_flink_runner_{{ ds_nodash }}.jar",
pipeline_options={
'output': '/tmp/start_java_pipeline_flink_runner',
'inputFile': GCS_INPUT,
},
job_class='org.apache.beam.examples.WordCount',
)

jar_to_local_flink_runner >> start_java_pipeline_flink_runner


from tests.system.utils import get_test_run

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
65 changes: 65 additions & 0 deletions tests/system/providers/apache/beam/example_beam_java_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG for Apache Beam operators
"""

from airflow import models
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from tests.system.providers.apache.beam.utils import (
GCS_INPUT,
GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
START_DATE,
)

with models.DAG(
"example_beam_native_java_spark_runner",
schedule_interval=None, # Override to match your needs
start_date=START_DATE,
catchup=False,
tags=['example'],
) as dag:

jar_to_local_spark_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_spark_runner",
bucket=GCS_JAR_SPARK_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_SPARK_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_spark_runner = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_spark_runner",
runner="SparkRunner",
jar="/tmp/beam_wordcount_spark_runner_{{ ds_nodash }}.jar",
pipeline_options={
'output': '/tmp/start_java_pipeline_spark_runner',
'inputFile': GCS_INPUT,
},
job_class='org.apache.beam.examples.WordCount',
)

jar_to_local_spark_runner >> start_java_pipeline_spark_runner


from tests.system.utils import get_test_run

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Loading

0 comments on commit 34e0ab9

Please sign in to comment.