diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 88107f4898eda..e38762ef8608f 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -20,7 +20,7 @@ PermissiveConfig, get_dagster_logger, ) -from dagster._annotations import deprecated, deprecated_param, experimental, public +from dagster._annotations import deprecated, experimental, public from dagster._utils.env import environ from dagster._utils.warnings import deprecation_warning from pydantic import Field @@ -361,17 +361,6 @@ def sync( yield from self._exec_sling_cmd(cmd, encoding=encoding) - @public - @deprecated_param( - param="dagster_sling_translator", - breaking_version="2.0", - additional_warn_text="Param is only required in `sling_assets` decorator.", - ) - @deprecated_param( - param="replication_config", - breaking_version="2.0", - additional_warn_text="Param is only required in `sling_assets` decorator.", - ) def replicate( self, *, @@ -390,17 +379,17 @@ def replicate( Returns: Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult """ - # retrieve decorator params from context - metadata_by_key = context.assets_def.metadata_by_key - first_asset_metadata = next(iter(metadata_by_key.values())) + # attempt to retrieve params from asset context if not passed as a parameter + if not (replication_config or dagster_sling_translator): + metadata_by_key = context.assets_def.metadata_by_key + first_asset_metadata = next(iter(metadata_by_key.values())) + dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) + replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) - dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) - if dagster_sling_translator is None: - raise Exception( - f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}" - ) + # if translator has not been defined on metadata _or_ through param, then use the default constructor + if not dagster_sling_translator: + dagster_sling_translator = DagsterSlingTranslator() - replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) if replication_config is None: raise Exception( f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 00054fa04fd85..b27845bc8e782 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -60,7 +60,6 @@ def my_sling_assets(): ... def test_runs_base_sling_config( csv_to_sqlite_replication_config: SlingReplicationParam, - path_to_test_csv: str, path_to_temp_sqlite_db: str, sqlite_connection: sqlite3.Connection, ): diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py new file mode 100644 index 0000000000000..1c48aaca3872a --- /dev/null +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -0,0 +1,35 @@ +import logging + +from dagster import OpExecutionContext, job, op +from dagster_embedded_elt.sling import SlingReplicationParam +from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource + + +def test_base_sling_config_op( + csv_to_sqlite_replication_config: SlingReplicationParam, + path_to_temp_sqlite_db: str, +): + sling_resource = SlingResource( + connections=[ + SlingConnectionResource(type="file", name="SLING_FILE"), + SlingConnectionResource( + type="sqlite", + name="SLING_SQLITE", + connection_string=f"sqlite://{path_to_temp_sqlite_db}", + ), + ] + ) + + @op(out={}) + def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource): + for row in sling.replicate( + context=context, replication_config=csv_to_sqlite_replication_config + ): + logging.info(row) + + @job + def my_sling_op_yield_events_job(): + my_sling_op_yield_events() + + result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) + assert result.success