diff --git a/examples/cli/README.md b/examples/cli/README.md index 7883474da..346d905d2 100644 --- a/examples/cli/README.md +++ b/examples/cli/README.md @@ -13,13 +13,28 @@ Test the installation with # Features -Currently 4 commands: - +## Commands - `build`: creates a Hamilton `Driver` from specified modules. It"s useful to validate the dataflow definition +- `validate`: calls `Driver.validate_execution()` for a set of `inputs` and `overrides` passed through the `--context` option. - `view`: calls `dr.display_all_functions()` on the built `Driver` - `version`: generates node hashes based on their source code, and a dataflow hash from the collection of node hashes. - `diff`: get a diff of added/deleted/edited nodes between the current version of Python modules and another git reference (`default=HEAD`, i.e., the last commited version). You can get a visualization of the diffs +## Options +- all commands receive `MODULES` which is a list of path to Python modules to assembled as a single dataflow +- all commands receive `--context` (`-ctx`), which is a file (`.py` or `.json`) that include top-level headers (see `config.py` and `config.json` in this repo for example): + - `HAMILTON_CONFIG`: `typing.Mapping` passed to `driver.Builder.with_config()` + - `HAMILTON_FINAL_VARS`: `typing.Sequence` passed to `driver.validate_execution(final_vars=...)` + - `HAMILTON_INPUTS`: `typing.Mapping` passed to `driver.validate_execution(inputs=...)` + - `HAMILTON_OVERRIDES`: `typing.Mapping` passed to `driver.validate_execution(overrides=...)` +- Using a `.py` context file provides more flexibility than `.json` to define inputs and overrides objects. +- all commands receive a `--name` (`-n`), which is used to name the output file (when the command produces a file). If `None`, a file name will be derived from the `MODULES` argument. +- When using a command that generates a file: + - passing a file path: will output the file with this name at this location + - passing a directory: will output the file with the `--name` value (either explicit or default derived from `MODULES`) at this location + - passing a file path with the name `default`: will output the file with the name replaced by `--name` value at this location. This is useful when you need to specify a type via filename. For example, `hamilton view -o /path/to/default.pdf my_dataflow.py` will create the file `/path/to/my_dataflow.pdf`. (This behavior may change) + + See [DOCS.md](./DOCS.md) for the full references # Usage diff --git a/examples/cli/config.json b/examples/cli/config.json new file mode 100644 index 000000000..7d492b64f --- /dev/null +++ b/examples/cli/config.json @@ -0,0 +1,6 @@ +{ + "HAMILTON_CONFIG": { + "holiday": "halloween" + }, + "HAMILTON_FINAL_VARS": ["customers_df", "customer_summary_table"] +} diff --git a/examples/cli/config.py b/examples/cli/config.py new file mode 100644 index 000000000..80ec6627f --- /dev/null +++ b/examples/cli/config.py @@ -0,0 +1,3 @@ +HAMILTON_CONFIG = dict(config_exists="true") + +HAMILTON_FINAL_VARS = ["config_when", "customer_summary_table"] diff --git a/examples/cli/module_v1.py b/examples/cli/module_v1.py index 5102bcc08..2fc7333fa 100644 --- a/examples/cli/module_v1.py +++ b/examples/cli/module_v1.py @@ -1,9 +1,16 @@ import pandas as pd -from hamilton.function_modifiers import extract_columns +from hamilton.function_modifiers import config, extract_columns -def customers_df(customers_path: str = "customers.csv") -> pd.DataFrame: +@config.when(holiday="halloween") +def customers_df__halloween() -> pd.DataFrame: + """Example of using @config.when function modifier""" + return pd.read_csv("/path/to/halloween/customers.csv") + + +@config.when_not(holiday="halloween") +def customers_df__default(customers_path: str = "customers.csv") -> pd.DataFrame: """Load the customer dataset.""" return pd.read_csv(customers_path) diff --git a/hamilton/cli/__main__.py b/hamilton/cli/__main__.py index b5ad2b29c..a7c7acc03 100644 --- a/hamilton/cli/__main__.py +++ b/hamilton/cli/__main__.py @@ -5,7 +5,7 @@ import warnings from pathlib import Path from pprint import pprint -from typing import Any, List, Optional +from typing import Any, Callable, List, Optional if sys.version_info < (3, 9): from typing_extensions import Annotated @@ -36,18 +36,59 @@ class CliState: verbose: Optional[bool] = None json_out: Optional[bool] = None dr: Optional[driver.Driver] = None - dataflow_version: Optional[dict] = None + name: Optional[str] = None cli = typer.Typer(rich_markup_mode="rich") state = CliState() -# TODO add `validate` command to call `Driver.validate()` +MODULES_ANNOTATIONS = Annotated[ + List[Path], + typer.Argument( + help="Paths to Hamilton modules", + exists=True, + dir_okay=False, + readable=True, + resolve_path=True, + ), +] + +NAME_ANNOTATIONS = Annotated[ + Optional[str], + typer.Option("--name", "-n", help="Name of the dataflow. Default: Derived from MODULES."), +] + +CONTEXT_ANNOTATIONS = Annotated[ + Optional[Path], + typer.Option( + "--context", + "-ctx", + help="Path to Driver context file [.json, .py]", + exists=True, + dir_okay=False, + readable=True, + resolve_path=True, + ), +] + +VIZ_OUTPUT_ANNOTATIONS = Annotated[ + Path, + typer.Option( + "--output", + "-o", + help="Output path of visualization. If path is a directory, use NAME for file name.", + dir_okay=True, + writable=True, + resolve_path=True, + ), +] + + # TODO add `experiments` for `hamilton.plugins.h_experiments` # TODO add `dataflows` submenu to manage locally installed dataflows # TODO add `init` to load project template -# entrypoint for `hamilton` without command +# callback() creates entrypoint for `hamilton` without command @cli.callback() def main( ctx: typer.Context, @@ -75,60 +116,63 @@ def main( logger.debug(f"json_out set to {json_out}") -@cli.command() -def build( - ctx: typer.Context, - modules: Annotated[ - List[Path], - typer.Argument( - help="Paths to Hamilton modules", - exists=True, - dir_okay=False, - readable=True, - resolve_path=True, - ), - ], -): - """Build a single Driver with MODULES""" +def _try_command(cmd: Callable, **cmd_kwargs) -> Any: + """Try a command and raise errors to Typer and exit CLI""" + cmd_name = cmd.__name__ try: - config = dict() - logger.debug("calling commands.build()") - state.dr = commands.build(modules=modules, config=config) + logger.debug(f"calling commands.{cmd_name}") + result = cmd(**cmd_kwargs) except Exception as e: response = Response( - command="build", success=False, message={"error": str(type(e)), "details": str(e)} + command=cmd_name, success=False, message={"error": str(type(e)), "details": str(e)} ) - logger.error(f"`hamilton build` failed: {dataclasses.asdict(response)}") - print(json.dumps(dataclasses.asdict(response))) + logger.error(dataclasses.asdict(response)) raise typer.Exit(code=1) - response = Response( - command="build", success=True, message={"modules": [p.stem for p in modules]} - ) + return result - logger.debug(f"`hamilton build` succeeded: {dataclasses.asdict(response)}") - if (ctx.info_name == "build") or state.verbose: + +def _response_handler(ctx: typer.Context, response: Response) -> None: + """Handle how to display response""" + if (ctx.info_name == response.command) or state.verbose: if state.json_out is True: print(json.dumps(dataclasses.asdict(response))) else: pprint(response.message) -# TODO add option to output diff of nodes and diff of functions -# since the function diff is what's useful for code reviews / debugging @cli.command() -def diff( +def build( ctx: typer.Context, - modules: Annotated[ - List[Path], - typer.Argument( - help="Paths to Hamilton modules", - exists=True, - dir_okay=False, - readable=True, - resolve_path=True, + modules: MODULES_ANNOTATIONS, + name: NAME_ANNOTATIONS = None, + context_path: CONTEXT_ANNOTATIONS = None, +): + """Build a single Driver with MODULES""" + state.dr = _try_command(cmd=commands.build, modules=modules, context_path=context_path) + + if name: + state.name = name + else: + state.name = "_".join([str(Path(m).stem) for m in modules])[:40] + + _response_handler( + ctx=ctx, + response=Response( + command="build", + success=True, + message={"modules": [p.stem for p in modules]}, ), - ], + ) + + +@cli.command() +def diff( + ctx: typer.Context, + modules: MODULES_ANNOTATIONS, + name: NAME_ANNOTATIONS = None, + context_path: CONTEXT_ANNOTATIONS = None, + output_file_path: VIZ_OUTPUT_ANNOTATIONS = Path("./"), git_reference: Annotated[ str, typer.Option( @@ -143,149 +187,101 @@ def diff( help="Generate a dataflow diff visualization", ), ] = False, - output_file_path: Annotated[ - Path, - typer.Option( - "--output", - "-o", - help="File path of visualization", - exists=False, - dir_okay=False, - readable=True, - resolve_path=True, - ), - ] = "diff.png", ): """Diff between the current MODULES and their specified GIT_REFERENCE""" if state.dr is None: - ctx.invoke(version, ctx=ctx, modules=modules) + ctx.invoke(version, ctx=ctx, modules=modules, name=name, context_path=context_path) + + # default value isn't set to None to let Typer properly resolve the path + # then, we change the file name + if output_file_path.is_dir(): + output_file_path.mkdir(parents=True, exist_ok=True) + output_file_path = output_file_path.joinpath(f"diff_{state.name}.png") + + diff = _try_command( + cmd=commands.diff, + current_dr=state.dr, + modules=modules, + git_reference=git_reference, + view=view, + output_file_path=output_file_path, + context_path=context_path, + ) + _response_handler( + ctx=ctx, + response=Response( + command="diff", + success=True, + message=diff, + ), + ) - try: - logger.debug("calling commands.diff()") - diff = commands.diff( - current_dr=state.dr, - modules=modules, - git_reference=git_reference, - view=view, - output_file_path=output_file_path, - config=None, - ) - except Exception as e: - response = Response( - command="diff", success=False, message={"error": str(type(e)), "details": str(e)} - ) - logger.error(f"`hamilton diff` failed: {dataclasses.asdict(response)}") - print(json.dumps(dataclasses.asdict(response))) - raise typer.Exit(code=1) - response = Response( - command="diff", - success=True, - message=diff, +@cli.command() +def validate( + ctx: typer.Context, + modules: MODULES_ANNOTATIONS, + context_path: CONTEXT_ANNOTATIONS, + name: NAME_ANNOTATIONS = None, +): + """Validate DATAFLOW execution for the given CONTEXT""" + if state.dr is None: + ctx.invoke(build, ctx=ctx, modules=modules, name=name, context_path=context_path) + + validated_context = _try_command(commands.validate, dr=state.dr, context_path=context_path) + _response_handler( + ctx=ctx, + response=Response( + command="validate", + success=True, + message=validated_context, + ), ) - logger.debug(f"`hamilton diff` succeeded: {dataclasses.asdict(response)}") - if (ctx.info_name == "diff") or state.verbose: - if state.json_out is True: - print(json.dumps(dataclasses.asdict(response))) - else: - pprint(response.message) - @cli.command() def version( ctx: typer.Context, - modules: Annotated[ - List[Path], - typer.Argument( - help="Paths to Hamilton modules", - exists=True, - dir_okay=False, - readable=True, - resolve_path=True, - ), - ], + modules: MODULES_ANNOTATIONS, + name: NAME_ANNOTATIONS = None, + context_path: CONTEXT_ANNOTATIONS = None, ): """Version NODES and DATAFLOW from dataflow with MODULES""" if state.dr is None: - ctx.invoke(build, ctx=ctx, modules=modules) - - try: - logger.debug("calling commands.version()") - dataflow_version = commands.version(state.dr) - except Exception as e: - response = Response( - command="version", success=False, message={"error": str(type(e)), "details": str(e)} - ) - logger.error(f"`hamilton version` failed: {dataclasses.asdict(response)}") - print(json.dumps(dataclasses.asdict(response))) - raise typer.Exit(code=1) - - response = Response( - command="version", - success=True, - message=dataflow_version, + ctx.invoke(build, ctx=ctx, modules=modules, name=name, context_path=context_path) + + dataflow_version = _try_command(cmd=commands.version, dr=state.dr) + _response_handler( + ctx=ctx, + response=Response( + command="version", + success=True, + message=dataflow_version, + ), ) - logger.debug(f"`hamilton version` succeeded: {dataclasses.asdict(response)}") - if (ctx.info_name == "version") or state.verbose: - if state.json_out is True: - json_str = json.dumps(dataclasses.asdict(response)) - print(json_str) - else: - pprint(response.message) - @cli.command() def view( ctx: typer.Context, - modules: Annotated[ - List[Path], - typer.Argument( - help="Paths to Hamilton modules", - exists=True, - dir_okay=False, - readable=True, - resolve_path=True, - ), - ], - output_file_path: Annotated[ - Path, - typer.Option( - "--output", - "-o", - help="File path of visualization", - exists=False, - dir_okay=False, - readable=True, - resolve_path=True, - ), - ] = "./dag.png", + modules: MODULES_ANNOTATIONS, + name: NAME_ANNOTATIONS = None, + context_path: CONTEXT_ANNOTATIONS = None, + output_file_path: VIZ_OUTPUT_ANNOTATIONS = Path("./"), ): """Build and visualize dataflow with MODULES""" if state.dr is None: - # TODO add config - ctx.invoke(build, ctx=ctx, modules=modules) + ctx.invoke(build, ctx=ctx, modules=modules, name=name, context_path=context_path) - try: - logger.debug("calling commands.view()") - commands.view(dr=state.dr, output_file_path=output_file_path) - except Exception as e: - response = Response( - command="view", success=False, message={"error": str(type(e)), "details": str(e)} - ) - logger.error(f"`hamilton view` failed: {dataclasses.asdict(response)}") - print(json.dumps(dataclasses.asdict(response))) - raise typer.Exit(code=1) + if output_file_path.is_dir(): + output_file_path.mkdir(parents=True, exist_ok=True) + output_file_path = output_file_path.joinpath(f"dag_{state.name}.png") - response = Response(command="view", success=True, message={"path": str(output_file_path)}) - - logger.debug(f"`hamilton view` succeeded: {dataclasses.asdict(response)}") - if (ctx.info_name == "view") or state.verbose: - if state.json_out is True: - print(json.dumps(dataclasses.asdict(response))) - else: - pprint(response.message) + _try_command(cmd=commands.view, dr=state.dr, output_file_path=output_file_path) + _response_handler( + ctx=ctx, + response=Response(command="view", success=True, message={"path": str(output_file_path)}), + ) if __name__ == "__main__": diff --git a/hamilton/cli/commands.py b/hamilton/cli/commands.py index 98fb5a715..9b765b3ca 100644 --- a/hamilton/cli/commands.py +++ b/hamilton/cli/commands.py @@ -5,14 +5,21 @@ from hamilton.cli import logic -def build(modules: List[Path], config: Optional[dict] = None): - config = config if config else {} +def build(modules: List[Path], context_path: Optional[Path] = None): + """Build a Hamilton driver from the passed modules, and + load the Driver config from the context file. + + Dynamic execution is enabled by default to support dataflow + using Parallelizable/Collect. This only matters if we are to + execute code. + """ + context = logic.load_context(context_path) if context_path else {} module_objects = [ad_hoc_utils.module_from_source(p.read_text()) for p in modules] return ( driver.Builder() .enable_dynamic_execution(allow_experimental_mode=True) .with_modules(*module_objects) - .with_config(config) + .with_config(context.get("HAMILTON_CONFIG", {})) .build() ) @@ -23,9 +30,10 @@ def diff( git_reference: Optional[str] = "HEAD", view: bool = False, output_file_path: Path = Path("./diff.png"), - config: Optional[dict] = None, + context_path: Optional[Path] = None, ) -> dict: - config = config if config else {} + """Get the diff of""" + context = logic.load_context(context_path) if context_path else {} current_version = logic.hash_hamilton_nodes(current_dr) current_node_to_func = logic.map_nodes_to_functions(current_dr) @@ -35,7 +43,7 @@ def diff( driver.Builder() .enable_dynamic_execution(allow_experimental_mode=True) .with_modules(*reference_modules) - .with_config(config) + .with_config(context.get("HAMILTON_CONFIG", {})) .build() ) reference_version = logic.hash_hamilton_nodes(reference_dr) @@ -71,7 +79,24 @@ def diff( return full_diff +def validate(dr: driver.Driver, context_path: Path) -> dict: + """Use driver.validate_execution() with values from the context file""" + context = logic.load_context(context_path) + + try: + dr.validate_execution( + final_vars=context["HAMILTON_FINAL_VARS"], + inputs=context["HAMILTON_INPUTS"], + overrides=context["HAMILTON_OVERRIDES"], + ) + except ValueError as e: + raise e + + return context + + def version(dr: driver.Driver) -> dict: + """Get the node and dataflow versions from the instantiated Driver""" nodes_hash = logic.hash_hamilton_nodes(dr) dataflow_hash = logic.hash_dataflow(nodes_hash) return dict( @@ -80,5 +105,6 @@ def version(dr: driver.Driver) -> dict: ) -def view(dr: driver.Driver, output_file_path: str) -> None: +def view(dr: driver.Driver, output_file_path: Path = Path("dag.png")) -> None: + """Display all functions of the instantiated Driver""" dr.display_all_functions(output_file_path) diff --git a/hamilton/cli/logic.py b/hamilton/cli/logic.py index 9591fe6a4..3050e4f64 100644 --- a/hamilton/cli/logic.py +++ b/hamilton/cli/logic.py @@ -1,15 +1,19 @@ -import functools -import hashlib -import subprocess from pathlib import Path from types import ModuleType from typing import Dict, List, Union -from hamilton import ad_hoc_utils, driver, graph, graph_types, graph_utils +from hamilton import driver + +CONFIG_HEADER = "HAMILTON_CONFIG" +FINAL_VARS_HEADER = "HAMILTON_FINAL_VARS" +INPUTS_HEADER = "HAMILTON_INPUTS" +OVERRIDES_HEADER = "HAMILTON_OVERRIDES" def get_git_base_directory() -> str: """Get the base path of the current git directory""" + import subprocess + try: result = subprocess.run( ["git", "rev-parse", "--show-toplevel"], @@ -22,12 +26,15 @@ def get_git_base_directory() -> str: return result.stdout.strip() else: print("Error:", result.stderr.strip()) + raise OSError(f"{result.stderr.strip()}") except FileNotFoundError: raise FileNotFoundError("Git command not found. Please make sure Git is installed.") def get_git_reference(git_relative_path: Union[str, Path], git_reference: str) -> str: """Get the source code from the specified file and git reference""" + import subprocess + try: result = subprocess.run( ["git", "show", f"{git_reference}:{git_relative_path}"], @@ -50,6 +57,8 @@ def get_git_reference(git_relative_path: Union[str, Path], git_reference: str) - def version_hamilton_functions(module: ModuleType) -> Dict[str, str]: """Hash the source code of Hamilton functions from a module""" + from hamilton import graph_utils + origins_version: Dict[str, str] = dict() for origin_name, _ in graph_utils.find_functions(module): @@ -61,10 +70,16 @@ def version_hamilton_functions(module: ModuleType) -> Dict[str, str]: def hash_hamilton_nodes(dr: driver.Driver) -> Dict[str, str]: """Hash the source code of Hamilton functions from nodes in a Driver""" + from hamilton import graph_types, graph_utils + graph = graph_types.HamiltonGraph.from_graph(dr.graph) nodes_version = dict() for n in graph.nodes: + # is None for config nodes + if n.originating_functions is None: + continue + node_origin = n.originating_functions[0] origin_hash = graph_utils.hash_source_code(node_origin, strip=True) nodes_version[n.name] = origin_hash @@ -74,10 +89,16 @@ def hash_hamilton_nodes(dr: driver.Driver) -> Dict[str, str]: def map_nodes_to_functions(dr: driver.Driver) -> Dict[str, str]: """Get a mapping from node name to Hamilton function name""" + from hamilton import graph_types + graph = graph_types.HamiltonGraph.from_graph(dr.graph) node_to_function = dict() for n in graph.nodes: + # is None for config nodes + if n.originating_functions is None: + continue + node_callable = n.originating_functions[0] node_to_function[n.name] = node_callable.__name__ @@ -86,6 +107,8 @@ def map_nodes_to_functions(dr: driver.Driver) -> Dict[str, str]: def hash_dataflow(nodes_version: Dict[str, str]) -> str: """Create a dataflow hash from the hashes of its nodes""" + import hashlib + sorted_nodes = sorted(nodes_version.values()) return hashlib.sha256(str(sorted_nodes).encode()).hexdigest() @@ -94,6 +117,8 @@ def load_modules_from_git( module_paths: List[Path], git_reference: str = "HEAD" ) -> List[ModuleType]: """Dynamically import modules for a git reference""" + from hamilton import ad_hoc_utils + git_base_dir = Path(get_git_base_directory()) modules = [] @@ -215,6 +240,10 @@ def visualize_diff( Uses the union of sets of nodes from driver 1 and driver 2. """ + import functools + + from hamilton import graph + all_nodes = set(reference_dr.graph.get_nodes()).union(set(current_dr.graph.get_nodes())) diff_style = functools.partial( @@ -232,3 +261,74 @@ def visualize_diff( node_modifiers=dict(), strictly_display_only_nodes_passed_in=True, ) + + +# TODO refactor ContextLoader to a class +# TODO support loading from pyproject.toml +def load_context(file_path: Path) -> dict: + if not file_path.exists(): + raise FileNotFoundError(f"`{file_path}` doesn't exist.") + + extension = file_path.suffix + if extension == ".json": + context = _read_json_context(file_path) + elif extension == ".py": + context = _read_py_context(file_path) + else: + raise ValueError(f"Received extension `{extension}` is unsupported.") + + context = _validate_context(context) + return context + + +def _validate_context(context: dict) -> dict: + if context[CONFIG_HEADER] is None: + context[CONFIG_HEADER] = {} + + if context[FINAL_VARS_HEADER] is None: + context[FINAL_VARS_HEADER] = [] + + if context[INPUTS_HEADER] is None: + context[INPUTS_HEADER] = {} + + if context[OVERRIDES_HEADER] is None: + context[OVERRIDES_HEADER] = {} + + return context + + +def _read_json_context(file_path: Path) -> dict: + """""" + import json + + data = json.load(file_path.open()) + + context = {} + for k in [ + CONFIG_HEADER, + FINAL_VARS_HEADER, + INPUTS_HEADER, + OVERRIDES_HEADER, + ]: + context[k] = data.get(k, None) + + return context + + +def _read_py_context(file_path: Path) -> dict: + import importlib + + spec = importlib.util.spec_from_file_location("cli_config", file_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + context = {} + for k in [ + CONFIG_HEADER, + FINAL_VARS_HEADER, + INPUTS_HEADER, + OVERRIDES_HEADER, + ]: + context[k] = getattr(module, k, None) + + return context