From ec4dcce3384262d858b6951631da809f15974db7 Mon Sep 17 00:00:00 2001
From: Vincent <97131062+vincbeck@users.noreply.github.com>
Date: Mon, 9 May 2022 11:40:27 -0600
Subject: [PATCH] Update sample dag and doc for Datasync (#23511)

---
 ...mple_datasync_2.py => example_datasync.py} |  70 +++---
 .../aws/example_dags/example_datasync_1.py    |  69 ------
 .../amazon/aws/operators/datasync.py          |   2 +-
 .../operators/datasync.rst                    | 212 +++++-------------
 4 files changed, 82 insertions(+), 271 deletions(-)
 rename airflow/providers/amazon/aws/example_dags/{example_datasync_2.py => example_datasync.py} (54%)
 delete mode 100644 airflow/providers/amazon/aws/example_dags/example_datasync_1.py

diff --git a/airflow/providers/amazon/aws/example_dags/example_datasync_2.py b/airflow/providers/amazon/aws/example_dags/example_datasync.py
similarity index 54%
rename from airflow/providers/amazon/aws/example_dags/example_datasync_2.py
rename to airflow/providers/amazon/aws/example_dags/example_datasync.py
index 12558b8170d0b..09c474079e497 100644
--- a/airflow/providers/amazon/aws/example_dags/example_datasync_2.py
+++ b/airflow/providers/amazon/aws/example_dags/example_datasync.py
@@ -14,24 +14,6 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This is an example dag for using `AWSDataSyncOperator` in a more complex manner.
-
-- Try to get a TaskArn. If one exists, update it.
-- If no tasks exist, try to create a new DataSync Task.
-    - If source and destination locations don't exist for the new task, create them first
-- If many tasks exist, raise an Exception
-- After getting or creating a DataSync Task, run it
-
-This DAG relies on the following environment variables:
-
-* SOURCE_LOCATION_URI - Source location URI, usually on premises SMB or NFS
-* DESTINATION_LOCATION_URI - Destination location URI, usually S3
-* CREATE_TASK_KWARGS - Passed to boto3.create_task(**kwargs)
-* CREATE_SOURCE_LOCATION_KWARGS - Passed to boto3.create_location(**kwargs)
-* CREATE_DESTINATION_LOCATION_KWARGS - Passed to boto3.create_location(**kwargs)
-* UPDATE_TASK_KWARGS - Passed to boto3.update_task(**kwargs)
-"""
 
 import json
 import re
@@ -39,22 +21,14 @@
 from os import getenv
 
 from airflow import models
+from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
 
-# [START howto_operator_datasync_2_args]
+TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")
 SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")
-
 DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
-
-default_create_task_kwargs = '{"Name": "Created by Airflow"}'
-CREATE_TASK_KWARGS = json.loads(getenv("CREATE_TASK_KWARGS", default_create_task_kwargs))
-
-default_create_source_location_kwargs = "{}"
-CREATE_SOURCE_LOCATION_KWARGS = json.loads(
-    getenv("CREATE_SOURCE_LOCATION_KWARGS", default_create_source_location_kwargs)
-)
-
-bucket_access_role_arn = "arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"
+CREATE_TASK_KWARGS = json.loads(getenv("CREATE_TASK_KWARGS", '{"Name": "Created by Airflow"}'))
+CREATE_SOURCE_LOCATION_KWARGS = json.loads(getenv("CREATE_SOURCE_LOCATION_KWARGS", '{}'))
 default_destination_location_kwargs = """\
 {"S3BucketArn": "arn:aws:s3:::mybucket",
     "S3Config": {"BucketAccessRoleArn":
@@ -63,23 +37,33 @@
 CREATE_DESTINATION_LOCATION_KWARGS = json.loads(
     getenv("CREATE_DESTINATION_LOCATION_KWARGS", re.sub(r"[\s+]", '', default_destination_location_kwargs))
 )
-
-default_update_task_kwargs = '{"Name": "Updated by Airflow"}'
-UPDATE_TASK_KWARGS = json.loads(getenv("UPDATE_TASK_KWARGS", default_update_task_kwargs))
-
-# [END howto_operator_datasync_2_args]
+UPDATE_TASK_KWARGS = json.loads(getenv("UPDATE_TASK_KWARGS", '{"Name": "Updated by Airflow"}'))
 
 with models.DAG(
-    "example_datasync_2",
+    "example_datasync",
     schedule_interval=None,  # Override to match your needs
     start_date=datetime(2021, 1, 1),
     catchup=False,
     tags=['example'],
 ) as dag:
+    # [START howto_operator_datasync_specific_task]
+    # Execute a specific task
+    datasync_specific_task = DataSyncOperator(task_id="datasync_specific_task", task_arn=TASK_ARN)
+    # [END howto_operator_datasync_specific_task]
+
+    # [START howto_operator_datasync_search_task]
+    # Search and execute a task
+    datasync_search_task = DataSyncOperator(
+        task_id="datasync_search_task",
+        source_location_uri=SOURCE_LOCATION_URI,
+        destination_location_uri=DESTINATION_LOCATION_URI,
+    )
+    # [END howto_operator_datasync_search_task]
 
-    # [START howto_operator_datasync_2]
-    datasync_task = DataSyncOperator(
-        task_id="datasync_task",
+    # [START howto_operator_datasync_create_task]
+    # Create a task (the task does not exist)
+    datasync_create_task = DataSyncOperator(
+        task_id="datasync_create_task",
         source_location_uri=SOURCE_LOCATION_URI,
         destination_location_uri=DESTINATION_LOCATION_URI,
         create_task_kwargs=CREATE_TASK_KWARGS,
@@ -88,4 +72,10 @@
         update_task_kwargs=UPDATE_TASK_KWARGS,
         delete_task_after_execution=True,
     )
-    # [END howto_operator_datasync_2]
+    # [END howto_operator_datasync_create_task]
+
+    chain(
+        datasync_specific_task,
+        datasync_search_task,
+        datasync_create_task,
+    )
diff --git a/airflow/providers/amazon/aws/example_dags/example_datasync_1.py b/airflow/providers/amazon/aws/example_dags/example_datasync_1.py
deleted file mode 100644
index 3c4275249a14e..0000000000000
--- a/airflow/providers/amazon/aws/example_dags/example_datasync_1.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-This is an example dag for using `AWSDataSyncOperator` in a straightforward manner.
-
-This DAG gets an AWS TaskArn for a specified source and destination, and then attempts to execute it.
-It assumes there is a single task returned and does not do error checking (eg if multiple tasks were found).
-
-This DAG relies on the following environment variables:
-
-* SOURCE_LOCATION_URI - Source location URI, usually on premises SMB or NFS
-* DESTINATION_LOCATION_URI - Destination location URI, usually S3
-"""
-
-from datetime import datetime
-from os import getenv
-
-from airflow import models
-from airflow.providers.amazon.aws.operators.datasync import DataSyncOperator
-
-# [START howto_operator_datasync_1_args_1]
-TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")
-# [END howto_operator_datasync_1_args_1]
-
-# [START howto_operator_datasync_1_args_2]
-SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")
-
-DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
-# [END howto_operator_datasync_1_args_2]
-
-
-with models.DAG(
-    "example_datasync_1_1",
-    schedule_interval=None,  # Override to match your needs
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=['example'],
-) as dag:
-
-    # [START howto_operator_datasync_1_1]
-    datasync_task_1 = DataSyncOperator(task_id="datasync_task_1", task_arn=TASK_ARN)
-    # [END howto_operator_datasync_1_1]
-
-with models.DAG(
-    "example_datasync_1_2",
-    start_date=datetime(2021, 1, 1),
-    schedule_interval=None,  # Override to match your needs
-) as dag:
-    # [START howto_operator_datasync_1_2]
-    datasync_task_2 = DataSyncOperator(
-        task_id="datasync_task_2",
-        source_location_uri=SOURCE_LOCATION_URI,
-        destination_location_uri=DESTINATION_LOCATION_URI,
-    )
-    # [END howto_operator_datasync_1_2]
diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py
index 932f7b31abc25..5a0c86071139f 100644
--- a/airflow/providers/amazon/aws/operators/datasync.py
+++ b/airflow/providers/amazon/aws/operators/datasync.py
@@ -31,7 +31,7 @@
 
 
 class DataSyncOperator(BaseOperator):
-    r"""Find, Create, Update, Execute and Delete AWS DataSync Tasks.
+    """Find, Create, Update, Execute and Delete AWS DataSync Tasks.
 
     If ``do_xcom_push`` is True, then the DataSync TaskArn and TaskExecutionArn
     which were executed will be pushed to an XCom.
diff --git a/docs/apache-airflow-providers-amazon/operators/datasync.rst b/docs/apache-airflow-providers-amazon/operators/datasync.rst
index c16a0777cb11a..20f18fa0556ca 100644
--- a/docs/apache-airflow-providers-amazon/operators/datasync.rst
+++ b/docs/apache-airflow-providers-amazon/operators/datasync.rst
@@ -15,139 +15,25 @@
     specific language governing permissions and limitations
     under the License.
 
-
-.. _howto/operator:DataSyncOperator:
-
 AWS DataSync Operator
 =====================
 
-Overview
---------
-
-Two example_dags are provided which showcase the
-:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
-in action.
-
- - example_datasync_1.py
- - example_datasync_2.py
-
-Both examples use the :class:`~airflow.providers.amazon.aws.hooks.datasync.DataSyncHook`
-to create a boto3 DataSync client. This hook in turn uses the :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
-
-Note this guide differentiates between an *Airflow task* (identified by a task_id on Airflow),
-and an *AWS DataSync Task* (identified by a TaskArn on AWS).
-
-example_datasync_1.py
---------------------------
-
-Purpose
-"""""""
-With this DAG we show approaches catering for two simple use cases.
-
-1.1 Specify a TaskARN to be executed.
-1.2 Find an AWS DataSync TaskArn based on source and destination URIs, and execute it.
-
-Environment variables
-"""""""""""""""""""""
-
-These examples rely on the following variables, which can be passed via OS environment variables.
-
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_1.py
-    :language: python
-    :start-after: [START howto_operator_datasync_1_args_1]
-    :end-before: [END howto_operator_datasync_1_args_1]
-
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_1.py
-    :language: python
-    :start-after: [START howto_operator_datasync_1_args_2]
-    :end-before: [END howto_operator_datasync_1_args_2]
-
-Get DataSync Tasks
-""""""""""""""""""
-
-The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` can execute a specific
-TaskArn by specifying the ``task_arn`` parameter. This is useful when you know the TaskArn you want to execute.
-
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_1.py
-    :language: python
-    :start-after: [START howto_operator_datasync_1_1]
-    :end-before: [END howto_operator_datasync_1_1]
-
-Alternatively, the operator can search in AWS DataSync for a Task based on
-``source_location_uri`` and ``destination_location_uri``. For example, your
-``source_location_uri`` might point to your on-premises SMB / NFS share, and your
-``destination_location_uri`` might be an S3 bucket.
-
-In AWS, DataSync Tasks are linked to source and destination Locations. A location has a LocationURI and
-is referenced by a LocationArn much like other AWS resources.
-The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
-can iterate all DataSync Tasks for their source and destination LocationArns. Then it checks
-each LocationArn to see if its the URIs match the desired source / destination URI.
-
-To perform a search based on the Location URIs, define the task as follows
-
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_1.py
-    :language: python
-    :start-after: [START howto_operator_datasync_1_2]
-    :end-before: [END howto_operator_datasync_1_2]
-
-Note: The above configuration assumes there is always exactly one DataSync TaskArn in AWS that matches.
-It will fail if either there were no matching TaskArns or if there were more than one matching TaskArn
-defined already in AWS DataSync. You may want to add additional logic to handle other cases
-- see example_datasync_2 and the `Operator behaviour`_ section.
+`AWS DataSync <https://aws.amazon.com/datasync/>`__ is a data-transfer service that simplifies, automates,
+and accelerates moving and replicating data between on-premises storage systems and AWS storage services over
+the internet or AWS Direct Connect.
 
-example_datasync_2.py
----------------------
-
-Purpose
-"""""""
-
-Show how DataSync Tasks and Locations can be automatically created, deleted and updated using the
-:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`.
-
-Find and update a DataSync Task, or create one if it doesn't exist. Update the Task, then execute it.
-Finally, delete it.
-
-Environment variables
-"""""""""""""""""""""
-
-This example relies on the following variables, which can be passed via OS environment variables.
-
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_2.py
-    :language: python
-    :start-after: [START howto_operator_datasync_2_args]
-    :end-before: [END howto_operator_datasync_2_args]
-
-Get, Create, Update, Run and Delete DataSync Tasks
-""""""""""""""""""""""""""""""""""""""""""""""""""
-
-The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` is used
-as before but with some extra arguments.
-
-Most of the arguments (``CREATE_*_KWARGS``) provide a way for the operator to automatically create a Task
-and/or Locations if no suitable existing Task was found. If these are left to their default value (None)
-then no create will be attempted.
-
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync_2.py
-    :language: python
-    :start-after: [START howto_operator_datasync_2]
-    :end-before: [END howto_operator_datasync_2]
-
-Note also the addition of ``UPDATE_TASK_KWARGS``; if this is not None then it will be used to do an
-update of the Task properties on AWS prior to the Task being executed.
+Prerequisite Tasks
+------------------
 
-Otherwise the behaviour is very similar to the first examples above. We want to identify a suitable TaskArn
-based on some criteria (specified task_arn or source and dest URIs) and execute it. In this example,
-the main differences are that we provide a way to create Tasks/Locations if none are found.
+.. include:: _partials/prerequisite_tasks.rst
 
-Also, because we specified ``delete_task_after_execution=True``, the TaskArn will be deleted
-from AWS DataSync after it completes successfully.
+.. _howto/operator:DataSyncOperator:
 
-Operator behaviour
-------------------
+Interact with AWS DataSync Tasks
+--------------------------------
 
-DataSync Task execution behaviour
-"""""""""""""""""""""""""""""""""
+You can use :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` to
+find, create, update, execute and delete AWS DataSync tasks.
 
 Once the :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` has identified
 the correct TaskArn to run (either because you specified it, or because it was found), it will then be
@@ -157,51 +43,57 @@ by a TaskExecutionArn.
 The TaskExecutionArn will be monitored until completion (success / failure), and its status will be
 periodically written to the Airflow task log.
 
-After completion, the TaskExecution description is retrieved from AWS and dumped to the Airflow task log
-for inspection.
-
-Finally, both the TaskArn and the TaskExecutionArn are returned from the ``execute()`` method, and pushed to
-an XCom automatically if ``do_xcom_push=True``.
-
 The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` supports
 optional passing of additional kwargs to the underlying ``boto3.start_task_execution()`` API.
 This is done with the ``task_execution_kwargs`` parameter.
-This is useful for example to limit bandwidth or filter included files - refer to the boto3 Datasync
-documentation for more details.
+This is useful for example to limit bandwidth or filter included files, see the `boto3 Datasync
+documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/datasync.html>`__
+for more details.
 
-TaskArn selection behaviour
-"""""""""""""""""""""""""""
+Execute a task
+""""""""""""""
 
-The :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
-may find 0, 1, or many AWS DataSync Tasks with a matching ``source_location_uri`` and
-``destination_location_uri``. The operator must decide what to do in each of these scenarios.
+To execute a specific task, you can pass the ``task_arn`` to the operator.
 
-To override the default behaviour, simply create an operator which inherits
-:class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator`
-and re-implement the ``choose_task`` and ``choose_location`` methods
-to suit your use case.
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_datasync_specific_task]
+    :end-before: [END howto_operator_datasync_specific_task]
+
+Search and execute a task
+"""""""""""""""""""""""""
 
-Scenarios and behaviours:
+To search for a task, you can specify the ``source_location_uri`` and ``destination_location_uri`` to the operator.
+If one task is found, this one will be executed.
+If more than one task is found, the operator will raise an Exception. To avoid this, you can set
+``allow_random_task_choice`` to ``True`` to randomly choose from candidate tasks.
 
- - No suitable AWS DataSync Tasks found
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_datasync_search_task]
+    :end-before: [END howto_operator_datasync_search_task]
 
-If there were 0 suitable AWS DataSync Tasks found, the operator will try to create one.
-This operator will use existing Locations if any are found which match the source or destination
-location uri that were specified. Or, if either location has no matching LocationArn in AWS then
-the operator will attempt to create new Location/s if suitable kwargs were provided to do so.
+Create and execute a task
+"""""""""""""""""""""""""
 
- - 1 AWS DataSync Task found
+When searching for a task, if no task is found you have the option to create one before executing it.
+In order to do that, you need to provide the extra parameters ``create_task_kwargs``, ``create_source_location_kwargs``
+and ``create_destination_location_kwargs``.
 
-This is the simplest scenario - just use the one DataSync Task that was found :).
+These extra parameters provide a way for the operator to automatically create a Task and/or Locations if no suitable
+existing Task was found. If these are left to their default value (None) then no create will be attempted.
 
- - More than one AWS DataSync Tasks found
+Also, because ``delete_task_after_execution`` is set to ``True``, the task will be deleted
+from AWS DataSync after it completes successfully.
 
-The operator will raise an Exception. To avoid this, you can set ``allow_random_task_choice=True``
-to randomly choose from candidate Tasks. Alternatively you can subclass this operator
-and re-implement the ``choose_task`` method with your own algorithm.
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_datasync.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_datasync_create_task]
+    :end-before: [END howto_operator_datasync_create_task]
 
-TaskArn creation behaviour
-"""""""""""""""""""""""""""
 
 When creating a Task, the
 :class:`~airflow.providers.amazon.aws.operators.datasync.DataSyncOperator` will try to find
@@ -209,14 +101,12 @@ and use existing LocationArns rather than creating new ones. If multiple Locatio
 specified URIs then we need to choose one to use. In this scenario, the operator behaves similarly
 to how it chooses a single Task from many Tasks:
 
-The operator will raise an Exception. To avoid this, you can set ``allow_random_location_choice=True``
-to randomly choose from candidate Locations. Alternatively you can subclass this operator
-and re-implement the ``choose_location`` method with your own algorithm.
-
+The operator will raise an Exception. To avoid this, you can set ``allow_random_location_choice`` to ``True``
+to randomly choose from candidate Locations.
 
 Reference
 ---------
 
 For further information, look at:
 
-* `AWS boto3 Library Documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/datasync.html>`__
+* `Boto3 Library Documentation for DataSync <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/datasync.html>`__