Skip to content

Commit

Permalink
import_entities_from_triplestore
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Nov 18, 2024
1 parent 09553ce commit aa327b7
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 29 deletions.
127 changes: 114 additions & 13 deletions oc_ocdm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
# SOFTWARE.
from __future__ import annotations

import time
import json
import os
import time
from typing import TYPE_CHECKING
from zipfile import ZipFile

from rdflib import RDF, ConjunctiveGraph, Graph, URIRef
from SPARQLWrapper import JSON, SPARQLWrapper

from oc_ocdm.graph.graph_entity import GraphEntity
from oc_ocdm.support.reporter import Reporter
from oc_ocdm.support.support import build_graph_from_results
from rdflib import RDF, ConjunctiveGraph, Graph, URIRef
from SPARQLWrapper import JSON, POST, SPARQLWrapper

if TYPE_CHECKING:
from typing import Any, Dict, List, Optional, Set
from oc_ocdm.graph.graph_set import GraphSet
Expand Down Expand Up @@ -223,7 +223,7 @@ def import_entities_from_graph(g_set: GraphSet, results: List[Dict]|Graph, resp_

@staticmethod
def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, resp_agent: str,
enable_validation: bool = False) -> GraphEntity:
enable_validation: bool = False) -> GraphEntity:
query: str = f"SELECT ?s ?p ?o WHERE {{BIND (<{res}> AS ?s). ?s ?p ?o.}}"
attempt = 0
max_attempts = 3
Expand All @@ -237,22 +237,123 @@ def import_entity_from_triplestore(g_set: GraphSet, ts_url: str, res: URIRef, re
sparql.setReturnFormat(JSON)
result = sparql.queryAndConvert()['results']['bindings']

if result:
imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation)
if len(imported_entities) <= 0:
raise ValueError("The requested entity was not found or was not recognized as a proper OCDM entity.")
else:
return imported_entities[0]
if not result: # Se non ci sono risultati, l'entità non esiste
raise ValueError(f"The requested entity {res} was not found in the triplestore.")

imported_entities: List[GraphEntity] = Reader.import_entities_from_graph(g_set, result, resp_agent, enable_validation)
if len(imported_entities) <= 0:
raise ValueError("The requested entity was not recognized as a proper OCDM entity.")
return imported_entities[0]

except ValueError as ve: # Non facciamo retry per errori di validazione
raise ve
except Exception as e:
attempt += 1
if attempt < max_attempts:
print(f"[3] Attempt {attempt} failed. Could not import entity due to communication problems: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
wait_time *= 2 # Double the wait time for the next attempt
wait_time *= 2
else:
print(f"[3] All {max_attempts} attempts failed. Could not import entity due to communication problems: {e}")
raise

raise Exception("Max attempts reached. Failed to import entity from triplestore.")
raise Exception("Max attempts reached. Failed to import entity from triplestore.")

@staticmethod
def import_entities_from_triplestore(g_set: GraphSet, ts_url: str, entities: List[URIRef], resp_agent: str,
enable_validation: bool = False, batch_size: int = 1000) -> List[GraphEntity]:
"""
Import multiple entities from a triplestore in batches using a single SPARQL query per batch.
Args:
g_set: The GraphSet to import entities into
ts_url: The triplestore URL endpoint
entities: List of URIRef entities to import
resp_agent: The responsible agent string
enable_validation: Whether to validate the imported graphs
batch_size: Number of entities to import in each batch
Returns:
List of imported GraphEntity objects
"""
if not entities:
raise ValueError("No entities provided for import")

imported_entities: List[GraphEntity] = []
max_attempts = 3
wait_time = 5 # Initial wait time in seconds

# Process entities in batches
for i in range(0, len(entities), batch_size):
batch = entities[i:i + batch_size]
not_found_entities = set(str(entity) for entity in batch)

# Construct SPARQL query for batch using UNION pattern for Virtuoso
union_patterns = []
for entity in batch:
union_patterns.append(f"{{ BIND(<{str(entity)}> AS ?s) ?s ?p ?o }}")

query = f"""
SELECT ?s ?p ?o
WHERE {{
{' UNION '.join(union_patterns)}
}}
"""

# Execute query with retry logic
attempt = 0
while attempt < max_attempts:
try:
sparql: SPARQLWrapper = SPARQLWrapper(ts_url)
sparql.setQuery(query)
sparql.setMethod('GET')
sparql.setReturnFormat(JSON)
results = sparql.queryAndConvert()['results']['bindings']

if not results: # Se non ci sono risultati per questo batch
entities_str = ', '.join(not_found_entities)
raise ValueError(f"The requested entities were not found in the triplestore: {entities_str}")

# Teniamo traccia delle entità trovate
for result in results:
if 's' in result and 'value' in result['s']:
not_found_entities.discard(result['s']['value'])

# Import entities from results
try:
batch_entities = Reader.import_entities_from_graph(
g_set=g_set,
results=results,
resp_agent=resp_agent,
enable_validation=enable_validation
)
imported_entities.extend(batch_entities)

# Se alcune entità non sono state trovate, lo segnaliamo
if not_found_entities:
entities_str = ', '.join(not_found_entities)
raise ValueError(f"The following entities were not recognized as proper OCDM entities: {entities_str}")

break # Usciamo dal ciclo di retry se tutto è andato bene

except ValueError as ve: # Errori di validazione non richiedono retry
raise ve

except ValueError as ve: # Non facciamo retry per errori di validazione o entità non trovate
raise ve
except Exception as e:
attempt += 1
if attempt < max_attempts:
print(f"[3] Attempt {attempt} failed. Could not import batch due to communication problems: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
wait_time *= 2
else:
print(f"[3] All {max_attempts} attempts failed. Could not import batch due to communication problems: {e}")
raise

if not imported_entities:
raise ValueError("None of the requested entities were found or recognized as proper OCDM entities.")

return imported_entities
7 changes: 7 additions & 0 deletions oc_ocdm/test/reader/id.nt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<https://w3id.org/oc/meta/id/0605> <http://purl.org/spar/datacite/usesIdentifierScheme> <http://purl.org/spar/datacite/doi> .
<https://w3id.org/oc/meta/id/0605> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/datacite/Identifier> .
<https://w3id.org/oc/meta/id/0605> <http://purl.org/dc/terms/identifier> "10.1234/cancer.1999.0605" .

<https://w3id.org/oc/meta/id/0636064270> <http://purl.org/spar/datacite/usesIdentifierScheme> <http://purl.org/spar/datacite/doi> .
<https://w3id.org/oc/meta/id/0636064270> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/datacite/Identifier> .
<https://w3id.org/oc/meta/id/0636064270> <http://purl.org/dc/terms/identifier> "10.1234/neurology.2012.0636066666" .
15 changes: 15 additions & 0 deletions oc_ocdm/test/reader/ra.nt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<https://w3id.org/oc/meta/ra/06360300897> <http://xmlns.com/foaf/0.1/familyName> "Smith" .
<https://w3id.org/oc/meta/ra/06360300897> <http://xmlns.com/foaf/0.1/givenName> "John" .
<https://w3id.org/oc/meta/ra/06360300897> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://xmlns.com/foaf/0.1/Agent> .

<https://w3id.org/oc/meta/ra/06360300898> <http://xmlns.com/foaf/0.1/familyName> "Johnson" .
<https://w3id.org/oc/meta/ra/06360300898> <http://xmlns.com/foaf/0.1/givenName> "Mary" .
<https://w3id.org/oc/meta/ra/06360300898> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://xmlns.com/foaf/0.1/Agent> .

<https://w3id.org/oc/meta/ra/06360300899> <http://xmlns.com/foaf/0.1/familyName> "Brown" .
<https://w3id.org/oc/meta/ra/06360300899> <http://xmlns.com/foaf/0.1/givenName> "David" .
<https://w3id.org/oc/meta/ra/06360300899> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://xmlns.com/foaf/0.1/Agent> .

<https://w3id.org/oc/meta/ra/06360300900> <http://xmlns.com/foaf/0.1/familyName> "Wilson" .
<https://w3id.org/oc/meta/ra/06360300900> <http://xmlns.com/foaf/0.1/givenName> "Sarah" .
<https://w3id.org/oc/meta/ra/06360300900> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://xmlns.com/foaf/0.1/Agent> .
163 changes: 148 additions & 15 deletions oc_ocdm/test/reader/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import os
import unittest
from unittest.mock import patch

from oc_ocdm.graph import GraphSet
from oc_ocdm.reader import Reader
Expand All @@ -27,23 +28,155 @@ class TestReader(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.endpoint = 'http://127.0.0.1:8804/sparql'
cls.resp_agent = 'https://orcid.org/0000-0002-8420-0696'
BASE = os.path.join('oc_ocdm', 'test', 'reader')
file_path = os.path.abspath(os.path.join(BASE, 'br.nt'))

g = Graph()
g.parse(file_path, format='nt')

cls.br_file = os.path.abspath(os.path.join(BASE, 'br.nt'))
cls.ra_file = os.path.abspath(os.path.join(BASE, 'ra.nt'))
cls.id_file = os.path.abspath(os.path.join(BASE, 'id.nt'))

for file_path in [cls.br_file, cls.ra_file, cls.id_file]:
if os.path.exists(file_path):
g = Graph()
g.parse(file_path, format='nt')

insert_query = "INSERT DATA { GRAPH <https://w3id.org/oc/meta/> {\n"
for s, p, o in g:
insert_query += f"{s.n3()} {p.n3()} {o.n3()} .\n"
insert_query += "} }"

server = SPARQLWrapper(cls.endpoint)
server.setMethod(POST)
server.setQuery(insert_query)
server.query()

def setUp(self):
self.reader = Reader()
self.g_set = GraphSet('https://w3id.org/oc/meta')

def test_import_entity_from_triplestore(self):
"""Test importing a single entity from triplestore."""
self.reader.import_entity_from_triplestore(
self.g_set,
self.endpoint,
URIRef('https://w3id.org/oc/meta/br/0605'),
self.resp_agent,
False
)
self.assertEqual(
set(str(s) for s in self.g_set.res_to_entity.keys()),
{'https://w3id.org/oc/meta/br/0605'}
)

def test_import_entities_from_triplestore_batch(self):
"""Test importing multiple entities in batch."""
entities = [
URIRef('https://w3id.org/oc/meta/br/0605'),
URIRef('https://w3id.org/oc/meta/br/0636066666')
]

imported = self.reader.import_entities_from_triplestore(
self.g_set,
self.endpoint,
entities,
self.resp_agent,
batch_size=1 # Test with small batch size
)

# Check if all entities were imported
imported_uris = set(str(s) for s in self.g_set.res_to_entity.keys())
expected_uris = set(str(e) for e in entities)
self.assertEqual(imported_uris, expected_uris)

# Check if number of imported entities matches
self.assertEqual(len(imported), len(entities))

# Check if specific properties were imported correctly
br_0605 = self.g_set.get_entity(URIRef('https://w3id.org/oc/meta/br/0605'))
self.assertIsNotNone(br_0605)
# Check title
title = next(br_0605.g.objects(br_0605.res, URIRef('http://purl.org/dc/terms/title')))
self.assertEqual(str(title), "A Review Of Hemolytic Uremic Syndrome In Patients Treated With Gemcitabine Therapy")

def test_import_invalid_entity(self):
"""Test importing a non-existent entity."""
with self.assertRaises(ValueError):
self.reader.import_entity_from_triplestore(
self.g_set,
self.endpoint,
URIRef('https://w3id.org/oc/meta/br/99999'),
self.resp_agent,
False
)

def test_batch_import_with_retry(self):
"""Test batch import with connection failure and retry."""
entities = [
URIRef('https://w3id.org/oc/meta/br/0605'),
URIRef('https://w3id.org/oc/meta/br/0636066666')
]

# Mock SPARQLWrapper to fail once then succeed
with patch('SPARQLWrapper.SPARQLWrapper.queryAndConvert') as mock_query:
mock_query.side_effect = [
Exception("Connection failed"), # First attempt fails
{'results': {'bindings': []}} # Second attempt succeeds
]

# This should retry and eventually succeed
with self.assertRaises(ValueError): # Empty results raise ValueError
self.reader.import_entities_from_triplestore(
self.g_set,
self.endpoint,
entities,
self.resp_agent
)

# Verify that retry happened
self.assertEqual(mock_query.call_count, 2)

def test_import_mixed_entity_types(self):
"""Test importing different types of entities in the same batch."""
entities = [
URIRef('https://w3id.org/oc/meta/br/0605'), # Bibliographic Resource
URIRef('https://w3id.org/oc/meta/id/0605'), # Identifier
URIRef('https://w3id.org/oc/meta/re/0605') # Resource Embodiment
]

imported = self.reader.import_entities_from_triplestore(
self.g_set,
self.endpoint,
entities,
self.resp_agent
)

# Check if entities of different types were imported correctly
entity_types = set()
for entity in imported:
for _, _, o in entity.g.triples((entity.res, URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'), None)):
entity_types.add(str(o))

self.assertTrue(len(entity_types) >= 2) # Should have multiple types

def test_empty_batch_import(self):
"""Test importing an empty list of entities."""
with self.assertRaises(ValueError):
self.reader.import_entities_from_triplestore(
self.g_set,
self.endpoint,
[],
self.resp_agent
)

insert_query = "INSERT DATA { GRAPH <https://w3id.org/oc/meta/> {\n"
for s, p, o in g:
insert_query += f"{s.n3()} {p.n3()} {o.n3()} .\n"
insert_query += "} }"
@classmethod
def tearDownClass(cls):
"""Clean up the triplestore after tests."""
delete_query = "CLEAR GRAPH <https://w3id.org/oc/meta/>"
server = SPARQLWrapper(cls.endpoint)
server.setMethod(POST)
server.setQuery(insert_query)
server.setQuery(delete_query)
server.query()

def test_import_entity_from_triplestore(self):
reader = Reader()
g_set = GraphSet('https://w3id.org/oc/meta')
reader.import_entity_from_triplestore(g_set, self.endpoint, URIRef('https://w3id.org/oc/meta/br/0605'), 'https://orcid.org/0000-0002-8420-0696', False)
self.assertEqual(set(str(s) for s in g_set.res_to_entity.keys()), {'https://w3id.org/oc/meta/br/0605'})


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "oc_ocdm"
version = "9.1.2"
version = "9.2.0"
description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel."
authors = [
"Silvio Peroni <[email protected]>",
Expand Down

0 comments on commit aa327b7

Please sign in to comment.