Skip to content

Commit

Permalink
added tests for parallel relations
Browse files Browse the repository at this point in the history
  • Loading branch information
jkminder committed Jan 14, 2022
1 parent f329324 commit 7f8aa72
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 15 deletions.
4 changes: 0 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
# Todos
- clean up documentation
- update documentation for parallel relationships
- add wrapper to readme?
- write tests for parallel relations (probably requires an extension to mockgraph (or we just switch to testing if its hard (shouldnt be though)))
- query demos (wait for LB examples)
31 changes: 23 additions & 8 deletions tests/integration/mock_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,39 +48,54 @@ def __init__(self) -> None:
# Dummy variables to simulate graph
self.service = object() # Dummy service
self.name = "MockGraph"
self._allow_parallel_relations = False

@property
def allow_parallel_relations(self):
return self._allow_parallel_relations

@allow_parallel_relations.setter
def allow_parallel_relations(self, value):
self._allow_parallel_relations = value

def delete_all(self):
self.nodes = []
self.relations = []

def add_node(self, node):
def create_node(self, node):
node.identity = MockGraph.current_node_id
MockGraph.current_node_id += 1
with self.nodes_lock:
self.nodes.append(node)

def create_relation(self, relation):
relation.graph = self
with self.relations_lock:
self.relations.append(relation)

def _merge_relation(self, relation):
found = False
for rel in self.relations:
if relation.start_node.identity == rel.start_node.identity and \
relation.end_node.identity == rel.end_node.identity and \
relation.type == rel.type:
type(relation).__name__ == type(rel).__name__:
found = True
# update relation
for key in relation.keys():
rel[key] = relation[key]
if not found:
with self.relations_lock:
self.relations.append(relation)
relation.graph = self
self.create_relation(relation)

def create(self, subgraph):
for node in subgraph.nodes:
if node.identity is None:
self.add_node(node)
self.create_node(node)
for relation in subgraph.relationships:
if relation.graph is None:
self._merge_relation(relation)
if not self._allow_parallel_relations:
self._merge_relation(relation)
else:
self.create_relation(relation)

def merge(self, subgraph):
for relation in subgraph.relationships:
Expand All @@ -97,7 +112,7 @@ def merge(self, subgraph):
if old_node is None:
match = self.matcher.match(node.__primarylabel__, **{node.__primarykey__: node[node.__primarykey__]})
if len(match.all()) == 0:
self.add_node(node)
self.create_node(node)
continue
elif len(match.all()) > 1:
raise ValueError("Multiple nodes found to merge")
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/resources/data_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
"Renamed": p["LastName"], "Static": "staticstring"}),"likes",
(["Species", "BioEntity", p["FavoriteFlower"]], {"Name": p["FavoriteFlower"]}), {"Since":"4ever", "EntityAttribute": p["ID"]}) for p in no_duplicates[1].iloc]

likes_relations_parallel = [((["Person"], {"ID": "1", "FirstName": "Julian", "Renamed": "Minder", "Static": "staticstring"}),"likes_parallel",
(["Species", "BioEntity", "virginica"], {"Name": "virginica"}), {"pk": i}) for i in [1,2,3,4]]

likes_relations_parallel = [((["Person"], {"ID": "1", "FirstName": "Julian", "Renamed": "Minder", "Static": "staticstring"}),"likes_parallel",
(["Species", "BioEntity", "virginica"], {"Name": "virginica"}), {"pk": i}) for i in [1,2,3,4]]

likes_relations_merged = [((["Person"], {"ID": "1", "FirstName": "Julian", "Renamed": "Minder", "Static": "staticstring"}),"likes_merged",
(["Species", "BioEntity", "virginica"], {"Name": "virginica"}), {"pk": "1"})]

person_only_nodes_only_result = {
"nodes": person_nodes,
"relations": []
Expand All @@ -73,4 +82,11 @@
"nodes": person_nodes + species_nodes + flower_nodes,
"relations": is_relations + likes_relations
}

result_parallel = {
"nodes": person_nodes + species_nodes,
"relations": likes_relations_parallel + likes_relations_merged
}


######################
17 changes: 16 additions & 1 deletion tests/integration/resources/schema_end_to_end.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,19 @@ ENTITY("PersonStateRecovery"):
- Static = "staticstring"
stop_after_10(RELATION(person, "likes", MATCH("Species", Name=PersonStateRecovery.FavoriteFlower))):
- Since = "4ever"
- EntityAttribute = PersonStateRecovery.ID
- EntityAttribute = PersonStateRecovery.ID

ENTITY("FlowerParallel"):
NODE("Species", "BioEntity", FlowerParallel.species) species:
+ Name = Flower.species

ENTITY("PersonParallel"):
NODE("Person") person:
+ ID = PersonParallel.ID
- FirstName = PersonParallel.FirstName
- Renamed = PersonParallel.LastName
- Static = "staticstring"
RELATION(MATCH("Person", ID = "1"), "likes_parallel", MATCH("Species", Name="virginica")):
- pk = PersonParallel.ID
RELATION(MATCH("Person", ID = "1"), "likes_merged", MATCH("Species", Name="virginica")):
+ pk = "1"
26 changes: 24 additions & 2 deletions tests/integration/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from mock_graph import MockGraph
from helpers import compare, update_matcher, StateRecoveryException
from resources.data_end_to_end import no_duplicates, duplicates, before_update, person_only_nodes_only_result, schema_file_name
from resources.data_end_to_end import iris, flower_only_result, full_result
from resources.data_end_to_end import iris, flower_only_result, full_result, result_parallel
from py2neo import Graph
# Turn off reinstantiation warnings
Converter.no_instantiation_warnings = True
Expand All @@ -29,6 +29,12 @@
def graph():
return MockGraph()

@pytest.fixture
def graph_wpr():
graph = MockGraph()
graph.allow_parallel_relations = True
return graph

@pytest.mark.parametrize("workers",[1,5,20])
@pytest.mark.parametrize(
"data,result",
Expand Down Expand Up @@ -106,4 +112,20 @@ def test_state_recovery(graph, data_type_1, data_type_2, result, workers):
except StateRecoveryException:
pass
#compare
compare(graph, result)
compare(graph, result)

@pytest.mark.parametrize("workers",[1,5,20])
@pytest.mark.parametrize(
"data_type_1,data_type_2,result",
[(iris, no_duplicates, result_parallel)]
)
def test_parallel_relations(graph_wpr, data_type_1, data_type_2, result, workers):
iterator = IteratorIterator([
PandasDataFrameIterator(data_type_1[1], data_type_1[0]+"Parallel"),
PandasDataFrameIterator(data_type_2[1], data_type_2[0]+"Parallel")
])
converter = Converter(schema_file_name, iterator, graph_wpr, num_workers=workers)
update_matcher(graph_wpr) #REQUIRED to use mock matcher
converter()
#compare
compare(graph_wpr, result)

0 comments on commit 7f8aa72

Please sign in to comment.