diff --git a/CHANGELOG.md b/CHANGELOG.md index d25e479..1e9bdfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -## 0.5.0 +## [UNRELEASED] 0.5.0 ### Features +- Added pipeline running log and execution order ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82)) - Added support for the `NEPTUNE_CUSTOM_RUN_ID` environment variable ([#81](https://github.com/neptune-ai/kedro-neptune/pull/81)) ## 0.4.0 diff --git a/src/kedro_neptune/__init__.py b/src/kedro_neptune/__init__.py index b70b7fd..25912f7 100644 --- a/src/kedro_neptune/__init__.py +++ b/src/kedro_neptune/__init__.py @@ -430,6 +430,8 @@ def log_pipeline_metadata(namespace: Handler, pipeline: Pipeline): ) ) + namespace["execution_order"] = pipeline.describe() + def log_run_params(namespace: Handler, run_params: Dict[str, Any]): namespace["run_params"] = stringify_unsupported(run_params) @@ -490,14 +492,19 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal return run = catalog.load("neptune_run") + + run["log"].append(f"Running {node.short_name}") + current_namespace = run[f"nodes/{node.short_name}"] + current_namespace["status"] = "running" + if inputs: current_namespace["inputs"] = stringify_unsupported(list(sorted(inputs.keys()))) for input_name, input_value in inputs.items(): if input_name.startswith("params:"): - current_namespace[f'parameters/{input_name[len("params:"):]}'] = input_value + current_namespace[f'parameters/{input_name[len("params:"):]}'] = stringify_unsupported(input_value) self._node_execution_timers[node.short_name] = time.time() @@ -511,8 +518,12 @@ def after_node_run(self, node: Node, catalog: DataCatalog, outputs: Dict[str, An execution_time = float(time.time() - self._node_execution_timers[node.short_name]) run = catalog.load("neptune_run") + + run["log"].append(f"Finished {node.short_name}") + current_namespace = run[f"nodes/{node.short_name}"] current_namespace["execution_time"] = execution_time + current_namespace["status"] = "done" if outputs: current_namespace["outputs"] = stringify_unsupported(list(sorted(outputs.keys()))) @@ -533,6 +544,7 @@ def after_pipeline_run(self, catalog: DataCatalog) -> None: run = catalog.load("neptune_run") log_data_catalog_metadata(namespace=run, catalog=catalog) + run["log"].append("Finished pipeline") run.container.sync() diff --git a/tests/kedro_neptune/utils/run_utils.py b/tests/kedro_neptune/utils/run_utils.py index e5f71d3..706cd64 100644 --- a/tests/kedro_neptune/utils/run_utils.py +++ b/tests/kedro_neptune/utils/run_utils.py @@ -47,6 +47,8 @@ def assert_structure(travel_speed: int = 10000, custom_run_id: Optional[str] = N assert run.exists("kedro/kedro_command") assert run.exists("kedro/run_params") assert run.exists("kedro/structure") + assert run.exists("kedro/execution_order") + assert run.exists("kedro/log") # Data catalog assert run.exists("kedro/catalog/datasets") @@ -158,3 +160,5 @@ def check_node_metadata(run: Run, node_namespace: str, inputs: List, outputs: Op if outputs: assert run.exists(f"{node_namespace}/outputs") assert sorted(literal_eval(run[f"{node_namespace}/outputs"].fetch())) == sorted(outputs) + + assert run[f"{node_namespace}/status"].fetch() == "done"