Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Plugin #1

Merged
merged 6 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[flake8]
max-line-length = 120
extend-ignore = E203, E266, E501, W503, E741
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,venv/*,src/*,tests/unit/common/protos/*,build
max-complexity=32
per-file-ignores =
*:F821
*/__init__.py: F401
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.DS_Store
__pycache__/
.coverage
*.egg-info/
build/
dist/
.python-version
24 changes: 24 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
repos:
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
- id: flake8
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.2.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: v0.8.0.4
hooks:
- id: shellcheck
72 changes: 71 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,71 @@
# airflow-provider-flyte
# Flyte Provider for Apache Airflow

This package provides an operator, a sensor, and a hook that integrates [Flyte](flyte.org/) into Apache Airflow.
`FlyteOperator` is helpful to trigger a task/workflow in Flyte and `FlyteSensor` enables monitoring a Flyte execution status
for completion.

## Installation

Prerequisites: An environment running `apache-airflow`.

```
pip install airflow-provider-flyte
```

## Configuration

In the Airflow UI, configure a _Connection_ for Flyte.

- Host(optional): The FlyteAdmin host. Defaults to localhost.
- Port (optional): The FlyteAdmin port. Defaults to 30081.
- Login (optional): `client_id`
- Password (optional): `client_credentials_secret`
- Extra (optional): Specify the `extra` parameter as JSON dictionary to provide additional parameters.
- `project`: The default project to connect to.
- `domain`: The default domain to connect to.
- `insecure`: Whether to use SSL or not.
- `command`: The command to execute to return a token using an external process.
- `scopes`: List of scopes to request.
samhita-alla marked this conversation as resolved.
Show resolved Hide resolved
- `auth_mode`: The OAuth mode to use. Defaults to pkce flow.

## Modules

### [Flyte Operator](https://github.com/flyteorg/airflow-provider-flyte/blob/main/flyte_provider/operators/flyte.py)

The `FlyteOperator` requires a `flyte_conn_id` to fetch all the connection-related
parameters that are useful to instantiate `FlyteRemote`. Also, you must give a
`launchplan_name` to trigger a workflow, or `task_name` to trigger a task; you can give a
handful of other values that are optional, such as `project`, `domain`, `max_parallelism`,
`raw_data_prefix`, `assumable_iam_role`, `kubernetes_service_account`, `labels`, `annotations`,
`secrets`, `notifications`, `disable_notifications`, `oauth2_client`, `version`, and `inputs`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also now supported interruptible

Copy link
Collaborator Author

@samhita-alla samhita-alla Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interruptibe needs to be given in the task metadata, right? I don't think we can send interruptible when executing the task through FlyteRemote.


Import into your DAG via:

```
from flyte_provider.operators.flyte import FlyteOperator
```

### [Flyte Sensor](https://github.com/flyteorg/airflow-provider-flyte/blob/main/flyte_provider/sensors/flyte.py)

If you need to wait for an execution to complete, use `FlyteSensor`.
Monitoring with `FlyteSensor` allows you to trigger downstream processes only when the Flyte executions are complete.

Import into your DAG via:

```
from flyte_provider.sensors.flyte import FlyteSensor
```

## Examples

See the [examples](https://github.com/flyte/airflow-provider-flyte/tree/main/flyte_provider/example_dags) directory for an example DAG.

## Issues

Please file issues and open pull requests [here](https://github.com/flyteorg/airflow-provider-flyte).
If you hit any roadblock, hit us up on [Slack](https://slack.flyte.org/).

### Pre-commit Hooks

Please use [pre-commit](https://pre-commit.com/) to automate linting and code formatting on every commit.
Run `pre-commit install` after installing running `pip install pre-commit`.
13 changes: 13 additions & 0 deletions flyte_provider/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
__version__ = "0.0.1"

from typing import Any, Dict


def get_provider_info() -> Dict[str, Any]:
return {
"package-name": "airflow-provider-flyte",
"name": "Flyte Airflow Provider",
"description": "A Flyte provider for Apache Airflow.",
"hook-class-names": ["flyte_provider.hooks.flyte.FlyteHook"],
"extra-links": ["flyte_provider.operators.flyte.RegistryLink"],
}
Empty file.
36 changes: 36 additions & 0 deletions flyte_provider/example_dags/example_flyte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime, timedelta

from airflow import DAG
from flyte_providers.flyte.operators.flyte import FlyteOperator
from flytekit.models.core import execution as _execution_model

_workflow_execution_succeeded = _execution_model.WorkflowExecutionPhase.SUCCEEDED

with DAG(
dag_id="example_flyte_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
catchup=False,
) as dag:
# do not wait for the execution to complete
flyte_execution = FlyteOperator(
task_id="flyte_task",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
assumable_iam_role="default",
samhita-alla marked this conversation as resolved.
Show resolved Hide resolved
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
notifications=[
{
"phases": [_workflow_execution_succeeded],
"email": {"recipients_email": ["[email protected]"]},
}
],
oauth2_client={"client_id": "123", "client_secret": "456"},
secrets=[{"group": "secrets", "key": "123"}],
)
53 changes: 53 additions & 0 deletions flyte_provider/example_dags/example_flyte_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime, timedelta

from airflow import DAG
from flyte_providers.flyte.operators.flyte import FlyteOperator
from flyte_providers.flyte.sensors.flyte import FlyteSensor

with DAG(
dag_id="example_flyte_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
catchup=False,
) as dag:
# wait for the execution to complete
flyte_execution_start = FlyteOperator(
task_id="flyte_task",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
max_parallelism=2,
raw_data_prefix="s3://flyte-demo/raw_data",
assumable_iam_role="default",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
)

flyte_execution_wait = FlyteSensor(
task_id="flyte_sensor_one",
execution_name=flyte_execution_start.output,
project="flytesnacks",
domain="development",
flyte_conn_id="flyte_conn_example",
) # poke every 60 seconds (default)

flyte_execution_start >> flyte_execution_wait

# wait for a long-running execution to complete
flyte_execution_wait_long = FlyteSensor(
task_id="flyte_sensor_two",
execution_name=flyte_execution_start.output,
project="flytesnacks",
domain="development",
flyte_conn_id="flyte_conn_example",
mode="reschedule",
poke_interval=5 * 60, # check every 5 minutes
timeout="86400", # wait for a day
soft_fail=True, # task is skipped if the condition is not met by timeout
)

flyte_execution_start >> flyte_execution_wait_long
51 changes: 51 additions & 0 deletions flyte_provider/example_dags/example_flyte_xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from datetime import datetime, timedelta

from airflow import DAG
from flyte_providers.flyte.operators.flyte import FlyteOperator
from flyte_providers.flyte.sensors.flyte import FlyteSensor

with DAG(
dag_id="example_flyte_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
catchup=False,
) as dag:

flyte_execution_start = FlyteOperator(
task_id="flyte_task_one",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
max_parallelism=2,
raw_data_prefix="s3://flyte-demo/raw_data",
assumable_iam_role="default",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
)

# wait for the execution to complete
flyte_execution_wait = FlyteSensor(
task_id="flyte_sensor",
execution_name=flyte_execution_start.output,
project="flytesnacks",
domain="development",
flyte_conn_id="flyte_conn_example",
) # poke every 60 seconds (default)

flyte_execution = FlyteOperator(
task_id="flyte_task_two",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
assumable_iam_role="default",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
)

flyte_execution_start >> flyte_execution_wait >> flyte_execution
Empty file.
Loading