Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[embedded-elt][sling] passing translator and replication config from decorator using metadata #20564

Merged
merged 13 commits into from
Mar 22, 2024
Merged
31 changes: 15 additions & 16 deletions docs/content/integrations/embedded-elt.mdx
Original file line number Diff line number Diff line change
@@ -122,7 +122,6 @@ Now you can define a Sling asset using the <PyObject module="dagster_embedded_el
Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> object.

```python file=/integrations/embedded_elt/sling_dagster_translator.py
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
@@ -135,12 +134,12 @@ replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

@@ -213,12 +212,12 @@ replication_config = {
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
```

## Example 2: File to Database
@@ -249,12 +248,12 @@ replication_config = {
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
```

---
Original file line number Diff line number Diff line change
@@ -48,9 +48,9 @@
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
Original file line number Diff line number Diff line change
@@ -45,12 +45,12 @@
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)


# end_storage_config
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
@@ -11,12 +10,12 @@
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Original file line number Diff line number Diff line change
@@ -26,12 +26,12 @@
)


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Original file line number Diff line number Diff line change
@@ -39,6 +39,8 @@ export const HIDDEN_METADATA_ENTRY_LABELS = new Set([
'dagster-dbt/exclude',
'dagster_dbt/manifest',
'dagster_dbt/dagster_dbt_translator',
'dagster_embedded_elt/dagster_sling_translator',
'dagster_embedded_elt/sling_replication_config',
]);

export const LogRowStructuredContentTable = ({
Original file line number Diff line number Diff line change
@@ -12,6 +12,9 @@
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

METADATA_KEY_TRANSLATOR = "dagster_embedded_elt/dagster_sling_translator"
METADATA_KEY_REPLICATION_CONFIG = "dagster_embedded_elt/sling_replication_config"


def get_streams_from_replication(
replication_config: Mapping[str, Any],
@@ -71,10 +74,7 @@ def sling_assets(
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(
replication_config=config_path,
dagster_sling_translator=DagsterSlingTranslator(),
):
for lines in sling.replicate(context=context):
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
context.log.info(lines)
"""
replication_config = validate_replication(replication_config)
@@ -88,7 +88,11 @@ def my_assets(context, sling: SlingResource):
key=dagster_sling_translator.get_asset_key(stream),
deps=dagster_sling_translator.get_deps_asset_key(stream),
description=dagster_sling_translator.get_description(stream),
metadata=dagster_sling_translator.get_metadata(stream),
metadata={ # type: ignore
rexledesma marked this conversation as resolved.
Show resolved Hide resolved
**dagster_sling_translator.get_metadata(stream),
METADATA_KEY_TRANSLATOR: dagster_sling_translator,
METADATA_KEY_REPLICATION_CONFIG: replication_config,
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
},
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy(
Original file line number Diff line number Diff line change
@@ -8,13 +8,15 @@
import uuid
from enum import Enum
from subprocess import PIPE, STDOUT, Popen
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union

import sling
from dagster import (
AssetExecutionContext,
ConfigurableResource,
EnvVar,
MaterializeResult,
OpExecutionContext,
PermissiveConfig,
get_dagster_logger,
)
@@ -23,7 +25,11 @@
from dagster._utils.warnings import deprecation_warning
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication
from dagster_embedded_elt.sling.asset_decorator import (
METADATA_KEY_REPLICATION_CONFIG,
METADATA_KEY_TRANSLATOR,
get_streams_from_replication,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

@@ -355,25 +361,35 @@ def sync(

yield from self._exec_sling_cmd(cmd, encoding=encoding)

@public
def replicate(
self,
*,
replication_config: SlingReplicationParam,
dagster_sling_translator: DagsterSlingTranslator,
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
context: Union[OpExecutionContext, AssetExecutionContext],
replication_config: Optional[SlingReplicationParam] = None,
dagster_sling_translator: Optional[DagsterSlingTranslator] = None,
debug: bool = False,
) -> Generator[MaterializeResult, None, None]:
"""Runs a Sling replication from the given replication config.

Args:
replication_config: The Sling replication config to use for the replication.
dagster_sling_translator: The translator to use for the replication.
context: Asset or Op execution context.
debug: Whether to run the replication in debug mode.
cmpadden marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult
"""
replication_config = validate_replication(replication_config)
# attempt to retrieve params from asset context if not passed as a parameter
if not (replication_config or dagster_sling_translator):
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR)
replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)

# if translator has not been defined on metadata _or_ through param, then use the default constructor
dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator()

replication_config = validate_replication(replication_config or {})
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
stream_definition = get_streams_from_replication(replication_config)

with self._setup_config():
Original file line number Diff line number Diff line change
@@ -2,7 +2,9 @@
import sqlite3

import pytest
import yaml
from dagster import (
AssetExecutionContext,
AssetKey,
FreshnessPolicy,
JsonMetadataValue,
@@ -15,6 +17,7 @@
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource
from path import Path


@pytest.mark.parametrize(
@@ -59,16 +62,12 @@ def my_sling_assets(): ...

def test_runs_base_sling_config(
csv_to_sqlite_replication_config: SlingReplicationParam,
path_to_test_csv: str,
path_to_temp_sqlite_db: str,
sqlite_connection: sqlite3.Connection,
):
@sling_assets(replication_config=csv_to_sqlite_replication_config)
def my_sling_assets(sling: SlingResource):
for row in sling.replicate(
replication_config=csv_to_sqlite_replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
):
def my_sling_assets(context: AssetExecutionContext, sling: SlingResource):
for row in sling.replicate(context=context):
logging.info(row)
cmpadden marked this conversation as resolved.
Show resolved Hide resolved

sling_resource = SlingResource(
@@ -105,11 +104,12 @@ def my_third_sling_assets(): ...


def test_base_with_meta_config_translator():
@sling_assets(
replication_config=file_relative_path(
__file__, "replication_configs/base_with_meta_config/replication.yaml"
)
replication_config_path = file_relative_path(
__file__, "replication_configs/base_with_meta_config/replication.yaml"
)
replication_config = yaml.safe_load(Path(replication_config_path).read_bytes())

@sling_assets(replication_config=replication_config_path)
def my_sling_assets(): ...

assert my_sling_assets.keys == {
@@ -134,7 +134,13 @@ def my_sling_assets(): ...
}

assert my_sling_assets.metadata_by_key == {
AssetKey(["target", "public", "accounts"]): {"stream_config": JsonMetadataValue(data=None)},
AssetKey(["target", "public", "accounts"]): {
"stream_config": JsonMetadataValue(data=None),
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
AssetKey(["target", "departments"]): {
"stream_config": JsonMetadataValue(
data={
@@ -152,7 +158,11 @@ def my_sling_assets(): ...
}
},
}
)
),
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
AssetKey(["target", "public", "transactions"]): {
"stream_config": JsonMetadataValue(
@@ -167,15 +177,23 @@ def my_sling_assets(): ...
}
},
}
)
),
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
AssetKey(["target", "public", "all_users"]): {
"stream_config": JsonMetadataValue(
data={
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
}
)
),
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

from dagster import OpExecutionContext, job, op
from dagster_embedded_elt.sling import SlingReplicationParam
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource


def test_base_sling_config_op(
csv_to_sqlite_replication_config: SlingReplicationParam,
path_to_temp_sqlite_db: str,
):
sling_resource = SlingResource(
connections=[
SlingConnectionResource(type="file", name="SLING_FILE"),
SlingConnectionResource(
type="sqlite",
name="SLING_SQLITE",
connection_string=f"sqlite://{path_to_temp_sqlite_db}",
),
]
)

@op(out={})
def my_sling_op_yield_events(context: OpExecutionContext, sling: SlingResource):
for row in sling.replicate(
context=context, replication_config=csv_to_sqlite_replication_config
):
logging.info(row)
cmpadden marked this conversation as resolved.
Show resolved Hide resolved

@job
def my_sling_op_yield_events_job():
my_sling_op_yield_events()

result = my_sling_op_yield_events_job.execute_in_process(resources={"sling": sling_resource})
assert result.success