Skip to content

Commit

Permalink
Flytekit dbt plugin (#1150)
Browse files Browse the repository at this point in the history
* flytekit-dbt plugin

* Supports dbt run and dbt test tasks
* The plugin includes integration test that need local PostgreSQL database

Signed-off-by: ariefrahmansyah <[email protected]>

* Revert README.md about unit tests

Signed-off-by: ariefrahmansyah <[email protected]>

* Merge conflicts

Signed-off-by: Eduardo Apolinario <[email protected]>

* Update requirements

Signed-off-by: Eduardo Apolinario <[email protected]>

* Move to dbt-sqlite

Signed-off-by: Eduardo Apolinario <[email protected]>

* Linting

Signed-off-by: Eduardo Apolinario <[email protected]>

* Regenerate requirements

Signed-off-by: Eduardo Apolinario <[email protected]>

* Delete setup_db.sh

Signed-off-by: Eduardo Apolinario <[email protected]>

* Fix test_task.py tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Move testdata to a separate directory

Signed-off-by: Eduardo Apolinario <[email protected]>

* Delete unused test

Signed-off-by: Eduardo Apolinario <[email protected]>

* Use flytekit logger

Signed-off-by: Eduardo Apolinario <[email protected]>

* Use my fork in the plugins tests

Signed-off-by: Eduardo Apolinario <[email protected]>

* Get string path for call to touch.touch

Signed-off-by: Eduardo Apolinario <[email protected]>

* Use pathlib.Path.touch

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove the touch package

Signed-off-by: Eduardo Apolinario <[email protected]>

* Simplify CI

Signed-off-by: Eduardo Apolinario <[email protected]>

* Revert "Simplify CI"

This reverts commit 134fa1c.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Revert "Use my fork in the plugins tests"

This reverts commit 02ef380.

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unused file

Signed-off-by: Eduardo Apolinario <[email protected]>

* Add dbt to README.md

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove "Set up postgres" from CI job

Signed-off-by: Eduardo Apolinario <[email protected]>

* Leave a note in requirements.in

Signed-off-by: Eduardo Apolinario <[email protected]>

* Remove unused jaffle_shop dir

Signed-off-by: Eduardo Apolinario <[email protected]>

* Set upperbound flytekit version to 2.0.0

Signed-off-by: Eduardo Apolinario <[email protected]>

Signed-off-by: ariefrahmansyah <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: ariefrahmansyah <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
3 people authored Sep 17, 2022
1 parent 21ae290 commit cfcccb8
Show file tree
Hide file tree
Showing 36 changed files with 2,287 additions and 20 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
pull_request:

env:
FLYTE_SDK_LOGGING_LEVEL: 10 # debug
FLYTE_SDK_LOGGING_LEVEL: 10 # debug

jobs:
build:
Expand Down Expand Up @@ -68,6 +68,7 @@ jobs:
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-data-fsspec
- flytekit-dbt
- flytekit-deck-standard
- flytekit-dolt
- flytekit-greatexpectations
Expand Down Expand Up @@ -109,8 +110,6 @@ jobs:
# Issue tracked: https://github.com/whylabs/whylogs/issues/697
- python-version: 3.10
plugin-names: "flytekit-whylogs"


steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand All @@ -136,7 +135,6 @@ jobs:
run: |
cd plugins/${{ matrix.plugin-names }}
coverage run -m pytest tests
lint:
runs-on: ubuntu-latest
steps:
Expand Down
33 changes: 17 additions & 16 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ All the Flytekit plugins maintained by the core team are added here. It is not n

## Currently Available Plugins 🔌

| Plugin | Installation | Description | Version | Type |
|------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| K8s native MPI Jobs | ```bash pip install flytekitplugins-kfmpi ``` | Installs SDK to author Distributed MPI Jobs in python using Kubeflow MPI Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| Papermill based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks and pass inputs and outputs between them and python tasks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only |
| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only |
| spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend |
| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries executed on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend |
| DOLT | ```bash pip install flytekitplugins-dolt ``` | Read & write dolt data sets and use dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only |
| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks. | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only |
| SQLAlchemy | ```bash pip install flytekitplugins-sqlalchemy ``` | Write queries for any database that supports SQLAlchemy | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-sqlalchemy/) | Flytekit-only |
| Great Expectations | ```bash pip install flytekitplugins-great-expectations``` | Enforce data quality for various data types within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Flytekit-only |
| Snowflake | ```bash pip install flytekitplugins-snowflake``` | Use Snowflake as a 'data warehouse-as-a-service' within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Backend |
| Plugin | Installation | Description | Version | Type |
|------------------------------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-awssagemaker.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-hive.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfpytorch.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kftensorflow.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| K8s native MPI Jobs | ```bash pip install flytekitplugins-kfmpi ``` | Installs SDK to author Distributed MPI Jobs in python using Kubeflow MPI Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-kfmpi.svg)](https://pypi.python.org/pypi/flytekitplugins-kfmpi/) | Backend |
| Papermill based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks and pass inputs and outputs between them and python tasks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-papermill.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only |
| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-pod.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only |
| spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend |
| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries executed on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-athena.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend |
| DOLT | ```bash pip install flytekitplugins-dolt ``` | Read & write dolt data sets and use dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-dolt.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only |
| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks. | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-pandera.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only |
| SQLAlchemy | ```bash pip install flytekitplugins-sqlalchemy ``` | Write queries for any database that supports SQLAlchemy | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-sqlalchemy.svg)](https://pypi.python.org/pypi/flytekitplugins-sqlalchemy/) | Flytekit-only |
| Great Expectations | ```bash pip install flytekitplugins-great-expectations``` | Enforce data quality for various data types within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Flytekit-only |
| Snowflake | ```bash pip install flytekitplugins-snowflake``` | Use Snowflake as a 'data warehouse-as-a-service' within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-snowflake.svg)](https://pypi.python.org/pypi/flytekitplugins-snowflake/) | Backend |
| dbt | ```bash pip install flytekitplugins-dbt``` | Run dbt within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-dbt.svg)](https://pypi.python.org/pypi/flytekitplugins-dbt/) | Flytekit-only |


## Have a Plugin Idea? 💡
Expand Down
15 changes: 15 additions & 0 deletions plugins/flytekit-dbt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Flytekit dbt plugin

Flytekit plugin for performing DBT tasks. Currently it supports both `dbt run` and `dbt test` tasks.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-dbt
```

_Example coming soon!_

## Contributors

- [Gojek](https://www.gojek.io/)
Empty file.
49 changes: 49 additions & 0 deletions plugins/flytekit-dbt/flytekitplugins/dbt/error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import List


class DBTHandledError(Exception):
"""
DBTHandledError wraps error logs and message from command execution that returns ``exit code 1``.
Parameters
----------
message : str
Error message.
logs : list of str
Logs produced by the command execution.
Attributes
----------
message : str
Error message.
logs : list of str
Logs produced by the command execution.
"""

def __init__(self, message: str, logs: List[str]):
self.logs = logs
self.message = message


class DBTUnhandledError(Exception):
"""
DBTUnhandledError wraps error logs and message from command execution that returns ``exit code 2``.
Parameters
----------
message : str
Error message.
logs : list of str
Logs produced by the command execution.
Attributes
----------
message : str
Error message.
logs : list of str
Logs produced by the command execution.
"""

def __init__(self, message: str, logs: List[str]):
self.logs = logs
self.message = message
205 changes: 205 additions & 0 deletions plugins/flytekit-dbt/flytekitplugins/dbt/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import json
from dataclasses import dataclass
from typing import List, Optional

from dataclasses_json import dataclass_json


@dataclass_json
@dataclass
class BaseDBTInput:
"""
Base class for DBT Task Input.
Attributes
----------
project_dir : str
Path to directory containing the DBT ``dbt_project.yml``.
profiles_dir : str
Path to directory containing the DBT ``profiles.yml``.
profile : str
Profile name to be used for the DBT task. It will override value in ``dbt_project.yml``.
target : str
Target to load for the given profile (default=None).
output_path : str
Path to directory where compiled files (e.g. models) will be written when running the task (default=target).
ignore_handled_error : bool
Ignore handled error (exit code = 1) returned by DBT, see https://docs.getdbt.com/reference/exit-codes (default=False).
flags : dict
Dictionary containing CLI flags to be added to the ``dbt run`` command (default=False).
"""

project_dir: str
profiles_dir: str
profile: str
target: str = None
output_path: str = "target"
ignore_handled_error: bool = False
flags: dict = None

def to_args(self) -> List[str]:
"""
Convert the instance of BaseDBTInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""

args = []
args += ["--project-dir", self.project_dir]
args += ["--profiles-dir", self.profiles_dir]
args += ["--profile", self.profile]
if self.target is not None:
args += ["--target", self.target]

if self.flags is not None:
for flag, value in self.flags.items():
if not value:
continue

args.append(f"--{flag}")
if isinstance(value, bool):
continue

if isinstance(value, list):
args += value
continue

if isinstance(value, dict):
args.append(json.dumps(value))
continue

args.append(str(value))

return args


@dataclass_json
@dataclass
class BaseDBTOutput:
"""
Base class for output of DBT task.
Attributes
----------
command : str
Complete CLI command and flags that was executed by DBT Task.
exit_code : int
Exit code returned by DBT CLI.
"""

command: str
exit_code: int


@dataclass_json
@dataclass
class DBTRunInput(BaseDBTInput):
"""
Input to DBT Run task.
Attributes
----------
select : List[str]
List of model to be executed (default=None).
exclude : List[str]
List of model to be excluded (default=None).
"""

select: Optional[List[str]] = None
exclude: Optional[List[str]] = None

def to_args(self) -> List[str]:
"""
Convert the instance of BaseDBTInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""

args = BaseDBTInput.to_args(self)
if self.select is not None:
args += ["--select"] + self.select

if self.exclude is not None:
args += ["--exclude"] + self.exclude

return args


@dataclass_json
@dataclass
class DBTRunOutput(BaseDBTOutput):
"""
Output of DBT run task.
Attributes
----------
raw_run_result : str
Raw value of DBT's ``run_result.json``.
raw_manifest : str
Raw value of DBT's ``manifest.json``.
"""

raw_run_result: str
raw_manifest: str


@dataclass_json
@dataclass
class DBTTestInput(BaseDBTInput):
"""
Input to DBT Test task.
Attributes
----------
select : List[str]
List of model to be executed (default : None).
exclude : List[str]
List of model to be excluded (default : None).
"""

select: Optional[List[str]] = None
exclude: Optional[List[str]] = None

def to_args(self) -> List[str]:
"""
Convert the instance of DBTTestInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""

args = BaseDBTInput.to_args(self)

if self.select is not None:
args += ["--select"] + self.select

if self.exclude is not None:
args += ["--exclude"] + self.exclude

return args


@dataclass_json
@dataclass
class DBTTestOutput(BaseDBTOutput):
"""
Output of DBT test task.
Attributes
----------
raw_run_result : str
Raw value of DBT's ``run_result.json``.
raw_manifest : str
Raw value of DBT's ``manifest.json``.
"""

raw_run_result: str
raw_manifest: str
Loading

0 comments on commit cfcccb8

Please sign in to comment.