Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amazon appflow #24057

Merged
merged 31 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7f60a97
Add Amazon AppFlow hook.
igorborgest May 31, 2022
78bf77a
Add Amazon AppFlow operators.
igorborgest May 31, 2022
e83ac35
Add Amazon AppFlow examples.
igorborgest May 31, 2022
ae155fd
Add Amazon Appflow docs.
igorborgest May 31, 2022
f90368f
Merge branch 'main' into amazon-appflow
igorborgest May 31, 2022
c70df4f
Apply comments/docs patterns.
igorborgest May 31, 2022
b83b0e2
Removing the "private" attribute signal and more.
igorborgest May 31, 2022
aaeeaaa
Fix task_ids for example_appflow.
igorborgest Jun 1, 2022
6a2314e
Move datetime_to_epoch() to utils and more.
igorborgest Jun 1, 2022
35efeb9
Fix the AppflowBaseOperator name.
igorborgest Jun 1, 2022
2b54596
Ignore AppflowBaseOperator during structure check.
igorborgest Jun 2, 2022
7cfed9b
test_short_circuit refactor.
igorborgest Jun 2, 2022
77f2a59
Add get_airflow_version.
igorborgest Jun 2, 2022
2a754d1
Update airflow/providers/amazon/aws/hooks/appflow.py
igorborgest Jun 7, 2022
94471ca
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 7, 2022
485558b
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 7, 2022
4f2123f
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 7, 2022
d850ea9
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 7, 2022
f7705ed
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 7, 2022
8c3a220
Addressing Josh's requests.
igorborgest Jun 7, 2022
57a6f1e
Add cached_property to AppflowHook
igorborgest Jun 10, 2022
d7098ec
Update airflow/providers/amazon/aws/hooks/appflow.py
igorborgest Jun 15, 2022
ccdf4bf
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 15, 2022
2e03f6f
Update airflow/providers/amazon/aws/operators/appflow.py
igorborgest Jun 15, 2022
fcbdd63
Update Josh's comment.
igorborgest Jun 16, 2022
c90dd84
Merge branch 'main' into amazon-appflow
igorborgest Jun 16, 2022
263abdd
Update cached_property import.
igorborgest Jun 21, 2022
8085dcb
Merge branch 'main' into amazon-appflow
igorborgest Jun 21, 2022
f096358
Merge branch 'main' into amazon-appflow
igorborgest Jun 21, 2022
9843e6b
Merge branch 'main' into amazon-appflow
igorborgest Jun 21, 2022
28dfacd
Fix mypy.
igorborgest Jun 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions airflow/providers/amazon/aws/example_dags/example_appflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Licensed to the Apache Software Foundation (ASF) under one
josh-fell marked this conversation as resolved.
Show resolved Hide resolved
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.appflow import (
AppflowRecordsShortCircuit,
AppflowRunAfterOperator,
AppflowRunBeforeOperator,
AppflowRunDailyOperator,
AppflowRunFullOperator,
AppflowRunOperator,
)

SOURCE_NAME = "salesforce"
FLOW_NAME = "salesforce-campaign"

with DAG(
"example_appflow",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example"],
) as dag:

# [START howto_appflow_run]
igorborgest marked this conversation as resolved.
Show resolved Hide resolved
run = AppflowRunOperator(
task_id="campaign-dump",
igorborgest marked this conversation as resolved.
Show resolved Hide resolved
source=SOURCE_NAME,
name=FLOW_NAME,
)
# [END howto_appflow_run]

# [START howto_appflow_run_full]
run_full = AppflowRunFullOperator(
task_id="campaign-dump-full",
source=SOURCE_NAME,
name=FLOW_NAME,
)
# [END howto_appflow_run_full]

# [START howto_appflow_run_daily]
run_daily = AppflowRunDailyOperator(
task_id="campaign-dump-daily",
source=SOURCE_NAME,
name=FLOW_NAME,
source_field="LastModifiedDate",
dt="{{ ds }}",
)
# [END howto_appflow_run_daily]

# [START howto_appflow_run_before]
run_before = AppflowRunBeforeOperator(
task_id="campaign-dump-before",
source=SOURCE_NAME,
name=FLOW_NAME,
source_field="LastModifiedDate",
dt="{{ ds }}",
)
# [END howto_appflow_run_before]

# [START howto_appflow_run_after]
run_after = AppflowRunAfterOperator(
task_id="campaign-dump-after",
source=SOURCE_NAME,
name=FLOW_NAME,
source_field="LastModifiedDate",
dt="3000-01-01", # Future date, so no records to dump
)
# [END howto_appflow_run_after]

# [START howto_appflow_shortcircuit]
has_records = AppflowRecordsShortCircuit(
task_id="campaign-dump-short-ciruit",
flow_name=FLOW_NAME,
appflow_run_task_id="campaign-dump-after", # Should shortcircuit, no records expected
)
# [END howto_appflow_shortcircuit]

skipped = BashOperator(
igorborgest marked this conversation as resolved.
Show resolved Hide resolved
task_id="should_be_skipped",
bash_command="echo 1",
)

run >> run_full >> run_daily >> run_before >> run_after >> has_records >> skipped
igorborgest marked this conversation as resolved.
Show resolved Hide resolved
51 changes: 51 additions & 0 deletions airflow/providers/amazon/aws/hooks/appflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import TYPE_CHECKING

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

if TYPE_CHECKING:
from mypy_boto3_appflow.client import AppflowClient


igorborgest marked this conversation as resolved.
Show resolved Hide resolved
class AppflowHook(AwsBaseHook):
"""
Interact with Amazon Appflow, using the boto3 library
Hook attribute `conn` has all methods that listed in documentation
igorborgest marked this conversation as resolved.
Show resolved Hide resolved

.. seealso::
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/appflow.html
- https://docs.aws.amazon.com/appflow/1.0/APIReference/Welcome.html

Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be specified and
are passed down to the underlying AwsBaseHook.

.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`

:param aws_conn_id: The Airflow connection used for AWS credentials.
igorborgest marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "appflow"
super().__init__(*args, **kwargs)

@property
def conn(self) -> 'AppflowClient':
"""Get the underlying boto3 Appflow client (cached)"""
igorborgest marked this conversation as resolved.
Show resolved Hide resolved
return super().conn
Loading