Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Qubole example DAGs to new design #22460 #24149

Merged
merged 3 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions airflow/providers/qubole/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-qubole/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/qubole/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/qubole>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-qubole/>
Installing from sources <installing-providers-from-sources>

Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-qubole/operators/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ Qubole Operators

.. note::
You can learn how to use Google Cloud integrations by analyzing the
`source code <https://github.com/apache/airflow/tree/main/airflow/providers/qubole/example_dags/>`_ of the particular example DAGs.
`source code <https://github.com/apache/airflow/tree/main/tests/system/providers/qubole/>`_ of the particular example DAGs.
24 changes: 12 additions & 12 deletions docs/apache-airflow-providers-qubole/operators/qubole.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ Run Hive command

To run query that shows all tables you can use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_hive_query]
:end-before: [END howto_operator_qubole_run_hive_query]

Also you can run script that locates in the bucket by passing path to query file

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_hive_script]
Expand All @@ -62,7 +62,7 @@ Run Hadoop command

To run jar file in your Hadoop cluster use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_hadoop_jar]
Expand All @@ -73,7 +73,7 @@ Run Pig command

To run script script in *Pig Latin* in your Hadoop cluster use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_pig_script]
Expand All @@ -84,7 +84,7 @@ Run Shell command

To run Shell-script script use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_shell_script]
Expand All @@ -95,7 +95,7 @@ Run Presto command

To run query using Presto use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_presto_query]
Expand All @@ -106,23 +106,23 @@ Run DB commands

To run query as `DbTap <https://docs.qubole.com/en/latest/rest-api/dbtap_api/index.html>`_ use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_db_tap_query]
:end-before: [END howto_operator_qubole_run_db_tap_query]

To run DB export command use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_db_export]
:end-before: [END howto_operator_qubole_run_db_export]

To run DB import command use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_db_import]
Expand All @@ -133,7 +133,7 @@ Run Spark commands

To run Scala script as a Spark job use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole.py
:language: python
:dedent: 4
:start-after: [START howto_operator_qubole_run_spark_scala]
Expand All @@ -153,7 +153,7 @@ File or directory existence

To wait for file or directory existence in cluster use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_qubole_run_file_sensor]
Expand All @@ -173,7 +173,7 @@ Partition existence

To wait for table partition existence in cluster use

.. exampleinclude:: /../../airflow/providers/qubole/example_dags/example_qubole.py
.. exampleinclude:: /../../tests/system/providers/qubole/example_qubole_sensors.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_qubole_run_partition_sensor]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import filecmp
import os
import random
import textwrap
from datetime import datetime
Expand All @@ -28,15 +29,18 @@
from airflow.operators.empty import EmptyOperator
except ModuleNotFoundError:
from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore

from airflow.operators.python import BranchPythonOperator
from airflow.providers.qubole.operators.qubole import QuboleOperator
from airflow.providers.qubole.sensors.qubole import QuboleFileSensor, QubolePartitionSensor
from airflow.utils.trigger_rule import TriggerRule

START_DATE = datetime(2021, 1, 1)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_qubole_operator"

with DAG(
dag_id='example_qubole_operator',
dag_id=DAG_ID,
schedule_interval=None,
start_date=START_DATE,
tags=['example'],
Expand Down Expand Up @@ -218,53 +222,14 @@ def main(args: Array[String]) {

branching >> db_import >> spark_cmd >> join

with DAG(
dag_id='example_qubole_sensor',
schedule_interval=None,
start_date=START_DATE,
tags=['example'],
) as dag2:
dag2.doc_md = textwrap.dedent(
"""
This is only an example DAG to highlight usage of QuboleSensor in various scenarios,
some of these tasks may or may not work based on your QDS account setup.
from tests.system.utils.watcher import watcher

Run a shell command from Qubole Analyze against your Airflow cluster with following to
trigger it manually `airflow dags trigger example_qubole_sensor`.
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

*Note: Make sure that connection `qubole_default` is properly set before running
this example.*
"""
)

# [START howto_sensor_qubole_run_file_sensor]
check_s3_file = QuboleFileSensor(
task_id='check_s3_file',
poke_interval=60,
timeout=600,
data={
"files": [
"s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
"s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
] # will check for availability of all the files in array
},
)
# [END howto_sensor_qubole_run_file_sensor]

# [START howto_sensor_qubole_run_partition_sensor]
check_hive_partition = QubolePartitionSensor(
task_id='check_hive_partition',
poke_interval=10,
timeout=60,
data={
"schema": "default",
"table": "my_partitioned_table",
"columns": [
{"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
{"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
], # will check for partitions like [month=12/day=12,month=12/day=13]
},
)
# [END howto_sensor_qubole_run_partition_sensor]
from tests.system.utils import get_test_run # noqa: E402

check_s3_file >> check_hive_partition
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
93 changes: 93 additions & 0 deletions tests/system/providers/qubole/example_qubole_sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import textwrap
from datetime import datetime

from airflow import DAG
from airflow.providers.qubole.sensors.qubole import QuboleFileSensor, QubolePartitionSensor

START_DATE = datetime(2021, 1, 1)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_qubole_sensor"


with DAG(
dag_id=DAG_ID,
schedule_interval=None,
start_date=START_DATE,
tags=['example'],
) as dag:
dag.doc_md = textwrap.dedent(
"""
This is only an example DAG to highlight usage of QuboleSensor in various scenarios,
some of these tasks may or may not work based on your QDS account setup.

Run a shell command from Qubole Analyze against your Airflow cluster with following to
trigger it manually `airflow dags trigger example_qubole_sensor`.

*Note: Make sure that connection `qubole_default` is properly set before running
this example.*
"""
)

# [START howto_sensor_qubole_run_file_sensor]
check_s3_file = QuboleFileSensor(
task_id='check_s3_file',
poke_interval=60,
timeout=600,
data={
"files": [
"s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
"s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
] # will check for availability of all the files in array
},
)
# [END howto_sensor_qubole_run_file_sensor]

# [START howto_sensor_qubole_run_partition_sensor]
check_hive_partition = QubolePartitionSensor(
task_id='check_hive_partition',
poke_interval=10,
timeout=60,
data={
"schema": "default",
"table": "my_partitioned_table",
"columns": [
{"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
{"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
], # will check for partitions like [month=12/day=12,month=12/day=13]
},
)
# [END howto_sensor_qubole_run_partition_sensor]

check_s3_file >> check_hive_partition

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)