Skip to content

Commit

Permalink
refactor #2
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Dec 13, 2024
1 parent 8748010 commit 39af283
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,9 @@
"""The Bluesky servers impose rate limiting of the following specification."""

import dagster as dg
from dagster_aws.s3 import S3Resource

from project_atproto_dashboard.assets import (
actor_feed_snapshot,
dbt_bluesky,
dbt_resource,
starter_pack_snapshot,
)
from project_atproto_dashboard.resources import ATProtoResource

atproto_resource = ATProtoResource(
login=dg.EnvVar("BSKY_LOGIN"), password=dg.EnvVar("BSKY_APP_PASSWORD")
)

s3_resource = S3Resource(
endpoint_url=dg.EnvVar("AWS_ENDPOINT_URL"),
aws_access_key_id=dg.EnvVar("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=dg.EnvVar("AWS_SECRET_ACCESS_KEY"),
region_name="auto",
)

import project_atproto_dashboard.ingestion.definitions as ingestion_definitions
import project_atproto_dashboard.modeling.definitions as modeling_definitions

defs = dg.Definitions(
assets=[starter_pack_snapshot, actor_feed_snapshot, dbt_bluesky],
resources={
"atproto_resource": atproto_resource,
"s3_resource": s3_resource,
"dbt": dbt_resource,
},
defs = dg.Definitions.merge(
ingestion_definitions.defs,
modeling_definitions.defs,
)
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Mapping, Optional

import dagster as dg
from dagster_aws.s3 import S3Resource
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets

from project_atproto_dashboard.resources import ATProtoResource
from project_atproto_dashboard.utils.atproto import get_all_feed_items, get_all_starter_pack_members
from project_atproto_dashboard.ingestion.resources import ATProtoResource
from project_atproto_dashboard.ingestion.utils.atproto import (
get_all_feed_items,
get_all_starter_pack_members,
)

PREFERRED_LANGUAGE = os.environ.get("BSKY_PREFERRED_LANGUAGE", "en")
AWS_BUCKET_NAME = os.environ.get("AWS_BUCKET_NAME", "dagster-demo")


atproto_did_dynamic_partition = dg.DynamicPartitionsDefinition(name="atproto_did_dynamic_partition")


@dg.asset(
partitions_def=dg.StaticPartitionsDefinition(
partition_keys=[
Expand Down Expand Up @@ -58,7 +61,6 @@ def starter_pack_snapshot(

s3_resource.get_client().put_object(Body=_bytes, Bucket=AWS_BUCKET_NAME, Key=object_key)

# TODO - delete dynamic partitions that no longer exist in the list
context.instance.add_dynamic_partitions(
partitions_def_name="atproto_did_dynamic_partition",
partition_keys=[list_item_view.subject.did for list_item_view in list_items],
Expand All @@ -72,9 +74,6 @@ def starter_pack_snapshot(
)


atproto_did_dynamic_partition = dg.DynamicPartitionsDefinition(name="atproto_did_dynamic_partition")


@dg.asset(
partitions_def=atproto_did_dynamic_partition,
deps=[dg.AssetDep(starter_pack_snapshot, partition_mapping=dg.AllPartitionMapping())],
Expand Down Expand Up @@ -119,33 +118,22 @@ def actor_feed_snapshot(
)


dbt_project = DbtProject(
project_dir=Path(__file__).joinpath("..", "..", "dbt_project").resolve(),
target=os.getenv("DBT_TARGET"),
atproto_resource = ATProtoResource(
login=dg.EnvVar("BSKY_LOGIN"), password=dg.EnvVar("BSKY_APP_PASSWORD")
)
dbt_project.prepare_if_dev()
dbt_resource = DbtCliResource(project_dir=dbt_project)


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
asset_path = dbt_resource_props["fqn"][1:-1]
if asset_path:
return "_".join(asset_path)
return "default"

def get_asset_key(self, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return dg.AssetKey(name)
else:
return super().get_asset_key(dbt_resource_props)
s3_resource = S3Resource(
endpoint_url=dg.EnvVar("AWS_ENDPOINT_URL"),
aws_access_key_id=dg.EnvVar("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=dg.EnvVar("AWS_SECRET_ACCESS_KEY"),
region_name="auto",
)


@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
defs = dg.Definitions(
assets=[starter_pack_snapshot, actor_feed_snapshot],
resources={
"atproto_resource": atproto_resource,
"s3_resource": s3_resource,
},
)
def dbt_bluesky(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from (dbt.cli(["build"], context=context).stream().fetch_row_counts())
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
from pathlib import Path
from typing import Any, Mapping, Optional

import dagster as dg
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets

dbt_project = DbtProject(
project_dir=Path(__file__).joinpath("..", "..", "..", "dbt_project").resolve(),
target=os.getenv("DBT_TARGET"),
)
dbt_project.prepare_if_dev()
dbt_resource = DbtCliResource(project_dir=dbt_project)


class CustomizedDagsterDbtTranslator(DagsterDbtTranslator):
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
asset_path = dbt_resource_props["fqn"][1:-1]
if asset_path:
return "_".join(asset_path)
return "default"

def get_asset_key(self, dbt_resource_props):
resource_type = dbt_resource_props["resource_type"]
name = dbt_resource_props["name"]
if resource_type == "source":
return dg.AssetKey(name)
else:
return super().get_asset_key(dbt_resource_props)


@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomizedDagsterDbtTranslator(),
)
def dbt_bluesky(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from (dbt.cli(["build"], context=context).stream().fetch_row_counts())


defs = dg.Definitions(
assets=[dbt_bluesky],
resources={
"dbt": dbt_resource,
},
)

0 comments on commit 39af283

Please sign in to comment.