diff --git a/docs/content/integrations/embedded-elt.mdx b/docs/content/integrations/embedded-elt.mdx index 0186e5afecbc6..7c0bbcf871314 100644 --- a/docs/content/integrations/embedded-elt.mdx +++ b/docs/content/integrations/embedded-elt.mdx @@ -122,9 +122,7 @@ Now you can define a Sling asset using the object. ```python file=/integrations/embedded_elt/sling_dagster_translator.py -from dagster_embedded_elt import sling from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingResource, sling_assets, ) @@ -137,10 +135,7 @@ sling_resource = SlingResource(connections=[...]) # Add connections here @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) @@ -167,7 +162,6 @@ This is an example of how to setup a Sling sync between two databases such as Po ```python file=/integrations/embedded_elt/postgres_snowflake.py from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingConnectionResource, SlingResource, sling_assets, @@ -215,10 +209,7 @@ replication_config = { @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) ``` ## Example 2: File to Database @@ -251,10 +242,7 @@ replication_config = { @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) ``` --- diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py index cca09ed35d820..bf45f8e0204c4 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/postgres_snowflake.py @@ -2,7 +2,6 @@ # pyright: reportOptionalMemberAccess=none from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingConnectionResource, SlingResource, sling_assets, @@ -50,7 +49,4 @@ @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py index c0f4c42463aae..852b3017825b9 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/s3_snowflake.py @@ -2,7 +2,6 @@ # pyright: reportOptionalMemberAccess=none from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingConnectionResource, SlingResource, sling_assets, @@ -47,10 +46,7 @@ @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) # end_storage_config diff --git a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py index 09226414221d5..752e9422ff7c1 100644 --- a/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py +++ b/examples/docs_snippets/docs_snippets/integrations/embedded_elt/sling_dagster_translator.py @@ -1,6 +1,4 @@ -from dagster_embedded_elt import sling from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, SlingResource, sling_assets, ) @@ -13,10 +11,7 @@ @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/examples/experimental/sling_decorator/sling_decorator/__init__.py b/examples/experimental/sling_decorator/sling_decorator/__init__.py index bc76ff44408ca..3eef6f7f687ef 100644 --- a/examples/experimental/sling_decorator/sling_decorator/__init__.py +++ b/examples/experimental/sling_decorator/sling_decorator/__init__.py @@ -1,6 +1,5 @@ from dagster import Definitions, file_relative_path from dagster_embedded_elt.sling import ( - DagsterSlingTranslator, sling_assets, ) from dagster_embedded_elt.sling.resources import ( @@ -28,10 +27,7 @@ @sling_assets(replication_config=replication_config) def my_assets(context, sling: SlingResource): - yield from sling.replicate( - replication_config=replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ) + yield from sling.replicate(context=context) for row in sling.stream_raw_logs(): context.log.info(row) diff --git a/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx b/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx index f6d92a73c777c..0c9dfc40bc0fb 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/metadata/MetadataEntry.tsx @@ -39,6 +39,8 @@ export const HIDDEN_METADATA_ENTRY_LABELS = new Set([ 'dagster-dbt/exclude', 'dagster_dbt/manifest', 'dagster_dbt/dagster_dbt_translator', + 'dagster_embedded_elt/dagster_sling_translator', + 'dagster_embedded_elt/sling_replication_config', ]); export const LogRowStructuredContentTable = ({ diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 0923671a5b600..41d028d50140d 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -12,6 +12,9 @@ from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication +METADATA_KEY_TRANSLATOR = "dagster_embedded_elt/dagster_sling_translator" +METADATA_KEY_REPLICATION_CONFIG = "dagster_embedded_elt/sling_replication_config" + def get_streams_from_replication( replication_config: Mapping[str, Any], @@ -71,11 +74,7 @@ def sling_assets( config_path = "/path/to/replication.yaml" @sling_assets(replication_config=config_path) def my_assets(context, sling: SlingResource): - for lines in sling.replicate( - replication_config=config_path, - dagster_sling_translator=DagsterSlingTranslator(), - ): - context.log.info(lines) + yield from sling.replicate(context=context) """ replication_config = validate_replication(replication_config) streams = get_streams_from_replication(replication_config) @@ -88,7 +87,11 @@ def my_assets(context, sling: SlingResource): key=dagster_sling_translator.get_asset_key(stream), deps=dagster_sling_translator.get_deps_asset_key(stream), description=dagster_sling_translator.get_description(stream), - metadata=dagster_sling_translator.get_metadata(stream), + metadata={ # type: ignore + **dagster_sling_translator.get_metadata(stream), + METADATA_KEY_TRANSLATOR: dagster_sling_translator, + METADATA_KEY_REPLICATION_CONFIG: replication_config, + }, group_name=dagster_sling_translator.get_group_name(stream), freshness_policy=dagster_sling_translator.get_freshness_policy(stream), auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy( diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index bcc8491b86c75..750631d32104e 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -8,13 +8,16 @@ import uuid from enum import Enum from subprocess import PIPE, STDOUT, Popen -from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional +from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union import sling from dagster import ( + AssetExecutionContext, + AssetMaterialization, ConfigurableResource, EnvVar, MaterializeResult, + OpExecutionContext, PermissiveConfig, get_dagster_logger, ) @@ -23,7 +26,11 @@ from dagster._utils.warnings import deprecation_warning from pydantic import Field -from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication +from dagster_embedded_elt.sling.asset_decorator import ( + METADATA_KEY_REPLICATION_CONFIG, + METADATA_KEY_TRANSLATOR, + get_streams_from_replication, +) from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication @@ -355,24 +362,35 @@ def sync( yield from self._exec_sling_cmd(cmd, encoding=encoding) - @public def replicate( self, *, - replication_config: SlingReplicationParam, - dagster_sling_translator: DagsterSlingTranslator, + context: Union[OpExecutionContext, AssetExecutionContext], + replication_config: Optional[SlingReplicationParam] = None, + dagster_sling_translator: Optional[DagsterSlingTranslator] = None, debug: bool = False, - ) -> Generator[MaterializeResult, None, None]: + ) -> Generator[Union[MaterializeResult, AssetMaterialization], None, None]: """Runs a Sling replication from the given replication config. Args: + context: Asset or Op execution context. replication_config: The Sling replication config to use for the replication. dagster_sling_translator: The translator to use for the replication. debug: Whether to run the replication in debug mode. Returns: - Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult + Generator[Union[MaterializeResult, AssetMaterialization], None, None]: A generator of MaterializeResult or AssetMaterialization """ + # attempt to retrieve params from asset context if not passed as a parameter + if not (replication_config or dagster_sling_translator): + metadata_by_key = context.assets_def.metadata_by_key + first_asset_metadata = next(iter(metadata_by_key.values())) + dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR) + replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG) + + # if translator has not been defined on metadata _or_ through param, then use the default constructor + dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator() + replication_config = validate_replication(replication_config) stream_definition = get_streams_from_replication(replication_config) @@ -408,11 +426,18 @@ def replicate( end_time = time.time() + has_asset_def: bool = bool(context and context.has_assets_def) + for stream in stream_definition: output_name = dagster_sling_translator.get_asset_key(stream) - yield MaterializeResult( - asset_key=output_name, metadata={"elapsed_time": end_time - start_time} - ) + if has_asset_def: + yield MaterializeResult( + asset_key=output_name, metadata={"elapsed_time": end_time - start_time} + ) + else: + yield AssetMaterialization( + asset_key=output_name, metadata={"elapsed_time": end_time - start_time} + ) def stream_raw_logs(self) -> Generator[str, None, None]: """Returns a generator of raw logs from the Sling CLI.""" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py index b7ee369687fc2..78850ac40978b 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/sling_replication.py @@ -1,6 +1,6 @@ from functools import lru_cache from pathlib import Path -from typing import Any, Mapping, Union, cast +from typing import Any, Mapping, Optional, Union, cast import dagster._check as check import yaml @@ -18,7 +18,8 @@ def read_replication_path(replication_path: Path) -> Mapping[str, Any]: return cast(Mapping[str, Any], yaml.safe_load(replication_path.read_bytes())) -def validate_replication(replication: SlingReplicationParam) -> Mapping[str, Any]: +def validate_replication(replication: Optional[SlingReplicationParam]) -> Mapping[str, Any]: + replication = replication or {} check.inst_param(replication, "manifest", (Path, str, dict)) if isinstance(replication, str): diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 6c1e9d81cc7d3..185c8107e17e5 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -1,8 +1,10 @@ -import logging import sqlite3 +from pathlib import Path import pytest +import yaml from dagster import ( + AssetExecutionContext, AssetKey, FreshnessPolicy, JsonMetadataValue, @@ -59,17 +61,12 @@ def my_sling_assets(): ... def test_runs_base_sling_config( csv_to_sqlite_replication_config: SlingReplicationParam, - path_to_test_csv: str, path_to_temp_sqlite_db: str, sqlite_connection: sqlite3.Connection, ): @sling_assets(replication_config=csv_to_sqlite_replication_config) - def my_sling_assets(sling: SlingResource): - for row in sling.replicate( - replication_config=csv_to_sqlite_replication_config, - dagster_sling_translator=DagsterSlingTranslator(), - ): - logging.info(row) + def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): + yield from sling.replicate(context=context) sling_resource = SlingResource( connections=[ @@ -82,7 +79,10 @@ def my_sling_assets(sling: SlingResource): ] ) res = materialize([my_sling_assets], resources={"sling": sling_resource}) + assert res.success + assert len(res.get_asset_materialization_events()) == 1 + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] assert counts == 3 @@ -105,11 +105,12 @@ def my_third_sling_assets(): ... def test_base_with_meta_config_translator(): - @sling_assets( - replication_config=file_relative_path( - __file__, "replication_configs/base_with_meta_config/replication.yaml" - ) + replication_config_path = file_relative_path( + __file__, "replication_configs/base_with_meta_config/replication.yaml" ) + replication_config = yaml.safe_load(Path(replication_config_path).read_bytes()) + + @sling_assets(replication_config=replication_config_path) def my_sling_assets(): ... assert my_sling_assets.keys == { @@ -134,7 +135,13 @@ def my_sling_assets(): ... } assert my_sling_assets.metadata_by_key == { - AssetKey(["target", "public", "accounts"]): {"stream_config": JsonMetadataValue(data=None)}, + AssetKey(["target", "public", "accounts"]): { + "stream_config": JsonMetadataValue(data=None), + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, + }, AssetKey(["target", "departments"]): { "stream_config": JsonMetadataValue( data={ @@ -152,7 +159,11 @@ def my_sling_assets(): ... } }, } - ) + ), + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, AssetKey(["target", "public", "transactions"]): { "stream_config": JsonMetadataValue( @@ -167,7 +178,11 @@ def my_sling_assets(): ... } }, } - ) + ), + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, AssetKey(["target", "public", "all_users"]): { "stream_config": JsonMetadataValue( @@ -175,7 +190,11 @@ def my_sling_assets(): ... "sql": 'select all_user_id, name \nfrom public."all_Users"\n', "object": "public.all_users", } - ) + ), + "dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator( + target_prefix="target" + ), + "dagster_embedded_elt/sling_replication_config": replication_config, }, } diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py new file mode 100644 index 0000000000000..9157dbe6de97a --- /dev/null +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -0,0 +1,40 @@ +import sqlite3 + +from dagster import OpExecutionContext, job, op +from dagster_embedded_elt.sling import SlingReplicationParam +from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource + + +def test_base_sling_config_op( + csv_to_sqlite_replication_config: SlingReplicationParam, + path_to_temp_sqlite_db: str, + sqlite_connection: sqlite3.Connection, +): + sling_resource = SlingResource( + connections=[ + SlingConnectionResource(type="file", name="SLING_FILE"), + SlingConnectionResource( + type="sqlite", + name="SLING_SQLITE", + connection_string=f"sqlite://{path_to_temp_sqlite_db}", + ), + ] + ) + + @op(out={}) + def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource): + yield from sling.replicate( + context=context, replication_config=csv_to_sqlite_replication_config + ) + + @job + def my_sling_op_yield_events_job(): + my_sling_op_yield_events() + + res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) + assert res.success + + assert len(res.get_asset_materialization_events()) == 1 + + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] + assert counts == 3