From b128e27e2fac7c030f7eb519dfb8a5f66a7076a7 Mon Sep 17 00:00:00 2001 From: Julian Minder Date: Tue, 26 Mar 2024 16:51:20 +0100 Subject: [PATCH] When creating subgraphs (CREATE) the commit is now also serialised across multiple workers. This is required because in specific instances it could happend that neo4j dropped some CREATES (only with relationships). Adds test to verify. Fixes #20. --- rel2graph/core/converter.py | 15 +++++-- rel2graph/neo4j/__init__.py | 2 +- .../resources/schema_concurrency.yaml | 8 ++++ tests/integration/test_concurrency.py | 44 +++++++++++++++++++ tests/integration/test_merge.py | 2 +- 5 files changed, 65 insertions(+), 6 deletions(-) create mode 100644 tests/integration/resources/schema_concurrency.yaml create mode 100644 tests/integration/test_concurrency.py diff --git a/rel2graph/core/converter.py b/rel2graph/core/converter.py index 518d3b2..3dd9bd3 100644 --- a/rel2graph/core/converter.py +++ b/rel2graph/core/converter.py @@ -103,13 +103,20 @@ def commit_batch(to_create: Subgraph, to_merge: Subgraph) -> None: nodes_committed = 0 relationships_committed = 0 - # Creating does not rely on synchronous executions - if len(to_create.nodes) + len(to_create.relationships) > 0: + # Creating nodes does not rely on serialized executions + # If there are relationships to create, we need to serialize the creation + # TODO: We could split the creation of nodes and relationships into two separate branches, might be more efficient + # but considering that in almost all cases no relationships are created in the first loop, it's not worth it + if len(to_create.relationships) > 0: + with __process_config.graph_lock: + with __process_config.graph_driver.session() as session: + commit_wrap(lambda: create(to_create, session)) + relationships_committed += len(to_create.relationships) + elif len(to_create.nodes) > 0: with __process_config.graph_driver.session() as session: commit_wrap(lambda: create(to_create, session)) nodes_committed += len(to_create.nodes) - relationships_committed += len(to_create.relationships) - + # Merging nodes requires serialization (synchronous executions) between processes # Using locks to enforce this if len(to_merge.nodes) + len(to_merge.relationships) > 0: diff --git a/rel2graph/neo4j/__init__.py b/rel2graph/neo4j/__init__.py index 9c485f4..8f703ec 100644 --- a/rel2graph/neo4j/__init__.py +++ b/rel2graph/neo4j/__init__.py @@ -45,7 +45,7 @@ def pull(graph: Subgraph, session: Session): graph (Subgraph): The graph to create. session (Session): The `session `_ to use. """ - session.execute_write(graph.__db_pull__) + session.execute_read(graph.__db_pull__) def match_nodes(session: Session, *labels: List[str], **properties: dict): diff --git a/tests/integration/resources/schema_concurrency.yaml b/tests/integration/resources/schema_concurrency.yaml new file mode 100644 index 0000000..855a03e --- /dev/null +++ b/tests/integration/resources/schema_concurrency.yaml @@ -0,0 +1,8 @@ +ENTITY("Entity"): + NODE("Test"): + + uid = Entity.uid + + +ENTITY("Relationship"): + RELATIONSHIP(MATCH("Test", uid=Relationship.to), "FROM", MATCH("Test", uid=Relationship.from)): + MERGE_RELATIONSHIPS(RELATIONSHIP(MATCH("Test", uid=Relationship.from), "TO", MATCH("Test", uid=Relationship.to))): diff --git a/tests/integration/test_concurrency.py b/tests/integration/test_concurrency.py new file mode 100644 index 0000000..8e8125b --- /dev/null +++ b/tests/integration/test_concurrency.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Integration tests for testing the synchronous conversion of relations. Related to issue #20. + +authors: Julian Minder +""" + +import pytest + +import pandas as pd +import logging +import numpy as np + +from rel2graph import Converter, IteratorIterator, register_attribute_postprocessor, Attribute, register_subgraph_preprocessor, GlobalSharedState, register_subgraph_postprocessor +from rel2graph.utils import load_file +from rel2graph.relational_modules.pandas import PandasDataFrameIterator +from rel2graph.neo4j import match_relationships, push, pull +from rel2graph.common_modules import MERGE_RELATIONSHIPS + +from helpers import * + + +# As this issue is related to multiple workers, we repeat the test multiple times +@pytest.mark.parametrize('execution_number', range(10)) +def test_concurrent_relationships(execution_number, session, uri, auth): + schema = load_file("tests/integration/resources/schema_concurrency.yaml") + + entities = pd.DataFrame({"uid": range(40)}) + + # 120 relations between 20 entities + relations = pd.DataFrame({"from": list(range(20))*6, "to": [i+20 for i in range(20) for _ in range(6)]}) + unique_pairs = len(relations.drop_duplicates()) + print(unique_pairs) + + iterator = IteratorIterator([PandasDataFrameIterator(entities, "Entity"), PandasDataFrameIterator(relations, "Relationship")]) + + converter = Converter(schema, iterator, uri, auth, num_workers=12, batch_size=10) + converter(skip_relationships=False) + + + assert num_relationships(session) == 120+unique_pairs + assert num_nodes(session) == 40 \ No newline at end of file diff --git a/tests/integration/test_merge.py b/tests/integration/test_merge.py index 1ec4f10..056004a 100644 --- a/tests/integration/test_merge.py +++ b/tests/integration/test_merge.py @@ -58,7 +58,7 @@ def test_standart_same_resource(config, session, uri, auth): @pytest.mark.parametrize("config",[(1,True), (1, False) ,(5, False)]) -def test_merge_nodesasdf(config, session, uri, auth): +def test_merge_nodes(config, session, uri, auth): schema = """ ENTITY("Entity"): NODE("Entity") node: