Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[embedded-elt][sling] passing translator and replication config from decorator using metadata #20564

Merged
merged 13 commits into from
Mar 22, 2024
Merged
31 changes: 15 additions & 16 deletions docs/content/integrations/embedded-elt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ Now you can define a Sling asset using the <PyObject module="dagster_embedded_el
Each stream will render two assets, one for the source stream and one for the target destination. You may override how assets are named by passing in a custom <PyObject module="dagster_embedded_elt.sling" object="DagsterSlingTranslator" /> object.

```python file=/integrations/embedded_elt/sling_dagster_translator.py
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
Expand All @@ -135,12 +134,12 @@ replication_config = file_relative_path(__file__, "../sling_replication.yaml")
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Expand Down Expand Up @@ -213,12 +212,12 @@ replication_config = {
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
```

## Example 2: File to Database
Expand Down Expand Up @@ -249,12 +248,12 @@ replication_config = {
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
```

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
}


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)


# end_storage_config
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from dagster_embedded_elt import sling
from dagster_embedded_elt.sling import (
DagsterSlingTranslator,
SlingResource,
Expand All @@ -11,12 +10,12 @@
sling_resource = SlingResource(connections=[...]) # Add connections here


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
)


@sling_assets(replication_config=replication_config)
@sling_assets(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(
replication_config=replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
)
yield from sling.replicate(context=context)
for row in sling.stream_raw_logs():
context.log.info(row)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

METADATA_KEY_TRANSLATOR = "dagster_sling/translator"
METADATA_KEY_REPLICATION_CONFIG = "dagster_sling/replication_config"
cmpadden marked this conversation as resolved.
Show resolved Hide resolved


def get_streams_from_replication(
replication_config: Mapping[str, Any],
Expand Down Expand Up @@ -71,10 +74,7 @@ def sling_assets(
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(
replication_config=config_path,
dagster_sling_translator=DagsterSlingTranslator(),
):
for lines in sling.replicate(context=context):
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
context.log.info(lines)
"""
replication_config = validate_replication(replication_config)
Expand All @@ -88,7 +88,11 @@ def my_assets(context, sling: SlingResource):
key=dagster_sling_translator.get_asset_key(stream),
deps=dagster_sling_translator.get_deps_asset_key(stream),
description=dagster_sling_translator.get_description(stream),
metadata=dagster_sling_translator.get_metadata(stream),
metadata={ # type: ignore
rexledesma marked this conversation as resolved.
Show resolved Hide resolved
**dagster_sling_translator.get_metadata(stream),
METADATA_KEY_TRANSLATOR: dagster_sling_translator,
METADATA_KEY_REPLICATION_CONFIG: replication_config,
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
},
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,28 @@
import uuid
from enum import Enum
from subprocess import PIPE, STDOUT, Popen
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union

import sling
from dagster import (
AssetExecutionContext,
ConfigurableResource,
EnvVar,
MaterializeResult,
OpExecutionContext,
PermissiveConfig,
get_dagster_logger,
)
from dagster._annotations import deprecated, experimental, public
from dagster._annotations import deprecated, deprecated_param, experimental, public
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication
from dagster_embedded_elt.sling.asset_decorator import (
METADATA_KEY_REPLICATION_CONFIG,
METADATA_KEY_TRANSLATOR,
get_streams_from_replication,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

Expand Down Expand Up @@ -356,23 +362,50 @@ def sync(
yield from self._exec_sling_cmd(cmd, encoding=encoding)

@public
@deprecated_param(
param="dagster_sling_translator",
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
@deprecated_param(
param="replication_config",
breaking_version="2.0",
additional_warn_text="Param is only required in `sling_assets` decorator.",
)
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
def replicate(
self,
*,
replication_config: SlingReplicationParam,
dagster_sling_translator: DagsterSlingTranslator,
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
context: Union[OpExecutionContext, AssetExecutionContext],
replication_config: Optional[SlingReplicationParam] = None,
dagster_sling_translator: Optional[DagsterSlingTranslator] = None,
debug: bool = False,
) -> Generator[MaterializeResult, None, None]:
"""Runs a Sling replication from the given replication config.

Args:
replication_config: The Sling replication config to use for the replication.
dagster_sling_translator: The translator to use for the replication.
context: Asset or Op execution context.
debug: Whether to run the replication in debug mode.
cmpadden marked this conversation as resolved.
Show resolved Hide resolved

Returns:
Optional[Generator[MaterializeResult, None, None]]: A generator of MaterializeResult
"""
# retrieve decorator params from context
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))

dagster_sling_translator = first_asset_metadata.get(METADATA_KEY_TRANSLATOR)
if dagster_sling_translator is None:
raise Exception(
f"`DagsterSlingTranslator` must be defined on metadata at {METADATA_KEY_TRANSLATOR}"
)

replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)
if replication_config is None:
raise Exception(
f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}"
cmpadden marked this conversation as resolved.
Show resolved Hide resolved
)

replication_config = validate_replication(replication_config)
stream_definition = get_streams_from_replication(replication_config)

Expand Down
Loading