Skip to content

Commit

Permalink
When creating subgraphs (CREATE) the commit is now also serialised ac…
Browse files Browse the repository at this point in the history
…ross multiple workers. This is required because in specific instances it could happend that neo4j dropped some CREATES (only with relationships). Adds test to verify. Fixes #20.
  • Loading branch information
jkminder committed Mar 26, 2024
1 parent 4e00f67 commit b128e27
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 6 deletions.
15 changes: 11 additions & 4 deletions rel2graph/core/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,20 @@ def commit_batch(to_create: Subgraph, to_merge: Subgraph) -> None:
nodes_committed = 0
relationships_committed = 0

# Creating does not rely on synchronous executions
if len(to_create.nodes) + len(to_create.relationships) > 0:
# Creating nodes does not rely on serialized executions
# If there are relationships to create, we need to serialize the creation
# TODO: We could split the creation of nodes and relationships into two separate branches, might be more efficient
# but considering that in almost all cases no relationships are created in the first loop, it's not worth it
if len(to_create.relationships) > 0:
with __process_config.graph_lock:
with __process_config.graph_driver.session() as session:
commit_wrap(lambda: create(to_create, session))
relationships_committed += len(to_create.relationships)
elif len(to_create.nodes) > 0:
with __process_config.graph_driver.session() as session:
commit_wrap(lambda: create(to_create, session))
nodes_committed += len(to_create.nodes)
relationships_committed += len(to_create.relationships)


# Merging nodes requires serialization (synchronous executions) between processes
# Using locks to enforce this
if len(to_merge.nodes) + len(to_merge.relationships) > 0:
Expand Down
2 changes: 1 addition & 1 deletion rel2graph/neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def pull(graph: Subgraph, session: Session):
graph (Subgraph): The graph to create.
session (Session): The `session <https://neo4j.com/docs/api/python-driver/current/api.html#session>`_ to use.
"""
session.execute_write(graph.__db_pull__)
session.execute_read(graph.__db_pull__)


def match_nodes(session: Session, *labels: List[str], **properties: dict):
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/resources/schema_concurrency.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ENTITY("Entity"):
NODE("Test"):
+ uid = Entity.uid


ENTITY("Relationship"):
RELATIONSHIP(MATCH("Test", uid=Relationship.to), "FROM", MATCH("Test", uid=Relationship.from)):
MERGE_RELATIONSHIPS(RELATIONSHIP(MATCH("Test", uid=Relationship.from), "TO", MATCH("Test", uid=Relationship.to))):
44 changes: 44 additions & 0 deletions tests/integration/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Integration tests for testing the synchronous conversion of relations. Related to issue #20.
authors: Julian Minder
"""

import pytest

import pandas as pd
import logging
import numpy as np

from rel2graph import Converter, IteratorIterator, register_attribute_postprocessor, Attribute, register_subgraph_preprocessor, GlobalSharedState, register_subgraph_postprocessor
from rel2graph.utils import load_file
from rel2graph.relational_modules.pandas import PandasDataFrameIterator
from rel2graph.neo4j import match_relationships, push, pull
from rel2graph.common_modules import MERGE_RELATIONSHIPS

from helpers import *


# As this issue is related to multiple workers, we repeat the test multiple times
@pytest.mark.parametrize('execution_number', range(10))
def test_concurrent_relationships(execution_number, session, uri, auth):
schema = load_file("tests/integration/resources/schema_concurrency.yaml")

entities = pd.DataFrame({"uid": range(40)})

# 120 relations between 20 entities
relations = pd.DataFrame({"from": list(range(20))*6, "to": [i+20 for i in range(20) for _ in range(6)]})
unique_pairs = len(relations.drop_duplicates())
print(unique_pairs)

iterator = IteratorIterator([PandasDataFrameIterator(entities, "Entity"), PandasDataFrameIterator(relations, "Relationship")])

converter = Converter(schema, iterator, uri, auth, num_workers=12, batch_size=10)
converter(skip_relationships=False)


assert num_relationships(session) == 120+unique_pairs
assert num_nodes(session) == 40
2 changes: 1 addition & 1 deletion tests/integration/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_standart_same_resource(config, session, uri, auth):


@pytest.mark.parametrize("config",[(1,True), (1, False) ,(5, False)])
def test_merge_nodesasdf(config, session, uri, auth):
def test_merge_nodes(config, session, uri, auth):
schema = """
ENTITY("Entity"):
NODE("Entity") node:
Expand Down

0 comments on commit b128e27

Please sign in to comment.