From 72ad76d7faa28a97e57e473a91acdbcf1ebc8806 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 27 Feb 2024 10:39:49 -0700 Subject: [PATCH 01/10] Added `create_tasks` method. The previous `create_task` method wraps around this new method and preserves previous functionality. --- alchemiscale/storage/cypher.py | 23 +++ alchemiscale/storage/statestore.py | 194 +++++++++++++----- .../integration/storage/test_statestore.py | 77 +++---- 3 files changed, 208 insertions(+), 86 deletions(-) create mode 100644 alchemiscale/storage/cypher.py diff --git a/alchemiscale/storage/cypher.py b/alchemiscale/storage/cypher.py new file mode 100644 index 00000000..6ae5a005 --- /dev/null +++ b/alchemiscale/storage/cypher.py @@ -0,0 +1,23 @@ +from alchemiscale import ScopedKey +from typing import List, Optional + + +def cypher_list_from_scoped_keys(scoped_keys: List[Optional[ScopedKey]]) -> str: + """Generate a Cypher list structure from a list of ScopedKeys, ignoring NoneType entries. + + Parameters + ---------- + scoped_keys + List of ScopedKeys to generate the Cypher list + + Returns + ------- + str + Cypher list + """ + + data = [] + for scoped_key in scoped_keys: + if scoped_key: + data.append('"' + str(scoped_key) + '"') + return "[" + ", ".join(data) + "]" diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 25d8d3be..c9ab6374 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -14,7 +14,7 @@ import numpy as np import networkx as nx -from gufe import AlchemicalNetwork, Transformation, Settings +from gufe import AlchemicalNetwork, Transformation, NonTransformation, Settings from gufe.tokenization import GufeTokenizable, GufeKey, JSON_HANDLER from neo4j import Transaction, GraphDatabase, Driver @@ -29,6 +29,7 @@ ) from ..strategies import Strategy from ..models import Scope, ScopedKey +from .cypher import cypher_list_from_scoped_keys from ..security.models import CredentialedEntity from ..settings import Neo4jStoreSettings @@ -1531,12 +1532,43 @@ def claim_taskhub_tasks( ## tasks - def create_task( + def validate_extends_tasks(self, task_list): + + if not task_list: + return [] + + q = f""" + UNWIND {cypher_list_from_scoped_keys(task_list)} as task + MATCH (t:Task {{`_scoped_key`: task}}) + return t + """ + + results = self.execute_query(q) + + nodes = {} + + for record in results.records: + node = record_data_to_node(record["t"]) + + status = node.get("status") + + if status in ("invalid", "deleted"): + # py2neo Node doesn't like the neo4j datetime object + # manually cast since we're raising anyways + # and the results are ephemeral + node["datetime_created"] = str(node["datetime_created"]) + raise ValueError(f"Cannot extend a `deleted` or `invalid` Task: {node}") + + nodes[node["_scoped_key"]] = node + + return nodes + + def create_tasks( self, - transformation: ScopedKey, - extends: Optional[ScopedKey] = None, + transformations: List[ScopedKey], + extends: Optional[List[Optional[ScopedKey]]] = None, creator: Optional[str] = None, - ) -> ScopedKey: + ) -> List[ScopedKey]: """Add a compute Task to a Transformation. Note: this creates a compute Task, but does not add it to any TaskHubs. @@ -1553,66 +1585,130 @@ def create_task( `extends` input for the Task's eventual call to `Protocol.create`. """ - if transformation.qualname not in ["Transformation", "NonTransformation"]: + + allowed_types = [Transformation.__qualname__, NonTransformation.__qualname__] + + # reshape data to a standard form + if extends is None: + extends = [None] * len(transformations) + elif len(extends) != len(transformations): raise ValueError( - "`transformation` ScopedKey does not correspond to a `Transformation`" + "`extends` must either be `None` or have the same length as `transformations`" ) - if extends is not None and extends.qualname != "Task": - raise ValueError("`extends` ScopedKey does not correspond to a `Task`") - - scope = transformation.scope - transformation_node = self._get_node(transformation) + for i, _extends in enumerate(extends): + if _extends is not None: + if not ( + extended_task_qualname := _extends.__getattribute__("qualname") + ): + raise ValueError( + f"`extends` entry for `Task` {transformations[i]} is not valid" + ) + if extended_task_qualname != "Task": + raise ValueError( + f"`extends` ScopedKey ({_extends}) does not correspond to a `Task`" + ) - # create a new task for the supplied transformation - # use a PERFORMS relationship - task = Task( - creator=creator, extends=str(extends) if extends is not None else None - ) + transformation_map = { + transformation_type: [[], []] for transformation_type in allowed_types + } + for i, transformation in enumerate(transformations): + if transformation.qualname not in allowed_types: + raise ValueError( + f"Got an unsupported `Task` type: {transformation.qualname}" + ) + transformation_map[transformation.qualname][0].append(transformation) + transformation_map[transformation.qualname][1].append(extends[i]) - _, task_node, scoped_key = self._gufe_to_subgraph( - task.to_shallow_dict(), - labels=["GufeTokenizable", task.__class__.__name__], - gufe_key=task.key, - scope=scope, + extends_nodes = self.validate_extends_tasks( + [_extends for _extends in extends if _extends is not None] ) subgraph = Subgraph() - if extends is not None: - previous_task_node = self._get_node(extends) - stat = previous_task_node.get("status") - # do not allow creation of a task that extends an invalid or deleted task. - if (stat == "invalid") or (stat == "deleted"): - # py2neo Node doesn't like the neo4j datetime object - # manually cast since we're raising anyways - # and the results are ephemeral - previous_task_node["datetime_created"] = str( - previous_task_node["datetime_created"] + for node_type, ( + transformation_subset, + extends_subset, + ) in transformation_map.items(): + + if not transformation_subset: + continue + + q = f""" + UNWIND {cypher_list_from_scoped_keys(transformation_subset)} as sk + MATCH (n:{node_type} {{`_scoped_key`: sk}}) + RETURN n + """ + + results = self.execute_query(q) + + transformation_nodes = {} + for record in results.records: + node = record_data_to_node(record["n"]) + transformation_nodes[node["_scoped_key"]] = node + + tasks = [ + Task( + creator=creator, + extends=str(_extends) if _extends is not None else None, ) - raise ValueError( - f"Cannot extend a `deleted` or `invalid` Task: {previous_task_node}" + for _extends in extends_subset + ] + + sks = [] + + for _transformation, _extends in zip(transformation_subset, extends_subset): + + scope = transformation.scope + + _task = Task( + creator=creator, + extends=str(_extends) if _extends is not None else None, + ) + _, task_node, scoped_key = self._gufe_to_subgraph( + _task.to_shallow_dict(), + labels=["GufeTokenizable", _task.__class__.__name__], + gufe_key=_task.key, + scope=scope, ) - subgraph = subgraph | Relationship.type("EXTENDS")( - task_node, - previous_task_node, - _org=scope.org, - _campaign=scope.campaign, - _project=scope.project, - ) - subgraph = subgraph | Relationship.type("PERFORMS")( - task_node, - transformation_node, - _org=scope.org, - _campaign=scope.campaign, - _project=scope.project, - ) + sks.append(scoped_key) + + if _extends is not None: + subgraph |= Relationship.type("EXTENDS")( + task_node, + extends_nodes[str(_extends)], + _org=scope.org, + _campaign=scope.campaign, + _project=scope.project, + ) + + subgraph |= Relationship.type("PERFORMS")( + task_node, + transformation_nodes[str(_transformation)], + _org=scope.org, + _campaign=scope.campaign, + _project=scope.project, + ) with self.transaction() as tx: merge_subgraph(tx, subgraph, "GufeTokenizable", "_scoped_key") - return scoped_key + return sks + + def create_task( + self, + transformation: ScopedKey, + extends: Optional[ScopedKey] = None, + creator: Optional[str] = None, + ) -> ScopedKey: + """Create a single `Task` from a `Transformation`. + + This method wrap around the more general `create_tasks` method. + """ + return self.create_tasks( + [transformation], extends=[extends] if extends is not None else [None] + )[0] def query_tasks(self, *, status=None, key=None, scope: Scope = Scope()): """Query for `Task`\s matching given attributes.""" diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 23d6aa7c..96c372bf 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -487,7 +487,6 @@ def test_create_task(self, n4js, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) task_sk: ScopedKey = n4js.create_task(transformation_sk) - q = f"""match (n:Task {{_gufe_key: '{task_sk.gufe_key}', _org: '{task_sk.org}', _campaign: '{task_sk.campaign}', _project: '{task_sk.project}'}})-[:PERFORMS]->(m:Transformation) @@ -497,6 +496,17 @@ def test_create_task(self, n4js, network_tyk2, scope_test): assert m["_gufe_key"] == transformation.key + N = 100 + task_sks = n4js.create_tasks([transformation_sk] * N) + + assert len(task_sks) == N + + # extend all of these tasks + task_sks = n4js.create_tasks([transformation_sk] * N, task_sks) + + # TODO: expand these tests + assert len(task_sks) == N + def test_create_task_extends_invalid_deleted(self, n4js, network_tyk2, scope_test): # add alchemical network, then try generating task an = network_tyk2 @@ -528,8 +538,7 @@ def test_query_tasks(self, n4js, network_tyk2, scope_test, multiple_scopes): tf_sks = n4js.query_transformations(scope=scope_test) - for tf_sk in tf_sks[:10]: - [n4js.create_task(tf_sk) for i in range(3)] + n4js.create_tasks([tf_sk for tf_sk in tf_sks[:10]] * 3) task_sks = n4js.query_tasks() assert len(task_sks) == 10 * 3 @@ -562,7 +571,7 @@ def test_get_network_tasks(self, n4js, network_tyk2, scope_test): task_sks = [] for tf_sk in tf_sks[:10]: - task_sks.extend([n4js.create_task(tf_sk) for i in range(3)]) + task_sks.extend(n4js.create_tasks([tf_sk] * 3)) task_sks_network = n4js.get_network_tasks(an_sk) assert set(task_sks_network) == set(task_sks) @@ -587,9 +596,7 @@ def test_get_task_networks(self, n4js, network_tyk2, scope_test): an_sk = n4js.query_networks(scope=scope_test)[0] tf_sks = n4js.get_network_transformations(an_sk) - task_sks = [] - for tf_sk in tf_sks[:10]: - task_sks.extend([n4js.create_task(tf_sk) for i in range(3)]) + task_sks = n4js.create_tasks([tf_sk for tf_sk in tf_sks[:10]] * 3) for task_sk in task_sks: an_sks = n4js.get_task_networks(task_sk) @@ -695,7 +702,7 @@ def test_set_task_priority(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) base_case = n4js.get_task_priority(task_sks) assert [10, 10, 10] == base_case @@ -716,7 +723,7 @@ def test_set_task_priority_returned_keys(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) updated = n4js.set_task_priority(task_sks, 1) assert updated == task_sks @@ -735,7 +742,7 @@ def test_set_task_priority_missing_task(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) task_sks_with_fake = task_sks + [ ScopedKey.from_str("Task-FAKE-test_org-test_campaign-test_project") ] @@ -751,7 +758,7 @@ def test_set_task_priority_out_of_bounds(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) msg = "priority must be between" @@ -771,7 +778,7 @@ def test_get_task_priority(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) result = n4js.get_task_priority(task_sks) assert result == [10, 10, 10] @@ -784,7 +791,7 @@ def test_get_task_priority_missing_task(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) task_sks_with_fake = task_sks + [ ScopedKey.from_str("Task-FAKE-test_org-test_campaign-test_project") ] @@ -879,7 +886,7 @@ def test_get_taskhub_actioned_tasks( transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - task_sks = [n4js.create_task(transformation_sk) for i in range(5)] + task_sks = n4js.create_tasks([transformation_sk] * 5) # do not action the tasks yet; should get back nothing actioned_tasks = n4js.get_taskhub_actioned_tasks(taskhub_sk) @@ -960,7 +967,7 @@ def test_action_task(self, n4js: Neo4jStore, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # action the tasks n4js.action_tasks(task_sks, taskhub_sk) @@ -994,8 +1001,8 @@ def test_action_task_other_statuses( transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) - # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(6)] + # create 6 tasks + task_sks = n4js.create_tasks([transformation_sk] * 6) # set all but first task to running n4js.set_task_running(task_sks[1:]) @@ -1049,7 +1056,7 @@ def test_get_unclaimed_tasks( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # action the tasks n4js.action_tasks(task_sks, taskhub_sk) @@ -1075,7 +1082,7 @@ def test_get_set_weights(self, n4js: Neo4jStore, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) n4js.action_tasks(task_sks, taskhub_sk) # weights should all be the default 0.5 @@ -1096,7 +1103,7 @@ def test_cancel_task(self, n4js, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # action the tasks actioned = n4js.action_tasks(task_sks, taskhub_sk) @@ -1126,7 +1133,7 @@ def test_get_taskhub_tasks(self, n4js, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # action the tasks actioned = n4js.action_tasks(task_sks, taskhub_sk) @@ -1152,7 +1159,7 @@ def test_claim_taskhub_tasks(self, n4js: Neo4jStore, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # shuffle the tasks; want to check that order of claiming is unrelated # to order created @@ -1381,7 +1388,7 @@ def test_claim_task_byweight(self, n4js: Neo4jStore, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # action the tasks n4js.action_tasks(task_sks, taskhub_sk) @@ -1414,9 +1421,7 @@ def test_get_scope_status(self, n4js: Neo4jStore, network_tyk2, scope_test): tf_sks = n4js.get_network_transformations(an_sk) - task_sks = [] - for tf_sk in tf_sks: - task_sks.append(n4js.create_task(tf_sk)) + task_sks = n4js.create_tasks(tf_sks) # try all scopes first status = n4js.get_scope_status(Scope()) @@ -1447,9 +1452,7 @@ def test_get_network_status(self, n4js: Neo4jStore, network_tyk2, scope_test): tf_sks = n4js.get_network_transformations(an_sk) - task_sks = [] - for tf_sk in tf_sks: - task_sks.append(n4js.create_task(tf_sk)) + task_sks = n4js.create_tasks(tf_sks) status = n4js.get_network_status(an_sk) assert len(status) == 1 @@ -1473,7 +1476,7 @@ def test_get_transformation_status( task_sks = [] for tf_sk in tf_sks: - task_sks.append([n4js.create_task(tf_sk) for i in range(3)]) + task_sks.append(n4js.create_tasks([tf_sk] * 3)) status = n4js.get_transformation_status(tf_sk) assert status == {"waiting": 3} @@ -1937,7 +1940,7 @@ def test_set_task_status_from_waiting( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) if not allowed: with pytest.raises(ValueError, match="Cannot set task"): @@ -1984,7 +1987,7 @@ def test_set_task_status_from_running( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # set all the tasks to running n4js.set_task_running(task_sks) @@ -2035,7 +2038,7 @@ def test_set_task_status_from_complete( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # set all the tasks to running n4js.set_task_running(task_sks) @@ -2089,7 +2092,7 @@ def test_set_task_status_from_error( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # set all the tasks to running n4js.set_task_running(task_sks) @@ -2151,7 +2154,7 @@ def test_set_task_status_from_terminals( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(10)] + task_sks = n4js.create_tasks([transformation_sk] * 10) # move it to one of the terminal statuses neo4j_terminal_op(task_sks) @@ -2186,7 +2189,7 @@ def test_set_task_status_removes_actions_relationship( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 3 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(3)] + task_sks = n4js.create_tasks([transformation_sk] * 3) n4js.action_tasks(task_sks, taskhub_sk) @@ -2349,7 +2352,7 @@ def test_get_task_status( transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 6 tasks - task_sks = [n4js.create_task(transformation_sk) for i in range(6)] + task_sks = n4js.create_tasks([transformation_sk] * 6) # task 0 will remain waiting From 0c9b6c69d5cc372db7e283ba1df8d5573e7608f5 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Wed, 28 Feb 2024 11:41:27 -0700 Subject: [PATCH 02/10] Fixed ScopedKey list location --- alchemiscale/storage/statestore.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index c9ab6374..20b7bc24 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1626,6 +1626,8 @@ def create_tasks( subgraph = Subgraph() + sks = [] + # iterate over all allowed types, unpacking the transformations and extends subsets for node_type, ( transformation_subset, extends_subset, @@ -1647,16 +1649,6 @@ def create_tasks( node = record_data_to_node(record["n"]) transformation_nodes[node["_scoped_key"]] = node - tasks = [ - Task( - creator=creator, - extends=str(_extends) if _extends is not None else None, - ) - for _extends in extends_subset - ] - - sks = [] - for _transformation, _extends in zip(transformation_subset, extends_subset): scope = transformation.scope From 21136b332f24f4c04823a23a313706af30a76a5c Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Wed, 28 Feb 2024 19:25:18 -0700 Subject: [PATCH 03/10] Fixed test --- alchemiscale/tests/integration/storage/test_statestore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 3ea106b2..0f62d674 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -1159,7 +1159,8 @@ def test_claim_taskhub_tasks(self, n4js: Neo4jStore, network_tyk2, scope_test): transformation_sk = n4js.get_scoped_key(transformation, scope_test) # create 10 tasks - task_sks = n4js.create_tasks([transformation_sk] * 10) + N = 10 + task_sks = n4js.create_tasks([transformation_sk] * N) # shuffle the tasks; want to check that order of claiming is unrelated # to order created From 4d994ebc82959fbc54a7dc2efe8f32318027152c Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 29 Feb 2024 10:14:11 -0700 Subject: [PATCH 04/10] Updated docstring --- alchemiscale/storage/statestore.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 5a130a3d..78133db6 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1581,21 +1581,20 @@ def create_tasks( extends: Optional[List[Optional[ScopedKey]]] = None, creator: Optional[str] = None, ) -> List[ScopedKey]: - """Add a compute Task to a Transformation. + """Add compute Tasks to a provided Transformations. - Note: this creates a compute Task, but does not add it to any TaskHubs. + Note: this creates compute Tasks, but does not add them to any TaskHubs. Parameters ---------- - transformation - The Transformation to compute. - scope - The scope the Transformation is in; ignored if `transformation` is a ScopedKey. + transformations + The Transformations to compute. extends - The ScopedKey of the Task to use as a starting point for this Task. + The ScopedKeys of the Tasks to use as a starting point for the Tasks. Will use the `ProtocolDAGResult` from the given Task as the `extends` input for the Task's eventual call to `Protocol.create`. - + creator (optional) + The creator of the Tasks """ allowed_types = [Transformation.__qualname__, NonTransformation.__qualname__] From 0dae50229461023c6429a25b051426c7c0451977 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Tue, 5 Mar 2024 10:30:13 -0700 Subject: [PATCH 05/10] Expanded `test_create_task` for extensions --- .../integration/storage/test_statestore.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 0f62d674..4a24ab39 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -10,6 +10,7 @@ from gufe.protocols.protocoldag import execute_DAG, ProtocolDAG, ProtocolDAGResult from alchemiscale.storage.statestore import Neo4jStore +from alchemiscale.storage.cypher import cypher_list_from_scoped_keys from alchemiscale.storage.models import ( Task, TaskHub, @@ -502,10 +503,24 @@ def test_create_task(self, n4js, network_tyk2, scope_test): assert len(task_sks) == N # extend all of these tasks - task_sks = n4js.create_tasks([transformation_sk] * N, task_sks) + child_task_sks = n4js.create_tasks([transformation_sk] * N, task_sks) - # TODO: expand these tests - assert len(task_sks) == N + assert len(child_task_sks) == N + + q = f""" + UNWIND {cypher_list_from_scoped_keys(child_task_sks)} AS task_sk + MATCH (n:Task)<-[:EXTENDS]-(m:Task {{`_scoped_key`: task_sk}}) + RETURN n, m + """ + results = n4js.execute_query(q) + + assert len(results.records) == N + + for record in results.records: + # n is a parent Task, m is a child Task + n, m = record["n"], record["m"] + assert ScopedKey.from_str(n["_scoped_key"]) in task_sks + assert ScopedKey.from_str(m["_scoped_key"]) in child_task_sks def test_create_task_extends_invalid_deleted(self, n4js, network_tyk2, scope_test): # add alchemical network, then try generating task From 155099429f68da5d4bcc8225b34df8a73d8d9559 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Wed, 13 Mar 2024 17:55:59 -0700 Subject: [PATCH 06/10] Updated user facing task creation functions - Added `AlchemiscaleClient.create_bulk_tasks` - Added `api.create_bulk_tasks` - Updated `AlchemiscaleClient.create_tasks` - Added tests for `AlchemiscaleClient.create_bulk_tasks` --- alchemiscale/interface/api.py | 33 ++++++-- alchemiscale/interface/client.py | 19 +++++ .../interface/client/test_client.py | 76 +++++++++++++++++++ 3 files changed, 123 insertions(+), 5 deletions(-) diff --git a/alchemiscale/interface/api.py b/alchemiscale/interface/api.py index 704e4d67..be7a0b79 100644 --- a/alchemiscale/interface/api.py +++ b/alchemiscale/interface/api.py @@ -353,11 +353,34 @@ def create_tasks( sk = ScopedKey.from_str(transformation_scoped_key) validate_scopes(sk.scope, token) - task_sks = [] - for i in range(count): - task_sks.append( - n4js.create_task(transformation=sk, extends=extends, creator=token.entity) - ) + task_sks = n4js.create_tasks([sk] * count, [extends] * count) + return [str(sk) for sk in task_sks] + + +@router.post("/bulk/transformations/tasks/create") +def create_bulk_tasks( + *, + transformations: List[str] = Body(embed=True), + extends: Optional[List[Optional[str]]] = None, + n4js: Neo4jStore = Depends(get_n4js_depends), + token: TokenData = Depends(get_token_data_depends), +): + transformation_sks = [ + ScopedKey.from_str(transformation_string) + for transformation_string in transformations + ] + + for transformation_sk in transformation_sks: + validate_scopes(transformation_sk.scope, token) + + if extends is not None: + extends = [ + None if not extends_str else ScopedKey.from_str(extends_str) + for extends_str in extends + ] + + # TODO: raise Bad Request for ValueErrors raised + task_sks = n4js.create_tasks(transformation_sks, extends) return [str(sk) for sk in task_sks] diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 685f51b1..c15264bd 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -469,6 +469,25 @@ def create_tasks( task_sks = self._post_resource(f"/transformations/{transformation}/tasks", data) return [ScopedKey.from_str(i) for i in task_sks] + def create_bulk_tasks( + self, + transformations: List[ScopedKey], + extends: Optional[List[Optional[ScopedKey]]] = None, + ) -> List[ScopedKey]: + + data = dict( + transformations=[str(transformation) for transformation in transformations], + extends=( + None + if not extends + else [ + str(task_sk) if task_sk is not None else None for task_sk in extends + ] + ), + ) + task_sks = self._post_resource("/bulk/transformations/tasks/create", data) + return [ScopedKey.from_str(i) for i in task_sks] + def query_tasks( self, scope: Optional[Scope] = None, diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 6dad2c7f..12e5b840 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -10,6 +10,7 @@ from alchemiscale.models import ScopedKey, Scope from alchemiscale.storage.models import TaskStatusEnum +from alchemiscale.storage.cypher import cypher_list_from_scoped_keys from alchemiscale.interface import client from alchemiscale.utils import RegistryBackup from alchemiscale.tests.integration.interface.utils import ( @@ -363,6 +364,81 @@ def test_create_tasks( ) assert set() == set(n4js.get_transformation_tasks(sk, extends=task_sks[1])) + def test_create_bulk_tasks( + self, + scope_test, + n4js_preloaded, + user_client: client.AlchemiscaleClient, + network_tyk2, + ): + n4js = n4js_preloaded + + an = network_tyk2 + transformations = list(an.edges)[:5] + + transformation_sks = [ + user_client.get_scoped_key(transformation, scope_test) + for transformation in transformations + ] + + # create three copies tasks per transformation + task_sks = user_client.create_bulk_tasks(transformation_sks * 3) + + all_tasks = set() + extends_list = [] + for transformation_sk in transformation_sks: + transformation_tasks = n4js.get_transformation_tasks(transformation_sk) + # there should be three tasks for each transformation + assert len(transformation_tasks) == 3 + all_tasks |= set(transformation_tasks) + + # we will want to test extensions, hold on to some tasks + extends_list.append(transformation_tasks[0]) + + assert set(task_sks) == all_tasks + + # create a new set of tasks + extends_tasks = user_client.create_bulk_tasks( + transformation_sks, extends=extends_list + ) + + # should still have 5 + assert len(extends_tasks) == 5 + + # get all of the original tasks, given our extension tasks + q = f"""UNWIND {cypher_list_from_scoped_keys(extends_tasks)} AS e_task + MATCH (Task {{`_scoped_key`: e_task}})-[:EXTENDS]->(original_task:Task) + RETURN original_task._scoped_key AS original_task + """ + results = n4js.execute_query(q) + + assert len(results.records) == 5 + + # check that we extended the correct tasks + for record in results.records: + original = ScopedKey.from_str(record["original_task"]) + assert original in extends_list + + # make sure the first transformation_sk isn't extending an already existing task + extends_list[0] = None + + # confirm we still have 5 entries + assert len(extends_list) == 5 + + extends_tasks = user_client.create_bulk_tasks( + transformation_sks, extends=extends_list + ) + + q = f"""UNWIND {cypher_list_from_scoped_keys(extends_tasks)} AS e_task + MATCH (Task {{`_scoped_key`: e_task}})-[:EXTENDS]->(original_task:Task) + RETURN original_task._scoped_key AS original_task + """ + results = n4js.execute_query(q) + + # we should only have 4 original tasks even though we + # created 5 new tasks + assert len(results.records) == 4 and len(extends_tasks) == 5 + def test_query_tasks( self, scope_test, From 08be3336483973f002d1177bf6fa972a77967ce3 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Wed, 13 Mar 2024 22:45:20 -0700 Subject: [PATCH 07/10] Modifications from @dotsdl in review --- alchemiscale/interface/api.py | 8 +++-- alchemiscale/interface/client.py | 2 +- alchemiscale/storage/statestore.py | 32 +++++++++++-------- .../interface/client/test_client.py | 15 +++++---- .../integration/storage/test_statestore.py | 9 ++++-- 5 files changed, 40 insertions(+), 26 deletions(-) diff --git a/alchemiscale/interface/api.py b/alchemiscale/interface/api.py index be7a0b79..65d11fa1 100644 --- a/alchemiscale/interface/api.py +++ b/alchemiscale/interface/api.py @@ -358,7 +358,7 @@ def create_tasks( @router.post("/bulk/transformations/tasks/create") -def create_bulk_tasks( +def create_transformations_tasks( *, transformations: List[str] = Body(embed=True), extends: Optional[List[Optional[str]]] = None, @@ -379,8 +379,10 @@ def create_bulk_tasks( for extends_str in extends ] - # TODO: raise Bad Request for ValueErrors raised - task_sks = n4js.create_tasks(transformation_sks, extends) + try: + task_sks = n4js.create_tasks(transformation_sks, extends) + except ValueError as e: + raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(e)) return [str(sk) for sk in task_sks] diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index c15264bd..0cd548b1 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -469,7 +469,7 @@ def create_tasks( task_sks = self._post_resource(f"/transformations/{transformation}/tasks", data) return [ScopedKey.from_str(i) for i in task_sks] - def create_bulk_tasks( + def create_transformations_tasks( self, transformations: List[ScopedKey], extends: Optional[List[Optional[ScopedKey]]] = None, diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 78133db6..40ca20dd 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1581,22 +1581,26 @@ def create_tasks( extends: Optional[List[Optional[ScopedKey]]] = None, creator: Optional[str] = None, ) -> List[ScopedKey]: - """Add compute Tasks to a provided Transformations. + """Create Tasks for the given Transformations. - Note: this creates compute Tasks, but does not add them to any TaskHubs. + Note: this creates Tasks; it does not action them. Parameters ---------- transformations - The Transformations to compute. + The Transformations to create Tasks for. + One Task is created for each Transformation ScopedKey given; to + create multiple Tasks for a given Transformation, provide its + ScopedKey multiple times. extends - The ScopedKeys of the Tasks to use as a starting point for the Tasks. + The ScopedKeys of the Tasks to use as a starting point for the + created Tasks, in the same order as `transformations`. If ``None`` + given for a given Task, it will not extend any other Task. Will use the `ProtocolDAGResult` from the given Task as the `extends` input for the Task's eventual call to `Protocol.create`. creator (optional) - The creator of the Tasks + The creator of the Tasks. """ - allowed_types = [Transformation.__qualname__, NonTransformation.__qualname__] # reshape data to a standard form @@ -1609,9 +1613,7 @@ def create_tasks( for i, _extends in enumerate(extends): if _extends is not None: - if not ( - extended_task_qualname := _extends.__getattribute__("qualname") - ): + if not (extended_task_qualname := getattr(_extends, "qualname", None)): raise ValueError( f"`extends` entry for `Task` {transformations[i]} is not valid" ) @@ -1626,7 +1628,7 @@ def create_tasks( for i, transformation in enumerate(transformations): if transformation.qualname not in allowed_types: raise ValueError( - f"Got an unsupported `Task` type: {transformation.qualname}" + f"Got an unsupported `Transformation` type: {transformation.qualname}" ) transformation_map[transformation.qualname][0].append(transformation) transformation_map[transformation.qualname][1].append(extends[i]) @@ -1705,12 +1707,16 @@ def create_task( extends: Optional[ScopedKey] = None, creator: Optional[str] = None, ) -> ScopedKey: - """Create a single `Task` from a `Transformation`. + """Create a single Task for a Transformation. + + This is a convenience method that wraps around the more general + `create_tasks` method. - This method wrap around the more general `create_tasks` method. """ return self.create_tasks( - [transformation], extends=[extends] if extends is not None else [None] + [transformation], + extends=[extends] if extends is not None else [None], + creator=creator, )[0] def query_tasks(self, *, status=None, key=None, scope: Scope = Scope()): diff --git a/alchemiscale/tests/integration/interface/client/test_client.py b/alchemiscale/tests/integration/interface/client/test_client.py index 12e5b840..f235817e 100644 --- a/alchemiscale/tests/integration/interface/client/test_client.py +++ b/alchemiscale/tests/integration/interface/client/test_client.py @@ -364,7 +364,7 @@ def test_create_tasks( ) assert set() == set(n4js.get_transformation_tasks(sk, extends=task_sks[1])) - def test_create_bulk_tasks( + def test_create_transformations_tasks( self, scope_test, n4js_preloaded, @@ -382,7 +382,7 @@ def test_create_bulk_tasks( ] # create three copies tasks per transformation - task_sks = user_client.create_bulk_tasks(transformation_sks * 3) + task_sks = user_client.create_transformations_tasks(transformation_sks * 3) all_tasks = set() extends_list = [] @@ -398,7 +398,7 @@ def test_create_bulk_tasks( assert set(task_sks) == all_tasks # create a new set of tasks - extends_tasks = user_client.create_bulk_tasks( + extends_tasks = user_client.create_transformations_tasks( transformation_sks, extends=extends_list ) @@ -415,9 +415,10 @@ def test_create_bulk_tasks( assert len(results.records) == 5 # check that we extended the correct tasks - for record in results.records: - original = ScopedKey.from_str(record["original_task"]) - assert original in extends_list + originals = [ + ScopedKey.from_str(record["original_task"]) for record in results.records + ] + assert set(originals) == set(extends_list) # make sure the first transformation_sk isn't extending an already existing task extends_list[0] = None @@ -425,7 +426,7 @@ def test_create_bulk_tasks( # confirm we still have 5 entries assert len(extends_list) == 5 - extends_tasks = user_client.create_bulk_tasks( + extends_tasks = user_client.create_transformations_tasks( transformation_sks, extends=extends_list ) diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 4a24ab39..cd05bb03 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -519,8 +519,13 @@ def test_create_task(self, n4js, network_tyk2, scope_test): for record in results.records: # n is a parent Task, m is a child Task n, m = record["n"], record["m"] - assert ScopedKey.from_str(n["_scoped_key"]) in task_sks - assert ScopedKey.from_str(m["_scoped_key"]) in child_task_sks + + task_sk = ScopedKey.from_str(n["_scoped_key"]) + assert task_sk in task_sks + + child_task_sk = child_task_sks[task_sks.index(task_sk)] + + assert ScopedKey.from_str(m["_scoped_key"]) == child_task_sk def test_create_task_extends_invalid_deleted(self, n4js, network_tyk2, scope_test): # add alchemical network, then try generating task From f13c1735a2f8808092c63b692c9cce219513922f Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 14 Mar 2024 11:22:24 -0700 Subject: [PATCH 08/10] Ensure that Tasks only EXTENDS Tasks that PEFORMS the same Transformation --- alchemiscale/storage/statestore.py | 23 ++++++++++++++----- .../integration/storage/test_statestore.py | 17 ++++++++++++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 40ca20dd..bb08d1d2 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1544,15 +1544,15 @@ def task_count(task_dict: dict): ## tasks - def validate_extends_tasks(self, task_list): + def validate_extends_tasks(self, task_list) -> Dict[str, Tuple[Node, str]]: if not task_list: - return [] + return {} q = f""" UNWIND {cypher_list_from_scoped_keys(task_list)} as task - MATCH (t:Task {{`_scoped_key`: task}}) - return t + MATCH (t:Task {{`_scoped_key`: task}})-[PERFORMS]->(tf:Transformation) + return t, tf._scoped_key as tf_sk """ results = self.execute_query(q) @@ -1561,6 +1561,7 @@ def validate_extends_tasks(self, task_list): for record in results.records: node = record_data_to_node(record["t"]) + transformation_sk = record["tf_sk"] status = node.get("status") @@ -1571,7 +1572,7 @@ def validate_extends_tasks(self, task_list): node["datetime_created"] = str(node["datetime_created"]) raise ValueError(f"Cannot extend a `deleted` or `invalid` Task: {node}") - nodes[node["_scoped_key"]] = node + nodes[node["_scoped_key"]] = (node, transformation_sk) return nodes @@ -1680,9 +1681,19 @@ def create_tasks( sks.append(scoped_key) if _extends is not None: + + extends_task_node, extends_transformation_sk = extends_nodes[ + str(_extends) + ] + + if extends_transformation_sk != str(_transformation): + raise ValueError( + f"{_extends} extends a transformation other than {_transformation}" + ) + subgraph |= Relationship.type("EXTENDS")( task_node, - extends_nodes[str(_extends)], + extends_task_node, _org=scope.org, _campaign=scope.campaign, _project=scope.project, diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index cd05bb03..416abc45 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -487,9 +487,11 @@ def test_create_task(self, n4js, network_tyk2, scope_test): transformation = list(an.edges)[0] transformation_sk = n4js.get_scoped_key(transformation, scope_test) + # test the `n4js.create_task` method, which calls `n4js.create_tasks` + # for convenience task_sk: ScopedKey = n4js.create_task(transformation_sk) - q = f"""match (n:Task {{_gufe_key: '{task_sk.gufe_key}', - _org: '{task_sk.org}', _campaign: '{task_sk.campaign}', + q = f"""match (n:Task {{_gufe_key: '{task_sk.gufe_key}', + _org: '{task_sk.org}', _campaign: '{task_sk.campaign}', _project: '{task_sk.project}'}})-[:PERFORMS]->(m:Transformation) return m """ @@ -527,6 +529,17 @@ def test_create_task(self, n4js, network_tyk2, scope_test): assert ScopedKey.from_str(m["_scoped_key"]) == child_task_sk + incompatible_transformation_sk = n4js.get_scoped_key( + list(an.edges)[1], scope_test + ) + + with pytest.raises(ValueError): + incompatible_transformations = [transformation_sk] * len(child_task_sks) + incompatible_transformations[0] = incompatible_transformation_sk + # since the child tasks all PERFORM transformation_sk, the addition + # of incompatible_transformation_sk raises a ValueError + n4js.create_tasks(incompatible_transformations, child_task_sks) + def test_create_task_extends_invalid_deleted(self, n4js, network_tyk2, scope_test): # add alchemical network, then try generating task an = network_tyk2 From 68bae7bc242f91c439b3350d8e05f2a9236d27f5 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Fri, 15 Mar 2024 11:01:17 -0700 Subject: [PATCH 09/10] Added docstring and simplified error message - Docstring for `create_transformations_tasks` - Only report the Tasks ScopedKey in extends validation --- alchemiscale/interface/client.py | 40 ++++++++++++++++++++++++++++++ alchemiscale/storage/statestore.py | 9 +++---- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 0cd548b1..9b4e7a24 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -474,6 +474,46 @@ def create_transformations_tasks( transformations: List[ScopedKey], extends: Optional[List[Optional[ScopedKey]]] = None, ) -> List[ScopedKey]: + """Create Tasks for multiple Transformations. + + Unlike `create_tasks`, this method can create Tasks for many + Transformations. This method should be used instead of `create_tasks` + whenever creating Tasks for more than one unique Transformation since it + minimizes the number of API requests to the alchemiscale server. + + + Parameters + ---------- + transformations + A list of ScopedKeys of Transformations to create Tasks for. The + same ScopedKey can be repeated to create multiple Tasks for the + same Transformation. + extends + A list of ScopedKeys for the Tasks to be extended. When not `None`, + `extends` must be a list of the same length as `transformations`. If + a transformation in `transformations` should not extend a Task, use + a `None` as a placeholder in the `extends` list. + + Returns + ------- + List[ScopedKey] + A list giving the ScopedKeys of the new Tasks created. + + Examples + -------- + + Instead of looping over Transformations and calling `create_tasks`, make + one call to `create_transformations_tasks`. + + >>> client.create_transformations_tasks([transformation_1_sk, transformation_2_sk]) + + The behavior of the `count` keyword argument from `create_tasks` can be + recreated by repeating the same transformation in the list while also + allowing the addition of other transformtions. + + >>> client.create_transformations_tasks([transformation_1_sk] * 3 + [transformation_2_sk] * 2) + + """ data = dict( transformations=[str(transformation) for transformation in transformations], diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index bb08d1d2..2b1488dc 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1566,11 +1566,10 @@ def validate_extends_tasks(self, task_list) -> Dict[str, Tuple[Node, str]]: status = node.get("status") if status in ("invalid", "deleted"): - # py2neo Node doesn't like the neo4j datetime object - # manually cast since we're raising anyways - # and the results are ephemeral - node["datetime_created"] = str(node["datetime_created"]) - raise ValueError(f"Cannot extend a `deleted` or `invalid` Task: {node}") + invalid_task_scoped_key = node["_scoped_key"] + raise ValueError( + f"Cannot extend a `deleted` or `invalid` Task: {invalid_task_scoped_key}" + ) nodes[node["_scoped_key"]] = (node, transformation_sk) From 0c2920d303c649528f1d8c8a758dd8e5d4bb9642 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Fri, 15 Mar 2024 15:13:44 -0700 Subject: [PATCH 10/10] Small modifications from @dotsdl review --- alchemiscale/interface/client.py | 1 - alchemiscale/storage/statestore.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/alchemiscale/interface/client.py b/alchemiscale/interface/client.py index 9b4e7a24..88b3a9c6 100644 --- a/alchemiscale/interface/client.py +++ b/alchemiscale/interface/client.py @@ -481,7 +481,6 @@ def create_transformations_tasks( whenever creating Tasks for more than one unique Transformation since it minimizes the number of API requests to the alchemiscale server. - Parameters ---------- transformations diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 2b1488dc..c0190f1e 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1544,7 +1544,7 @@ def task_count(task_dict: dict): ## tasks - def validate_extends_tasks(self, task_list) -> Dict[str, Tuple[Node, str]]: + def _validate_extends_tasks(self, task_list) -> Dict[str, Tuple[Node, str]]: if not task_list: return {} @@ -1633,7 +1633,7 @@ def create_tasks( transformation_map[transformation.qualname][0].append(transformation) transformation_map[transformation.qualname][1].append(extends[i]) - extends_nodes = self.validate_extends_tasks( + extends_nodes = self._validate_extends_tasks( [_extends for _extends in extends if _extends is not None] ) @@ -1687,7 +1687,7 @@ def create_tasks( if extends_transformation_sk != str(_transformation): raise ValueError( - f"{_extends} extends a transformation other than {_transformation}" + f"{_extends} extends a Transformation other than {_transformation}" ) subgraph |= Relationship.type("EXTENDS")(