From 08f0ddc7ce2253a71590e8b31bc24551d53184ae Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Tue, 19 Mar 2024 14:40:57 -0400 Subject: [PATCH 01/13] [embedded-elt][sling] example of passing translator from decorator using metadata --- .../sling/asset_decorator.py | 9 ++++-- .../dagster_embedded_elt/sling/resources.py | 20 ++++++++++--- .../test_asset_decorator.py | 28 +++++++++++-------- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 0923671a5b600..6839c0ee247a2 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -12,6 +12,8 @@ from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication +METADATA_KEY_TRANSLATOR = "dagster_sling/translator" + def get_streams_from_replication( replication_config: Mapping[str, Any], @@ -73,7 +75,7 @@ def sling_assets( def my_assets(context, sling: SlingResource): for lines in sling.replicate( replication_config=config_path, - dagster_sling_translator=DagsterSlingTranslator(), + context=context, ): context.log.info(lines) """ @@ -88,7 +90,10 @@ def my_assets(context, sling: SlingResource): key=dagster_sling_translator.get_asset_key(stream), deps=dagster_sling_translator.get_deps_asset_key(stream), description=dagster_sling_translator.get_description(stream), - metadata=dagster_sling_translator.get_metadata(stream), + metadata={ # type: ignore + **dagster_sling_translator.get_metadata(stream), + METADATA_KEY_TRANSLATOR: dagster_sling_translator, + }, group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index bcc8491b86c75..2fffbe71214f9 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -8,13 +8,15 @@ import uuid from enum import Enum from subprocess import PIPE, STDOUT, Popen -from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional +from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union import sling from dagster import ( + AssetExecutionContext, ConfigurableResource, EnvVar, MaterializeResult, + OpExecutionContext, PermissiveConfig, get_dagster_logger, ) @@ -23,7 +25,10 @@ from dagster._utils.warnings import deprecation_warning from pydantic import Field -from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication +from dagster_embedded_elt.sling.asset_decorator import ( + METADATA_KEY_TRANSLATOR, + get_streams_from_replication, +) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication @@ -360,14 +365,14 @@ def replicate( self, *, replication_config: SlingReplicationParam, - dagster_sling_translator: DagsterSlingTranslator, + context: Union[OpExecutionContext, AssetExecutionContext], debug: bool = False, ) -> Generator[MaterializeResult, None, None]: """Runs a Sling replication from the given replication config. Args: replication_config: The Sling replication config to use for the replication. - dagster_sling_translator: The translator to use for the replication. + context: Asset or Op execution context. debug: Whether to run the replication in debug mode. Returns: @@ -376,6 +381,13 @@ def replicate( replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config) + # retrieve translator from context + metadata_by_key = context.assets_def.metadata_by_key + first_asset_metadata = next(iter(metadata_by_key.values())) + dagster_sling_translator = first_asset_metadata.get( + METADATA_KEY_TRANSLATOR, DagsterSlingTranslator() + ) + with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir() diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 6c1e9d81cc7d3..4011543b587d2 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -3,6 +3,7 @@ import pytest from dagster import ( + AssetExecutionContext, AssetKey, FreshnessPolicy, JsonMetadataValue, @@ -64,10 +65,9 @@ def test_runs_base_sling_config( sqlite_connection: sqlite3.Connection, ): @sling_assets(replication_config=csv_to_sqlite_replication_config) - def my_sling_assets(sling: SlingResource): + def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): for row in sling.replicate( - replication_config=csv_to_sqlite_replication_config, - dagster_sling_translator=DagsterSlingTranslator(), + replication_config=csv_to_sqlite_replication_config, context=context ): logging.info(row) @@ -105,11 +105,11 @@ def my_third_sling_assets(): ... def test_base_with_meta_config_translator(): - @sling_assets( - replication_config=file_relative_path( - __file__, "replication_configs/base_with_meta_config/replication.yaml" - ) + replication_config = file_relative_path( + __file__, "replication_configs/base_with_meta_config/replication.yaml" ) + + @sling_assets(replication_config=replication_config) def my_sling_assets(): ... assert my_sling_assets.keys == { @@ -134,7 +134,10 @@ def my_sling_assets(): ... } assert my_sling_assets.metadata_by_key == { - AssetKey(["target", "public", "accounts"]): {"stream_config": JsonMetadataValue(data=None)}, + AssetKey(["target", "public", "accounts"]): { + "stream_config": JsonMetadataValue(data=None), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + }, AssetKey(["target", "departments"]): { "stream_config": JsonMetadataValue( data={ @@ -152,7 +155,8 @@ def my_sling_assets(): ... } }, } - ) + ), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), }, AssetKey(["target", "public", "transactions"]): { "stream_config": JsonMetadataValue( @@ -167,7 +171,8 @@ def my_sling_assets(): ... } }, } - ) + ), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), }, AssetKey(["target", "public", "all_users"]): { "stream_config": JsonMetadataValue( @@ -175,7 +180,8 @@ def my_sling_assets(): ... "sql": 'select all_user_id, name \nfrom public."all_Users"\n', "object": "public.all_users", } - ) + ), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), }, } From df8a236453c8488b80ef7468eda767098c307c5f Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Thu, 21 Mar 2024 10:48:45 -0400 Subject: [PATCH 02/13] update docs/examples to pass translator in decorator --- docs/content/integrations/embedded-elt.mdx | 20 ++++++++++++------- .../embedded_elt/postgres_snowflake.py | 6 ++++-- .../integrations/embedded_elt/s3_snowflake.py | 6 ++++-- .../embedded_elt/sling_dagster_translator.py | 8 +++++--- .../sling_decorator/__init__.py | 7 +++++-- .../dagster_embedded_elt/sling/resources.py | 13 +++++++++++- 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 0186e5afecbc6..85a774ac2465c 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -122,7 +122,6 @@ Now you can define a Sling asset using the object. ```python file=/integrations/embedded_elt/sling_dagster_translator.py -from dagster_embedded_elt import sling from dagster_embedded_elt.sling import ( DagsterSlingTranslator, SlingResource, @@ -135,11 +134,14 @@ replication_config = file_relative_path(__file__, "../sling_replication.yaml") sling_resource = SlingResource(connections=[...]) # Add connections here -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( + context=context, replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) for row in sling.stream_raw_logs(): context.log.info(row) @@ -213,11 +215,13 @@ replication_config = { } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) ``` @@ -249,11 +253,13 @@ replication_config = { } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) ``` diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index cca09ed35d820..fb02e486d2e4e 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -48,9 +48,11 @@ } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index c0f4c42463aae..1e3ca221fbb7c 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -45,11 +45,13 @@ } -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py index 09226414221d5..246854abf572e 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -1,4 +1,3 @@ -from dagster_embedded_elt import sling from dagster_embedded_elt.sling import ( DagsterSlingTranslator, SlingResource, @@ -11,11 +10,14 @@ sling_resource = SlingResource(connections=[...]) # Add connections here -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( + context=context, replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index bc76ff44408ca..a03df94101702 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -26,11 +26,14 @@ ) -@sling_assets(replication_config=replication_config) +@sling_assets( + replication_config=replication_config, + dagster_sling_translator=DagsterSlingTranslator(), +) def my_assets(context, sling: SlingResource): yield from sling.replicate( + context=context, replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), ) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 2fffbe71214f9..2878edc05df51 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -20,7 +20,7 @@ PermissiveConfig, get_dagster_logger, ) -from dagster._annotations import deprecated, experimental, public +from dagster._annotations import deprecated, deprecated_param, experimental, public from dagster._utils.env import environ from dagster._utils.warnings import deprecation_warning from pydantic import Field @@ -361,11 +361,17 @@ def sync( yield from self._exec_sling_cmd(cmd, encoding=encoding) @public + @deprecated_param( + param="dagster_sling_translator", + breaking_version="2.0", + additional_warn_text="Param is only required in `sling_assets` decorator.", + ) def replicate( self, *, replication_config: SlingReplicationParam, context: Union[OpExecutionContext, AssetExecutionContext], + dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, ) -> Generator[MaterializeResult, None, None]: """Runs a Sling replication from the given replication config. @@ -388,6 +394,11 @@ def replicate( METADATA_KEY_TRANSLATOR, DagsterSlingTranslator() ) + if dagster_sling_translator is None: + raise Exception( + f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}" + ) + with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir() From 26f4e147e55a183bd86b0a39b8d7016940c4a6e9 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Thu, 21 Mar 2024 11:09:38 -0400 Subject: [PATCH 03/13] mv replication_config to metadata and sling decorator --- docs/content/integrations/embedded-elt.mdx | 13 +- .../embedded_elt/postgres_snowflake.py | 4 +- .../integrations/embedded_elt/s3_snowflake.py | 4 +- .../embedded_elt/sling_dagster_translator.py | 5 +- .../sling_decorator/__init__.py | 5 +- .../sling/asset_decorator.py | 7 +- .../dagster_embedded_elt/sling/resources.py | 26 ++- .../test_asset_decorator.py | 176 +++++++++++++++++- 8 files changed, 201 insertions(+), 39 deletions(-) diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 85a774ac2465c..23aae82ffe03d 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -139,10 +139,7 @@ sling_resource = SlingResource(connections=[...]) # Add connections here dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - context=context, - replication_config=replication_config, - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) @@ -220,9 +217,7 @@ replication_config = { dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) ``` ## Example 2: File to Database @@ -258,9 +253,7 @@ replication_config = { dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) ``` --- diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index fb02e486d2e4e..c74133c912c15 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -53,6 +53,4 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index 1e3ca221fbb7c..aefacb73c01ba 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -50,9 +50,7 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - ) + yield from sling.replicate(context=context) # end_storage_config diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py index 246854abf572e..e178c63988173 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -15,10 +15,7 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - context=context, - replication_config=replication_config, - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index a03df94101702..ec9e1c62b7ddc 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -31,10 +31,7 @@ dagster_sling_translator=DagsterSlingTranslator(), ) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - context=context, - replication_config=replication_config, - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 6839c0ee247a2..9be6a92ec6878 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -13,6 +13,7 @@ from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication METADATA_KEY_TRANSLATOR = "dagster_sling/translator" +METADATA_KEY_REPLICATION_CONFIG = "dagster_sling/replication_config" def get_streams_from_replication( @@ -73,10 +74,7 @@ def sling_assets( config_path = "/path/to/replication.yaml" @sling_assets(replication_config=config_path) def my_assets(context, sling: SlingResource): - for lines in sling.replicate( - replication_config=config_path, - context=context, - ): + for lines in sling.replicate(context=context): context.log.info(lines) """ replication_config = validate_replication(replication_config) @@ -93,6 +91,7 @@ def my_assets(context, sling: SlingResource): metadata={ # type: ignore **dagster_sling_translator.get_metadata(stream), METADATA_KEY_TRANSLATOR: dagster_sling_translator, + METADATA_KEY_REPLICATION_CONFIG: replication_config, }, group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 2878edc05df51..88107f4898eda 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -26,6 +26,7 @@ from pydantic import Field from dagster_embedded_elt.sling.asset_decorator import ( + METADATA_KEY_REPLICATION_CONFIG, METADATA_KEY_TRANSLATOR, get_streams_from_replication, ) @@ -366,11 +367,16 @@ def sync( breaking_version="2.0", additional_warn_text="Param is only required in `sling_assets` decorator.", ) + @deprecated_param( + param="replication_config", + breaking_version="2.0", + additional_warn_text="Param is only required in `sling_assets` decorator.", + ) def replicate( self, *, - replication_config: SlingReplicationParam, context: Union[OpExecutionContext, AssetExecutionContext], + replication_config: Optional[SlingReplicationParam] = None, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, ) -> Generator[MaterializeResult, None, None]: @@ -384,21 +390,25 @@ def replicate( Returns: Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult """ - replication_config = validate_replication(replication_config) - stream_definition = get_streams_from_replication(replication_config) - - # retrieve translator from context + # retrieve decorator params from context metadata_by_key = context.assets_def.metadata_by_key first_asset_metadata = next(iter(metadata_by_key.values())) - dagster_sling_translator = first_asset_metadata.get( - METADATA_KEY_TRANSLATOR, DagsterSlingTranslator() - ) + dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) if dagster_sling_translator is None: raise Exception( f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}" ) + replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) + if replication_config is None: + raise Exception( + f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}" + ) + + replication_config = validate_replication(replication_config) + stream_definition = get_streams_from_replication(replication_config) + with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir() diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 4011543b587d2..00054fa04fd85 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -66,9 +66,7 @@ def test_runs_base_sling_config( ): @sling_assets(replication_config=csv_to_sqlite_replication_config) def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): - for row in sling.replicate( - replication_config=csv_to_sqlite_replication_config, context=context - ): + for row in sling.replicate(context=context): logging.info(row) sling_resource = SlingResource( @@ -137,6 +135,49 @@ def my_sling_assets(): ... AssetKey(["target", "public", "accounts"]): { "stream_config": JsonMetadataValue(data=None), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, AssetKey(["target", "departments"]): { "stream_config": JsonMetadataValue( @@ -157,6 +198,49 @@ def my_sling_assets(): ... } ), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, AssetKey(["target", "public", "transactions"]): { "stream_config": JsonMetadataValue( @@ -173,6 +257,49 @@ def my_sling_assets(): ... } ), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, AssetKey(["target", "public", "all_users"]): { "stream_config": JsonMetadataValue( @@ -182,6 +309,49 @@ def my_sling_assets(): ... } ), "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + "dagster_sling/replication_config": { + "source": "MY_SOURCE", + "target": "MY_TARGET", + "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, + "streams": { + "public.accounts": None, + "public.users": { + "disabled": True, + "meta": {"dagster": {"asset_key": "public.foo_users"}}, + }, + "public.finance_departments_old": { + "object": "departments", + "source_options": {"empty_as_null": False}, + "meta": { + "dagster": { + "deps": ["foo_one", "foo_two"], + "group": "group_2", + "freshness_policy": { + "maximum_lag_minutes": 0, + "cron_schedule": "5 4 * * *", + "cron_schedule_timezone": "UTC", + }, + } + }, + }, + 'public."Transactions"': { + "mode": "incremental", + "primary_key": "id", + "update_key": "last_updated_at", + "meta": { + "dagster": { + "description": "Example Description!", + "auto_materialize_policy": True, + } + }, + }, + "public.all_users": { + "sql": 'select all_user_id, name \nfrom public."all_Users"\n', + "object": "public.all_users", + }, + }, + "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, + }, }, } From cb8f22cfe6b75b160ecc35353e46b299318f510b Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Thu, 21 Mar 2024 13:00:03 -0400 Subject: [PATCH 04/13] add sling metadata to ui ignore set --- .../dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx b/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx index c32d54c7875ed..9edf15473a2d1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx @@ -39,6 +39,8 @@ export const HIDDEN_METADATA_ENTRY_LABELS = new Set([ 'dagster-dbt/exclude', 'dagster_dbt/manifest', 'dagster_dbt/dagster_dbt_translator', + 'dagster_sling/translator', + 'dagster_sling/replication_config', ]); export const LogRowStructuredContentTable = ({ From a286b9f8884910a34f50d0c92ac3ddc2f2dde26c Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Thu, 21 Mar 2024 15:13:39 -0400 Subject: [PATCH 05/13] rm deprecation; add unit test for op usage --- .../dagster_embedded_elt/sling/resources.py | 31 ++++++---------- .../test_asset_decorator.py | 1 - .../dagster_embedded_elt_tests/test_op.py | 35 +++++++++++++++++++ 3 files changed, 45 insertions(+), 22 deletions(-) create mode 100644 python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 88107f4898eda..e38762ef8608f 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -20,7 +20,7 @@ PermissiveConfig, get_dagster_logger, ) -from dagster._annotations import deprecated, deprecated_param, experimental, public +from dagster._annotations import deprecated, experimental, public from dagster._utils.env import environ from dagster._utils.warnings import deprecation_warning from pydantic import Field @@ -361,17 +361,6 @@ def sync( yield from self._exec_sling_cmd(cmd, encoding=encoding) - @public - @deprecated_param( - param="dagster_sling_translator", - breaking_version="2.0", - additional_warn_text="Param is only required in `sling_assets` decorator.", - ) - @deprecated_param( - param="replication_config", - breaking_version="2.0", - additional_warn_text="Param is only required in `sling_assets` decorator.", - ) def replicate( self, *, @@ -390,17 +379,17 @@ def replicate( Returns: Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult """ - # retrieve decorator params from context - metadata_by_key = context.assets_def.metadata_by_key - first_asset_metadata = next(iter(metadata_by_key.values())) + # attempt to retrieve params from asset context if not passed as a parameter + if not (replication_config or dagster_sling_translator): + metadata_by_key = context.assets_def.metadata_by_key + first_asset_metadata = next(iter(metadata_by_key.values())) + dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) + replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) - dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) - if dagster_sling_translator is None: - raise Exception( - f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}" - ) + # if translator has not been defined on metadata _or_ through param, then use the default constructor + if not dagster_sling_translator: + dagster_sling_translator = DagsterSlingTranslator() - replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) if replication_config is None: raise Exception( f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 00054fa04fd85..b27845bc8e782 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -60,7 +60,6 @@ def my_sling_assets(): ... def test_runs_base_sling_config( csv_to_sqlite_replication_config: SlingReplicationParam, - path_to_test_csv: str, path_to_temp_sqlite_db: str, sqlite_connection: sqlite3.Connection, ): diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py new file mode 100644 index 0000000000000..1c48aaca3872a --- /dev/null +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -0,0 +1,35 @@ +import logging + +from dagster import OpExecutionContext, job, op +from dagster_embedded_elt.sling import SlingReplicationParam +from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource + + +def test_base_sling_config_op( + csv_to_sqlite_replication_config: SlingReplicationParam, + path_to_temp_sqlite_db: str, +): + sling_resource = SlingResource( + connections=[ + SlingConnectionResource(type="file", name="SLING_FILE"), + SlingConnectionResource( + type="sqlite", + name="SLING_SQLITE", + connection_string=f"sqlite://{path_to_temp_sqlite_db}", + ), + ] + ) + + @op(out={}) + def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource): + for row in sling.replicate( + context=context, replication_config=csv_to_sqlite_replication_config + ): + logging.info(row) + + @job + def my_sling_op_yield_events_job(): + my_sling_op_yield_events() + + result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) + assert result.success From 767459c0bc7be6e08f61fde8c712d5fe48fc2b6b Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Thu, 21 Mar 2024 16:48:56 -0400 Subject: [PATCH 06/13] implement pr review feedback --- .../ui-core/src/metadata/MetadataEntry.tsx | 4 +- .../sling/asset_decorator.py | 4 +- .../dagster_embedded_elt/sling/resources.py | 10 +- .../test_asset_decorator.py | 199 ++---------------- 4 files changed, 27 insertions(+), 190 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx b/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx index 9edf15473a2d1..609a3305c61d3 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx @@ -39,8 +39,8 @@ export const HIDDEN_METADATA_ENTRY_LABELS = new Set([ 'dagster-dbt/exclude', 'dagster_dbt/manifest', 'dagster_dbt/dagster_dbt_translator', - 'dagster_sling/translator', - 'dagster_sling/replication_config', + 'dagster_embedded_elt/dagster_sling_translator', + 'dagster_embedded_elt/sling_replication_config', ]); export const LogRowStructuredContentTable = ({ diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 9be6a92ec6878..6e26eb8790b51 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -12,8 +12,8 @@ from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication -METADATA_KEY_TRANSLATOR = "dagster_sling/translator" -METADATA_KEY_REPLICATION_CONFIG = "dagster_sling/replication_config" +METADATA_KEY_TRANSLATOR = "dagster_embedded_elt/dagster_sling_translator" +METADATA_KEY_REPLICATION_CONFIG = "dagster_embedded_elt/sling_replication_config" def get_streams_from_replication( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index e38762ef8608f..5f5c48d630f93 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -387,15 +387,9 @@ def replicate( replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) # if translator has not been defined on metadata _or_ through param, then use the default constructor - if not dagster_sling_translator: - dagster_sling_translator = DagsterSlingTranslator() + dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator() - if replication_config is None: - raise Exception( - f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}" - ) - - replication_config = validate_replication(replication_config) + replication_config = validate_replication(replication_config or {}) stream_definition = get_streams_from_replication(replication_config) with self._setup_config(): diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index b27845bc8e782..828cfc8d386f3 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -2,6 +2,7 @@ import sqlite3 import pytest +import yaml from dagster import ( AssetExecutionContext, AssetKey, @@ -16,6 +17,7 @@ ) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource +from path import Path @pytest.mark.parametrize( @@ -102,11 +104,12 @@ def my_third_sling_assets(): ... def test_base_with_meta_config_translator(): - replication_config = file_relative_path( + replication_config_path = file_relative_path( __file__, "replication_configs/base_with_meta_config/replication.yaml" ) + replication_config = yaml.safe_load(Path(replication_config_path).read_bytes()) - @sling_assets(replication_config=replication_config) + @sling_assets(replication_config=replication_config_path) def my_sling_assets(): ... assert my_sling_assets.keys == { @@ -133,50 +136,10 @@ def my_sling_assets(): ... assert my_sling_assets.metadata_by_key == { AssetKey(["target", "public", "accounts"]): { "stream_config": JsonMetadataValue(data=None), - "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), - "dagster_sling/replication_config": { - "source": "MY_SOURCE", - "target": "MY_TARGET", - "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, - "streams": { - "public.accounts": None, - "public.users": { - "disabled": True, - "meta": {"dagster": {"asset_key": "public.foo_users"}}, - }, - "public.finance_departments_old": { - "object": "departments", - "source_options": {"empty_as_null": False}, - "meta": { - "dagster": { - "deps": ["foo_one", "foo_two"], - "group": "group_2", - "freshness_policy": { - "maximum_lag_minutes": 0, - "cron_schedule": "5 4 * * *", - "cron_schedule_timezone": "UTC", - }, - } - }, - }, - 'public."Transactions"': { - "mode": "incremental", - "primary_key": "id", - "update_key": "last_updated_at", - "meta": { - "dagster": { - "description": "Example Description!", - "auto_materialize_policy": True, - } - }, - }, - "public.all_users": { - "sql": 'select all_user_id, name \nfrom public."all_Users"\n', - "object": "public.all_users", - }, - }, - "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, - }, + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, AssetKey(["target", "departments"]): { "stream_config": JsonMetadataValue( @@ -196,50 +159,10 @@ def my_sling_assets(): ... }, } ), - "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), - "dagster_sling/replication_config": { - "source": "MY_SOURCE", - "target": "MY_TARGET", - "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, - "streams": { - "public.accounts": None, - "public.users": { - "disabled": True, - "meta": {"dagster": {"asset_key": "public.foo_users"}}, - }, - "public.finance_departments_old": { - "object": "departments", - "source_options": {"empty_as_null": False}, - "meta": { - "dagster": { - "deps": ["foo_one", "foo_two"], - "group": "group_2", - "freshness_policy": { - "maximum_lag_minutes": 0, - "cron_schedule": "5 4 * * *", - "cron_schedule_timezone": "UTC", - }, - } - }, - }, - 'public."Transactions"': { - "mode": "incremental", - "primary_key": "id", - "update_key": "last_updated_at", - "meta": { - "dagster": { - "description": "Example Description!", - "auto_materialize_policy": True, - } - }, - }, - "public.all_users": { - "sql": 'select all_user_id, name \nfrom public."all_Users"\n', - "object": "public.all_users", - }, - }, - "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, - }, + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, AssetKey(["target", "public", "transactions"]): { "stream_config": JsonMetadataValue( @@ -255,50 +178,10 @@ def my_sling_assets(): ... }, } ), - "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), - "dagster_sling/replication_config": { - "source": "MY_SOURCE", - "target": "MY_TARGET", - "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, - "streams": { - "public.accounts": None, - "public.users": { - "disabled": True, - "meta": {"dagster": {"asset_key": "public.foo_users"}}, - }, - "public.finance_departments_old": { - "object": "departments", - "source_options": {"empty_as_null": False}, - "meta": { - "dagster": { - "deps": ["foo_one", "foo_two"], - "group": "group_2", - "freshness_policy": { - "maximum_lag_minutes": 0, - "cron_schedule": "5 4 * * *", - "cron_schedule_timezone": "UTC", - }, - } - }, - }, - 'public."Transactions"': { - "mode": "incremental", - "primary_key": "id", - "update_key": "last_updated_at", - "meta": { - "dagster": { - "description": "Example Description!", - "auto_materialize_policy": True, - } - }, - }, - "public.all_users": { - "sql": 'select all_user_id, name \nfrom public."all_Users"\n', - "object": "public.all_users", - }, - }, - "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, - }, + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, AssetKey(["target", "public", "all_users"]): { "stream_config": JsonMetadataValue( @@ -307,50 +190,10 @@ def my_sling_assets(): ... "object": "public.all_users", } ), - "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), - "dagster_sling/replication_config": { - "source": "MY_SOURCE", - "target": "MY_TARGET", - "defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"}, - "streams": { - "public.accounts": None, - "public.users": { - "disabled": True, - "meta": {"dagster": {"asset_key": "public.foo_users"}}, - }, - "public.finance_departments_old": { - "object": "departments", - "source_options": {"empty_as_null": False}, - "meta": { - "dagster": { - "deps": ["foo_one", "foo_two"], - "group": "group_2", - "freshness_policy": { - "maximum_lag_minutes": 0, - "cron_schedule": "5 4 * * *", - "cron_schedule_timezone": "UTC", - }, - } - }, - }, - 'public."Transactions"': { - "mode": "incremental", - "primary_key": "id", - "update_key": "last_updated_at", - "meta": { - "dagster": { - "description": "Example Description!", - "auto_materialize_policy": True, - } - }, - }, - "public.all_users": { - "sql": 'select all_user_id, name \nfrom public."all_Users"\n', - "object": "public.all_users", - }, - }, - "env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True}, - }, + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, } From 93ae244a55bc8f8f2990daff63d6813680b7c3f3 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 10:16:57 -0400 Subject: [PATCH 07/13] add ability to use sling within an op --- .../dagster_embedded_elt/sling/resources.py | 17 +++++++++++------ .../sling/sling_replication.py | 5 +++-- .../test_asset_decorator.py | 7 +++---- .../dagster_embedded_elt_tests/test_op.py | 7 ++----- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 5f5c48d630f93..91060e55db5a5 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -17,6 +17,7 @@ EnvVar, MaterializeResult, OpExecutionContext, + Output, PermissiveConfig, get_dagster_logger, ) @@ -368,12 +369,13 @@ def replicate( replication_config: Optional[SlingReplicationParam] = None, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, - ) -> Generator[MaterializeResult, None, None]: + ) -> Generator[Union[MaterializeResult, Output], None, None]: """Runs a Sling replication from the given replication config. Args: - replication_config: The Sling replication config to use for the replication. context: Asset or Op execution context. + replication_config: The Sling replication config to use for the replication. + dagster_sling_translator: The translator to use for the replication. debug: Whether to run the replication in debug mode. Returns: @@ -389,7 +391,7 @@ def replicate( # if translator has not been defined on metadata _or_ through param, then use the default constructor dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator() - replication_config = validate_replication(replication_config or {}) + replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config) with self._setup_config(): @@ -426,9 +428,12 @@ def replicate( for stream in stream_definition: output_name = dagster_sling_translator.get_asset_key(stream) - yield MaterializeResult( - asset_key=output_name, metadata={"elapsed_time": end_time - start_time} - ) + if isinstance(context, AssetExecutionContext): + yield MaterializeResult( + asset_key=output_name, metadata={"elapsed_time": end_time - start_time} + ) + # elif isinstance(context, OpExecutionContext): + # yield Output(value=None, output_name=str(output_name), metadata={"elapsed_time": end_time - start_time}) def stream_raw_logs(self) -> Generator[str, None, None]: """Returns a generator of raw logs from the Sling CLI.""" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py index b7ee369687fc2..78850ac40978b 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py @@ -1,6 +1,6 @@ from functools import lru_cache from pathlib import Path -from typing import Any, Mapping, Union, cast +from typing import Any, Mapping, Optional, Union, cast import dagster._check as check import yaml @@ -18,7 +18,8 @@ def read_replication_path(replication_path: Path) -> Mapping[str, Any]: return cast(Mapping[str, Any], yaml.safe_load(replication_path.read_bytes())) -def validate_replication(replication: SlingReplicationParam) -> Mapping[str, Any]: +def validate_replication(replication: Optional[SlingReplicationParam]) -> Mapping[str, Any]: + replication = replication or {} check.inst_param(replication, "manifest", (Path, str, dict)) if isinstance(replication, str): diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 828cfc8d386f3..9c23da84b16eb 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -1,5 +1,5 @@ -import logging import sqlite3 +from pathlib import Path import pytest import yaml @@ -17,7 +17,6 @@ ) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource -from path import Path @pytest.mark.parametrize( @@ -67,8 +66,7 @@ def test_runs_base_sling_config( ): @sling_assets(replication_config=csv_to_sqlite_replication_config) def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): - for row in sling.replicate(context=context): - logging.info(row) + yield from sling.replicate(context=context) sling_resource = SlingResource( connections=[ @@ -81,6 +79,7 @@ def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): ] ) res = materialize([my_sling_assets], resources={"sling": sling_resource}) + assert res.success counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] assert counts == 3 diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py index 1c48aaca3872a..d418c24ee911d 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -1,5 +1,3 @@ -import logging - from dagster import OpExecutionContext, job, op from dagster_embedded_elt.sling import SlingReplicationParam from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource @@ -22,10 +20,9 @@ def test_base_sling_config_op( @op(out={}) def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource): - for row in sling.replicate( + yield from sling.replicate( context=context, replication_config=csv_to_sqlite_replication_config - ): - logging.info(row) + ) @job def my_sling_op_yield_events_job(): From a94aab1268575a807e4a515d03944be9ab3a93a0 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 10:30:24 -0400 Subject: [PATCH 08/13] assert materialize events --- .../dagster_embedded_elt/sling/asset_decorator.py | 3 +-- .../dagster_embedded_elt/sling/resources.py | 2 -- .../dagster_embedded_elt_tests/test_asset_decorator.py | 2 ++ .../dagster_embedded_elt_tests/test_op.py | 5 +++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 6e26eb8790b51..41d028d50140d 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -74,8 +74,7 @@ def sling_assets( config_path = "/path/to/replication.yaml" @sling_assets(replication_config=config_path) def my_assets(context, sling: SlingResource): - for lines in sling.replicate(context=context): - context.log.info(lines) + yield from sling.replicate(context=context) """ replication_config = validate_replication(replication_config) streams = get_streams_from_replication(replication_config) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 91060e55db5a5..ba4a5a8396b84 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -432,8 +432,6 @@ def replicate( yield MaterializeResult( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) - # elif isinstance(context, OpExecutionContext): - # yield Output(value=None, output_name=str(output_name), metadata={"elapsed_time": end_time - start_time}) def stream_raw_logs(self) -> Generator[str, None, None]: """Returns a generator of raw logs from the Sling CLI.""" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 9c23da84b16eb..185c8107e17e5 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -81,6 +81,8 @@ def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): res = materialize([my_sling_assets], resources={"sling": sling_resource}) assert res.success + assert len(res.get_asset_materialization_events()) == 1 + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] assert counts == 3 diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py index d418c24ee911d..bcaa0085eed8f 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -28,5 +28,6 @@ def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource): def my_sling_op_yield_events_job(): my_sling_op_yield_events() - result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) - assert result.success + res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) + assert res.success + assert len(res.get_job_success_event()) == 1 From 23c4ebebb0537e46beb4c6bb85fd9878a9ac4b29 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 10:33:44 -0400 Subject: [PATCH 09/13] rm default sling translator --- .../integrations/embedded_elt/sling_dagster_translator.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py index e178c63988173..752e9422ff7c1 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -1,5 +1,4 @@ from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingResource, sling_assets, ) @@ -10,10 +9,7 @@ sling_resource = SlingResource(connections=[...]) # Add connections here -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): From 172e14b176528a43a023af2533cc46cbbf5f0b2c Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 10:34:32 -0400 Subject: [PATCH 10/13] rm default sling translator --- docs/content/integrations/embedded-elt.mdx | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 23aae82ffe03d..597bbf7fd4ef0 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -123,7 +123,6 @@ Each stream will render two assets, one for the source stream and one for the ta ```python file=/integrations/embedded_elt/sling_dagster_translator.py from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingResource, sling_assets, ) @@ -134,10 +133,7 @@ replication_config = file_relative_path(__file__, "../sling_replication.yaml") sling_resource = SlingResource(connections=[...]) # Add connections here -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): From f7962b5fb0efa85aa762d6b098009b839e01e0ea Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 13:52:30 -0400 Subject: [PATCH 11/13] use has_asset_def --- .../dagster_embedded_elt/sling/resources.py | 4 +++- .../dagster_embedded_elt_tests/test_op.py | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index ba4a5a8396b84..822f917b37069 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -426,9 +426,11 @@ def replicate( end_time = time.time() + has_asset_def: bool = bool(context and context.has_assets_def) + for stream in stream_definition: output_name = dagster_sling_translator.get_asset_key(stream) - if isinstance(context, AssetExecutionContext): + if has_asset_def: yield MaterializeResult( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py index bcaa0085eed8f..c4db1f9ef54bd 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -1,3 +1,5 @@ +import sqlite3 + from dagster import OpExecutionContext, job, op from dagster_embedded_elt.sling import SlingReplicationParam from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource @@ -6,6 +8,7 @@ def test_base_sling_config_op( csv_to_sqlite_replication_config: SlingReplicationParam, path_to_temp_sqlite_db: str, + sqlite_connection: sqlite3.Connection, ): sling_resource = SlingResource( connections=[ @@ -30,4 +33,6 @@ def my_sling_op_yield_events_job(): res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) assert res.success - assert len(res.get_job_success_event()) == 1 + + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] + assert counts == 3 From 1cceda31b81bef40277d84f5c953ab96162b6c5a Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 14:50:44 -0400 Subject: [PATCH 12/13] rm empty translator usage --- docs/content/integrations/embedded-elt.mdx | 11 ++--------- .../integrations/embedded_elt/postgres_snowflake.py | 6 +----- .../integrations/embedded_elt/s3_snowflake.py | 6 +----- .../sling_decorator/sling_decorator/__init__.py | 6 +----- 4 files changed, 5 insertions(+), 24 deletions(-) diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 597bbf7fd4ef0..7c0bbcf871314 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -162,7 +162,6 @@ This is an example of how to setup a Sling sync between two databases such as Po ```python file=/integrations/embedded_elt/postgres_snowflake.py from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingConnectionResource, SlingResource, sling_assets, @@ -208,10 +207,7 @@ replication_config = { } -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) ``` @@ -244,10 +240,7 @@ replication_config = { } -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) ``` diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index c74133c912c15..bf45f8e0204c4 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -2,7 +2,6 @@ # pyright: reportOptionalMemberAccess=none from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingConnectionResource, SlingResource, sling_assets, @@ -48,9 +47,6 @@ } -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index aefacb73c01ba..852b3017825b9 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -2,7 +2,6 @@ # pyright: reportOptionalMemberAccess=none from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingConnectionResource, SlingResource, sling_assets, @@ -45,10 +44,7 @@ } -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index ec9e1c62b7ddc..3eef6f7f687ef 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -1,6 +1,5 @@ from dagster import Definitions, file_relative_path from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, sling_assets, ) from dagster_embedded_elt.sling.resources import ( @@ -26,10 +25,7 @@ ) -@sling_assets( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), -) +@sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): From dd973aeafb02fabfb0df7cd7635ae6fe265c58d3 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 14:54:59 -0400 Subject: [PATCH 13/13] yield AssetMaterialization for ops; update unit test --- .../dagster_embedded_elt/sling/resources.py | 10 +++++++--- .../dagster_embedded_elt_tests/test_op.py | 2 ++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 822f917b37069..750631d32104e 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -13,11 +13,11 @@ import sling from dagster import ( AssetExecutionContext, + AssetMaterialization, ConfigurableResource, EnvVar, MaterializeResult, OpExecutionContext, - Output, PermissiveConfig, get_dagster_logger, ) @@ -369,7 +369,7 @@ def replicate( replication_config: Optional[SlingReplicationParam] = None, dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, - ) -> Generator[Union[MaterializeResult, Output], None, None]: + ) -> Generator[Union[MaterializeResult, AssetMaterialization], None, None]: """Runs a Sling replication from the given replication config. Args: @@ -379,7 +379,7 @@ def replicate( debug: Whether to run the replication in debug mode. Returns: - Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult + Generator[Union[MaterializeResult, AssetMaterialization], None, None]: A generator of MaterializeResult or AssetMaterialization """ # attempt to retrieve params from asset context if not passed as a parameter if not (replication_config or dagster_sling_translator): @@ -434,6 +434,10 @@ def replicate( yield MaterializeResult( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) + else: + yield AssetMaterialization( + asset_key=output_name, metadata={"elapsed_time": end_time - start_time} + ) def stream_raw_logs(self) -> Generator[str, None, None]: """Returns a generator of raw logs from the Sling CLI.""" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py index c4db1f9ef54bd..9157dbe6de97a 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -34,5 +34,7 @@ def my_sling_op_yield_events_job(): res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) assert res.success + assert len(res.get_asset_materialization_events()) == 1 + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] assert counts == 3