Skip to content

Commit

Permalink
Minor fixes in RDS Create-Delete operators and DMS example DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
eskarimov committed Jun 27, 2022
1 parent e68d304 commit 206e884
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
24 changes: 12 additions & 12 deletions airflow/providers/amazon/aws/example_dags/example_dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,16 @@ def delete_dms_assets():
)

chain(
create_db_instance
>> create_sample_table()
>> create_dms_assets()
>> create_task
>> start_task
>> describe_tasks
>> await_task_start
>> stop_task
>> await_task_stop
>> delete_task
>> delete_dms_assets()
>> delete_db_instance
create_db_instance,
create_sample_table(),
create_dms_assets(),
create_task,
start_task,
describe_tasks,
await_task_start,
stop_task,
await_task_stop,
delete_task,
delete_dms_assets(),
delete_db_instance,
)
10 changes: 6 additions & 4 deletions airflow/providers/amazon/aws/operators/rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator):
:param rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance``
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance
:param aws_conn_id: The Airflow connection used for AWS credentials.
:param wait_for_completion: Whether or not wait for creation of the DB instance complete. (default: True)
:param wait_for_completion: Whether or not wait for creation of the DB instance to
complete. (default: True)
"""

def __init__(
Expand All @@ -589,7 +590,7 @@ def __init__(
self.wait_for_completion = wait_for_completion

def execute(self, context: 'Context') -> str:
self.log.info(f"Creating new DB instance {self.db_instance_identifier}")
self.log.info("Creating new DB instance %s", self.db_instance_identifier)

create_db_instance = self.hook.conn.create_db_instance(
DBInstanceIdentifier=self.db_instance_identifier,
Expand Down Expand Up @@ -618,7 +619,8 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator):
:param rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance``
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.delete_db_instance
:param aws_conn_id: The Airflow connection used for AWS credentials.
:param wait_for_completion: Whether or not wait for deletion of the DB instance complete. (default: True)
:param wait_for_completion: Whether or not wait for deletion of the DB instance to
complete. (default: True)
"""

def __init__(
Expand All @@ -636,7 +638,7 @@ def __init__(
self.wait_for_completion = wait_for_completion

def execute(self, context: 'Context') -> str:
self.log.info(f"Deleting DB instance {self.db_instance_identifier}")
self.log.info("Deleting DB instance %s", self.db_instance_identifier)

delete_db_instance = self.hook.conn.delete_db_instance(
DBInstanceIdentifier=self.db_instance_identifier,
Expand Down

0 comments on commit 206e884

Please sign in to comment.