Skip to content

Commit

Permalink
AIP-47 - Migrate redshift DAGs to new design apache#22438
Browse files Browse the repository at this point in the history
  • Loading branch information
chethanuk committed Jul 18, 2022
1 parent 8d6f4ed commit 9ed2f30
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Create an Amazon Redshift cluster
To create an Amazon Redshift Cluster with the specified parameters you can use
:class:`~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftCreateClusterOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_cluster]
Expand All @@ -54,7 +54,7 @@ Resume an Amazon Redshift cluster
To resume a 'paused' Amazon Redshift cluster you can use
:class:`RedshiftResumeClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_resume_cluster]
Expand All @@ -68,7 +68,7 @@ Pause an Amazon Redshift cluster
To pause an 'available' Amazon Redshift cluster you can use
:class:`RedshiftPauseClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_pause_cluster]
Expand All @@ -82,7 +82,7 @@ Delete an Amazon Redshift cluster
To delete an Amazon Redshift cluster you can use
:class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster]
Expand All @@ -99,7 +99,7 @@ Wait on an Amazon Redshift cluster state
To check the state of an Amazon Redshift Cluster until it reaches the target state or another terminal
state you can use :class:`~airflow.providers.amazon.aws.sensors.redshift_cluster.RedshiftClusterSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_redshift_cluster]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ statements against an Amazon Redshift cluster.
This differs from ``RedshiftSQLOperator`` in that it allows users to query and retrieve data via the AWS API and avoid
the necessity of a Postgres connection.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_data]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Execute a SQL query
To execute a SQL query against an Amazon Redshift cluster without using a Postgres connection,
please check ``RedshiftDataOperator``.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_sql.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_sql]
Expand All @@ -51,7 +51,7 @@ please check ``RedshiftDataOperator``.
``RedshiftSQLOperator`` supports the ``parameters`` attribute which allows us to dynamically pass
parameters into SQL statements.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_sql.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_sql.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_sql_with_params]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ To get more information about this operator visit:

Example usage:

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_to_s3.py
:language: python
:dedent: 4
:start-after: [START howto_transfer_redshift_to_s3]
Expand Down
17 changes: 17 additions & 0 deletions tests/system/providers/amazon/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
17 changes: 17 additions & 0 deletions tests/system/providers/amazon/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
RedshiftResumeClusterOperator,
)
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import set_env_id

ENV_ID = set_env_id()
DAG_ID = 'example_redshift_cluster'
REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift-cluster-1")

with DAG(
dag_id="example_redshift_cluster",
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
Expand Down Expand Up @@ -85,6 +89,7 @@
task_delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_redshift_delete_cluster]

Expand All @@ -96,3 +101,15 @@
task_resume_cluster,
task_delete_cluster,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
# under the License.

from datetime import datetime
from os import getenv

from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from tests.system.providers.amazon.aws.utils import fetch_variable, set_env_id

REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier")
REDSHIFT_DATABASE = getenv("REDSHIFT_DATABASE", "redshift_database")
REDSHIFT_DATABASE_USER = getenv("REDSHIFT_DATABASE_USER", "awsuser")
ENV_ID = set_env_id()
DAG_ID = 'example_redshift_data_execute_sql'
REDSHIFT_CLUSTER_IDENTIFIER = fetch_variable("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier")
REDSHIFT_DATABASE = fetch_variable("REDSHIFT_DATABASE", "redshift_database")
REDSHIFT_DATABASE_USER = fetch_variable("REDSHIFT_DATABASE_USER", "awsuser")

REDSHIFT_QUERY = """
SELECT table_schema,
Expand All @@ -51,7 +53,7 @@ def output_query_results(statement_id):


with DAG(
dag_id="example_redshift_data_execute_sql",
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
Expand All @@ -70,3 +72,14 @@ def output_query_results(statement_id):
# [END howto_operator_redshift_data]

task_output = output_query_results(task_query.output)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import set_env_id

ENV_ID = set_env_id()
DAG_ID = 'example_redshift_sql'

with DAG(
dag_id="example_redshift_sql",
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
Expand Down Expand Up @@ -69,6 +74,7 @@
teardown__task_drop_table = RedshiftSQLOperator(
task_id='teardown__drop_table',
sql='DROP TABLE IF EXISTS fruit',
trigger_rule=TriggerRule.ALL_DONE,
)

chain(
Expand All @@ -77,3 +83,15 @@
[task_select_data, task_select_filtered_data],
teardown__task_drop_table,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

from airflow import DAG
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from tests.system.providers.amazon.aws.utils import set_env_id

ENV_ID = set_env_id()
DAG_ID = 'example_redshift_to_s3'
S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name")
S3_KEY = getenv("S3_KEY", "s3_key")
REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table")

with DAG(
dag_id="example_redshift_to_s3",
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
Expand All @@ -41,3 +44,9 @@
table=REDSHIFT_TABLE,
)
# [END howto_transfer_redshift_to_s3]


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 9ed2f30

Please sign in to comment.