-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8528268
commit d77a773
Showing
11 changed files
with
310 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .task import SQLAlchemyConfig, SQLAlchemyTask |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
import typing | ||
from dataclasses import dataclass | ||
|
||
import pandas as pd | ||
from sqlalchemy import create_engine | ||
|
||
from flytekit import current_context, kwtypes | ||
from flytekit.core.base_sql_task import SQLTask | ||
from flytekit.core.python_function_task import PythonInstanceTask | ||
from flytekit.models.security import Secret | ||
from flytekit.types.schema import FlyteSchema | ||
|
||
|
||
@dataclass | ||
class SQLAlchemyConfig(object): | ||
""" | ||
Use this configuration to configure task. String should be standard | ||
sqlalchemy connector format | ||
(https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls). | ||
Database can be found: | ||
- within the container | ||
- or from a publicly accessible source | ||
Args: | ||
uri: default sqlalchemy connector | ||
connect_args: sqlalchemy kwarg overrides -- ex: host | ||
secret_connect_args: flyte secrets loaded into sqlalchemy connect args | ||
-- ex: {"password": {"name": SECRET_NAME, "group": SECRET_GROUP}} | ||
""" | ||
|
||
uri: str | ||
connect_args: typing.Optional[typing.Dict[str, typing.Any]] = None | ||
secret_connect_args: typing.Optional[typing.Dict[str, Secret]] = None | ||
|
||
|
||
class SQLAlchemyTask(PythonInstanceTask[SQLAlchemyConfig], SQLTask[SQLAlchemyConfig]): | ||
""" | ||
Makes it possible to run client side SQLAlchemy queries that optionally return a FlyteSchema object | ||
TODO: How should we use pre-built containers for running portable tasks like this. Should this always be a | ||
referenced task type? | ||
""" | ||
|
||
_SQLALCHEMY_TASK_TYPE = "sqlalchemy" | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
query_template: str, | ||
task_config: SQLAlchemyConfig, | ||
inputs: typing.Optional[typing.Dict[str, typing.Type]] = None, | ||
output_schema_type: typing.Optional[typing.Type[FlyteSchema]] = None, | ||
**kwargs, | ||
): | ||
output_schema = output_schema_type if output_schema_type else FlyteSchema | ||
outputs = kwtypes(results=output_schema) | ||
self._uri = task_config.uri | ||
self._connect_args = task_config.connect_args or {} | ||
self._secret_connect_args = task_config.secret_connect_args | ||
|
||
super().__init__( | ||
name=name, | ||
task_config=task_config, | ||
task_type=self._SQLALCHEMY_TASK_TYPE, | ||
query_template=query_template, | ||
inputs=inputs, | ||
outputs=outputs, | ||
**kwargs, | ||
) | ||
|
||
@property | ||
def output_columns(self) -> typing.Optional[typing.List[str]]: | ||
c = self.python_interface.outputs["results"].column_names() | ||
return c if c else None | ||
|
||
def execute(self, **kwargs) -> typing.Any: | ||
if self._secret_connect_args is not None: | ||
for key, secret in self._secret_connect_args.items(): | ||
value = current_context().secrets.get(secret.group, secret.key) | ||
self._connect_args[key] = value | ||
engine = create_engine(self._uri, connect_args=self._connect_args, echo=False) | ||
print(f"Connecting to db {self._uri}") | ||
with engine.begin() as connection: | ||
df = pd.read_sql_query(self.get_query(**kwargs), connection) | ||
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from setuptools import setup | ||
|
||
PLUGIN_NAME = "sqlalchemy" | ||
|
||
microlib_name = f"flytekitplugins-{PLUGIN_NAME}" | ||
|
||
plugin_requires = ["flytekit>=0.17.0,<1.0.0", "sqlalchemy>=1.4.7"] | ||
|
||
__version__ = "0.0.0+develop" | ||
|
||
setup( | ||
name=microlib_name, | ||
version=__version__, | ||
author="dolthub", | ||
author_email="[email protected]", | ||
description="SQLAlchemy plugin for flytekit", | ||
namespace_packages=["flytekitplugins"], | ||
packages=[f"flytekitplugins.{PLUGIN_NAME}"], | ||
install_requires=plugin_requires, | ||
license="apache2", | ||
python_requires=">=3.7", | ||
classifiers=[ | ||
"Intended Audience :: Science/Research", | ||
"Intended Audience :: Developers", | ||
"License :: OSI Approved :: Apache Software License", | ||
"Programming Language :: Python :: 3.7", | ||
"Programming Language :: Python :: 3.8", | ||
"Topic :: Scientific/Engineering", | ||
"Topic :: Scientific/Engineering :: Artificial Intelligence", | ||
"Topic :: Software Development", | ||
"Topic :: Software Development :: Libraries", | ||
"Topic :: Software Development :: Libraries :: Python Modules", | ||
], | ||
) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from collections import OrderedDict | ||
|
||
from flytekit.common.translator import get_serializable | ||
from flytekit.core import context_manager | ||
from flytekit.core.context_manager import Image, ImageConfig | ||
from plugins.tests.sqlalchemy.test_task import tk as not_tk | ||
|
||
|
||
def test_sql_lhs(): | ||
assert not_tk.lhs == "tk" | ||
|
||
|
||
def test_sql_command(): | ||
default_img = Image(name="default", fqn="test", tag="tag") | ||
serialization_settings = context_manager.SerializationSettings( | ||
project="project", | ||
domain="domain", | ||
version="version", | ||
env=None, | ||
image_config=ImageConfig(default_image=default_img, images=[default_img]), | ||
) | ||
srz_t = get_serializable(OrderedDict(), serialization_settings, not_tk) | ||
assert srz_t.container.args[-7:] == [ | ||
"--resolver", | ||
"flytekit.core.python_auto_container.default_task_resolver", | ||
"--", | ||
"task-module", | ||
"plugins.tests.sqlalchemy.test_task", | ||
"task-name", | ||
"tk", | ||
] |
Oops, something went wrong.