From 9ed2f300cd3abba6cb6a7ffa6e99e1722863d1eb Mon Sep 17 00:00:00 2001 From: Chethan UK Date: Sun, 5 Jun 2022 23:40:51 +0100 Subject: [PATCH] AIP-47 - Migrate redshift DAGs to new design #22438 --- .../operators/redshift_cluster.rst | 10 ++++---- .../operators/redshift_data.rst | 2 +- .../operators/redshift_sql.rst | 4 ++-- .../operators/transfer/redshift_to_s3.rst | 2 +- tests/system/providers/amazon/__init__.py | 17 ++++++++++++++ tests/system/providers/amazon/aws/__init__.py | 17 ++++++++++++++ .../amazon/aws}/example_redshift_cluster.py | 19 ++++++++++++++- .../aws}/example_redshift_data_execute_sql.py | 23 +++++++++++++++---- .../amazon/aws}/example_redshift_sql.py | 20 +++++++++++++++- .../amazon/aws}/example_redshift_to_s3.py | 11 ++++++++- 10 files changed, 108 insertions(+), 17 deletions(-) create mode 100644 tests/system/providers/amazon/__init__.py create mode 100644 tests/system/providers/amazon/aws/__init__.py rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws}/example_redshift_cluster.py (84%) rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws}/example_redshift_data_execute_sql.py (71%) rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws}/example_redshift_sql.py (81%) rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws}/example_redshift_to_s3.py (82%) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst index a0875c3d771a3..b1c708850eaa0 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst @@ -40,7 +40,7 @@ Create an Amazon Redshift cluster To create an Amazon Redshift Cluster with the specified parameters you can use :class:`~airflow.providers.amazon.aws.operators.redshift_cluster.RedshiftCreateClusterOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_cluster] @@ -54,7 +54,7 @@ Resume an Amazon Redshift cluster To resume a 'paused' Amazon Redshift cluster you can use :class:`RedshiftResumeClusterOperator ` -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_resume_cluster] @@ -68,7 +68,7 @@ Pause an Amazon Redshift cluster To pause an 'available' Amazon Redshift cluster you can use :class:`RedshiftPauseClusterOperator ` -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_pause_cluster] @@ -82,7 +82,7 @@ Delete an Amazon Redshift cluster To delete an Amazon Redshift cluster you can use :class:`RedshiftDeleteClusterOperator ` -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_delete_cluster] @@ -99,7 +99,7 @@ Wait on an Amazon Redshift cluster state To check the state of an Amazon Redshift Cluster until it reaches the target state or another terminal state you can use :class:`~airflow.providers.amazon.aws.sensors.redshift_cluster.RedshiftClusterSensor`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py :language: python :dedent: 4 :start-after: [START howto_sensor_redshift_cluster] diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_data.rst b/docs/apache-airflow-providers-amazon/operators/redshift_data.rst index eb9dc5112952c..dea4472f0076d 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_data.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_data.rst @@ -43,7 +43,7 @@ statements against an Amazon Redshift cluster. This differs from ``RedshiftSQLOperator`` in that it allows users to query and retrieve data via the AWS API and avoid the necessity of a Postgres connection. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_data] diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst index 92ff0e1c4b4c5..fc9809ac46efe 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_sql.rst @@ -42,7 +42,7 @@ Execute a SQL query To execute a SQL query against an Amazon Redshift cluster without using a Postgres connection, please check ``RedshiftDataOperator``. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_sql.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_sql.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_sql] @@ -51,7 +51,7 @@ please check ``RedshiftDataOperator``. ``RedshiftSQLOperator`` supports the ``parameters`` attribute which allows us to dynamically pass parameters into SQL statements. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_sql.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_sql.py :language: python :dedent: 4 :start-after: [START howto_operator_redshift_sql_with_params] diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst index 2235bf8bb3bb5..3707cf2b4a1cd 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst @@ -42,7 +42,7 @@ To get more information about this operator visit: Example usage: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_to_s3.py :language: python :dedent: 4 :start-after: [START howto_transfer_redshift_to_s3] diff --git a/tests/system/providers/amazon/__init__.py b/tests/system/providers/amazon/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/system/providers/amazon/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/amazon/aws/__init__.py b/tests/system/providers/amazon/aws/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/system/providers/amazon/aws/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py b/tests/system/providers/amazon/aws/example_redshift_cluster.py similarity index 84% rename from airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py rename to tests/system/providers/amazon/aws/example_redshift_cluster.py index dc6deead3c618..ed0e2219afe26 100644 --- a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +++ b/tests/system/providers/amazon/aws/example_redshift_cluster.py @@ -28,11 +28,15 @@ RedshiftResumeClusterOperator, ) from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import set_env_id +ENV_ID = set_env_id() +DAG_ID = 'example_redshift_cluster' REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift-cluster-1") with DAG( - dag_id="example_redshift_cluster", + dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule_interval=None, catchup=False, @@ -85,6 +89,7 @@ task_delete_cluster = RedshiftDeleteClusterOperator( task_id="delete_cluster", cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER, + trigger_rule=TriggerRule.ALL_DONE, ) # [END howto_operator_redshift_delete_cluster] @@ -96,3 +101,15 @@ task_resume_cluster, task_delete_cluster, ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py b/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py similarity index 71% rename from airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py rename to tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py index cfa3e4cefcb2d..d5ddb6e21312f 100644 --- a/airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py +++ b/tests/system/providers/amazon/aws/example_redshift_data_execute_sql.py @@ -16,16 +16,18 @@ # under the License. from datetime import datetime -from os import getenv from airflow import DAG from airflow.decorators import task from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator +from tests.system.providers.amazon.aws.utils import fetch_variable, set_env_id -REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier") -REDSHIFT_DATABASE = getenv("REDSHIFT_DATABASE", "redshift_database") -REDSHIFT_DATABASE_USER = getenv("REDSHIFT_DATABASE_USER", "awsuser") +ENV_ID = set_env_id() +DAG_ID = 'example_redshift_data_execute_sql' +REDSHIFT_CLUSTER_IDENTIFIER = fetch_variable("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier") +REDSHIFT_DATABASE = fetch_variable("REDSHIFT_DATABASE", "redshift_database") +REDSHIFT_DATABASE_USER = fetch_variable("REDSHIFT_DATABASE_USER", "awsuser") REDSHIFT_QUERY = """ SELECT table_schema, @@ -51,7 +53,7 @@ def output_query_results(statement_id): with DAG( - dag_id="example_redshift_data_execute_sql", + dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule_interval=None, catchup=False, @@ -70,3 +72,14 @@ def output_query_results(statement_id): # [END howto_operator_redshift_data] task_output = output_query_results(task_query.output) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_sql.py b/tests/system/providers/amazon/aws/example_redshift_sql.py similarity index 81% rename from airflow/providers/amazon/aws/example_dags/example_redshift_sql.py rename to tests/system/providers/amazon/aws/example_redshift_sql.py index a71ef71934edc..17e386c7de27c 100644 --- a/airflow/providers/amazon/aws/example_dags/example_redshift_sql.py +++ b/tests/system/providers/amazon/aws/example_redshift_sql.py @@ -21,9 +21,14 @@ from airflow import DAG from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import set_env_id + +ENV_ID = set_env_id() +DAG_ID = 'example_redshift_sql' with DAG( - dag_id="example_redshift_sql", + dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule_interval=None, catchup=False, @@ -69,6 +74,7 @@ teardown__task_drop_table = RedshiftSQLOperator( task_id='teardown__drop_table', sql='DROP TABLE IF EXISTS fruit', + trigger_rule=TriggerRule.ALL_DONE, ) chain( @@ -77,3 +83,15 @@ [task_select_data, task_select_filtered_data], teardown__task_drop_table, ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py b/tests/system/providers/amazon/aws/example_redshift_to_s3.py similarity index 82% rename from airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py rename to tests/system/providers/amazon/aws/example_redshift_to_s3.py index 8116e02dc165c..a7f50e57712c6 100644 --- a/airflow/providers/amazon/aws/example_dags/example_redshift_to_s3.py +++ b/tests/system/providers/amazon/aws/example_redshift_to_s3.py @@ -20,13 +20,16 @@ from airflow import DAG from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator +from tests.system.providers.amazon.aws.utils import set_env_id +ENV_ID = set_env_id() +DAG_ID = 'example_redshift_to_s3' S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name") S3_KEY = getenv("S3_KEY", "s3_key") REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table") with DAG( - dag_id="example_redshift_to_s3", + dag_id=DAG_ID, start_date=datetime(2021, 1, 1), schedule_interval=None, catchup=False, @@ -41,3 +44,9 @@ table=REDSHIFT_TABLE, ) # [END howto_transfer_redshift_to_s3] + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)