Skip to content

Commit

Permalink
MetaEditor: relationship_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Nov 28, 2024
1 parent 994cb9e commit f814792
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 46 deletions.
88 changes: 56 additions & 32 deletions oc_meta/plugins/editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(self, meta_config: str, resp_agent: str, save_queries: bool = False
self.counter_handler = RedisCounterHandler(host=self.redis_host, port=self.redis_port, db=self.redis_db)

self.entity_cache = EntityCache()
self.relationship_cache = {}

def make_sparql_query_with_retry(self, sparql: SPARQLWrapper, query, max_retries=5, backoff_factor=0.3):
sparql.setQuery(query)
Expand Down Expand Up @@ -164,7 +165,7 @@ def delete(self, res: str, property: str = None, object: str = None) -> None:
entity_to_purge = g_set.get_entity(URIRef(res))
entity_to_purge.mark_as_to_be_deleted()
self.save(g_set, supplier_prefix)

def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None:
"""
Merge two entities and their related entities using batch import with caching.
Expand All @@ -175,42 +176,65 @@ def merge(self, g_set: GraphSet, res: URIRef, other: URIRef) -> None:
other: The entity to be merged into the main one
"""
# First get all related entities with a single SPARQL query
sparql = SPARQLWrapper(endpoint=self.endpoint)
query = f'''
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX datacite: <http://purl.org/spar/datacite/>
PREFIX pro: <http://purl.org/spar/pro/>
SELECT DISTINCT ?entity WHERE {{
{{?entity ?p <{other}>}} UNION
{{<{res}> ?p ?entity}} UNION
{{<{other}> ?p ?entity}}
FILTER (?p != rdf:type)
FILTER (?p != datacite:usesIdentifierScheme)
FILTER (?p != pro:withRole)
}}'''

data = self.make_sparql_query_with_retry(sparql, query)

# Collect entities that need to be imported (not in cache)
entities_to_import = set()
related_entities = set()

# Check main entities against cache
if not self.entity_cache.is_cached(res):
entities_to_import.add(res)
if not self.entity_cache.is_cached(other):
entities_to_import.add(other)

# Check related entities against cache
for result in data["results"]["bindings"]:
if result['entity']['type'] == 'uri':
related_entity = URIRef(result["entity"]["value"])
if not self.entity_cache.is_cached(related_entity):
entities_to_import.add(related_entity)
if other in self.relationship_cache:
related_entities.update(self.relationship_cache[other])
else:
sparql = SPARQLWrapper(endpoint=self.endpoint)
query = f'''
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX datacite: <http://purl.org/spar/datacite/>
PREFIX pro: <http://purl.org/spar/pro/>
SELECT DISTINCT ?entity WHERE {{
{{?entity ?p <{other}>}} UNION
{{<{other}> ?p ?entity}}
FILTER (?p != rdf:type)
FILTER (?p != datacite:usesIdentifierScheme)
FILTER (?p != pro:withRole)
}}'''

data = self.make_sparql_query_with_retry(sparql, query)
other_related = {URIRef(result["entity"]["value"])
for result in data["results"]["bindings"]
if result['entity']['type'] == 'uri'}

self.relationship_cache[other] = other_related
related_entities.update(other_related)

if res in self.relationship_cache:
related_entities.update(self.relationship_cache[res])
else:
# Query only for objects of the surviving entity if not in cache
sparql = SPARQLWrapper(endpoint=self.endpoint)
query = f'''
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX datacite: <http://purl.org/spar/datacite/>
PREFIX pro: <http://purl.org/spar/pro/>
SELECT DISTINCT ?entity WHERE {{
<{res}> ?p ?entity
FILTER (?p != rdf:type)
FILTER (?p != datacite:usesIdentifierScheme)
FILTER (?p != pro:withRole)
}}'''

data = self.make_sparql_query_with_retry(sparql, query)
res_related = {URIRef(result["entity"]["value"])
for result in data["results"]["bindings"]
if result['entity']['type'] == 'uri'}

self.relationship_cache[res] = res_related
related_entities.update(res_related)

entities_to_import = set([res, other])
entities_to_import.update(related_entities)
entities_to_import = {e for e in entities_to_import
if not self.entity_cache.is_cached(e)}

# Import only non-cached entities if there are any
if entities_to_import:
try:
imported_entities = self.reader.import_entities_from_triplestore(
self.reader.import_entities_from_triplestore(
g_set=g_set,
ts_url=self.endpoint,
entities=list(entities_to_import),
Expand Down
39 changes: 26 additions & 13 deletions oc_meta/run/merge/duplicated_entities_simultaneously.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ def count_csv_rows(csv_file: str) -> int:
return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row

def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: List[str],
surviving_entities: List[str], batch_size: int = 10) -> Set[URIRef]:
surviving_entities: List[str], batch_size: int = 10) -> Set[URIRef]:
"""
Fetch all related entities in batches to avoid overwhelming the SPARQL endpoint.
Fetch all related entities in batches and populate the relationship cache.
Args:
meta_editor: MetaEditor instance
Expand All @@ -75,8 +75,6 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities:
# Process merged entities in batches
for i in range(0, len(merged_entities), batch_size):
batch_merged = merged_entities[i:i + batch_size]

# Create UNION clauses for current batch
merged_clauses = []
for entity in batch_merged:
merged_clauses.extend([
Expand Down Expand Up @@ -106,15 +104,22 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities:
results = meta_editor.make_sparql_query_with_retry(sparql, query)
for result in results["results"]["bindings"]:
if result['entity']['type'] == 'uri':
all_related_entities.add(URIRef(result['entity']['value']))
related_uri = URIRef(result['entity']['value'])
all_related_entities.add(related_uri)

# Update relationship cache for each merged entity in the batch
for entity in batch_merged:
entity_uri = URIRef(entity)
if entity_uri not in meta_editor.relationship_cache:
meta_editor.relationship_cache[entity_uri] = set()
meta_editor.relationship_cache[entity_uri].add(related_uri)

except Exception as e:
print(f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}")

# Process surviving entities in batches
for i in range(0, len(surviving_entities), batch_size):
batch_surviving = surviving_entities[i:i + batch_size]

# Create UNION clauses for current batch
surviving_clauses = []
for entity in batch_surviving:
surviving_clauses.append(
Expand All @@ -123,7 +128,7 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities:

if not surviving_clauses:
continue

query = f"""
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX datacite: <http://purl.org/spar/datacite/>
Expand All @@ -143,12 +148,20 @@ def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities:
results = meta_editor.make_sparql_query_with_retry(sparql, query)
for result in results["results"]["bindings"]:
if result['entity']['type'] == 'uri':
all_related_entities.add(URIRef(result['entity']['value']))
related_uri = URIRef(result['entity']['value'])
all_related_entities.add(related_uri)

# Update relationship cache for each surviving entity in the batch
for entity in batch_surviving:
entity_uri = URIRef(entity)
if entity_uri not in meta_editor.relationship_cache:
meta_editor.relationship_cache[entity_uri] = set()
meta_editor.relationship_cache[entity_uri].add(related_uri)

except Exception as e:
print(f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}")

return all_related_entities

def process_file(self, csv_file: str) -> str:
"""Process a single CSV file with cross-row batch processing"""
data = self.read_csv(csv_file)
Expand Down Expand Up @@ -179,7 +192,7 @@ def process_file(self, csv_file: str) -> str:
if not rows_to_process:
return csv_file

# Fetch all related entities in batches
# Fetch all related entities and populate relationship cache
all_related_entities = self.fetch_related_entities_batch(
meta_editor,
batch_merged_entities,
Expand All @@ -205,7 +218,7 @@ def process_file(self, csv_file: str) -> str:
entities=list(entities_to_import),
resp_agent=meta_editor.resp_agent,
enable_validation=False,
batch_size=self.batch_size # Usa lo stesso batch_size
batch_size=self.batch_size
)

# Update cache with newly imported entities
Expand All @@ -216,7 +229,7 @@ def process_file(self, csv_file: str) -> str:
print(f"Error importing entities: {e}")
modified = True

# Perform all merges now that entities are imported
# Perform all merges using cached relationship data
for surviving_entity, merged_entities in rows_to_process:
surviving_uri = URIRef(surviving_entity)
for merged_entity in merged_entities:
Expand Down
108 changes: 107 additions & 1 deletion test/entity_merger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import redis
from oc_meta.run.merge.duplicated_entities_simultaneously import EntityMerger
from oc_meta.run.meta_editor import MetaEditor
from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler
from oc_ocdm.graph import GraphSet
from oc_ocdm.prov.prov_set import ProvSet
from oc_ocdm.storer import Storer
from rdflib import XSD, Graph, Literal, URIRef
from rdflib import URIRef
from SPARQLWrapper import POST, SPARQLWrapper

BASE = os.path.join('test', 'merger')
Expand Down Expand Up @@ -1045,6 +1046,111 @@ def test_merge_bibliographic_resources(self):

self.assertTrue(re_found, "Resource embodiment should still exist after merge")

def test_fetch_related_entities_batch(self):
"""Test batch fetching of related entities"""
meta_editor = MetaEditor(META_CONFIG,
"https://orcid.org/0000-0002-8420-0696",
save_queries=False)

g_set = GraphSet("https://w3id.org/oc/meta/", supplier_prefix="060",
custom_counter_handler=self.counter_handler)

# Utilizziamo un insieme più piccolo di numeri validi per il test
valid_numbers = [11, 12, 13, 14, 15]
entities = {}

# Creiamo gli autori e li memorizziamo in un dizionario per facile accesso
for i in valid_numbers:
ra = g_set.add_ra(
resp_agent="https://orcid.org/0000-0002-8420-0696",
res=URIRef(f"https://w3id.org/oc/meta/ra/060{i}")
)
ra.has_name(f"Author {i}")
entities[i] = ra

# Creiamo le entità correlate per ogni autore
for i in valid_numbers:
# Creiamo l'identificatore
identifier = g_set.add_id(
resp_agent="https://orcid.org/0000-0002-8420-0696",
res=URIRef(f"https://w3id.org/oc/meta/id/060{i}")
)
identifier.create_orcid(f"0000-0001-{i:04d}-1111")
entities[i].has_identifier(identifier)

# Creiamo il ruolo
role = g_set.add_ar(
resp_agent="https://orcid.org/0000-0002-8420-0696",
res=URIRef(f"https://w3id.org/oc/meta/ar/060{i}")
)
role.create_author()
role.is_held_by(entities[i])

# Creiamo la pubblicazione
pub = g_set.add_br(
resp_agent="https://orcid.org/0000-0002-8420-0696",
res=URIRef(f"https://w3id.org/oc/meta/br/060{i}")
)
pub.has_title(f"Publication {i}")
pub.has_contributor(role)

prov = ProvSet(g_set, "https://w3id.org/oc/meta/", wanted_label=False,
custom_counter_handler=self.counter_handler)
prov.generate_provenance()

rdf_output = os.path.join(OUTPUT, 'rdf') + os.sep

res_storer = Storer(abstract_set=g_set, dir_split=10000, n_file_item=1000,
output_format='json-ld', zip_output=False)
prov_storer = Storer(abstract_set=prov, dir_split=10000, n_file_item=1000,
output_format='json-ld', zip_output=False)

res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/")
res_storer.upload_all(triplestore_url=SERVER, base_dir=rdf_output,
batch_size=10, save_queries=False)

batch_sizes = [1, 5, 11, 25]
for batch_size in batch_sizes:
with self.subTest(batch_size=batch_size):
# Test con una singola entità
merged_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[0]}"]
surviving_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[1]}"]

related = self.merger.fetch_related_entities_batch(
meta_editor=meta_editor,
merged_entities=merged_entities,
surviving_entities=surviving_entities,
batch_size=batch_size
)

expected_related = {
URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[0]}"), # ID della merged
URIRef(f"https://w3id.org/oc/meta/ar/060{valid_numbers[0]}"), # AR della merged
URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[1]}") # AR della surviving
}

self.assertEqual(related, expected_related)

# Test con multiple entità
merged_entities = [f"https://w3id.org/oc/meta/ra/060{i}"
for i in valid_numbers[:3]]
surviving_entities = [f"https://w3id.org/oc/meta/ra/060{valid_numbers[3]}"]

related = self.merger.fetch_related_entities_batch(
meta_editor=meta_editor,
merged_entities=merged_entities,
surviving_entities=surviving_entities,
batch_size=batch_size
)

expected_related = set()
for i in valid_numbers[:3]: # Entità merged
expected_related.add(URIRef(f"https://w3id.org/oc/meta/id/060{i}"))
expected_related.add(URIRef(f"https://w3id.org/oc/meta/ar/060{i}"))
expected_related.add(URIRef(f"https://w3id.org/oc/meta/id/060{valid_numbers[3]}"))

self.assertEqual(related, expected_related)

if __name__ == '__main__':
unittest.main()

0 comments on commit f814792

Please sign in to comment.