Skip to content

Commit

Permalink
rm deprecation; add unit test for op usage
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 21, 2024
1 parent cb8f22c commit a286b9f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
PermissiveConfig,
get_dagster_logger,
)
from dagster._annotations import deprecated, deprecated_param, experimental, public
from dagster._annotations import deprecated, experimental, public
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field
Expand Down Expand Up @@ -361,17 +361,6 @@ def sync(

yield from self._exec_sling_cmd(cmd, encoding=encoding)

@public
@deprecated_param(
param="dagster_sling_translator",
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
@deprecated_param(
param="replication_config",
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
def replicate(
self,
*,
Expand All @@ -390,17 +379,17 @@ def replicate(
Returns:
Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult
"""
# retrieve decorator params from context
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
# attempt to retrieve params from asset context if not passed as a parameter
if not (replication_config or dagster_sling_translator):
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR)
replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)

dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR)
if dagster_sling_translator is None:
raise Exception(
f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}"
)
# if translator has not been defined on metadata _or_ through param, then use the default constructor
if not dagster_sling_translator:
dagster_sling_translator = DagsterSlingTranslator()

replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)
if replication_config is None:
raise Exception(
f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def my_sling_assets(): ...

def test_runs_base_sling_config(
csv_to_sqlite_replication_config: SlingReplicationParam,
path_to_test_csv: str,
path_to_temp_sqlite_db: str,
sqlite_connection: sqlite3.Connection,
):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

from dagster import OpExecutionContext, job, op
from dagster_embedded_elt.sling import SlingReplicationParam
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource


def test_base_sling_config_op(
csv_to_sqlite_replication_config: SlingReplicationParam,
path_to_temp_sqlite_db: str,
):
sling_resource = SlingResource(
connections=[
SlingConnectionResource(type="file", name="SLING_FILE"),
SlingConnectionResource(
type="sqlite",
name="SLING_SQLITE",
connection_string=f"sqlite://{path_to_temp_sqlite_db}",
),
]
)

@op(out={})
def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource):
for row in sling.replicate(
context=context, replication_config=csv_to_sqlite_replication_config
):
logging.info(row)

@job
def my_sling_op_yield_events_job():
my_sling_op_yield_events()

result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource})
assert result.success

0 comments on commit a286b9f

Please sign in to comment.