Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move quickstart_snowflake back to mono repo #11132

Merged
merged 2 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 270 additions & 0 deletions examples/quickstart_snowflake/README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions examples/quickstart_snowflake/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .repository import quickstart_snowflake
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import base64
from io import BytesIO
from typing import List

import matplotlib.pyplot as plt
import pandas as pd
import requests
from wordcloud import STOPWORDS, WordCloud

from dagster import MetadataValue, OpExecutionContext, asset


@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstory_ids() -> pd.DataFrame:
"""
Get up to 500 top stories from the HackerNews topstories endpoint.

API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
"""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_500_newstories = requests.get(newstories_url).json()
return pd.DataFrame(top_500_newstories, columns=["item_ids"])


@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: OpExecutionContext, hackernews_topstory_ids: pd.DataFrame
) -> pd.DataFrame:
"""
Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.

API Docs: https://github.com/HackerNews/API#items
"""

results = []
for item_id in hackernews_topstory_ids["item_ids"]:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")

df = pd.DataFrame(results)

# Dagster supports attaching arbitrary metadata to asset materializations. This metadata will be
# shown in the run logs and also be displayed on the "Activity" tab of the "Asset Details" page in the UI.
# This metadata would be useful for monitoring and maintaining the asset as you iterate.
# Read more about in asset metadata in https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadata
context.add_output_metadata(
{
"num_records": len(df),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
return df


@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: OpExecutionContext, hackernews_topstories: pd.DataFrame
) -> None:
"""
Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.

Read more about how to create word clouds in http://amueller.github.io/word_cloud/.
"""
stopwords = set(STOPWORDS)
stopwords.update(["Ask", "Show", "HN"])
titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

# Generate the word cloud image
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(titles_cloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)

# Save the image to a buffer and embed the image into Markdown content for quick view
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
md_content = f"![img](data:image/png;base64,{image_data.decode()})"

# Attach the Markdown content as metadata to the asset
# Read about more metadata types in https://docs.dagster.io/_apidocs/ops#metadata-types
context.add_output_metadata({"plot": MetadataValue.md(md_content)})
40 changes: 40 additions & 0 deletions examples/quickstart_snowflake/quickstart_snowflake/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler

from dagster import (
ScheduleDefinition,
define_asset_job,
load_assets_from_package_module,
repository,
with_resources,
)

from . import assets

daily_refresh_schedule = ScheduleDefinition(
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
)


@repository
def quickstart_snowflake():
return [
*with_resources(
load_assets_from_package_module(assets),
resource_defs={
"io_manager": build_snowflake_io_manager([SnowflakePandasTypeHandler()]).configured(
# Read about using environment variables and secrets in Dagster:
# https://docs.dagster.io/guides/dagster/using-environment-variables-and-secrets
{
"account": {"env": "SNOWFLAKE_ACCOUNT"},
"user": {"env": "SNOWFLAKE_USER"},
"password": {"env": "SNOWFLAKE_PASSWORD"},
"warehouse": {"env": "SNOWFLAKE_WAREHOUSE"},
"database": {"env": "SNOWFLAKE_DATABASE"},
"schema": {"env": "SNOWFLAKE_SCHEMA"},
}
),
},
),
daily_refresh_schedule,
]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 2 additions & 0 deletions examples/quickstart_snowflake/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = quickstart_snowflake
21 changes: 21 additions & 0 deletions examples/quickstart_snowflake/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from setuptools import find_packages, setup

setup(
name="quickstart_snowflake",
packages=find_packages(exclude=["quickstart_snowflake_tests"]),
install_requires=[
"dagster",
# snowflake-connector-python[pandas] is included by dagster-snowflake-pandas but it does
# not get included during pex dependency resolution, so we directly add this dependency
"snowflake-connector-python[pandas]",
"dagster-snowflake-pandas",
"dagster-cloud",
"boto3",
"pandas",
"matplotlib",
"textblob",
"tweepy",
"wordcloud",
],
extras_require={"dev": ["dagit", "pytest"]},
)
2 changes: 2 additions & 0 deletions examples/quickstart_snowflake/workspace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
load_from:
- python_package: quickstart_snowflake
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_generate/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"quickstart_aws",
"quickstart_etl",
"quickstart_gcp",
"quickstart_snowflake",
"tutorial_dbt_dagster",
"tutorial_notebook_assets",
"deploy_docker",
Expand Down