diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 0186e5afecbc6..85a774ac2465c 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -122,7 +122,6 @@ Now you can define a Sling asset using the object. ```python file=/integrations/embedded_elt/sling_dagster_translator.py -from dagster_embedded_elt import sling from dagster_embedded_elt.sling import ( DagsterSlingTranslator, SlingResource, @@ -135,11 +134,14 @@ replication_config = file_relative_path(__file__, "../sling_replication.yaml") sling_resource = SlingResource(connections=[...]) # Add connections here -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( + context=context, replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) for row in sling.stream_raw_logs(): context.log.info(row) @@ -213,11 +215,13 @@ replication_config = { } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) ``` @@ -249,11 +253,13 @@ replication_config = { } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) ``` diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index cca09ed35d820..fb02e486d2e4e 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -48,9 +48,11 @@ } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index c0f4c42463aae..1e3ca221fbb7c 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -45,11 +45,13 @@ } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py index 09226414221d5..246854abf572e 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -1,4 +1,3 @@ -from dagster_embedded_elt import sling from dagster_embedded_elt.sling import ( DagsterSlingTranslator, SlingResource, @@ -11,11 +10,14 @@ sling_resource = SlingResource(connections=[...]) # Add connections here -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( + context=context, replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index bc76ff44408ca..a03df94101702 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -26,11 +26,14 @@ ) -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( + context=context, replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 2fffbe71214f9..2878edc05df51 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -20,7 +20,7 @@ PermissiveConfig, get_dagster_logger, ) -from dagster._annotations import deprecated, experimental, public +from dagster._annotations import deprecated, deprecated_param, experimental, public from dagster._utils.env import environ from dagster._utils.warnings import deprecation_warning from pydantic import Field @@ -361,11 +361,17 @@ def sync( yield from self._exec_sling_cmd(cmd, encoding=encoding) @public + @deprecated_param( + param="dagster_sling_translator", + breaking_version="2.0", + additional_warn_text="Param is only required in `sling_assets` decorator.", + ) def replicate( self, *, replication_config: SlingReplicationParam, context: Union[OpExecutionContext, AssetExecutionContext], + dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, ) -> Generator[MaterializeResult, None, None]: """Runs a Sling replication from the given replication config. @@ -388,6 +394,11 @@ def replicate( METADATA_KEY_TRANSLATOR, DagsterSlingTranslator() ) + if dagster_sling_translator is None: + raise Exception( + f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}" + ) + with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir()