From 9e67dc03ded86e6ca5b066d7b858dac65b819925 Mon Sep 17 00:00:00 2001 From: Siddhant Sadangi Date: Wed, 5 Jun 2024 12:30:00 +0200 Subject: [PATCH 1/3] Added node status tracking --- CHANGELOG.md | 5 +++++ src/kedro_neptune/__init__.py | 26 +++++++++++++++++++++--- tests/kedro_neptune/utils/run_utils.py | 28 ++++++++++++++++++++++---- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e28f911..ca8110c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## [UNRELEASED] 0.5.0 + +### Features +- Added node status tracking ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82)) + ## 0.4.0 ### Features diff --git a/src/kedro_neptune/__init__.py b/src/kedro_neptune/__init__.py index f6dc709..e5d288a 100644 --- a/src/kedro_neptune/__init__.py +++ b/src/kedro_neptune/__init__.py @@ -14,7 +14,13 @@ # limitations under the License. # -__all__ = ["NeptuneRunDataset", "NeptuneFileDataset", "neptune_hooks", "init", "__version__"] +__all__ = [ + "NeptuneRunDataset", + "NeptuneFileDataset", + "neptune_hooks", + "init", + "__version__", +] import hashlib import json @@ -207,7 +213,10 @@ def init( config_template = yaml.load(INITIAL_NEPTUNE_CONFIG) config_template["neptune"]["project"] = project config_template["neptune"]["base_namespace"] = base_namespace - config_template["neptune"]["upload_source_files"] = ["**/*.py", f"{settings.CONF_SOURCE}/{config}/*.yml"] + config_template["neptune"]["upload_source_files"] = [ + "**/*.py", + f"{settings.CONF_SOURCE}/{config}/*.yml", + ] config_template["neptune"]["dependencies"] = dependencies yaml.dump(config_template, config_file) @@ -414,7 +423,8 @@ def log_data_catalog_metadata(namespace: Handler, catalog: DataCatalog): def log_pipeline_metadata(namespace: Handler, pipeline: Pipeline): namespace["structure"].upload( File.from_content( - content=json.dumps(json.loads(pipeline.to_json()), indent=4, sort_keys=True), extension="json" + content=json.dumps(json.loads(pipeline.to_json()), indent=4, sort_keys=True), + extension="json", ) ) @@ -478,8 +488,13 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal return run = catalog.load("neptune_run") + + run["status/currently_running_node"] = node.short_name + current_namespace = run[f"nodes/{node.short_name}"] + current_namespace["status"] = "running" + if inputs: current_namespace["inputs"] = stringify_unsupported(list(sorted(inputs.keys()))) @@ -499,8 +514,13 @@ def after_node_run(self, node: Node, catalog: DataCatalog, outputs: Dict[str, An execution_time = float(time.time() - self._node_execution_timers[node.short_name]) run = catalog.load("neptune_run") + + run["status/last_run"] = node.short_name + run["status/currently_running"] = "None" + current_namespace = run[f"nodes/{node.short_name}"] current_namespace["execution_time"] = execution_time + current_namespace["status"] = "done" if outputs: current_namespace["outputs"] = stringify_unsupported(list(sorted(outputs.keys()))) diff --git a/tests/kedro_neptune/utils/run_utils.py b/tests/kedro_neptune/utils/run_utils.py index 65c39e1..1f39e23 100644 --- a/tests/kedro_neptune/utils/run_utils.py +++ b/tests/kedro_neptune/utils/run_utils.py @@ -90,7 +90,10 @@ def assert_structure(travel_speed: int = 10000): # Nodes data check_node_metadata( - run=run, node_namespace="kedro/nodes/distances", inputs=["planets"], outputs=["distances_to_planets"] + run=run, + node_namespace="kedro/nodes/distances", + inputs=["planets"], + outputs=["distances_to_planets"], ) check_node_metadata( run=run, @@ -98,20 +101,35 @@ def assert_structure(travel_speed: int = 10000): inputs=["distances_to_planets"], outputs=["furthest_planet_distance", "furthest_planet_name"], ) - check_node_metadata(run=run, node_namespace="kedro/nodes/judge_model", inputs=["neptune_run", "dataset"]) check_node_metadata( - run=run, node_namespace="kedro/nodes/prepare_dataset", inputs=["planets"], outputs=["dataset"] + run=run, + node_namespace="kedro/nodes/judge_model", + inputs=["neptune_run", "dataset"], + ) + check_node_metadata( + run=run, + node_namespace="kedro/nodes/prepare_dataset", + inputs=["planets"], + outputs=["dataset"], ) check_node_metadata( run=run, node_namespace="kedro/nodes/travel_time", - inputs=["furthest_planet_distance", "furthest_planet_name", "params:travel_speed"], + inputs=[ + "furthest_planet_distance", + "furthest_planet_name", + "params:travel_speed", + ], outputs=["travel_hours"], ) assert run.exists("kedro/nodes/travel_time/parameters") assert run.exists("kedro/nodes/travel_time/parameters/travel_speed") assert run["kedro/nodes/travel_time/parameters/travel_speed"].fetch() == travel_speed + # Status + assert run["kedro/status/currently_running"].fetch() == "None" + assert run["kedro/status/last_run"].fetch() == "travel_time" + # User defined data assert run.exists("furthest_planet") assert run.exists("furthest_planet/name") @@ -141,3 +159,5 @@ def check_node_metadata(run: Run, node_namespace: str, inputs: List, outputs: Op if outputs: assert run.exists(f"{node_namespace}/outputs") assert sorted(literal_eval(run[f"{node_namespace}/outputs"].fetch())) == sorted(outputs) + + assert run[f"{node_namespace}/status"].fetch() == "done" From 098fd6344ea507b632d2233c4bfde733235ceb68 Mon Sep 17 00:00:00 2001 From: Siddhant Sadangi Date: Wed, 5 Jun 2024 12:42:03 +0200 Subject: [PATCH 2/3] Fix namespace --- src/kedro_neptune/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kedro_neptune/__init__.py b/src/kedro_neptune/__init__.py index e5d288a..4566045 100644 --- a/src/kedro_neptune/__init__.py +++ b/src/kedro_neptune/__init__.py @@ -489,7 +489,7 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal run = catalog.load("neptune_run") - run["status/currently_running_node"] = node.short_name + run["status/currently_running"] = node.short_name current_namespace = run[f"nodes/{node.short_name}"] From 34c7914753e9e15d325d38c243c5102d5d3a78a5 Mon Sep 17 00:00:00 2001 From: Siddhant Sadangi Date: Thu, 6 Jun 2024 12:30:30 +0200 Subject: [PATCH 3/3] * Removed status and added running log * Added pipeline execution order * Added stringify_unsupported --- CHANGELOG.md | 2 +- src/kedro_neptune/__init__.py | 10 ++++++---- tests/kedro_neptune/utils/run_utils.py | 6 ++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca8110c..012e95c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## [UNRELEASED] 0.5.0 ### Features -- Added node status tracking ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82)) +- Added pipeline running log and execution order ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82)) ## 0.4.0 diff --git a/src/kedro_neptune/__init__.py b/src/kedro_neptune/__init__.py index 4566045..bea4ad3 100644 --- a/src/kedro_neptune/__init__.py +++ b/src/kedro_neptune/__init__.py @@ -428,6 +428,8 @@ def log_pipeline_metadata(namespace: Handler, pipeline: Pipeline): ) ) + namespace["execution_order"] = pipeline.describe() + def log_run_params(namespace: Handler, run_params: Dict[str, Any]): namespace["run_params"] = stringify_unsupported(run_params) @@ -489,7 +491,7 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal run = catalog.load("neptune_run") - run["status/currently_running"] = node.short_name + run["log"].append(f"Running {node.short_name}") current_namespace = run[f"nodes/{node.short_name}"] @@ -500,7 +502,7 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal for input_name, input_value in inputs.items(): if input_name.startswith("params:"): - current_namespace[f'parameters/{input_name[len("params:"):]}'] = input_value + current_namespace[f'parameters/{input_name[len("params:"):]}'] = stringify_unsupported(input_value) self._node_execution_timers[node.short_name] = time.time() @@ -515,8 +517,7 @@ def after_node_run(self, node: Node, catalog: DataCatalog, outputs: Dict[str, An run = catalog.load("neptune_run") - run["status/last_run"] = node.short_name - run["status/currently_running"] = "None" + run["log"].append(f"Finished {node.short_name}") current_namespace = run[f"nodes/{node.short_name}"] current_namespace["execution_time"] = execution_time @@ -541,6 +542,7 @@ def after_pipeline_run(self, catalog: DataCatalog) -> None: run = catalog.load("neptune_run") log_data_catalog_metadata(namespace=run, catalog=catalog) + run["log"].append("Finished pipeline") run.container.sync() diff --git a/tests/kedro_neptune/utils/run_utils.py b/tests/kedro_neptune/utils/run_utils.py index 1f39e23..c04691b 100644 --- a/tests/kedro_neptune/utils/run_utils.py +++ b/tests/kedro_neptune/utils/run_utils.py @@ -47,6 +47,8 @@ def assert_structure(travel_speed: int = 10000): assert run.exists("kedro/kedro_command") assert run.exists("kedro/run_params") assert run.exists("kedro/structure") + assert run.exists("kedro/execution_order") + assert run.exists("kedro/log") # Data catalog assert run.exists("kedro/catalog/datasets") @@ -126,10 +128,6 @@ def assert_structure(travel_speed: int = 10000): assert run.exists("kedro/nodes/travel_time/parameters/travel_speed") assert run["kedro/nodes/travel_time/parameters/travel_speed"].fetch() == travel_speed - # Status - assert run["kedro/status/currently_running"].fetch() == "None" - assert run["kedro/status/last_run"].fetch() == "travel_time" - # User defined data assert run.exists("furthest_planet") assert run.exists("furthest_planet/name")