Skip to content

Commit

Permalink
[embedded-elt][sling] example of passing translator from decorator us…
Browse files Browse the repository at this point in the history
…ing metadata
  • Loading branch information
cmpadden committed Mar 19, 2024
1 parent 84e712f commit 48b4026
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

METADATA_KEY_TRANSLATOR = "dagster_sling/translator"


def get_streams_from_replication(
replication_config: Mapping[str, Any],
Expand Down Expand Up @@ -73,7 +75,7 @@ def sling_assets(
def my_assets(context, sling: SlingResource):
for lines in sling.replicate(
replication_config=config_path,
dagster_sling_translator=DagsterSlingTranslator(),
context=context,
):
context.log.info(lines)
"""
Expand All @@ -88,7 +90,10 @@ def my_assets(context, sling: SlingResource):
key=dagster_sling_translator.get_asset_key(stream),
deps=dagster_sling_translator.get_deps_asset_key(stream),
description=dagster_sling_translator.get_description(stream),
metadata=dagster_sling_translator.get_metadata(stream),
metadata={ # type: ignore
**dagster_sling_translator.get_metadata(stream),
METADATA_KEY_TRANSLATOR: dagster_sling_translator,
},
group_name=dagster_sling_translator.get_group_name(stream),
freshness_policy=dagster_sling_translator.get_freshness_policy(stream),
auto_materialize_policy=dagster_sling_translator.get_auto_materialize_policy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import uuid
from enum import Enum
from subprocess import PIPE, STDOUT, Popen
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional
from typing import IO, Any, AnyStr, Dict, Generator, Iterator, List, Optional, Union

import sling
from dagster import (
AssetExecutionContext,
ConfigurableResource,
EnvVar,
MaterializeResult,
OpExecutionContext,
PermissiveConfig,
get_dagster_logger,
)
Expand All @@ -23,7 +25,10 @@
from dagster._utils.warnings import deprecation_warning
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication
from dagster_embedded_elt.sling.asset_decorator import (
METADATA_KEY_TRANSLATOR,
get_streams_from_replication,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

Expand Down Expand Up @@ -360,14 +365,14 @@ def replicate(
self,
*,
replication_config: SlingReplicationParam,
dagster_sling_translator: DagsterSlingTranslator,
context: Union[OpExecutionContext, AssetExecutionContext],
debug: bool = False,
) -> Generator[MaterializeResult, None, None]:
"""Runs a Sling replication from the given replication config.
Args:
replication_config: The Sling replication config to use for the replication.
dagster_sling_translator: The translator to use for the replication.
context: Asset or Op execution context.
debug: Whether to run the replication in debug mode.
Returns:
Expand All @@ -376,6 +381,13 @@ def replicate(
replication_config = validate_replication(replication_config)
stream_definition = get_streams_from_replication(replication_config)

# retrieve translator from context
metadata_by_key = context.assets_def.metadata_by_key
first_asset_metadata = next(iter(metadata_by_key.values()))
dagster_sling_translator = first_asset_metadata.get(
METADATA_KEY_TRANSLATOR, DagsterSlingTranslator()
)

with self._setup_config():
uid = uuid.uuid4()
temp_dir = tempfile.gettempdir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
from dagster import (
AssetExecutionContext,
AssetKey,
FreshnessPolicy,
JsonMetadataValue,
Expand Down Expand Up @@ -64,10 +65,9 @@ def test_runs_base_sling_config(
sqlite_connection: sqlite3.Connection,
):
@sling_assets(replication_config=csv_to_sqlite_replication_config)
def my_sling_assets(sling: SlingResource):
def my_sling_assets(context: AssetExecutionContext, sling: SlingResource):
for row in sling.replicate(
replication_config=csv_to_sqlite_replication_config,
dagster_sling_translator=DagsterSlingTranslator(),
replication_config=csv_to_sqlite_replication_config, context=context
):
logging.info(row)

Expand Down Expand Up @@ -105,11 +105,11 @@ def my_third_sling_assets(): ...


def test_base_with_meta_config_translator():
@sling_assets(
replication_config=file_relative_path(
__file__, "replication_configs/base_with_meta_config/replication.yaml"
)
replication_config = file_relative_path(
__file__, "replication_configs/base_with_meta_config/replication.yaml"
)

@sling_assets(replication_config=replication_config)
def my_sling_assets(): ...

assert my_sling_assets.keys == {
Expand All @@ -134,7 +134,10 @@ def my_sling_assets(): ...
}

assert my_sling_assets.metadata_by_key == {
AssetKey(["target", "public", "accounts"]): {"stream_config": JsonMetadataValue(data=None)},
AssetKey(["target", "public", "accounts"]): {
"stream_config": JsonMetadataValue(data=None),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
},
AssetKey(["target", "departments"]): {
"stream_config": JsonMetadataValue(
data={
Expand All @@ -152,7 +155,8 @@ def my_sling_assets(): ...
}
},
}
)
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
},
AssetKey(["target", "public", "transactions"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -167,15 +171,17 @@ def my_sling_assets(): ...
}
},
}
)
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
},
AssetKey(["target", "public", "all_users"]): {
"stream_config": JsonMetadataValue(
data={
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
}
)
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
},
}

Expand Down

0 comments on commit 48b4026

Please sign in to comment.