Skip to content

Commit

Permalink
AIP-47 - Migrate spark DAGs to new design apache#22439
Browse files Browse the repository at this point in the history
  • Loading branch information
chethanuk committed Jun 5, 2022
1 parent acf8951 commit 50e1d05
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 23 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ repos:
^airflow/www/static/|
^airflow/providers/|
^tests/providers/apache/cassandra/hooks/test_cassandra.py$|
^tests/system/providers/apache/spark/example_spark_dag.py$|
^docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst$|
^docs/apache-airflow-providers-apache-hive/commits.rst$|
^airflow/api_connexion/openapi/v1.yaml$|
Expand Down
17 changes: 0 additions & 17 deletions airflow/providers/apache/spark/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-apache-spark/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/apache/spark/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/apache/spark>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-apache-spark/>
Installing from sources <installing-providers-from-sources>

Expand Down
6 changes: 3 additions & 3 deletions docs/apache-airflow-providers-apache-spark/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Using the operator

Using ``cmd_type`` parameter, is possible to transfer data from Spark to a database (``spark_to_jdbc``) or from a database to Spark (``jdbc_to_spark``), which will write the table using the Spark command ``saveAsTable``.

.. exampleinclude:: /../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
.. exampleinclude:: /../../tests/system/providers/apache/spark/example_spark_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spark_jdbc]
Expand All @@ -65,7 +65,7 @@ For parameter definition take a look at :class:`~airflow.providers.apache.spark.
Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
.. exampleinclude:: /../../tests/system/providers/apache/spark/example_spark_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spark_sql]
Expand All @@ -88,7 +88,7 @@ For parameter definition take a look at :class:`~airflow.providers.apache.spark.
Using the operator
""""""""""""""""""

.. exampleinclude:: /../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
.. exampleinclude:: /../../tests/system/providers/apache/spark/example_spark_dag.py
:language: python
:dedent: 4
:start-after: [START howto_operator_spark_submit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
Example Airflow DAG to submit Apache Spark applications using
`SparkSubmitOperator`, `SparkJDBCOperator` and `SparkSqlOperator`.
"""

import os
from datetime import datetime

from airflow.models import DAG
from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_spark_operator"

with DAG(
dag_id='example_spark_operator',
dag_id=DAG_ID,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
Expand Down Expand Up @@ -64,5 +69,12 @@
# [END howto_operator_spark_jdbc]

# [START howto_operator_spark_sql]
sql_job = SparkSqlOperator(sql="SELECT * FROM bar", master="local", task_id="sql_job")
spark_sql_job = SparkSqlOperator(
sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)
# [END howto_operator_spark_sql]

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 50e1d05

Please sign in to comment.