diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/definitions.py b/examples/project_atproto_dashboard/project_atproto_dashboard/definitions.py index d27594cdebdf1..65ae1c3ffe98a 100644 --- a/examples/project_atproto_dashboard/project_atproto_dashboard/definitions.py +++ b/examples/project_atproto_dashboard/project_atproto_dashboard/definitions.py @@ -1,33 +1,9 @@ -"""The Bluesky servers impose rate limiting of the following specification.""" - import dagster as dg -from dagster_aws.s3 import S3Resource - -from project_atproto_dashboard.assets import ( - actor_feed_snapshot, - dbt_bluesky, - dbt_resource, - starter_pack_snapshot, -) -from project_atproto_dashboard.resources import ATProtoResource - -atproto_resource = ATProtoResource( - login=dg.EnvVar("BSKY_LOGIN"), password=dg.EnvVar("BSKY_APP_PASSWORD") -) - -s3_resource = S3Resource( - endpoint_url=dg.EnvVar("AWS_ENDPOINT_URL"), - aws_access_key_id=dg.EnvVar("AWS_ACCESS_KEY_ID"), - aws_secret_access_key=dg.EnvVar("AWS_SECRET_ACCESS_KEY"), - region_name="auto", -) +import project_atproto_dashboard.ingestion.definitions as ingestion_definitions +import project_atproto_dashboard.modeling.definitions as modeling_definitions -defs = dg.Definitions( - assets=[starter_pack_snapshot, actor_feed_snapshot, dbt_bluesky], - resources={ - "atproto_resource": atproto_resource, - "s3_resource": s3_resource, - "dbt": dbt_resource, - }, +defs = dg.Definitions.merge( + ingestion_definitions.defs, + modeling_definitions.defs, ) diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/utils/__init__.py b/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/__init__.py similarity index 100% rename from examples/project_atproto_dashboard/project_atproto_dashboard/utils/__init__.py rename to examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/__init__.py diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/assets.py b/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/definitions.py similarity index 71% rename from examples/project_atproto_dashboard/project_atproto_dashboard/assets.py rename to examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/definitions.py index d779ea3eccc01..64929853ef124 100644 --- a/examples/project_atproto_dashboard/project_atproto_dashboard/assets.py +++ b/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/definitions.py @@ -1,19 +1,22 @@ import os from datetime import datetime -from pathlib import Path -from typing import Any, Mapping, Optional import dagster as dg from dagster_aws.s3 import S3Resource -from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets -from project_atproto_dashboard.resources import ATProtoResource -from project_atproto_dashboard.utils.atproto import get_all_feed_items, get_all_starter_pack_members +from project_atproto_dashboard.ingestion.resources import ATProtoResource +from project_atproto_dashboard.ingestion.utils.atproto import ( + get_all_feed_items, + get_all_starter_pack_members, +) PREFERRED_LANGUAGE = os.environ.get("BSKY_PREFERRED_LANGUAGE", "en") AWS_BUCKET_NAME = os.environ.get("AWS_BUCKET_NAME", "dagster-demo") +atproto_did_dynamic_partition = dg.DynamicPartitionsDefinition(name="atproto_did_dynamic_partition") + + @dg.asset( partitions_def=dg.StaticPartitionsDefinition( partition_keys=[ @@ -58,7 +61,6 @@ def starter_pack_snapshot( s3_resource.get_client().put_object(Body=_bytes, Bucket=AWS_BUCKET_NAME, Key=object_key) - # TODO - delete dynamic partitions that no longer exist in the list context.instance.add_dynamic_partitions( partitions_def_name="atproto_did_dynamic_partition", partition_keys=[list_item_view.subject.did for list_item_view in list_items], @@ -72,9 +74,6 @@ def starter_pack_snapshot( ) -atproto_did_dynamic_partition = dg.DynamicPartitionsDefinition(name="atproto_did_dynamic_partition") - - @dg.asset( partitions_def=atproto_did_dynamic_partition, deps=[dg.AssetDep(starter_pack_snapshot, partition_mapping=dg.AllPartitionMapping())], @@ -119,33 +118,22 @@ def actor_feed_snapshot( ) -dbt_project = DbtProject( - project_dir=Path(__file__).joinpath("..", "..", "dbt_project").resolve(), - target=os.getenv("DBT_TARGET"), +atproto_resource = ATProtoResource( + login=dg.EnvVar("BSKY_LOGIN"), password=dg.EnvVar("BSKY_APP_PASSWORD") ) -dbt_project.prepare_if_dev() -dbt_resource = DbtCliResource(project_dir=dbt_project) - -class CustomizedDagsterDbtTranslator(DagsterDbtTranslator): - def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: - asset_path = dbt_resource_props["fqn"][1:-1] - if asset_path: - return "_".join(asset_path) - return "default" - - def get_asset_key(self, dbt_resource_props): - resource_type = dbt_resource_props["resource_type"] - name = dbt_resource_props["name"] - if resource_type == "source": - return dg.AssetKey(name) - else: - return super().get_asset_key(dbt_resource_props) +s3_resource = S3Resource( + endpoint_url=dg.EnvVar("AWS_ENDPOINT_URL"), + aws_access_key_id=dg.EnvVar("AWS_ACCESS_KEY_ID"), + aws_secret_access_key=dg.EnvVar("AWS_SECRET_ACCESS_KEY"), + region_name="auto", +) -@dbt_assets( - manifest=dbt_project.manifest_path, - dagster_dbt_translator=CustomizedDagsterDbtTranslator(), +defs = dg.Definitions( + assets=[starter_pack_snapshot, actor_feed_snapshot], + resources={ + "atproto_resource": atproto_resource, + "s3_resource": s3_resource, + }, ) -def dbt_bluesky(context: dg.AssetExecutionContext, dbt: DbtCliResource): - yield from (dbt.cli(["build"], context=context).stream().fetch_row_counts()) diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/resources.py b/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/resources.py similarity index 100% rename from examples/project_atproto_dashboard/project_atproto_dashboard/resources.py rename to examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/resources.py diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/utils/__init__.py b/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/utils/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/utils/atproto.py b/examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/utils/atproto.py similarity index 100% rename from examples/project_atproto_dashboard/project_atproto_dashboard/utils/atproto.py rename to examples/project_atproto_dashboard/project_atproto_dashboard/ingestion/utils/atproto.py diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/modeling/__init__.py b/examples/project_atproto_dashboard/project_atproto_dashboard/modeling/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/modeling/definitions.py b/examples/project_atproto_dashboard/project_atproto_dashboard/modeling/definitions.py new file mode 100644 index 0000000000000..6e7acf2d55f78 --- /dev/null +++ b/examples/project_atproto_dashboard/project_atproto_dashboard/modeling/definitions.py @@ -0,0 +1,45 @@ +import os +from pathlib import Path +from typing import Any, Mapping, Optional + +import dagster as dg +from dagster_dbt import DagsterDbtTranslator, DbtCliResource, DbtProject, dbt_assets + +dbt_project = DbtProject( + project_dir=Path(__file__).joinpath("..", "..", "..", "dbt_project").resolve(), + target=os.getenv("DBT_TARGET"), +) +dbt_project.prepare_if_dev() +dbt_resource = DbtCliResource(project_dir=dbt_project) + + +class CustomizedDagsterDbtTranslator(DagsterDbtTranslator): + def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: + asset_path = dbt_resource_props["fqn"][1:-1] + if asset_path: + return "_".join(asset_path) + return "default" + + def get_asset_key(self, dbt_resource_props): + resource_type = dbt_resource_props["resource_type"] + name = dbt_resource_props["name"] + if resource_type == "source": + return dg.AssetKey(name) + else: + return super().get_asset_key(dbt_resource_props) + + +@dbt_assets( + manifest=dbt_project.manifest_path, + dagster_dbt_translator=CustomizedDagsterDbtTranslator(), +) +def dbt_bluesky(context: dg.AssetExecutionContext, dbt: DbtCliResource): + yield from (dbt.cli(["build"], context=context).stream().fetch_row_counts()) + + +defs = dg.Definitions( + assets=[dbt_bluesky], + resources={ + "dbt": dbt_resource, + }, +)