Skip to content

Commit

Permalink
Merge pull request #82 from neptune-ai/ss/added_node_status
Browse files Browse the repository at this point in the history
Added running status logs and execution order
  • Loading branch information
AleksanderWWW authored Jun 6, 2024
2 parents 239b7af + b95dc83 commit 9aa82b1
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 0.5.0
## [UNRELEASED] 0.5.0

### Features
- Added pipeline running log and execution order ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82))
- Added support for the `NEPTUNE_CUSTOM_RUN_ID` environment variable ([#81](https://github.com/neptune-ai/kedro-neptune/pull/81))

## 0.4.0
Expand Down
14 changes: 13 additions & 1 deletion src/kedro_neptune/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,8 @@ def log_pipeline_metadata(namespace: Handler, pipeline: Pipeline):
)
)

namespace["execution_order"] = pipeline.describe()


def log_run_params(namespace: Handler, run_params: Dict[str, Any]):
namespace["run_params"] = stringify_unsupported(run_params)
Expand Down Expand Up @@ -490,14 +492,19 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal
return

run = catalog.load("neptune_run")

run["log"].append(f"Running {node.short_name}")

current_namespace = run[f"nodes/{node.short_name}"]

current_namespace["status"] = "running"

if inputs:
current_namespace["inputs"] = stringify_unsupported(list(sorted(inputs.keys())))

for input_name, input_value in inputs.items():
if input_name.startswith("params:"):
current_namespace[f'parameters/{input_name[len("params:"):]}'] = input_value
current_namespace[f'parameters/{input_name[len("params:"):]}'] = stringify_unsupported(input_value)

self._node_execution_timers[node.short_name] = time.time()

Expand All @@ -511,8 +518,12 @@ def after_node_run(self, node: Node, catalog: DataCatalog, outputs: Dict[str, An
execution_time = float(time.time() - self._node_execution_timers[node.short_name])

run = catalog.load("neptune_run")

run["log"].append(f"Finished {node.short_name}")

current_namespace = run[f"nodes/{node.short_name}"]
current_namespace["execution_time"] = execution_time
current_namespace["status"] = "done"

if outputs:
current_namespace["outputs"] = stringify_unsupported(list(sorted(outputs.keys())))
Expand All @@ -533,6 +544,7 @@ def after_pipeline_run(self, catalog: DataCatalog) -> None:

run = catalog.load("neptune_run")
log_data_catalog_metadata(namespace=run, catalog=catalog)
run["log"].append("Finished pipeline")
run.container.sync()


Expand Down
4 changes: 4 additions & 0 deletions tests/kedro_neptune/utils/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def assert_structure(travel_speed: int = 10000, custom_run_id: Optional[str] = N
assert run.exists("kedro/kedro_command")
assert run.exists("kedro/run_params")
assert run.exists("kedro/structure")
assert run.exists("kedro/execution_order")
assert run.exists("kedro/log")

# Data catalog
assert run.exists("kedro/catalog/datasets")
Expand Down Expand Up @@ -158,3 +160,5 @@ def check_node_metadata(run: Run, node_namespace: str, inputs: List, outputs: Op
if outputs:
assert run.exists(f"{node_namespace}/outputs")
assert sorted(literal_eval(run[f"{node_namespace}/outputs"].fetch())) == sorted(outputs)

assert run[f"{node_namespace}/status"].fetch() == "done"

0 comments on commit 9aa82b1

Please sign in to comment.