diff --git a/oc_meta/run/merge/duplicated_entities_simultaneously.py b/oc_meta/run/merge/duplicated_entities_simultaneously.py index 42da24e..d0d4e78 100644 --- a/oc_meta/run/merge/duplicated_entities_simultaneously.py +++ b/oc_meta/run/merge/duplicated_entities_simultaneously.py @@ -2,11 +2,13 @@ import concurrent.futures import csv import os -from typing import List, Dict +import traceback +from typing import Dict, List, Set from oc_meta.plugins.editor import MetaEditor from oc_ocdm.graph import GraphSet from rdflib import URIRef +from SPARQLWrapper import SPARQLWrapper from tqdm import tqdm @@ -17,6 +19,7 @@ def __init__(self, meta_config: str, resp_agent: str, entity_types: List[str], s self.entity_types = entity_types self.stop_file_path = stop_file_path self.workers = workers + self.batch_size = 10 @staticmethod def get_entity_type(entity_url: str) -> str: @@ -53,8 +56,101 @@ def count_csv_rows(csv_file: str) -> int: with open(csv_file, 'r', encoding='utf-8') as f: return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row + def fetch_related_entities_batch(self, meta_editor: MetaEditor, merged_entities: List[str], + surviving_entities: List[str], batch_size: int = 10) -> Set[URIRef]: + """ + Fetch all related entities in batches to avoid overwhelming the SPARQL endpoint. + + Args: + meta_editor: MetaEditor instance + merged_entities: List of entities to be merged + surviving_entities: List of surviving entities + batch_size: Maximum number of entities to process in a single SPARQL query + + Returns: + Set of all related entities + """ + all_related_entities = set() + + # Process merged entities in batches + for i in range(0, len(merged_entities), batch_size): + batch_merged = merged_entities[i:i + batch_size] + + # Create UNION clauses for current batch + merged_clauses = [] + for entity in batch_merged: + merged_clauses.extend([ + f"{{?entity ?p <{entity}>}}", # Get subjects + f"{{<{entity}> ?p ?entity}}" # Get objects + ]) + + if not merged_clauses: + continue + + query = f""" + PREFIX rdf: + PREFIX datacite: + PREFIX pro: + SELECT DISTINCT ?entity WHERE {{ + {{ + {' UNION '.join(merged_clauses)} + }} + FILTER (?p != rdf:type) + FILTER (?p != datacite:usesIdentifierScheme) + FILTER (?p != pro:withRole) + }} + """ + + sparql = SPARQLWrapper(meta_editor.endpoint) + try: + results = meta_editor.make_sparql_query_with_retry(sparql, query) + for result in results["results"]["bindings"]: + if result['entity']['type'] == 'uri': + all_related_entities.add(URIRef(result['entity']['value'])) + except Exception as e: + print(f"Error fetching related entities for merged batch {i}-{i+batch_size}: {e}") + + # Process surviving entities in batches + for i in range(0, len(surviving_entities), batch_size): + batch_surviving = surviving_entities[i:i + batch_size] + + # Create UNION clauses for current batch + surviving_clauses = [] + for entity in batch_surviving: + surviving_clauses.append( + f"{{<{entity}> ?p ?entity}}" # Get only objects for surviving entities + ) + + if not surviving_clauses: + continue + + query = f""" + PREFIX rdf: + PREFIX datacite: + PREFIX pro: + SELECT DISTINCT ?entity WHERE {{ + {{ + {' UNION '.join(surviving_clauses)} + }} + FILTER (?p != rdf:type) + FILTER (?p != datacite:usesIdentifierScheme) + FILTER (?p != pro:withRole) + }} + """ + + sparql = SPARQLWrapper(meta_editor.endpoint) + try: + results = meta_editor.make_sparql_query_with_retry(sparql, query) + for result in results["results"]["bindings"]: + if result['entity']['type'] == 'uri': + all_related_entities.add(URIRef(result['entity']['value'])) + except Exception as e: + print(f"Error fetching related entities for surviving batch {i}-{i+batch_size}: {e}") + + return all_related_entities + def process_file(self, csv_file: str) -> str: - """Process a single CSV file with its own MetaEditor instance""" + """Process a single CSV file with cross-row batch processing""" data = self.read_csv(csv_file) meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True) modified = False @@ -62,25 +158,80 @@ def process_file(self, csv_file: str) -> str: # Create a GraphSet for the current file g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler) + # Collect all entities that need processing + batch_merged_entities = [] + batch_surviving_entities = [] + rows_to_process = [] + for row in data: - if os.path.exists(self.stop_file_path): - break + if row.get('Done') == 'True' or os.path.exists(self.stop_file_path): + continue entity_type = self.get_entity_type(row['surviving_entity']) - if row.get('Done') != 'True' and entity_type in self.entity_types: - surviving_entity = URIRef(row['surviving_entity']) + if entity_type in self.entity_types: + surviving_entity = row['surviving_entity'] merged_entities = row['merged_entities'].split('; ') + + batch_surviving_entities.append(surviving_entity) + batch_merged_entities.extend(merged_entities) + rows_to_process.append((surviving_entity, merged_entities)) + + if not rows_to_process: + return csv_file + + # Fetch all related entities in batches + all_related_entities = self.fetch_related_entities_batch( + meta_editor, + batch_merged_entities, + batch_surviving_entities, + self.batch_size + ) - for merged_entity in merged_entities: - merged_entity = merged_entity.strip() - try: - meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity)) - except ValueError: - continue + # Add primary entities to the import set + entities_to_import = all_related_entities.copy() + entities_to_import.update(URIRef(e) for e in batch_surviving_entities) + entities_to_import.update(URIRef(e) for e in batch_merged_entities) - row['Done'] = 'True' + # Remove already cached entities + entities_to_import = {e for e in entities_to_import + if not meta_editor.entity_cache.is_cached(e)} + + # Batch import all non-cached entities + if entities_to_import: + try: + meta_editor.reader.import_entities_from_triplestore( + g_set=g_set, + ts_url=meta_editor.endpoint, + entities=list(entities_to_import), + resp_agent=meta_editor.resp_agent, + enable_validation=False, + batch_size=self.batch_size # Usa lo stesso batch_size + ) + + # Update cache with newly imported entities + for entity in entities_to_import: + meta_editor.entity_cache.add(entity) + + except ValueError as e: + print(f"Error importing entities: {e}") modified = True + + # Perform all merges now that entities are imported + for surviving_entity, merged_entities in rows_to_process: + surviving_uri = URIRef(surviving_entity) + for merged_entity in merged_entities: + try: + meta_editor.merge(g_set, surviving_uri, URIRef(merged_entity)) + modified = True + except ValueError: + continue + + # Update CSV and save changes if needed if modified: + for row in data: + if row.get('Done') != 'True' and self.get_entity_type(row['surviving_entity']) in self.entity_types: + row['Done'] = 'True' + meta_editor.save(g_set) self.write_csv(csv_file, data) @@ -109,10 +260,19 @@ def process_folder(self, csv_folder: str): for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Overall Progress"): + csv_file = futures[future] try: future.result() except Exception as e: - print(f"Error processing file: {e}") + error_trace = traceback.format_exc() + print(f""" + Error processing file {csv_file}: + Type: {type(e).__name__} + Details: {str(e)} + Full Traceback: + {error_trace} + Suggestion: This is an unexpected error. Please check the traceback for more details. + """) def main():