diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 0923671a5b600..6839c0ee247a2 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -12,6 +12,8 @@ from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication +METADATA_KEY_TRANSLATOR = "dagster_sling/translator" + def get_streams_from_replication( replication_config: Mapping[str, Any], @@ -73,7 +75,7 @@ def sling_assets( def my_assets(context, sling: SlingResource): for lines in sling.replicate( replication_config=config_path, - dagster_sling_translator=DagsterSlingTranslator(), + context=context, ): context.log.info(lines) """ @@ -88,7 +90,10 @@ def my_assets(context, sling: SlingResource): key=dagster_sling_translator.get_asset_key(stream), deps=dagster_sling_translator.get_deps_asset_key(stream), description=dagster_sling_translator.get_description(stream), - metadata=dagster_sling_translator.get_metadata(stream), + metadata={ # type: ignore + **dagster_sling_translator.get_metadata(stream), + METADATA_KEY_TRANSLATOR: dagster_sling_translator, + }, group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index bcc8491b86c75..2fffbe71214f9 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -8,13 +8,15 @@ import uuid from enum import Enum from subprocess import PIPE, STDOUT, Popen -from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional +from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union import sling from dagster import ( + AssetExecutionContext, ConfigurableResource, EnvVar, MaterializeResult, + OpExecutionContext, PermissiveConfig, get_dagster_logger, ) @@ -23,7 +25,10 @@ from dagster._utils.warnings import deprecation_warning from pydantic import Field -from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication +from dagster_embedded_elt.sling.asset_decorator import ( + METADATA_KEY_TRANSLATOR, + get_streams_from_replication, +) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication @@ -360,14 +365,14 @@ def replicate( self, *, replication_config: SlingReplicationParam, - dagster_sling_translator: DagsterSlingTranslator, + context: Union[OpExecutionContext, AssetExecutionContext], debug: bool = False, ) -> Generator[MaterializeResult, None, None]: """Runs a Sling replication from the given replication config. Args: replication_config: The Sling replication config to use for the replication. - dagster_sling_translator: The translator to use for the replication. + context: Asset or Op execution context. debug: Whether to run the replication in debug mode. Returns: @@ -376,6 +381,13 @@ def replicate( replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config) + # retrieve translator from context + metadata_by_key = context.assets_def.metadata_by_key + first_asset_metadata = next(iter(metadata_by_key.values())) + dagster_sling_translator = first_asset_metadata.get( + METADATA_KEY_TRANSLATOR, DagsterSlingTranslator() + ) + with self._setup_config(): uid = uuid.uuid4() temp_dir = tempfile.gettempdir() diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index e1716308f45fa..83204c76ea3c8 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -3,6 +3,7 @@ import pytest from dagster import ( + AssetExecutionContext, AssetKey, FreshnessPolicy, JsonMetadataValue, @@ -64,10 +65,9 @@ def test_runs_base_sling_config( sqlite_connection: sqlite3.Connection, ): @sling_assets(replication_config=csv_to_sqlite_replication_config) - def my_sling_assets(sling: SlingResource): + def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): for row in sling.replicate( - replication_config=csv_to_sqlite_replication_config, - dagster_sling_translator=DagsterSlingTranslator(), + replication_config=csv_to_sqlite_replication_config, context=context ): logging.info(row) @@ -105,11 +105,11 @@ def my_third_sling_assets(): ... def test_base_with_meta_config_translator(): - @sling_assets( - replication_config=file_relative_path( - __file__, "replication_configs/base_with_meta_config/replication.yaml" - ) + replication_config = file_relative_path( + __file__, "replication_configs/base_with_meta_config/replication.yaml" ) + + @sling_assets(replication_config=replication_config) def my_sling_assets(): ... assert my_sling_assets.keys == { @@ -134,7 +134,10 @@ def my_sling_assets(): ... } assert my_sling_assets.metadata_by_key == { - AssetKey(["target", "public", "accounts"]): {"stream_config": JsonMetadataValue(data=None)}, + AssetKey(["target", "public", "accounts"]): { + "stream_config": JsonMetadataValue(data=None), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), + }, AssetKey(["target", "departments"]): { "stream_config": JsonMetadataValue( data={ @@ -152,7 +155,8 @@ def my_sling_assets(): ... } }, } - ) + ), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), }, AssetKey(["target", "public", "transactions"]): { "stream_config": JsonMetadataValue( @@ -167,7 +171,8 @@ def my_sling_assets(): ... } }, } - ) + ), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), }, AssetKey(["target", "public", "all_users"]): { "stream_config": JsonMetadataValue( @@ -175,7 +180,8 @@ def my_sling_assets(): ... "sql": 'select all_user_id, name \nfrom public."all_Users"\n', "object": "public.all_users", } - ) + ), + "dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"), }, }