From 8c7f50aee57e7ded644432708b0066fc24f61d60 Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Sat, 19 Oct 2024 18:24:32 +0200 Subject: [PATCH] check info dir and no timeout in gen_info_dir and check provenance in merged brs --- oc_meta/run/check/info_dir.py | 71 ++++++++++++++++++ oc_meta/run/find/duplicated_ids_from_files.py | 51 ++++++------- oc_meta/run/gen_info_dir.py | 6 +- oc_meta/run/merge/check_merged_brs_results.py | 75 ++++++++++++++++++- .../duplicated_entities_simultaneously.py | 15 +++- 5 files changed, 182 insertions(+), 36 deletions(-) create mode 100644 oc_meta/run/check/info_dir.py diff --git a/oc_meta/run/check/info_dir.py b/oc_meta/run/check/info_dir.py new file mode 100644 index 00000000..5eb4614c --- /dev/null +++ b/oc_meta/run/check/info_dir.py @@ -0,0 +1,71 @@ +import argparse +import json +import os +import zipfile +from multiprocessing import Pool, cpu_count + +from redis import Redis +from tqdm import tqdm + +from oc_meta.run.gen_info_dir import get_prefix, get_resource_number, get_short_name + +def process_zip_file(args): + zip_file, redis_host, redis_port, redis_db = args + redis_client = Redis(host=redis_host, port=redis_port, db=redis_db) + missing_entities = [] + + with zipfile.ZipFile(zip_file, 'r') as zip_ref: + for file_name in zip_ref.namelist(): + with zip_ref.open(file_name) as entity_file: + json_data = json.load(entity_file) + for graph in json_data: + for entity in graph['@graph']: + prov_entity_uri = entity['@id'] + entity_uri = prov_entity_uri.split('/prov/se/')[0] + supplier_prefix = get_prefix(entity_uri) + short_name = get_short_name(entity_uri) + resource_number = get_resource_number(entity_uri) + + expected_key = f"{short_name}:{supplier_prefix}:{resource_number}:se" + + if not redis_client.exists(expected_key): + print(f"\nEntità mancante trovata:") + print(f"URI: {entity_uri}") + print(f"Prov URI: {prov_entity_uri}") + print(f"Chiave Redis attesa: {expected_key}") + print("---") + + missing_entities.append({ + "URI": entity_uri, + "Prov URI": prov_entity_uri, + "Chiave Redis attesa": expected_key + }) + + return missing_entities + +def explore_provenance_files(root_path, redis_host, redis_port, redis_db): + prov_zip_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root_path) + for f in filenames if f.endswith('.zip') and 'prov' in dp] + + args_list = [(zip_file, redis_host, redis_port, redis_db) for zip_file in prov_zip_files] + + num_processes = cpu_count() # Usa tutti i core disponibili + with Pool(processes=num_processes) as pool: + results = list(tqdm(pool.imap(process_zip_file, args_list), total=len(args_list), desc="Processing provenance zip files")) + + all_missing_entities = [item for sublist in results for item in sublist] + + print(f"\nTotale entità mancanti trovate: {len(all_missing_entities)}") + +def main(): + parser = argparse.ArgumentParser(description="Verifica la presenza di entità di provenance in Redis.") + parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare") + parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis") + parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis") + parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare") + args = parser.parse_args() + + explore_provenance_files(args.directory, args.redis_host, args.redis_port, args.redis_db) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/oc_meta/run/find/duplicated_ids_from_files.py b/oc_meta/run/find/duplicated_ids_from_files.py index b3c09ab1..b2d7282c 100644 --- a/oc_meta/run/find/duplicated_ids_from_files.py +++ b/oc_meta/run/find/duplicated_ids_from_files.py @@ -1,10 +1,12 @@ import argparse +import csv +import logging import os import zipfile -import json -import csv +from typing import Dict + +from rdflib import ConjunctiveGraph, URIRef from tqdm import tqdm -import logging logging.basicConfig(filename='error_log_find_duplicated_ids_from_files.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') @@ -24,11 +26,10 @@ def read_and_analyze_zip_files(folder_path, csv_path): with zipfile.ZipFile(zip_path, 'r') as zip_ref: for zip_file in zip_ref.namelist(): try: - with zip_ref.open(zip_file) as json_file: - data = json.load(json_file) - analyze_json(data, entity_info, zip_path, zip_file) - except json.JSONDecodeError: - logging.error(f"Errore nel parsing JSON del file {zip_file} in {zip_path}") + with zip_ref.open(zip_file) as rdf_file: + g = ConjunctiveGraph() + g.parse(data=rdf_file.read(), format="json-ld") + analyze_graph(g, entity_info) except Exception as e: logging.error(f"Errore nell'elaborazione del file {zip_file} in {zip_path}: {str(e)}") except zipfile.BadZipFile: @@ -46,24 +47,20 @@ def get_zip_files(id_folder_path): zip_files.append(os.path.join(root, file)) return zip_files -def analyze_json(data, entity_info, zip_path, zip_file): - for graph in data: - for entity in graph.get("@graph", []): - try: - entity_id = entity["@id"] - identifier_scheme = entity["http://purl.org/spar/datacite/usesIdentifierScheme"][0]["@id"] - literal_value = entity["http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"][0]["@value"] - - key = (identifier_scheme, literal_value) - if key not in entity_info: - entity_info[key] = [] - entity_info[key].append(entity_id) - except KeyError as e: - logging.error(f"Chiave mancante nell'entità {entity.get('@id', 'ID sconosciuto')} " - f"nel file {zip_file} all'interno di {zip_path}: {str(e)}") - except Exception as e: - logging.error(f"Errore nell'analisi dell'entità {entity.get('@id', 'ID sconosciuto')} " - f"nel file {zip_file} all'interno di {zip_path}: {str(e)}") +def analyze_graph(g: ConjunctiveGraph, entity_info: Dict[tuple, list]): + datacite_uses_identifier_scheme = URIRef("http://purl.org/spar/datacite/usesIdentifierScheme") + literal_reification_has_literal_value = URIRef("http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue") + + for s, p, o in g: + entity_id = str(s) + identifier_scheme = g.value(s, datacite_uses_identifier_scheme) + literal_value = g.value(s, literal_reification_has_literal_value) + + if identifier_scheme and literal_value: + key = (str(identifier_scheme), str(literal_value)) + if key not in entity_info: + entity_info[key] = [] + entity_info[key].append(entity_id) def save_duplicates_to_csv(entity_info, csv_path): try: @@ -80,7 +77,7 @@ def save_duplicates_to_csv(entity_info, csv_path): logging.error(f"Errore nel salvataggio del file CSV {csv_path}: {str(e)}") def main(): - parser = argparse.ArgumentParser(description="Legge i file JSON all'interno dei file ZIP in una sottocartella 'id'.") + parser = argparse.ArgumentParser(description="Legge i file RDF all'interno dei file ZIP in una sottocartella 'id'.") parser.add_argument("folder_path", type=str, help="Percorso della cartella contenente la sottocartella 'id'") parser.add_argument("csv_path", type=str, help="Percorso del file CSV per salvare i duplicati") args = parser.parse_args() diff --git a/oc_meta/run/gen_info_dir.py b/oc_meta/run/gen_info_dir.py index bff3210f..d52591b2 100644 --- a/oc_meta/run/gen_info_dir.py +++ b/oc_meta/run/gen_info_dir.py @@ -114,10 +114,8 @@ def explore_directories(root_path, redis_host, redis_port, redis_db): zip_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root_path) for f in filenames if f.endswith('.zip') and 'prov' in dp] - timeout = 30 - with ProcessPool() as pool: - future_results = {pool.schedule(process_zip_file, args=[zip_file], timeout=timeout): zip_file + future_results = {pool.schedule(process_zip_file, args=[zip_file]): zip_file for zip_file in zip_files} results = [] @@ -127,8 +125,6 @@ def explore_directories(root_path, redis_host, redis_port, redis_db): try: result = future.result() results.append(result) - except TimeoutError: - print(f"Process exceeded timeout of {timeout} seconds for file: {zip_file}") except Exception as e: print(f"Error processing file {zip_file}: {e}") finally: diff --git a/oc_meta/run/merge/check_merged_brs_results.py b/oc_meta/run/merge/check_merged_brs_results.py index 7a8d853f..95501525 100644 --- a/oc_meta/run/merge/check_merged_brs_results.py +++ b/oc_meta/run/merge/check_merged_brs_results.py @@ -2,6 +2,7 @@ import csv import os import random +import re import time import zipfile from functools import partial @@ -9,12 +10,13 @@ import yaml from oc_meta.plugins.editor import MetaEditor -from rdflib import RDF, ConjunctiveGraph, Literal, URIRef +from rdflib import RDF, ConjunctiveGraph, Literal, Namespace, URIRef from SPARQLWrapper import JSON, SPARQLWrapper from tqdm import tqdm DATACITE = "http://purl.org/spar/datacite/" FABIO = "http://purl.org/spar/fabio/" +PROV = Namespace("http://www.w3.org/ns/prov#") def read_csv(csv_file): with open(csv_file, 'r') as f: @@ -31,7 +33,7 @@ def sparql_query_with_retry(sparql, max_retries=3, initial_delay=1, backoff_fact delay = initial_delay * (backoff_factor ** attempt) time.sleep(delay + random.uniform(0, 1)) -def check_entity_file(file_path, entity_uri, is_surviving): +def check_entity_file(file_path: str, entity_uri, is_surviving): with zipfile.ZipFile(file_path, 'r') as zip_ref: for filename in zip_ref.namelist(): with zip_ref.open(filename) as file: @@ -59,6 +61,75 @@ def check_entity_file(file_path, entity_uri, is_surviving): identifiers = list(g.objects(entity, URIRef(DATACITE + "hasIdentifier"))) if not identifiers: tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has no datacite:hasIdentifier") + + # Check provenance + prov_file_path = file_path.replace('.zip', '') + '/prov/se.zip' + check_provenance(prov_file_path, entity_uri, is_surviving) + +def check_provenance(prov_file_path, entity_uri, is_surviving): + def extract_snapshot_number(snapshot_uri): + match = re.search(r'/prov/se/(\d+)$', str(snapshot_uri)) + if match: + return int(match.group(1)) + return 0 # Return 0 if no match found, this will put invalid URIs at the start + + try: + with zipfile.ZipFile(prov_file_path, 'r') as zip_ref: + g = ConjunctiveGraph() + for filename in zip_ref.namelist(): + with zip_ref.open(filename) as file: + g.parse(file, format='json-ld') + + entity = URIRef(entity_uri) + snapshots = list(g.subjects(PROV.specializationOf, entity)) + + if len(snapshots) <= 1: + tqdm.write(f"Error in provenance file {prov_file_path}: Less than two snapshots found for entity {entity_uri}") + return + + # Sort snapshots by their URI number + snapshots.sort(key=extract_snapshot_number) + + for i, snapshot in enumerate(snapshots): + snapshot_number = extract_snapshot_number(snapshot) + if snapshot_number != i + 1: + tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} has unexpected number {snapshot_number}, expected {i + 1}") + + gen_time = g.value(snapshot, PROV.generatedAtTime) + if gen_time is None: + tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} has no generation time") + + if i < len(snapshots) - 1 or not is_surviving: + invalidation_time = g.value(snapshot, PROV.invalidatedAtTime) + if invalidation_time is None: + tqdm.write(f"Error in provenance file {prov_file_path}: Non-last snapshot {snapshot} has no invalidation time") + elif is_surviving and g.value(snapshot, PROV.invalidatedAtTime) is not None: + tqdm.write(f"Error in provenance file {prov_file_path}: Last snapshot of surviving entity {snapshot} should not have invalidation time") + + # Check prov:wasDerivedFrom + derived_from = list(g.objects(snapshot, PROV.wasDerivedFrom)) + if i == 0: # First snapshot + if derived_from: + tqdm.write(f"Error in provenance file {prov_file_path}: First snapshot {snapshot} should not have prov:wasDerivedFrom relation") + elif i == len(snapshots) - 1 and is_surviving: # Last snapshot of surviving entity (merge snapshot) + if len(derived_from) < 2: + tqdm.write(f"Error in provenance file {prov_file_path}: Merge snapshot {snapshot} should be derived from more than one snapshot") + else: # All other snapshots + if len(derived_from) != 1: + tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} should have exactly one prov:wasDerivedFrom relation, but has {len(derived_from)}") + elif derived_from[0] != snapshots[i-1]: + tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} is not derived from the previous snapshot") + + if not is_surviving: + # Check if the last snapshot is invalidated for merged entities + last_snapshot = snapshots[-1] + if (None, PROV.invalidated, last_snapshot) not in g: + tqdm.write(f"Error in provenance file {prov_file_path}: Last snapshot {last_snapshot} of merged entity {entity_uri} is not invalidated") + + except FileNotFoundError: + tqdm.write(f"Error: Provenance file not found for entity {entity_uri}") + except zipfile.BadZipFile: + tqdm.write(f"Error: Invalid zip file for provenance of entity {entity_uri}") def check_entity_sparql(sparql_endpoint, entity_uri, is_surviving): sparql = SPARQLWrapper(sparql_endpoint) diff --git a/oc_meta/run/merge/duplicated_entities_simultaneously.py b/oc_meta/run/merge/duplicated_entities_simultaneously.py index a53e0ede..ada6815d 100644 --- a/oc_meta/run/merge/duplicated_entities_simultaneously.py +++ b/oc_meta/run/merge/duplicated_entities_simultaneously.py @@ -43,6 +43,7 @@ def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path # Creiamo un unico GraphSet per tutte le operazioni g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler) + modified = False for row in data: if os.path.exists(stop_file_path): @@ -61,13 +62,19 @@ def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path continue row['Done'] = 'True' + modified = True # Salviamo le modifiche una sola volta alla fine - meta_editor.save(g_set) + if modified: + meta_editor.save(g_set) - write_csv(csv_file, data) + write_csv(csv_file, data) return csv_file +def count_csv_rows(csv_file): + with open(csv_file, 'r', encoding='utf-8') as f: + return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row + def main(): parser = argparse.ArgumentParser(description="Merge entities from CSV files in a folder.") parser.add_argument('csv_folder', type=str, help="Path to the folder containing CSV files") @@ -83,6 +90,10 @@ def main(): csv_files = [os.path.join(args.csv_folder, file) for file in os.listdir(args.csv_folder) if file.endswith('.csv')] + # Filtrare i file CSV in base al numero di righe e al numero di workers + if args.workers > 4: + csv_files = [file for file in csv_files if count_csv_rows(file) <= 10000] + with concurrent.futures.ProcessPoolExecutor(max_workers=args.workers) as executor: futures = {executor.submit(process_file, csv_file, args.meta_config, args.resp_agent, args.entity_types, args.stop_file): csv_file for csv_file in csv_files}