Skip to content

Commit

Permalink
move quickstart_gcp back to mono repo
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhan committed Dec 14, 2022
1 parent b646945 commit b3951c1
Show file tree
Hide file tree
Showing 12 changed files with 492 additions and 0 deletions.
274 changes: 274 additions & 0 deletions examples/quickstart_gcp/README.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions examples/quickstart_gcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
1 change: 1 addition & 0 deletions examples/quickstart_gcp/quickstart_gcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .repository import quickstart_gcp
Empty file.
87 changes: 87 additions & 0 deletions examples/quickstart_gcp/quickstart_gcp/assets/hackernews.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import base64
from io import BytesIO

import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import MetadataValue, OpExecutionContext, asset
from wordcloud import STOPWORDS, WordCloud


@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstory_ids() -> pd.DataFrame:
"""
Get up to 500 top stories from the HackerNews topstories endpoint.
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
"""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_500_newstories = requests.get(newstories_url).json()
return pd.DataFrame(top_500_newstories, columns=["item_ids"])


@asset(group_name="hackernews", compute_kind="HackerNews API")
def hackernews_topstories(
context: OpExecutionContext, hackernews_topstory_ids: pd.DataFrame
) -> pd.DataFrame:
"""
Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items.
API Docs: https://github.com/HackerNews/API#items
"""

results = []
for item_id in hackernews_topstory_ids["item_ids"]:
item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
results.append(item)
if len(results) % 20 == 0:
context.log.info(f"Got {len(results)} items so far.")

df = pd.DataFrame(results)

# drop 'kids' column for now as list type doesn't work with bigquery pandas out-of-the-box
df.drop("kids", axis=1, inplace=True)

# Dagster supports attaching arbitrary metadata to asset materializations. This metadata will be
# shown in the run logs and also be displayed on the "Activity" tab of the "Asset Details" page in the UI.
# This metadata would be useful for monitoring and maintaining the asset as you iterate.
# Read more about in asset metadata in https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadata
context.add_output_metadata(
{
"num_records": len(df),
"preview": MetadataValue.md(df.head().to_markdown()),
}
)
return df


@asset(group_name="hackernews", compute_kind="Plot")
def hackernews_topstories_word_cloud(
context: OpExecutionContext, hackernews_topstories: pd.DataFrame
) -> None:
"""
Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories.
Embed the plot into a Markdown metadata for quick view.
Read more about how to create word clouds in http://amueller.github.io/word_cloud/.
"""
stopwords = set(STOPWORDS)
stopwords.update(["Ask", "Show", "HN"])
titles_text = " ".join([str(item) for item in hackernews_topstories["title"]])
titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text)

# Generate the word cloud image
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(titles_cloud, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)

# Save the image to a buffer and embed the image into Markdown content for quick view
buffer = BytesIO()
plt.savefig(buffer, format="png")
image_data = base64.b64encode(buffer.getvalue())
md_content = f"![img](data:image/png;base64,{image_data.decode()})"

# Attach the Markdown content and s3 file path as metadata to the asset
# Read about more metadata types in https://docs.dagster.io/_apidocs/ops#metadata-types
context.add_output_metadata({"plot": MetadataValue.md(md_content)})
71 changes: 71 additions & 0 deletions examples/quickstart_gcp/quickstart_gcp/io_managers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import json

import pandas as pd
import pandas_gbq
from dagster import (
Field,
InitResourceContext,
InputContext,
IOManager,
OutputContext,
StringSource,
io_manager,
)
from google.oauth2 import service_account

# Learn more about custom I/O managers in Dagster docs:
# https://docs.dagster.io/concepts/io-management/io-managers#a-custom-io-manager-that-stores-pandas-dataframes-in-tables


class BigQueryDataframeIOManager(IOManager):
def __init__(self, credentials: dict, project_id: str, dataset_id: str) -> None:
# `from_service_account_info` accepts a dictionary corresponding to the JSON file contents
# If you'd like to refer to the JSON file path, change it to `from_service_account_file`
self._credentials = service_account.Credentials.from_service_account_info(credentials)
self._project_id = project_id
self._dataset_id = dataset_id

def handle_output(self, context: OutputContext, obj: pd.DataFrame):
# Skip handling if the output is None
if obj is None:
return

table_name = context.asset_key.to_python_identifier()

pandas_gbq.to_gbq(
obj,
f"{self._dataset_id}.{table_name}",
project_id=self._project_id,
credentials=self._credentials,
if_exists="replace", # always overwrite
)

# Recording metadata from an I/O manager:
# https://docs.dagster.io/concepts/io-management/io-managers#recording-metadata-from-an-io-manager
context.add_output_metadata({"dataset_id": self._dataset_id, "table_name": table_name})

def load_input(self, context: InputContext):
# upstream_output.asset_key is the asset key given to the Out that we're loading for
table_name = context.upstream_output.asset_key.to_python_identifier()

df = pandas_gbq.read_gbq(
f"SELECT * FROM `{self._dataset_id}.{table_name}`", credentials=self._credentials
)
return df


@io_manager(
config_schema={
"credentials": StringSource,
"project_id": StringSource,
"dataset_id": Field(
str, default_value="my_dataset", description="Dataset ID. Defaults to 'my_dataset'"
),
}
)
def bigquery_pandas_io_manager(init_context: InitResourceContext) -> BigQueryDataframeIOManager:
return BigQueryDataframeIOManager(
credentials=json.loads(init_context.resource_config["credentials"]),
project_id=init_context.resource_config["project_id"],
dataset_id=init_context.resource_config["dataset_id"],
)
30 changes: 30 additions & 0 deletions examples/quickstart_gcp/quickstart_gcp/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dagster import (ScheduleDefinition, define_asset_job,
load_assets_from_package_module, repository,
with_resources)

from . import assets
from .io_managers import bigquery_pandas_io_manager

daily_refresh_schedule = ScheduleDefinition(
job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
)


@repository
def quickstart_gcp():
return [
*with_resources(
load_assets_from_package_module(assets),
resource_defs={
# Read about using environment variables and secrets in Dagster:
# https://docs.dagster.io/guides/dagster/using-environment-variables-and-secrets
"io_manager": bigquery_pandas_io_manager.configured(
{
"credentials": {"env": "BIGQUERY_SERVICE_ACCOUNT_CREDENTIALS"},
"project_id": {"env": "BIGQUERY_PROJECT_ID"},
}
),
},
),
daily_refresh_schedule,
]
1 change: 1 addition & 0 deletions examples/quickstart_gcp/quickstart_gcp_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

2 changes: 2 additions & 0 deletions examples/quickstart_gcp/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = quickstart_gcp
20 changes: 20 additions & 0 deletions examples/quickstart_gcp/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from setuptools import find_packages, setup

setup(
name="quickstart_gcp",
packages=find_packages(exclude=["quickstart_gcp_tests"]),
install_requires=[
"dagster",
"dagster-gcp",
"dagster-cloud",
"boto3", # used by Dagster Cloud Serverless
"pandas",
"matplotlib",
"textblob",
"tweepy",
"wordcloud",
"pandas_gbq",
"google-auth",
],
extras_require={"dev": ["dagit", "pytest"]},
)
2 changes: 2 additions & 0 deletions examples/quickstart_gcp/workspace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
load_from:
- python_package: quickstart_gcp

0 comments on commit b3951c1

Please sign in to comment.