Skip to content

Commit

Permalink
assert materialize events
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 22, 2024
1 parent 93ae244 commit a94aab1
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def sling_assets(
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(context=context):
context.log.info(lines)
yield from sling.replicate(context=context)
"""
replication_config = validate_replication(replication_config)
streams = get_streams_from_replication(replication_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,6 @@ def replicate(
yield MaterializeResult(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
)
# elif isinstance(context, OpExecutionContext):
# yield Output(value=None, output_name=str(output_name), metadata={"elapsed_time": end_time - start_time})

def stream_raw_logs(self) -> Generator[str, None, None]:
"""Returns a generator of raw logs from the Sling CLI."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def my_sling_assets(context: AssetExecutionContext, sling: SlingResource):
res = materialize([my_sling_assets], resources={"sling": sling_resource})

assert res.success
assert len(res.get_asset_materialization_events()) == 1

counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0]
assert counts == 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource):
def my_sling_op_yield_events_job():
my_sling_op_yield_events()

result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource})
assert result.success
res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource})
assert res.success
assert len(res.get_job_success_event()) == 1

0 comments on commit a94aab1

Please sign in to comment.