Skip to content

Commit

Permalink
implement pr review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
cmpadden committed Mar 21, 2024
1 parent a286b9f commit 28e303f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.sling_replication import SlingReplicationParam, validate_replication

METADATA_KEY_TRANSLATOR = "dagster_sling/translator"
METADATA_KEY_REPLICATION_CONFIG = "dagster_sling/replication_config"
METADATA_KEY_TRANSLATOR = "dagster_embedded_elt/dagster_sling_translator"
METADATA_KEY_REPLICATION_CONFIG = "dagster_embedded_elt/sling_replication_config"


def get_streams_from_replication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,9 @@ def replicate(
replication_config = first_asset_metadata.get(METADATA_KEY_REPLICATION_CONFIG)

# if translator has not been defined on metadata _or_ through param, then use the default constructor
if not dagster_sling_translator:
dagster_sling_translator = DagsterSlingTranslator()
dagster_sling_translator = dagster_sling_translator or DagsterSlingTranslator()

if replication_config is None:
raise Exception(
f"`ReplicationConfig` must be defined on metadata at {METADATA_KEY_REPLICATION_CONFIG}"
)

replication_config = validate_replication(replication_config)
replication_config = validate_replication(replication_config or {})
stream_definition = get_streams_from_replication(replication_config)

with self._setup_config():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sqlite3

import pytest
import yaml
from dagster import (
AssetExecutionContext,
AssetKey,
Expand All @@ -16,6 +17,7 @@
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource
from path import Path


@pytest.mark.parametrize(
Expand Down Expand Up @@ -102,11 +104,12 @@ def my_third_sling_assets(): ...


def test_base_with_meta_config_translator():
replication_config = file_relative_path(
replication_config_path = file_relative_path(
__file__, "replication_configs/base_with_meta_config/replication.yaml"
)
replication_config = yaml.safe_load(Path(replication_config_path).read_bytes())

@sling_assets(replication_config=replication_config)
@sling_assets(replication_config=replication_config_path)
def my_sling_assets(): ...

assert my_sling_assets.keys == {
Expand All @@ -133,50 +136,10 @@ def my_sling_assets(): ...
assert my_sling_assets.metadata_by_key == {
AssetKey(["target", "public", "accounts"]): {
"stream_config": JsonMetadataValue(data=None),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
AssetKey(["target", "departments"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -196,50 +159,10 @@ def my_sling_assets(): ...
},
}
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
AssetKey(["target", "public", "transactions"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -255,50 +178,10 @@ def my_sling_assets(): ...
},
}
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
AssetKey(["target", "public", "all_users"]): {
"stream_config": JsonMetadataValue(
Expand All @@ -307,50 +190,10 @@ def my_sling_assets(): ...
"object": "public.all_users",
}
),
"dagster_sling/translator": DagsterSlingTranslator(target_prefix="target"),
"dagster_sling/replication_config": {
"source": "MY_SOURCE",
"target": "MY_TARGET",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"public.accounts": None,
"public.users": {
"disabled": True,
"meta": {"dagster": {"asset_key": "public.foo_users"}},
},
"public.finance_departments_old": {
"object": "departments",
"source_options": {"empty_as_null": False},
"meta": {
"dagster": {
"deps": ["foo_one", "foo_two"],
"group": "group_2",
"freshness_policy": {
"maximum_lag_minutes": 0,
"cron_schedule": "5 4 * * *",
"cron_schedule_timezone": "UTC",
},
}
},
},
'public."Transactions"': {
"mode": "incremental",
"primary_key": "id",
"update_key": "last_updated_at",
"meta": {
"dagster": {
"description": "Example Description!",
"auto_materialize_policy": True,
}
},
},
"public.all_users": {
"sql": 'select all_user_id, name \nfrom public."all_Users"\n',
"object": "public.all_users",
},
},
"env": {"SLING_LOADED_AT_COLUMN": True, "SLING_STREAM_URL_COLUMN": True},
},
"dagster_embedded_elt/dagster_sling_translator": DagsterSlingTranslator(
target_prefix="target"
),
"dagster_embedded_elt/sling_replication_config": replication_config,
},
}

Expand Down

0 comments on commit 28e303f

Please sign in to comment.