Skip to content

Commit

Permalink
Merge pull request #1 from flyteorg/airflow-plugin
Browse files Browse the repository at this point in the history
Airflow Plugin
  • Loading branch information
samhita-alla authored Jun 23, 2022
2 parents b581a66 + 51d5f41 commit 17475f6
Show file tree
Hide file tree
Showing 26 changed files with 1,464 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[flake8]
max-line-length = 120
extend-ignore = E203, E266, E501, W503, E741
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,venv/*,src/*,tests/unit/common/protos/*,build
max-complexity=32
per-file-ignores =
*:F821
*/__init__.py: F401
40 changes: 40 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Build

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip
uses: actions/cache@v2
with:
# This path is specific to Ubuntu
path: ~/.cache/pip
# Look to see if there is a cache hit for the corresponding requirements files
key: ${{ format('{0}-pip-{1}', runner.os, hashFiles('requirements.txt', 'requirements-dev.txt', 'requirements-docs.txt')) }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Lint
run: |
pre-commit run --all-files
- name: Unit tests with pytest
run: |
python -m unittest
36 changes: 36 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Publish Airflow Flyte Provider

on:
release:
types: [published]

jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: "0"
- name: Set up Python
uses: actions/setup-python@v1
with:
python-version: "3.x"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Autobump version setup.py
id: bump-setup-py
run: |
# from refs/tags/v1.2.3 get 1.2.3
VERSION=$(echo $GITHUB_REF | sed 's#.*/v##')
PLACEHOLDER="__version__\ =\ \"0.0.0+dev0\""
grep "$PLACEHOLDER" "setup.py"
sed -i "s#$PLACEHOLDER#__version__ = \"$VERSION\"#g" "setup.py"
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python setup.py sdist bdist_wheel
twine upload dist/*
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.DS_Store
__pycache__/
.coverage
*.egg-info/
build/
dist/
.python-version
*_astro
24 changes: 24 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
repos:
- repo: https://github.com/PyCQA/flake8
rev: 4.0.1
hooks:
- id: flake8
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.2.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: v0.8.0.4
hooks:
- id: shellcheck
84 changes: 83 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,83 @@
# airflow-provider-flyte
# Flyte Provider for Apache Airflow

This package provides an operator, a sensor, and a hook that integrates [Flyte](flyte.org/) into Apache Airflow.
`FlyteOperator` is helpful to trigger a task/workflow in Flyte and `FlyteSensor` enables monitoring a Flyte execution status
for completion.

## Installation

Prerequisites: An environment running `apache-airflow`.

```
pip install airflow-provider-flyte
```

## Configuration

In the Airflow UI, configure a _Connection_ for Flyte.

- Host(optional): The FlyteAdmin host. Defaults to localhost.
- Port (optional): The FlyteAdmin port. Defaults to 30081.
- Login (optional): `client_id`
- Password (optional): `client_credentials_secret`
- Extra (optional): Specify the `extra` parameter as JSON dictionary to provide additional parameters.
- `project`: The default project to connect to.
- `domain`: The default domain to connect to.
- `insecure`: Whether to use SSL or not.
- `command`: The command to execute to return a token using an external process.
- `scopes`: List of scopes to request.
- `auth_mode`: The OAuth mode to use. Defaults to pkce flow.
- `env_prefix`: Prefix that will be used to lookup for injected secrets at runtime.
- `default_dir`: Default directory that will be used to find secrets as individual files.
- `file_prefix`: Prefix for the file in the `default_dir`.
- `statsd_host`: The statsd host.
- `statsd_port`: The statsd port.
- `statsd_disabled`: Whether to send statsd or not.
- `statsd_disabled_tags`: Turn on to reduce cardinality.
- `local_sandbox_path`
- S3 Config:
- `s3_enable_debug`
- `s3_endpoint`
- `s3_retries`
- `s3_backoff`
- `s3_access_key_id`
- `s3_secret_access_key`
- GCS Config:
- `gsutil_parallelism`

## Modules

### [Flyte Operator](https://github.com/flyteorg/airflow-provider-flyte/blob/main/flyte_provider/operators/flyte.py)

The `FlyteOperator` requires a `flyte_conn_id` to fetch all the connection-related
parameters that are useful to instantiate `FlyteRemote`. Also, you must give a
`launchplan_name` to trigger a workflow, or `task_name` to trigger a task; you can give a
handful of other values that are optional, such as `project`, `domain`, `max_parallelism`,
`raw_data_prefix`, `kubernetes_service_account`, `labels`, `annotations`,
`secrets`, `notifications`, `disable_notifications`, `oauth2_client`, `version`, and `inputs`.

Import into your DAG via:

```
from flyte_provider.operators.flyte import FlyteOperator
```

### [Flyte Sensor](https://github.com/flyteorg/airflow-provider-flyte/blob/main/flyte_provider/sensors/flyte.py)

If you need to wait for an execution to complete, use `FlyteSensor`.
Monitoring with `FlyteSensor` allows you to trigger downstream processes only when the Flyte executions are complete.

Import into your DAG via:

```
from flyte_provider.sensors.flyte import FlyteSensor
```

## Examples

See the [examples](https://github.com/flyte/airflow-provider-flyte/tree/main/flyte_provider/example_dags) directory for an example DAG.

## Issues

Please file issues and open pull requests [here](https://github.com/flyteorg/airflow-provider-flyte).
If you hit any roadblock, hit us up on [Slack](https://slack.flyte.org/).
12 changes: 12 additions & 0 deletions flyte_provider/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any, Dict


def get_provider_info() -> Dict[str, Any]:
return {
"package-name": "airflow-provider-flyte",
"name": "Flyte Airflow Provider",
"description": "A Flyte provider for Apache Airflow.",
"hook-class-names": ["flyte_provider.hooks.flyte.FlyteHook"],
"extra-links": ["flyte_provider.operators.flyte.RegistryLink"],
"versions": ["0.0.1"],
}
Empty file.
35 changes: 35 additions & 0 deletions flyte_provider/example_dags/example_flyte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datetime import datetime, timedelta

from airflow import DAG
from flyte_providers.flyte.operators.flyte import FlyteOperator
from flytekit.models.core import execution as _execution_model

_workflow_execution_succeeded = _execution_model.WorkflowExecutionPhase.SUCCEEDED

with DAG(
dag_id="example_flyte_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
catchup=False,
) as dag:
# do not wait for the execution to complete
flyte_execution = FlyteOperator(
task_id="flyte_task",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
notifications=[
{
"phases": [_workflow_execution_succeeded],
"email": {"recipients_email": ["[email protected]"]},
}
],
oauth2_client={"client_id": "123", "client_secret": "456"},
secrets=[{"group": "secrets", "key": "123"}],
)
52 changes: 52 additions & 0 deletions flyte_provider/example_dags/example_flyte_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import datetime, timedelta

from airflow import DAG
from flyte_providers.flyte.operators.flyte import FlyteOperator
from flyte_providers.flyte.sensors.flyte import FlyteSensor

with DAG(
dag_id="example_flyte_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
catchup=False,
) as dag:
# wait for the execution to complete
flyte_execution_start = FlyteOperator(
task_id="flyte_task",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
max_parallelism=2,
raw_data_prefix="s3://flyte-demo/raw_data",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
)

flyte_execution_wait = FlyteSensor(
task_id="flyte_sensor_one",
execution_name=flyte_execution_start.output,
project="flytesnacks",
domain="development",
flyte_conn_id="flyte_conn_example",
) # poke every 60 seconds (default)

flyte_execution_start >> flyte_execution_wait

# wait for a long-running execution to complete
flyte_execution_wait_long = FlyteSensor(
task_id="flyte_sensor_two",
execution_name=flyte_execution_start.output,
project="flytesnacks",
domain="development",
flyte_conn_id="flyte_conn_example",
mode="reschedule",
poke_interval=5 * 60, # check every 5 minutes
timeout="86400", # wait for a day
soft_fail=True, # task is skipped if the condition is not met by timeout
)

flyte_execution_start >> flyte_execution_wait_long
49 changes: 49 additions & 0 deletions flyte_provider/example_dags/example_flyte_xcom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datetime import datetime, timedelta

from airflow import DAG
from flyte_providers.flyte.operators.flyte import FlyteOperator
from flyte_providers.flyte.sensors.flyte import FlyteSensor

with DAG(
dag_id="example_flyte_operator",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=["example"],
catchup=False,
) as dag:

flyte_execution_start = FlyteOperator(
task_id="flyte_task_one",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
max_parallelism=2,
raw_data_prefix="s3://flyte-demo/raw_data",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
)

# wait for the execution to complete
flyte_execution_wait = FlyteSensor(
task_id="flyte_sensor",
execution_name=flyte_execution_start.output,
project="flytesnacks",
domain="development",
flyte_conn_id="flyte_conn_example",
) # poke every 60 seconds (default)

flyte_execution = FlyteOperator(
task_id="flyte_task_two",
flyte_conn_id="flyte_conn_example",
project="flytesnacks",
domain="development",
launchplan_name="core.basic.lp.my_wf",
kubernetes_service_account="demo",
version="v1",
inputs={"val": 19},
)

flyte_execution_start >> flyte_execution_wait >> flyte_execution
Empty file.
Loading

0 comments on commit 17475f6

Please sign in to comment.