diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 85a774ac2465c..23aae82ffe03d 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -139,10 +139,7 @@ sling_resource = SlingResource(connections=[...]) # Add connections here dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - context=context, - replication_config=replication_config, - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) @@ -220,9 +217,7 @@ replication_config = { dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) ``` ## Example 2: File to Database @@ -258,9 +253,7 @@ replication_config = { dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) ``` --- diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index fb02e486d2e4e..c74133c912c15 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -53,6 +53,4 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index 1e3ca221fbb7c..aefacb73c01ba 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -50,9 +50,7 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) # end_storage_config diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py index 246854abf572e..e178c63988173 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -15,10 +15,7 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - context=context, - replication_config=replication_config, - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index a03df94101702..ec9e1c62b7ddc 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -31,10 +31,7 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - context=context, - replication_config=replication_config, - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 6839c0ee247a2..9be6a92ec6878 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -13,6 +13,7 @@ from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication METADATA_KEY_TRANSLATOR = "dagster_sling/translator" +METADATA_KEY_REPLICATION_CONFIG = "dagster_sling/replication_config" def get_streams_from_replication( @@ -73,10 +74,7 @@ def sling_assets( config_path = "/path/to/replication.yaml" @sling_assets(replication_config=config_path) def my_assets(context, sling: SlingResource): - for lines in sling.replicate( - replication_config=config_path, - context=context, - ): + for lines in sling.replicate(context=context): context.log.info(lines) """ replication_config = validate_replication(replication_config) @@ -93,6 +91,7 @@ def my_assets(context, sling: SlingResource): metadata={ # type: ignore **dagster_sling_translator.get_metadata(stream), METADATA_KEY_TRANSLATOR: dagster_sling_translator, + METADATA_KEY_REPLICATION_CONFIG: replication_config, }, group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 2878edc05df51..88107f4898eda 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -26,6 +26,7 @@ from pydantic import Field from dagster_embedded_elt.sling.asset_decorator import ( + METADATA_KEY_REPLICATION_CONFIG, METADATA_KEY_TRANSLATOR, get_streams_from_replication, ) @@ -366,11 +367,16 @@ def sync( breaking_version="2.0", additional_warn_text="Param is only required in `sling_assets` decorator.", ) + @deprecated_param( + param="replication_config", + breaking_version="2.0", + additional_warn_text="Param is only required in `sling_assets` decorator.", + ) def replicate( self, *, - replication_config: SlingReplicationParam, context: Union[OpExecutionContext, AssetExecutionContext], + replication_config: Optional[SlingReplicationParam] = None, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, ) -> Generator[MaterializeResult, None, None]: @@ -384,21 +390,25 @@ def replicate( Returns: Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult """ - replication_config = validate_replication(replication_config) - stream_definition = get_streams_from_replication(replication_config) - - # retrieve translator from context + # retrieve decorator params from context metadata_by_key = context.assets_def.metadata_by_key first_asset_metadata = next(iter(metadata_by_key.values())) - dagster_sling_translator = first_asset_metadata.get( - METADATA_KEY_TRANSLATOR, DagsterSlingTranslator() - ) + dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) if dagster_sling_translator is None: raise Exception( f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}" ) + replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) + if replication_config is None: + raise Exception( + f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}" + ) + + replication_config = validate_replication(replication_config) + stream_definition = get_streams_from_replication(replication_config) + with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir() diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 4011543b587d2..00054fa04fd85 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -66,9 +66,7 @@ def test_runs_base_sling_config( ): @sling_assets(replication_config=csv_to_sqlite_replication_config) def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): - for row in sling.replicate( - replication_config=csv_to_sqlite_replication_config, context=context - ): + for row in sling.replicate(context=context): logging.info(row) sling_resource = SlingResource( @@ -137,6 +135,49 @@ def my_sling_assets(): ... AssetKey(["target", "public", "accounts"]): { "stream_config": JsonMetadataValue(data=None), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, AssetKey(["target", "departments"]): { "stream_config": JsonMetadataValue( @@ -157,6 +198,49 @@ def my_sling_assets(): ... } ), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, AssetKey(["target", "public", "transactions"]): { "stream_config": JsonMetadataValue( @@ -173,6 +257,49 @@ def my_sling_assets(): ... } ), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, AssetKey(["target", "public", "all_users"]): { "stream_config": JsonMetadataValue( @@ -182,6 +309,49 @@ def my_sling_assets(): ... } ), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, }