Skip to content

Commit

Permalink
yield AssetMaterialization for ops; update unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 22, 2024
1 parent 1cceda3 commit dd973ae
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import sling
from dagster import (
AssetExecutionContext,
AssetMaterialization,
ConfigurableResource,
EnvVar,
MaterializeResult,
OpExecutionContext,
Output,
PermissiveConfig,
get_dagster_logger,
)
Expand Down Expand Up @@ -369,7 +369,7 @@ def replicate(
replication_config: Optional[SlingReplicationParam] = None,
dagster_sling_translator: Optional[DagsterSlingTranslator] = None,
debug: bool = False,
) -> Generator[Union[MaterializeResult, Output], None, None]:
) -> Generator[Union[MaterializeResult, AssetMaterialization], None, None]:
"""Runs a Sling replication from the given replication config.
Args:
Expand All @@ -379,7 +379,7 @@ def replicate(
debug: Whether to run the replication in debug mode.
Returns:
Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult
Generator[Union[MaterializeResult, AssetMaterialization], None, None]: A generator of MaterializeResult or AssetMaterialization
"""
# attempt to retrieve params from asset context if not passed as a parameter
if not (replication_config or dagster_sling_translator):
Expand Down Expand Up @@ -434,6 +434,10 @@ def replicate(
yield MaterializeResult(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
)
else:
yield AssetMaterialization(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
)

def stream_raw_logs(self) -> Generator[str, None, None]:
"""Returns a generator of raw logs from the Sling CLI."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ def my_sling_op_yield_events_job():
res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource})
assert res.success

assert len(res.get_asset_materialization_events()) == 1

counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0]
assert counts == 3

0 comments on commit dd973ae

Please sign in to comment.