diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index ba4a5a8396b84..822f917b37069 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -426,9 +426,11 @@ def replicate( end_time = time.time() + has_asset_def: bool = bool(context and context.has_assets_def) + for stream in stream_definition: output_name = dagster_sling_translator.get_asset_key(stream) - if isinstance(context, AssetExecutionContext): + if has_asset_def: yield MaterializeResult( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py index bcaa0085eed8f..c4db1f9ef54bd 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -1,3 +1,5 @@ +import sqlite3 + from dagster import OpExecutionContext, job, op from dagster_embedded_elt.sling import SlingReplicationParam from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource @@ -6,6 +8,7 @@ def test_base_sling_config_op( csv_to_sqlite_replication_config: SlingReplicationParam, path_to_temp_sqlite_db: str, + sqlite_connection: sqlite3.Connection, ): sling_resource = SlingResource( connections=[ @@ -30,4 +33,6 @@ def my_sling_op_yield_events_job(): res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) assert res.success - assert len(res.get_job_success_event()) == 1 + + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] + assert counts == 3