From a94aab1268575a807e4a515d03944be9ab3a93a0 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Fri, 22 Mar 2024 10:30:24 -0400 Subject: [PATCH] assert materialize events --- .../dagster_embedded_elt/sling/asset_decorator.py | 3 +-- .../dagster_embedded_elt/sling/resources.py | 2 -- .../dagster_embedded_elt_tests/test_asset_decorator.py | 2 ++ .../dagster_embedded_elt_tests/test_op.py | 5 +++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py index 6e26eb8790b51..41d028d50140d 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/asset_decorator.py @@ -74,8 +74,7 @@ def sling_assets( config_path = "/path/to/replication.yaml" @sling_assets(replication_config=config_path) def my_assets(context, sling: SlingResource): - for lines in sling.replicate(context=context): - context.log.info(lines) + yield from sling.replicate(context=context) """ replication_config = validate_replication(replication_config) streams = get_streams_from_replication(replication_config) diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py index 91060e55db5a5..ba4a5a8396b84 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt/sling/resources.py @@ -432,8 +432,6 @@ def replicate( yield MaterializeResult( asset_key=output_name, metadata={"elapsed_time": end_time - start_time} ) - # elif isinstance(context, OpExecutionContext): - # yield Output(value=None, output_name=str(output_name), metadata={"elapsed_time": end_time - start_time}) def stream_raw_logs(self) -> Generator[str, None, None]: """Returns a generator of raw logs from the Sling CLI.""" diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py index 9c23da84b16eb..185c8107e17e5 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_asset_decorator.py @@ -81,6 +81,8 @@ def my_sling_assets(context: AssetExecutionContext, sling: SlingResource): res = materialize([my_sling_assets], resources={"sling": sling_resource}) assert res.success + assert len(res.get_asset_materialization_events()) == 1 + counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0] assert counts == 3 diff --git a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py index d418c24ee911d..bcaa0085eed8f 100644 --- a/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py +++ b/python_modules/libraries/dagster-embedded-elt/dagster_embedded_elt_tests/test_op.py @@ -28,5 +28,6 @@ def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource): def my_sling_op_yield_events_job(): my_sling_op_yield_events() - result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) - assert result.success + res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource}) + assert res.success + assert len(res.get_job_success_event()) == 1