Skip to content

Commit

Permalink
Use redshift-data operators instead of SQL operators (#36113)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck authored Dec 8, 2023
1 parent 73abe32 commit 2c251a0
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
24 changes: 17 additions & 7 deletions tests/system/providers/amazon/aws/example_redshift_s3_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
)
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateBucketOperator,
S3CreateObjectOperator,
Expand All @@ -37,7 +38,6 @@
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

Expand Down Expand Up @@ -144,15 +144,22 @@ def create_connection(conn_id_name: str, cluster_id: str):
replace=True,
)

create_table_redshift_data = SQLExecuteQueryOperator(
create_table_redshift_data = RedshiftDataOperator(
task_id="create_table_redshift_data",
conn_id=conn_id_name,
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=SQL_CREATE_TABLE,
wait_for_completion=True,
)
insert_data = SQLExecuteQueryOperator(

insert_data = RedshiftDataOperator(
task_id="insert_data",
conn_id=conn_id_name,
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=SQL_INSERT_DATA,
wait_for_completion=True,
)

# [START howto_transfer_redshift_to_s3]
Expand Down Expand Up @@ -196,10 +203,13 @@ def create_connection(conn_id_name: str, cluster_id: str):
)
# [END howto_transfer_s3_to_redshift_multiple_keys]

drop_table = SQLExecuteQueryOperator(
drop_table = RedshiftDataOperator(
task_id="drop_table",
conn_id=conn_id_name,
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=SQL_DROP_TABLE,
wait_for_completion=True,
trigger_rule=TriggerRule.ALL_DONE,
)
delete_cluster = RedshiftDeleteClusterOperator(
Expand Down
20 changes: 13 additions & 7 deletions tests/system/providers/amazon/aws/example_s3_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
)
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.operators.s3 import (
S3CreateBucketOperator,
S3CreateObjectOperator,
Expand All @@ -36,7 +37,7 @@
)
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
from airflow.providers.amazon.aws.transfers.s3_to_sql import S3ToSqlOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator, SQLTableCheckOperator
from airflow.providers.common.sql.operators.sql import SQLTableCheckOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
from tests.system.utils.watcher import watcher
Expand Down Expand Up @@ -132,15 +133,18 @@ def create_connection(conn_id_name: str, cluster_id: str):
replace=True,
)

create_table = SQLExecuteQueryOperator(
create_table = RedshiftDataOperator(
task_id="create_sample_table",
conn_id=conn_id_name,
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=f"""
CREATE TABLE IF NOT EXISTS {SQL_TABLE_NAME} (
cocktail_id INT NOT NULL,
cocktail_name VARCHAR NOT NULL,
base_spirit VARCHAR NOT NULL);
""",
""",
wait_for_completion=True,
)

# [START howto_transfer_s3_to_sql]
Expand Down Expand Up @@ -199,11 +203,13 @@ def parse_csv_to_generator(filepath):
},
)

drop_table = SQLExecuteQueryOperator(
conn_id=conn_id_name,
trigger_rule=TriggerRule.ALL_DONE,
drop_table = RedshiftDataOperator(
task_id="drop_table",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=f"DROP TABLE {SQL_TABLE_NAME}",
wait_for_completion=True,
)

delete_s3_objects = S3DeleteObjectsOperator(
Expand Down
16 changes: 11 additions & 5 deletions tests/system/providers/amazon/aws/example_sql_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
RedshiftCreateClusterOperator,
RedshiftDeleteClusterOperator,
)
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

Expand Down Expand Up @@ -128,16 +128,22 @@ def create_connection(conn_id_name: str, cluster_id: str):

set_up_connection = create_connection(conn_id_name, cluster_id=redshift_cluster_identifier)

create_table_redshift_data = SQLExecuteQueryOperator(
create_table_redshift_data = RedshiftDataOperator(
task_id="create_table_redshift_data",
conn_id=conn_id_name,
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=SQL_CREATE_TABLE,
wait_for_completion=True,
)

insert_data = SQLExecuteQueryOperator(
insert_data = RedshiftDataOperator(
task_id="insert_data",
conn_id=conn_id_name,
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=SQL_INSERT_DATA,
wait_for_completion=True,
)

# [START howto_transfer_sql_to_s3]
Expand Down

0 comments on commit 2c251a0

Please sign in to comment.