Skip to content

Commit

Permalink
Add mysql support
Browse files Browse the repository at this point in the history
  • Loading branch information
phanikumv committed Mar 9, 2023
1 parent 5c56d20 commit 1bce829
Show file tree
Hide file tree
Showing 28 changed files with 880 additions and 76 deletions.
7 changes: 7 additions & 0 deletions .github/ci-test-connections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,10 @@ connections:
aws_access_key_id: "ROOTNAME"
aws_secret_access_key: "CHANGEME123"
endpoint_url: "http://127.0.0.1:9000"
- conn_id: mysql_conn
conn_type: mysql
host: $MYSQL_HOST
login: $MYSQL_LOGIN
password: $MYSQL_PASSWORD
port: 3306
schema: $MYSQL_DB
4 changes: 4 additions & 0 deletions .github/workflows/ci-python-sdk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ env:
MSSQL_HOST: ${{ secrets.MSSQL_HOST }}
MSSQL_LOGIN: ${{ secrets.MSSQL_LOGIN }}
MSSQL_PASSWORD: ${{ secrets.MSSQL_PASSWORD }}
MYSQL_DB: ${{ secrets.MYSQL_DB }}
MYSQL_HOST: ${{ secrets.MYSQL_HOST }}
MYSQL_LOGIN: ${{ secrets.MYSQL_LOGIN }}
MYSQL_PASSWORD: ${{ secrets.MYSQL_PASSWORD }}

jobs:
Markdown-link-check:
Expand Down
1 change: 1 addition & 0 deletions python-sdk/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Database.DELTA: "databricks_conn",
Database.MSSQL: "mssql_conn",
Database.DUCKDB: "duckdb_conn",
Database.MYSQL: "mysql_conn",
}


Expand Down
11 changes: 11 additions & 0 deletions python-sdk/docs/astro/sql/operators/load_file.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,17 @@ Loading to Duckdb
:start-after: [START load_file_example_27]
:end-before: [END load_file_example_27]

Loading to MySQL
~~~~~~~~~~~~~~~~~

``load_file`` can load data to MySQL database hosted on cloud or on-premise server.

.. literalinclude:: ../../../../example_dags/example_load_file.py
:language: python
:start-after: [START load_file_example_28]
:end-before: [END load_file_example_28]



Patterns in file path
~~~~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 2 additions & 0 deletions python-sdk/docs/astro/sql/operators/merge.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ The ``merge`` operator runs different SQL queries behind the scenes based on the
- Yes
* - MS SQL
- No
* - MySQL
- Yes

You can create and add constraints on a table by providing them in the ``columns`` parameter of :ref:`load_file`. Refer to the :ref:`custom_schema` section for details.

Expand Down
10 changes: 10 additions & 0 deletions python-sdk/example_dags/example_load_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
MSSQL_CONN_ID = "mssql_conn"
DUCKDB_CONN_ID = "duckdb_conn"
AWS_CONN_ID = "aws_conn"
MYSQL_CONN_ID = "mysql_conn"

CWD = pathlib.Path(__file__).parent
default_args = {
Expand Down Expand Up @@ -365,4 +366,13 @@
)
# [END load_file_example_27]

# [START load_file_example_28]
aql.load_file(
input_file=File("s3://tmp9/homes_main.csv", conn_id=AWS_CONN_ID),
output_table=Table(
conn_id=MYSQL_CONN_ID,
),
)
# [END load_file_example_28]

aql.cleanup()
86 changes: 86 additions & 0 deletions python-sdk/example_dags/example_mysql_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from datetime import datetime

import pandas as pd
from airflow import DAG

from astro import sql as aql
from astro.files import File
from astro.table import Table

START_DATE = datetime(2000, 1, 1)
LAST_ONE_DF = pd.DataFrame(data={"title": ["Random movie"], "rating": [121]})


@aql.transform()
def top_five_animations(input_table: Table): # skipcq: PYL-W0613
return """
SELECT *
FROM {{input_table}}
WHERE genre1='Animation'
ORDER BY rating desc
LIMIT 5
"""


@aql.transform()
def last_five_animations(input_table: Table): # skipcq: PYL-W0613
return """
SELECT *
FROM {{input_table}}
WHERE genre1='Animation'
ORDER BY rating asc
LIMIT 5
"""


@aql.transform
def union_top_and_last(first_table: Table, second_table: Table): # skipcq: PYL-W0613
"""Union `first_table` and `second_table` tables to create a simple dataset."""
return """
SELECT title, rating from {{first_table}}
UNION
SELECT title, rating from {{second_table}}
"""


@aql.transform
def union_table_and_dataframe(input_table: Table, input_dataframe: pd.DataFrame): # skipcq: PYL-W0613
"""Union `union_table` table and `input_dataframe` dataframe to create a simple dataset."""
return """
SELECT title, rating from {{input_table}}
UNION
SELECT title, rating from {{input_dataframe}}
"""


with DAG(
"example_transform_mysql",
schedule_interval=None,
start_date=START_DATE,
catchup=False,
) as dag:
imdb_movies = aql.load_file(
input_file=File(path="s3://astro-sdk/imdb_v2.csv"),
task_id="load_csv",
output_table=Table(conn_id="mysql_conn"),
)

top_five = top_five_animations(
input_table=imdb_movies,
output_table=Table(
conn_id="mysql_conn",
),
)

last_five = last_five_animations(
input_table=imdb_movies,
output_table=Table(
conn_id="mysql_conn",
),
)

union_table = union_top_and_last(top_five, last_five)

union_table_and_dataframe(union_table, LAST_ONE_DF)

aql.cleanup()
5 changes: 5 additions & 0 deletions python-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ mssql = [
"apache-airflow-providers-microsoft-mssql>=3.2",
]

mysql = [
"apache-airflow-providers-mysql",
]

duckdb = [
"airflow-provider-duckdb>=0.0.2",
]
Expand Down Expand Up @@ -125,6 +129,7 @@ all = [
"azure-storage-blob",
"apache-airflow-providers-microsoft-mssql>=3.2",
"airflow-provider-duckdb>=0.0.2",
"apache-airflow-providers-mysql"
]
doc = [
"myst-parser>=0.17",
Expand Down
1 change: 1 addition & 0 deletions python-sdk/src/astro/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Database(Enum):
REDSHIFT = "redshift"
MSSQL = "mssql"
DUCKDB = "duckdb"
MYSQL = "mysql"
# [END database]

def __str__(self) -> str:
Expand Down
Loading

0 comments on commit 1bce829

Please sign in to comment.