Skip to content

Commit

Permalink
Batch import entities in multiple rows before merge
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Nov 23, 2024
1 parent cf5dcaf commit 994cb9e
Showing 1 changed file with 174 additions and 14 deletions.
188 changes: 174 additions & 14 deletions oc_meta/run/merge/duplicated_entities_simultaneously.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import concurrent.futures
import csv
import os
from typing import List, Dict
import traceback
from typing import Dict, List, Set

from oc_meta.plugins.editor import MetaEditor
from oc_ocdm.graph import GraphSet
from rdflib import URIRef
from SPARQLWrapper import SPARQLWrapper
from tqdm import tqdm


Expand All @@ -17,6 +19,7 @@ def __init__(self, meta_config: str, resp_agent: str, entity_types: List[str], s
self.entity_types = entity_types
self.stop_file_path = stop_file_path
self.workers = workers
self.batch_size = 10

@staticmethod
def get_entity_type(entity_url: str) -> str:
Expand Down Expand Up @@ -53,34 +56,182 @@ def count_csv_rows(csv_file: str) -> int:
with open(csv_file, 'r', encoding='utf-8') as f:
return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row

def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: List[str],
surviving_entities: List[str], batch_size: int = 10) -> Set[URIRef]:
"""
Fetch all related entities in batches to avoid overwhelming the SPARQL endpoint.
Args:
meta_editor: MetaEditor instance
merged_entities: List of entities to be merged
surviving_entities: List of surviving entities
batch_size: Maximum number of entities to process in a single SPARQL query
Returns:
Set of all related entities
"""
all_related_entities = set()

# Process merged entities in batches
for i in range(0, len(merged_entities), batch_size):
batch_merged = merged_entities[i:i + batch_size]

# Create UNION clauses for current batch
merged_clauses = []
for entity in batch_merged:
merged_clauses.extend([
f"{{?entity ?p <{entity}>}}", # Get subjects
f"{{<{entity}> ?p ?entity}}" # Get objects
])

if not merged_clauses:
continue

query = f"""
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX datacite: <http://purl.org/spar/datacite/>
PREFIX pro: <http://purl.org/spar/pro/>
SELECT DISTINCT ?entity WHERE {{
{{
{' UNION '.join(merged_clauses)}
}}
FILTER (?p != rdf:type)
FILTER (?p != datacite:usesIdentifierScheme)
FILTER (?p != pro:withRole)
}}
"""

sparql = SPARQLWrapper(meta_editor.endpoint)
try:
results = meta_editor.make_sparql_query_with_retry(sparql, query)
for result in results["results"]["bindings"]:
if result['entity']['type'] == 'uri':
all_related_entities.add(URIRef(result['entity']['value']))
except Exception as e:
print(f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}")

# Process surviving entities in batches
for i in range(0, len(surviving_entities), batch_size):
batch_surviving = surviving_entities[i:i + batch_size]

# Create UNION clauses for current batch
surviving_clauses = []
for entity in batch_surviving:
surviving_clauses.append(
f"{{<{entity}> ?p ?entity}}" # Get only objects for surviving entities
)

if not surviving_clauses:
continue

query = f"""
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX datacite: <http://purl.org/spar/datacite/>
PREFIX pro: <http://purl.org/spar/pro/>
SELECT DISTINCT ?entity WHERE {{
{{
{' UNION '.join(surviving_clauses)}
}}
FILTER (?p != rdf:type)
FILTER (?p != datacite:usesIdentifierScheme)
FILTER (?p != pro:withRole)
}}
"""

sparql = SPARQLWrapper(meta_editor.endpoint)
try:
results = meta_editor.make_sparql_query_with_retry(sparql, query)
for result in results["results"]["bindings"]:
if result['entity']['type'] == 'uri':
all_related_entities.add(URIRef(result['entity']['value']))
except Exception as e:
print(f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}")

return all_related_entities

def process_file(self, csv_file: str) -> str:
"""Process a single CSV file with its own MetaEditor instance"""
"""Process a single CSV file with cross-row batch processing"""
data = self.read_csv(csv_file)
meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True)
modified = False

# Create a GraphSet for the current file
g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler)

# Collect all entities that need processing
batch_merged_entities = []
batch_surviving_entities = []
rows_to_process = []

for row in data:
if os.path.exists(self.stop_file_path):
break
if row.get('Done') == 'True' or os.path.exists(self.stop_file_path):
continue

entity_type = self.get_entity_type(row['surviving_entity'])
if row.get('Done') != 'True' and entity_type in self.entity_types:
surviving_entity = URIRef(row['surviving_entity'])
if entity_type in self.entity_types:
surviving_entity = row['surviving_entity']
merged_entities = row['merged_entities'].split('; ')

batch_surviving_entities.append(surviving_entity)
batch_merged_entities.extend(merged_entities)
rows_to_process.append((surviving_entity, merged_entities))

if not rows_to_process:
return csv_file

# Fetch all related entities in batches
all_related_entities = self.fetch_related_entities_batch(
meta_editor,
batch_merged_entities,
batch_surviving_entities,
self.batch_size
)

for merged_entity in merged_entities:
merged_entity = merged_entity.strip()
try:
meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity))
except ValueError:
continue
# Add primary entities to the import set
entities_to_import = all_related_entities.copy()
entities_to_import.update(URIRef(e) for e in batch_surviving_entities)
entities_to_import.update(URIRef(e) for e in batch_merged_entities)

row['Done'] = 'True'
# Remove already cached entities
entities_to_import = {e for e in entities_to_import
if not meta_editor.entity_cache.is_cached(e)}

# Batch import all non-cached entities
if entities_to_import:
try:
meta_editor.reader.import_entities_from_triplestore(
g_set=g_set,
ts_url=meta_editor.endpoint,
entities=list(entities_to_import),
resp_agent=meta_editor.resp_agent,
enable_validation=False,
batch_size=self.batch_size # Usa lo stesso batch_size
)

# Update cache with newly imported entities
for entity in entities_to_import:
meta_editor.entity_cache.add(entity)

except ValueError as e:
print(f"Error importing entities: {e}")
modified = True

# Perform all merges now that entities are imported
for surviving_entity, merged_entities in rows_to_process:
surviving_uri = URIRef(surviving_entity)
for merged_entity in merged_entities:
try:
meta_editor.merge(g_set, surviving_uri, URIRef(merged_entity))
modified = True
except ValueError:
continue

# Update CSV and save changes if needed
if modified:
for row in data:
if row.get('Done') != 'True' and self.get_entity_type(row['surviving_entity']) in self.entity_types:
row['Done'] = 'True'

meta_editor.save(g_set)
self.write_csv(csv_file, data)

Expand Down Expand Up @@ -109,10 +260,19 @@ def process_folder(self, csv_folder: str):
for future in tqdm(concurrent.futures.as_completed(futures),
total=len(futures),
desc="Overall Progress"):
csv_file = futures[future]
try:
future.result()
except Exception as e:
print(f"Error processing file: {e}")
error_trace = traceback.format_exc()
print(f"""
Error processing file {csv_file}:
Type: {type(e).__name__}
Details: {str(e)}
Full Traceback:
{error_trace}
Suggestion: This is an unexpected error. Please check the traceback for more details.
""")


def main():
Expand Down

0 comments on commit 994cb9e

Please sign in to comment.