Skip to content

Commit

Permalink
Add CLI support (#26261)
Browse files Browse the repository at this point in the history
## Summary & Motivation

As title. Does two things:

1. create a global component registry with some hard-coded components pre-loaded
2. adds a template for generating instances of a component

## How I Tested These Changes

```
dg generate deployment my_deployment
cd my_deployment
dg generate code-location my_location
cd my_location
dg generate component sling_replication file_ingest
```

, also a unit test which does the same

## Changelog

NOCHANGELOG
  • Loading branch information
OwenKephart authored Dec 4, 2024
1 parent 4b61c33 commit ba57dc5
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@
from dagster_components.core.component_defs_builder import (
build_defs_from_toplevel_components_folder as build_defs_from_toplevel_components_folder,
)
from dagster_components.impls.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollection,
)
from dagster_components.impls.sling_replication import SlingReplicationComponent

__component_registry__ = {
"pipes_subprocess_script_collection": PipesSubprocessScriptCollection,
"sling_replication": SlingReplicationComponent,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import click

from dagster_components.core.component import ComponentRegistry
from dagster_components import ComponentRegistry, __component_registry__
from dagster_components.core.deployment import (
CodeLocationProjectContext,
DeploymentProjectContext,
Expand Down Expand Up @@ -69,7 +69,9 @@ def generate_component_type_command(name: str) -> None:
)
sys.exit(1)

context = CodeLocationProjectContext.from_path(Path.cwd(), ComponentRegistry.empty())
context = CodeLocationProjectContext.from_path(
Path.cwd(), ComponentRegistry(__component_registry__)
)
if context.has_component_type(name):
click.echo(click.style(f"A component type named `{name}` already exists.", fg="red"))
sys.exit(1)
Expand All @@ -90,7 +92,9 @@ def generate_component_command(component_type: str, name: str) -> None:
)
sys.exit(1)

context = CodeLocationProjectContext.from_path(Path.cwd(), ComponentRegistry.empty())
context = CodeLocationProjectContext.from_path(
Path.cwd(), ComponentRegistry(__component_registry__)
)
if not context.has_component_type(component_type):
click.echo(
click.style(f"No component type `{component_type}` could be resolved.", fg="red")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from abc import ABC, abstractmethod
from types import ModuleType
from typing import TYPE_CHECKING, ClassVar, Dict, Iterable, Mapping, Optional, Type
Expand Down Expand Up @@ -36,7 +37,7 @@ def from_decl_node(

class ComponentRegistry:
def __init__(self, components: Dict[str, Type[Component]]):
self._components: Dict[str, Type[Component]] = components
self._components: Dict[str, Type[Component]] = copy.copy(components)

@staticmethod
def empty() -> "ComponentRegistry":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def __init__(self, dirpath: Path, resource: SlingResource):
self.dirpath = dirpath
self.resource = resource

@classmethod
def registered_name(cls) -> str:
return "sling_replication"

@classmethod
def from_decl_node(cls, context: ComponentLoadContext, decl_node: ComponentDeclNode) -> Self:
assert isinstance(decl_node, YamlComponentDecl)
Expand All @@ -39,6 +43,6 @@ def generate_files(cls) -> None:
replication_path = Path(os.getcwd()) / "replication.yaml"
with open(replication_path, "w") as f:
yaml.dump(
{"source": None, "target": None, "streams": None},
{"source": {}, "target": {}, "streams": {}},
f,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
component_type: {{ component_type }}
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,19 @@ def test_generate_component_already_exists_fails() -> None:
result = runner.invoke(generate_component_command, ["baz", "qux"])
assert result.exit_code != 0
assert "already exists" in result.output


def test_generate_global_component_instance() -> None:
runner = CliRunner()
with isolated_example_code_location_bar(runner):
result = runner.invoke(generate_component_command, ["sling_replication", "file_ingest"])
assert result.exit_code == 0
assert Path("bar/components/file_ingest").exists()

defs_path = Path("bar/components/file_ingest/defs.yml")
assert defs_path.exists()
assert "component_type: sling_replication" in defs_path.read_text()

replication_path = Path("bar/components/file_ingest/replication.yaml")
assert replication_path.exists()
assert "source: " in replication_path.read_text()
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
import yaml
from dagster import AssetKey
from dagster._utils.env import environ
from dagster_components.core.component_decl_builder import DefsFileModel
from dagster_components.core.component_defs_builder import (
YamlComponentDecl,
Expand Down Expand Up @@ -35,28 +36,29 @@ def sling_path() -> Generator[Path, None, None]:
the proper temp path.
"""
with tempfile.TemporaryDirectory() as temp_dir:
shutil.copytree(STUB_LOCATION_PATH, temp_dir, dirs_exist_ok=True)
with environ({"HOME": temp_dir}):
shutil.copytree(STUB_LOCATION_PATH, temp_dir, dirs_exist_ok=True)

# update the replication yaml to reference a CSV file in the tempdir
replication_path = Path(temp_dir) / COMPONENT_RELPATH / "replication.yaml"
# update the replication yaml to reference a CSV file in the tempdir
replication_path = Path(temp_dir) / COMPONENT_RELPATH / "replication.yaml"

def _update_replication(data: Dict[str, Any]) -> Mapping[str, Any]:
placeholder_data = data["streams"].pop("<PLACEHOLDER>")
data["streams"][f"file://{temp_dir}/input.csv"] = placeholder_data
return data
def _update_replication(data: Dict[str, Any]) -> Mapping[str, Any]:
placeholder_data = data["streams"].pop("<PLACEHOLDER>")
data["streams"][f"file://{temp_dir}/input.csv"] = placeholder_data
return data

_update_yaml(replication_path, _update_replication)
_update_yaml(replication_path, _update_replication)

# update the defs yaml to add a duckdb instance
defs_path = Path(temp_dir) / COMPONENT_RELPATH / "defs.yml"
# update the defs yaml to add a duckdb instance
defs_path = Path(temp_dir) / COMPONENT_RELPATH / "defs.yml"

def _update_defs(data: Dict[str, Any]) -> Mapping[str, Any]:
data["component_params"]["connections"][0]["instance"] = f"{temp_dir}/duckdb"
return data
def _update_defs(data: Dict[str, Any]) -> Mapping[str, Any]:
data["component_params"]["connections"][0]["instance"] = f"{temp_dir}/duckdb"
return data

_update_yaml(defs_path, _update_defs)
_update_yaml(defs_path, _update_defs)

yield Path(temp_dir)
yield Path(temp_dir)


def test_python_params(sling_path: Path) -> None:
Expand Down
1 change: 0 additions & 1 deletion examples/experimental/dagster-components/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ def get_version() -> str:
},
extras_require={
"sling": ["dagster-embedded-elt"],
"test": ["dagster-embedded-elt"],
},
)
3 changes: 2 additions & 1 deletion examples/experimental/dagster-components/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ deps =
-e ../../../python_modules/dagster[test]
-e ../../../python_modules/dagster-test
-e ../../../python_modules/dagster-pipes
-e .[test]
-e ../../../python_modules/libraries/dagster-embedded-elt
-e .
allowlist_externals =
/bin/bash
uv
Expand Down

0 comments on commit ba57dc5

Please sign in to comment.