Skip to content

Commit

Permalink
mv replication_config to metadata and sling decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 21, 2024
1 parent df8a236 commit 26f4e14
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 39 deletions.
13 changes: 3 additions & 10 deletions docs/content/integrations/embedded-elt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,7 @@ sling_resource = SlingResource(connections=[...]) # Add connections here
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
context=context,
replication_config=replication_config,
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Expand Down Expand Up @@ -220,9 +217,7 @@ replication_config = {
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
)
yield from sling.replicate(context=context)
```

## Example 2: File to Database
Expand Down Expand Up @@ -258,9 +253,7 @@ replication_config = {
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
)
yield from sling.replicate(context=context)
```

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,4 @@
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
)
yield from sling.replicate(context=context)
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
)
yield from sling.replicate(context=context)


# end_storage_config
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
context=context,
replication_config=replication_config,
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
context=context,
replication_config=replication_config,
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

METADATA_KEY_TRANSLATOR = "dagster_sling/translator"
METADATA_KEY_REPLICATION_CONFIG = "dagster_sling/replication_config"


def get_streams_from_replication(
Expand Down Expand Up @@ -73,10 +74,7 @@ def sling_assets(
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(
replication_config=config_path,
context=context,
):
for lines in sling.replicate(context=context):
context.log.info(lines)
"""
replication_config = validate_replication(replication_config)
Expand All @@ -93,6 +91,7 @@ def my_assets(context, sling: SlingResource):
metadata={ # type: ignore
**dagster_sling_translator.get_metadata(stream),
METADATA_KEY_TRANSLATOR: dagster_sling_translator,
METADATA_KEY_REPLICATION_CONFIG: replication_config,
},
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import (
METADATA_KEY_REPLICATION_CONFIG,
METADATA_KEY_TRANSLATOR,
get_streams_from_replication,
)
Expand Down Expand Up @@ -366,11 +367,16 @@ def sync(
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
@deprecated_param(
param="replication_config",
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
def replicate(
self,
*,
replication_config: SlingReplicationParam,
context: Union[OpExecutionContext, AssetExecutionContext],
replication_config: Optional[SlingReplicationParam] = None,
dagster_sling_translator: Optional[DagsterSlingTranslator] = None,
debug: bool = False,
) -> Generator[MaterializeResult, None, None]:
Expand All @@ -384,21 +390,25 @@ def replicate(
Returns:
Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult
"""
replication_config = validate_replication(replication_config)
stream_definition = get_streams_from_replication(replication_config)

# retrieve translator from context
# retrieve decorator params from context
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
dagster_sling_translator = first_asset_metadata.get(
METADATA_KEY_TRANSLATOR, DagsterSlingTranslator()
)

dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR)
if dagster_sling_translator is None:
raise Exception(
f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}"
)

replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)
if replication_config is None:
raise Exception(
f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}"
)

replication_config = validate_replication(replication_config)
stream_definition = get_streams_from_replication(replication_config)

with self._setup_config():
uid = uuid.uuid4()
temp_dir = tempfile.gettempdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ def test_runs_base_sling_config(
):
@sling_assets(replication_config=csv_to_sqlite_replication_config)
def my_sling_assets(context: AssetExecutionContext, sling: SlingResource):
for row in sling.replicate(
replication_config=csv_to_sqlite_replication_config, context=context
):
for row in sling.replicate(context=context):
logging.info(row)

sling_resource = SlingResource(
Expand Down Expand Up @@ -137,6 +135,49 @@ def my_sling_assets(): ...
AssetKey(["target", "public", "accounts"]): {
"stream_config": JsonMetadataValue(data=None),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
},
AssetKey(["target", "departments"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -157,6 +198,49 @@ def my_sling_assets(): ...
}
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
},
AssetKey(["target", "public", "transactions"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -173,6 +257,49 @@ def my_sling_assets(): ...
}
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
},
AssetKey(["target", "public", "all_users"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -182,6 +309,49 @@ def my_sling_assets(): ...
}
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
},
}

Expand Down

0 comments on commit 26f4e14

Please sign in to comment.