Skip to content

Commit

Permalink
Example for writing queries for Athena (flyteorg#319)
Browse files Browse the repository at this point in the history
- flyte allows writing queries directly that are executed by the
backend. This shows such an example

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Jul 20, 2021
1 parent a815aea commit b3bb288
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cookbook/integrations/aws/athena/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
PREFIX=athena
include ../../../common/Makefile
include ../../../common/leaf.mk
21 changes: 20 additions & 1 deletion cookbook/integrations/aws/athena/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@
Athena
######

.. NOTE::
Executing Athena Queries
=======================
Flyte backend can be connected with athena. Once enabled, it allows you to query AWS Athena service (Presto + ANSI SQL Support) and retrieve typed schema (optionally).

This section provides a guide on how to use the AWS Athena Plugin using flytekit python.

Installation
------------

To use the flytekit athena plugin simply run the following:

.. prompt:: bash

pip install flytekitplugins-athena

No Need for a dockerfile
------------------------
This plugin is purely a spec and since SQL is completely portable, it has no need to build a container. Thus this plugin examples do not have any Dockerfiles.

.. TODO: write a subsection for "Configuring the backend to get athena working"
=======
Coming soon 🛠
Empty file.
107 changes: 107 additions & 0 deletions cookbook/integrations/aws/athena/athena.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Athena Query
############
This example shows how to use a Flyte AthenaTask to execute a query.
"""

from flytekit import kwtypes, task, workflow
from flytekit.types.schema import FlyteSchema
from flytekitplugins.athena import AthenaConfig, AthenaTask

# %%
# This is the world's simplest query. Note that in order for registration to work properly, you'll need to give your
# Athena task a name that's unique across your project/domain for your Flyte installation.
athena_task_no_io = AthenaTask(
name="sql.athena.no_io",
inputs={},
query_template="""
select 1
""",
output_schema_type=None,
task_config=AthenaConfig(database="mnist"),
)


@workflow
def no_io_wf():
return athena_task_no_io()


# %%
# Of course, in real world applications we are usually more interested in using Athena to query a dataset.
# In this case we've populated our vaccinations table with the publicly available dataset
# `here <https://www.kaggle.com/gpreda/covid-world-vaccination-progress>`__.
# For a primer on how upload a dataset, checkout of the official
# `AWS docs <https://docs.aws.amazon.com/quicksight/latest/user/create-a-data-set-athena.html>`__.
# The data is formatted according to this schema:
#
# +----------------------------------------------+
# | country (string) |
# +----------------------------------------------+
# | iso_code (string) |
# +----------------------------------------------+
# | date (string) |
# +----------------------------------------------+
# | total_vaccinations (string) |
# +----------------------------------------------+
# | people_vaccinated (string) |
# +----------------------------------------------+
# | people_fully_vaccinated (string) |
# +----------------------------------------------+
# | daily_vaccinations_raw (string) |
# +----------------------------------------------+
# | daily_vaccinations (string) |
# +----------------------------------------------+
# | total_vaccinations_per_hundred (string) |
# +----------------------------------------------+
# | people_vaccinated_per_hundred (string) |
# +----------------------------------------------+
# | people_fully_vaccinated_per_hundred (string) |
# +----------------------------------------------+
# | daily_vaccinations_per_million (string) |
# +----------------------------------------------+
# | vaccines (string) |
# +----------------------------------------------+
# | source_name (string) |
# +----------------------------------------------+
# | source_website (string) |
# +----------------------------------------------+
#
# Let's look out how we can parameterize our query to filter results for a specific country, provided as a user input
# specifying a country iso code.
# We'll produce a FlyteSchema that we can use in downstream flyte tasks for further analysis or manipulation.
# Note that we cache this output data so we don't have to re-run the query in future workflow iterations
# should we decide to change how we manipulate data downstream

athena_task_templatized_query = AthenaTask(
name="sql.athena.w_io",
# Define inputs as well as their types that can be used to customize the query.
inputs=kwtypes(iso_code=str),
task_config=AthenaConfig(database="vaccinations"),
query_template="""
SELECT * FROM vaccinations where iso_code like {{ .inputs.iso_code }}
""",
# While we define a generic schema as the output here, Flyte supports more strongly typed schemas to provide
# better compile-time checks for task compatibility. Refer to :py:class:`flytekit.FlyteSchema` for more details
output_schema_type=FlyteSchema,
# Cache the output data so we don't have to re-run the query in future workflow iterations
# should we decide to change how we manipulate data downstream.
# For more information about caching, visit :ref:`Task Caching <task_cache>`
cache=True,
cache_version="1.0",
)


# %%
# Now we (trivially) clean up and interact with the data produced from the above Athena query in a separate Flyte task.
@task
def manipulate_athena_schema(s: FlyteSchema) -> FlyteSchema:
df = s.open().all()
return df[df.total_vaccinations.notnull()]


@workflow
def full_athena_wf(country_iso_code: str) -> FlyteSchema:
demo_schema = athena_task_templatized_query(iso_code=country_iso_code)
return manipulate_athena_schema(s=demo_schema)
3 changes: 3 additions & 0 deletions cookbook/integrations/aws/athena/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-r ../../../common/requirements-common.in

flytekitplugins-athena>=0.20.0
142 changes: 142 additions & 0 deletions cookbook/integrations/aws/athena/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# /Library/Developer/CommandLineTools/usr/bin/make requirements.txt
#
attrs==21.2.0
# via scantree
certifi==2021.5.30
# via requests
chardet==4.0.0
# via requests
click==7.1.2
# via flytekit
croniter==1.0.15
# via flytekit
cycler==0.10.0
# via matplotlib
dataclasses-json==0.5.4
# via flytekit
decorator==5.0.9
# via retry
deprecated==1.2.12
# via flytekit
dirhash==0.2.1
# via flytekit
docker-image-py==0.1.10
# via flytekit
flyteidl==0.19.9
# via flytekit
flytekit==0.20.0
# via
# -r ../../../common/requirements-common.in
# flytekitplugins-athena
flytekitplugins-athena==0.20.0
# via -r requirements.in
grpcio==1.38.1
# via flytekit
idna==2.10
# via requests
importlib-metadata==4.6.0
# via keyring
keyring==23.0.1
# via flytekit
kiwisolver==1.3.1
# via matplotlib
marshmallow==3.12.1
# via
# dataclasses-json
# marshmallow-enum
# marshmallow-jsonschema
marshmallow-enum==1.5.1
# via dataclasses-json
marshmallow-jsonschema==0.12.0
# via flytekit
matplotlib==3.4.2
# via -r ../../../common/requirements-common.in
mypy-extensions==0.4.3
# via typing-inspect
natsort==7.1.1
# via flytekit
numpy==1.21.0
# via
# matplotlib
# pandas
# pyarrow
pandas==1.2.5
# via flytekit
pathspec==0.8.1
# via scantree
pillow==8.3.0
# via matplotlib
protobuf==3.17.3
# via
# flyteidl
# flytekit
py==1.10.0
# via retry
pyarrow==3.0.0
# via flytekit
pyparsing==2.4.7
# via matplotlib
python-dateutil==2.8.1
# via
# croniter
# flytekit
# matplotlib
# pandas
python-json-logger==2.0.1
# via flytekit
pytimeparse==1.1.8
# via flytekit
pytz==2018.4
# via
# flytekit
# pandas
regex==2021.7.1
# via docker-image-py
requests==2.25.1
# via
# flytekit
# responses
responses==0.13.3
# via flytekit
retry==0.9.2
# via flytekit
scantree==0.0.1
# via dirhash
six==1.16.0
# via
# cycler
# flytekit
# grpcio
# protobuf
# python-dateutil
# responses
# scantree
sortedcontainers==2.4.0
# via flytekit
statsd==3.3.0
# via flytekit
stringcase==1.2.0
# via dataclasses-json
typing-extensions==3.10.0.0
# via typing-inspect
typing-inspect==0.7.1
# via dataclasses-json
urllib3==1.25.11
# via
# flytekit
# requests
# responses
wheel==0.36.2
# via
# -r ../../../common/requirements-common.in
# flytekit
wrapt==1.12.1
# via
# deprecated
# flytekit
zipp==3.5.0
# via importlib-metadata
3 changes: 3 additions & 0 deletions cookbook/integrations/aws/athena/sandbox.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[sdk]
workflow_packages=athena
python_venv=flytekit_venv

0 comments on commit b3bb288

Please sign in to comment.