Skip to content

Commit

Permalink
update docs/examples to pass translator in decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 21, 2024
1 parent 08f0ddc commit df8a236
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 17 deletions.
20 changes: 13 additions & 7 deletions docs/content/integrations/embedded-elt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ Now you can define a Sling asset using the <PyObject module="dagster_embedded_el
Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> object.

```python file=/integrations/embedded_elt/sling_dagster_translator.py
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
Expand All @@ -135,11 +134,14 @@ replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
context=context,
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
for row in sling.stream_raw_logs():
context.log.info(row)
Expand Down Expand Up @@ -213,11 +215,13 @@ replication_config = {
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
```

Expand Down Expand Up @@ -249,11 +253,13 @@ replication_config = {
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
Expand All @@ -11,11 +10,14 @@
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
context=context,
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
for row in sling.stream_raw_logs():
context.log.info(row)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
)


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
context=context,
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
for row in sling.stream_raw_logs():
context.log.info(row)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
PermissiveConfig,
get_dagster_logger,
)
from dagster._annotations import deprecated, experimental, public
from dagster._annotations import deprecated, deprecated_param, experimental, public
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field
Expand Down Expand Up @@ -361,11 +361,17 @@ def sync(
yield from self._exec_sling_cmd(cmd, encoding=encoding)

@public
@deprecated_param(
param="dagster_sling_translator",
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
def replicate(
self,
*,
replication_config: SlingReplicationParam,
context: Union[OpExecutionContext, AssetExecutionContext],
dagster_sling_translator: Optional[DagsterSlingTranslator] = None,
debug: bool = False,
) -> Generator[MaterializeResult, None, None]:
"""Runs a Sling replication from the given replication config.
Expand All @@ -388,6 +394,11 @@ def replicate(
METADATA_KEY_TRANSLATOR, DagsterSlingTranslator()
)

if dagster_sling_translator is None:
raise Exception(
f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}"
)

with self._setup_config():
uid = uuid.uuid4()
temp_dir = tempfile.gettempdir()
Expand Down

0 comments on commit df8a236

Please sign in to comment.