Skip to content

Commit

Permalink
use has_asset_def
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 22, 2024
1 parent 172e14b commit f7962b5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,11 @@ def replicate(

end_time = time.time()

has_asset_def: bool = bool(context and context.has_assets_def)

for stream in stream_definition:
output_name = dagster_sling_translator.get_asset_key(stream)
if isinstance(context, AssetExecutionContext):
if has_asset_def:
yield MaterializeResult(
asset_key=output_name, metadata={"elapsed_time": end_time - start_time}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sqlite3

from dagster import OpExecutionContext, job, op
from dagster_embedded_elt.sling import SlingReplicationParam
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource
Expand All @@ -6,6 +8,7 @@
def test_base_sling_config_op(
csv_to_sqlite_replication_config: SlingReplicationParam,
path_to_temp_sqlite_db: str,
sqlite_connection: sqlite3.Connection,
):
sling_resource = SlingResource(
connections=[
Expand All @@ -30,4 +33,6 @@ def my_sling_op_yield_events_job():

res = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource})
assert res.success
assert len(res.get_job_success_event()) == 1

counts = sqlite_connection.execute("SELECT count(1) FROM main.tbl").fetchone()[0]
assert counts == 3

0 comments on commit f7962b5

Please sign in to comment.