Skip to content

Commit

Permalink
Snowflake Agent Doc (#1714)
Browse files Browse the repository at this point in the history
* Snowflake new example

Signed-off-by: Future-Outlier <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

* revert

Signed-off-by: Future-Outlier <[email protected]>

* Fix the format

Signed-off-by: Future-Outlier <[email protected]>

* remove To begin, import the required libraries.

Signed-off-by: Future-Outlier <[email protected]>

* Add Pandas

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier authored Aug 2, 2024
1 parent 83fc575 commit c8fed8b
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 68 deletions.
7 changes: 4 additions & 3 deletions examples/snowflake_agent/requirements.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
flytekitplugins-snowflake==1.7.0
flytekit==1.7.1b1
marshmallow_enum
flytekitplugins-snowflake
flytekit
pandas
pyarrow
Original file line number Diff line number Diff line change
Expand Up @@ -4,87 +4,119 @@
#
# This example shows how to use the `SnowflakeTask` to execute a query in Snowflake.
#
# To begin, import the required libraries.
# %%
from flytekit import kwtypes, workflow
import pandas as pd
from flytekit import ImageSpec, Secret, StructuredDataset, kwtypes, task, workflow
from flytekitplugins.snowflake import SnowflakeConfig, SnowflakeTask

# %% [markdown]
# Instantiate a {py:class}`~flytekitplugins.snowflake.SnowflakeTask` to execute a query.
# Incorporate {py:class}`~flytekitplugins.snowflake.SnowflakeConfig` within the task to define the appropriate configuration.
# %%
snowflake_task_no_io = SnowflakeTask(
name="sql.snowflake.no_io",
inputs={},
query_template="SELECT 1",
output_schema_type=None,
image = ImageSpec(
packages=["flytekitplugins-snowflake", "pandas", "pyarrow"],
registry="ghcr.io/flyteorg",
)

"""
Define a Snowflake task to insert data into the FLYTEAGENT.PUBLIC.TEST table.
The `inputs` parameter specifies the types of the inputs using `kwtypes`.
The `query_template` uses Python string interpolation to insert these inputs into the SQL query.
The placeholders `%(id)s`, `%(name)s`, and `%(age)s` will be replaced by the actual values
provided when the task is executed.
"""

"""
You can get the SnowflakeConfig's metadata from the Snowflake console by executing the following query:
SELECT
CURRENT_USER() AS "User",
CONCAT(CURRENT_ORGANIZATION_NAME(), '-', CURRENT_ACCOUNT_NAME()) AS "Account",
CURRENT_DATABASE() AS "Database",
CURRENT_SCHEMA() AS "Schema",
CURRENT_WAREHOUSE() AS "Warehouse";
"""

snowflake_task_insert_query = SnowflakeTask(
name="insert-query",
inputs=kwtypes(id=int, name=str, age=int),
task_config=SnowflakeConfig(
account="<SNOWFLAKE_ACCOUNT_ID>",
database="SNOWFLAKE_SAMPLE_DATA",
schema="TPCH_SF1000",
user="FLYTE",
account="FLYTE_SNOFLAKE_ACCOUNT",
database="FLYTEAGENT",
schema="PUBLIC",
warehouse="COMPUTE_WH",
),
query_template="""
INSERT INTO FLYTEAGENT.PUBLIC.TEST (ID, NAME, AGE)
VALUES (%(id)s, %(name)s, %(age)s);
""",
)

# %% [markdown]
# :::{note}
# For successful registration, ensure that your Snowflake task is assigned a unique
# name within your project/domain for your Flyte installation.
# :::
#
# In practical applications, our primary focus is often on utilizing Snowflake to query datasets.
# Here, we employ the `SNOWFLAKE_SAMPLE_DATA`, a default dataset in the Snowflake service.
# You can find more details about it [here](https://docs.snowflake.com/en/user-guide/sample-data.html).
# The data adheres to the following schema:
#
# ```{eval-rst}
# +----------------------------------------------+
# | C_CUSTKEY (int) |
# +----------------------------------------------+
# | C_NAME (string) |
# +----------------------------------------------+
# | C_ADDRESS (string) |
# +----------------------------------------------+
# | C_NATIONKEY (int) |
# +----------------------------------------------+
# | C_PHONE (string) |
# +----------------------------------------------+
# | C_ACCTBAL (float) |
# +----------------------------------------------+
# | C_MKTSEGMENT (string) |
# +----------------------------------------------+
# | C_COMMENT (string) |
# +----------------------------------------------+
# ```
#
# Let us explore how we can parameterize our query to filter results for a specific country.
# This country will be provided as user input, using a nation key to specify it.
# %%
snowflake_task_templatized_query = SnowflakeTask(
name="sql.snowflake.w_io",
# Define inputs as well as their types that can be used to customize the query.
inputs=kwtypes(nation_key=int),
name="select-query",
output_schema_type=StructuredDataset,
task_config=SnowflakeConfig(
account="<SNOWFLAKE_ACCOUNT_ID>",
database="SNOWFLAKE_SAMPLE_DATA",
schema="TPCH_SF1000",
user="FLYTE",
account="FLYTE_SNOFLAKE_ACCOUNT",
database="FLYTEAGENT",
schema="PUBLIC",
warehouse="COMPUTE_WH",
),
query_template="SELECT * from CUSTOMER where C_NATIONKEY = {{ .inputs.nation_key }} limit 100",
query_template="SELECT * FROM FLYTEAGENT.PUBLIC.TEST ORDER BY ID DESC LIMIT 3;",
)


@task(
container_image=image,
secret_requests=[
Secret(
group="private-key",
key="snowflake",
)
],
)
def print_head(input_sd: StructuredDataset) -> pd.DataFrame:
# Download the DataFrame from the Snowflake table via StructuredDataset
# We don't have to provide the uri here because the input_sd already has the uri
df = input_sd.open(pd.DataFrame).all()
print(df)
return df


@task(
container_image=image,
secret_requests=[
Secret(
group="private-key",
key="snowflake",
)
],
)
def write_table() -> StructuredDataset:
df = pd.DataFrame({"ID": [1, 2, 3], "NAME": ["flyte", "is", "amazing"], "AGE": [30, 30, 30]})
print(df)

# Upload the DataFrame to the Snowflake table via StructuredDataset
user = ("FLYTE",)
account = ("FLYTE_SNOFLAKE_ACCOUNT",)
database = ("FLYTEAGENT",)
schema = ("PUBLIC",)
warehouse = ("COMPUTE_WH",)
table = "TEST"
uri = f"snowflake://{user}:{account}/{warehouse}/{database}/{schema}/{table}"

return StructuredDataset(dataframe=df, uri=uri)


@workflow
def snowflake_wf(nation_key: int):
return snowflake_task_templatized_query(nation_key=nation_key)
def wf() -> StructuredDataset:
sd = snowflake_task_templatized_query()
t1 = print_head(input_sd=sd)
insert_query = snowflake_task_insert_query(id=1, name="Flyte", age=30)
sd2 = snowflake_task_templatized_query()
wt = write_table()

sd >> t1 >> insert_query >> wt >> sd2

return print_head(input_sd=sd2)


# %% [markdown]
# To review the query results, access the Snowflake console at:
# `https://<SNOWFLAKE_ACCOUNT_ID>.snowflakecomputing.com/console#/monitoring/queries/detail`.
#
# You can also execute the task and workflow locally.
# %%
if __name__ == "__main__":
print(snowflake_task_no_io())
print(snowflake_wf(nation_key=10))
wf()

0 comments on commit c8fed8b

Please sign in to comment.