Skip to content

Commit

Permalink
check info dir and no timeout in gen_info_dir and check provenance in…
Browse files Browse the repository at this point in the history
… merged brs
  • Loading branch information
arcangelo7 committed Oct 19, 2024
1 parent 8c21419 commit 8c7f50a
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 36 deletions.
71 changes: 71 additions & 0 deletions oc_meta/run/check/info_dir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import argparse
import json
import os
import zipfile
from multiprocessing import Pool, cpu_count

from redis import Redis
from tqdm import tqdm

from oc_meta.run.gen_info_dir import get_prefix, get_resource_number, get_short_name

def process_zip_file(args):
zip_file, redis_host, redis_port, redis_db = args
redis_client = Redis(host=redis_host, port=redis_port, db=redis_db)
missing_entities = []

with zipfile.ZipFile(zip_file, 'r') as zip_ref:
for file_name in zip_ref.namelist():
with zip_ref.open(file_name) as entity_file:
json_data = json.load(entity_file)
for graph in json_data:
for entity in graph['@graph']:
prov_entity_uri = entity['@id']
entity_uri = prov_entity_uri.split('/prov/se/')[0]
supplier_prefix = get_prefix(entity_uri)
short_name = get_short_name(entity_uri)
resource_number = get_resource_number(entity_uri)

expected_key = f"{short_name}:{supplier_prefix}:{resource_number}:se"

if not redis_client.exists(expected_key):
print(f"\nEntità mancante trovata:")
print(f"URI: {entity_uri}")
print(f"Prov URI: {prov_entity_uri}")
print(f"Chiave Redis attesa: {expected_key}")
print("---")

missing_entities.append({
"URI": entity_uri,
"Prov URI": prov_entity_uri,
"Chiave Redis attesa": expected_key
})

return missing_entities

def explore_provenance_files(root_path, redis_host, redis_port, redis_db):
prov_zip_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root_path)
for f in filenames if f.endswith('.zip') and 'prov' in dp]

args_list = [(zip_file, redis_host, redis_port, redis_db) for zip_file in prov_zip_files]

num_processes = cpu_count() # Usa tutti i core disponibili
with Pool(processes=num_processes) as pool:
results = list(tqdm(pool.imap(process_zip_file, args_list), total=len(args_list), desc="Processing provenance zip files"))

all_missing_entities = [item for sublist in results for item in sublist]

print(f"\nTotale entità mancanti trovate: {len(all_missing_entities)}")

def main():
parser = argparse.ArgumentParser(description="Verifica la presenza di entità di provenance in Redis.")
parser.add_argument("directory", type=str, help="Il percorso della directory da esplorare")
parser.add_argument("--redis-host", type=str, default="localhost", help="L'host del server Redis")
parser.add_argument("--redis-port", type=int, default=6379, help="La porta del server Redis")
parser.add_argument("--redis-db", type=int, default=6, help="Il numero del database Redis da utilizzare")
args = parser.parse_args()

explore_provenance_files(args.directory, args.redis_host, args.redis_port, args.redis_db)

if __name__ == "__main__":
main()
51 changes: 24 additions & 27 deletions oc_meta/run/find/duplicated_ids_from_files.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import argparse
import csv
import logging
import os
import zipfile
import json
import csv
from typing import Dict

from rdflib import ConjunctiveGraph, URIRef
from tqdm import tqdm
import logging

logging.basicConfig(filename='error_log_find_duplicated_ids_from_files.txt', level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s')
Expand All @@ -24,11 +26,10 @@ def read_and_analyze_zip_files(folder_path, csv_path):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for zip_file in zip_ref.namelist():
try:
with zip_ref.open(zip_file) as json_file:
data = json.load(json_file)
analyze_json(data, entity_info, zip_path, zip_file)
except json.JSONDecodeError:
logging.error(f"Errore nel parsing JSON del file {zip_file} in {zip_path}")
with zip_ref.open(zip_file) as rdf_file:
g = ConjunctiveGraph()
g.parse(data=rdf_file.read(), format="json-ld")
analyze_graph(g, entity_info)
except Exception as e:
logging.error(f"Errore nell'elaborazione del file {zip_file} in {zip_path}: {str(e)}")
except zipfile.BadZipFile:
Expand All @@ -46,24 +47,20 @@ def get_zip_files(id_folder_path):
zip_files.append(os.path.join(root, file))
return zip_files

def analyze_json(data, entity_info, zip_path, zip_file):
for graph in data:
for entity in graph.get("@graph", []):
try:
entity_id = entity["@id"]
identifier_scheme = entity["http://purl.org/spar/datacite/usesIdentifierScheme"][0]["@id"]
literal_value = entity["http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue"][0]["@value"]

key = (identifier_scheme, literal_value)
if key not in entity_info:
entity_info[key] = []
entity_info[key].append(entity_id)
except KeyError as e:
logging.error(f"Chiave mancante nell'entità {entity.get('@id', 'ID sconosciuto')} "
f"nel file {zip_file} all'interno di {zip_path}: {str(e)}")
except Exception as e:
logging.error(f"Errore nell'analisi dell'entità {entity.get('@id', 'ID sconosciuto')} "
f"nel file {zip_file} all'interno di {zip_path}: {str(e)}")
def analyze_graph(g: ConjunctiveGraph, entity_info: Dict[tuple, list]):
datacite_uses_identifier_scheme = URIRef("http://purl.org/spar/datacite/usesIdentifierScheme")
literal_reification_has_literal_value = URIRef("http://www.essepuntato.it/2010/06/literalreification/hasLiteralValue")

for s, p, o in g:
entity_id = str(s)
identifier_scheme = g.value(s, datacite_uses_identifier_scheme)
literal_value = g.value(s, literal_reification_has_literal_value)

if identifier_scheme and literal_value:
key = (str(identifier_scheme), str(literal_value))
if key not in entity_info:
entity_info[key] = []
entity_info[key].append(entity_id)

def save_duplicates_to_csv(entity_info, csv_path):
try:
Expand All @@ -80,7 +77,7 @@ def save_duplicates_to_csv(entity_info, csv_path):
logging.error(f"Errore nel salvataggio del file CSV {csv_path}: {str(e)}")

def main():
parser = argparse.ArgumentParser(description="Legge i file JSON all'interno dei file ZIP in una sottocartella 'id'.")
parser = argparse.ArgumentParser(description="Legge i file RDF all'interno dei file ZIP in una sottocartella 'id'.")
parser.add_argument("folder_path", type=str, help="Percorso della cartella contenente la sottocartella 'id'")
parser.add_argument("csv_path", type=str, help="Percorso del file CSV per salvare i duplicati")
args = parser.parse_args()
Expand Down
6 changes: 1 addition & 5 deletions oc_meta/run/gen_info_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,8 @@ def explore_directories(root_path, redis_host, redis_port, redis_db):
zip_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root_path)
for f in filenames if f.endswith('.zip') and 'prov' in dp]

timeout = 30

with ProcessPool() as pool:
future_results = {pool.schedule(process_zip_file, args=[zip_file], timeout=timeout): zip_file
future_results = {pool.schedule(process_zip_file, args=[zip_file]): zip_file
for zip_file in zip_files}

results = []
Expand All @@ -127,8 +125,6 @@ def explore_directories(root_path, redis_host, redis_port, redis_db):
try:
result = future.result()
results.append(result)
except TimeoutError:
print(f"Process exceeded timeout of {timeout} seconds for file: {zip_file}")
except Exception as e:
print(f"Error processing file {zip_file}: {e}")
finally:
Expand Down
75 changes: 73 additions & 2 deletions oc_meta/run/merge/check_merged_brs_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
import csv
import os
import random
import re
import time
import zipfile
from functools import partial
from multiprocessing import Pool, cpu_count

import yaml
from oc_meta.plugins.editor import MetaEditor
from rdflib import RDF, ConjunctiveGraph, Literal, URIRef
from rdflib import RDF, ConjunctiveGraph, Literal, Namespace, URIRef
from SPARQLWrapper import JSON, SPARQLWrapper
from tqdm import tqdm

DATACITE = "http://purl.org/spar/datacite/"
FABIO = "http://purl.org/spar/fabio/"
PROV = Namespace("http://www.w3.org/ns/prov#")

def read_csv(csv_file):
with open(csv_file, 'r') as f:
Expand All @@ -31,7 +33,7 @@ def sparql_query_with_retry(sparql, max_retries=3, initial_delay=1, backoff_fact
delay = initial_delay * (backoff_factor ** attempt)
time.sleep(delay + random.uniform(0, 1))

def check_entity_file(file_path, entity_uri, is_surviving):
def check_entity_file(file_path: str, entity_uri, is_surviving):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for filename in zip_ref.namelist():
with zip_ref.open(filename) as file:
Expand Down Expand Up @@ -59,6 +61,75 @@ def check_entity_file(file_path, entity_uri, is_surviving):
identifiers = list(g.objects(entity, URIRef(DATACITE + "hasIdentifier")))
if not identifiers:
tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has no datacite:hasIdentifier")

# Check provenance
prov_file_path = file_path.replace('.zip', '') + '/prov/se.zip'
check_provenance(prov_file_path, entity_uri, is_surviving)

def check_provenance(prov_file_path, entity_uri, is_surviving):
def extract_snapshot_number(snapshot_uri):
match = re.search(r'/prov/se/(\d+)$', str(snapshot_uri))
if match:
return int(match.group(1))
return 0 # Return 0 if no match found, this will put invalid URIs at the start

try:
with zipfile.ZipFile(prov_file_path, 'r') as zip_ref:
g = ConjunctiveGraph()
for filename in zip_ref.namelist():
with zip_ref.open(filename) as file:
g.parse(file, format='json-ld')

entity = URIRef(entity_uri)
snapshots = list(g.subjects(PROV.specializationOf, entity))

if len(snapshots) <= 1:
tqdm.write(f"Error in provenance file {prov_file_path}: Less than two snapshots found for entity {entity_uri}")
return

# Sort snapshots by their URI number
snapshots.sort(key=extract_snapshot_number)

for i, snapshot in enumerate(snapshots):
snapshot_number = extract_snapshot_number(snapshot)
if snapshot_number != i + 1:
tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} has unexpected number {snapshot_number}, expected {i + 1}")

gen_time = g.value(snapshot, PROV.generatedAtTime)
if gen_time is None:
tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} has no generation time")

if i < len(snapshots) - 1 or not is_surviving:
invalidation_time = g.value(snapshot, PROV.invalidatedAtTime)
if invalidation_time is None:
tqdm.write(f"Error in provenance file {prov_file_path}: Non-last snapshot {snapshot} has no invalidation time")
elif is_surviving and g.value(snapshot, PROV.invalidatedAtTime) is not None:
tqdm.write(f"Error in provenance file {prov_file_path}: Last snapshot of surviving entity {snapshot} should not have invalidation time")

# Check prov:wasDerivedFrom
derived_from = list(g.objects(snapshot, PROV.wasDerivedFrom))
if i == 0: # First snapshot
if derived_from:
tqdm.write(f"Error in provenance file {prov_file_path}: First snapshot {snapshot} should not have prov:wasDerivedFrom relation")
elif i == len(snapshots) - 1 and is_surviving: # Last snapshot of surviving entity (merge snapshot)
if len(derived_from) < 2:
tqdm.write(f"Error in provenance file {prov_file_path}: Merge snapshot {snapshot} should be derived from more than one snapshot")
else: # All other snapshots
if len(derived_from) != 1:
tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} should have exactly one prov:wasDerivedFrom relation, but has {len(derived_from)}")
elif derived_from[0] != snapshots[i-1]:
tqdm.write(f"Error in provenance file {prov_file_path}: Snapshot {snapshot} is not derived from the previous snapshot")

if not is_surviving:
# Check if the last snapshot is invalidated for merged entities
last_snapshot = snapshots[-1]
if (None, PROV.invalidated, last_snapshot) not in g:
tqdm.write(f"Error in provenance file {prov_file_path}: Last snapshot {last_snapshot} of merged entity {entity_uri} is not invalidated")

except FileNotFoundError:
tqdm.write(f"Error: Provenance file not found for entity {entity_uri}")
except zipfile.BadZipFile:
tqdm.write(f"Error: Invalid zip file for provenance of entity {entity_uri}")

def check_entity_sparql(sparql_endpoint, entity_uri, is_surviving):
sparql = SPARQLWrapper(sparql_endpoint)
Expand Down
15 changes: 13 additions & 2 deletions oc_meta/run/merge/duplicated_entities_simultaneously.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path

# Creiamo un unico GraphSet per tutte le operazioni
g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler)
modified = False

for row in data:
if os.path.exists(stop_file_path):
Expand All @@ -61,13 +62,19 @@ def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path
continue

row['Done'] = 'True'
modified = True

# Salviamo le modifiche una sola volta alla fine
meta_editor.save(g_set)
if modified:
meta_editor.save(g_set)

write_csv(csv_file, data)
write_csv(csv_file, data)
return csv_file

def count_csv_rows(csv_file):
with open(csv_file, 'r', encoding='utf-8') as f:
return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row

def main():
parser = argparse.ArgumentParser(description="Merge entities from CSV files in a folder.")
parser.add_argument('csv_folder', type=str, help="Path to the folder containing CSV files")
Expand All @@ -83,6 +90,10 @@ def main():

csv_files = [os.path.join(args.csv_folder, file) for file in os.listdir(args.csv_folder) if file.endswith('.csv')]

# Filtrare i file CSV in base al numero di righe e al numero di workers
if args.workers > 4:
csv_files = [file for file in csv_files if count_csv_rows(file) <= 10000]

with concurrent.futures.ProcessPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(process_file, csv_file, args.meta_config, args.resp_agent, args.entity_types, args.stop_file): csv_file for csv_file in csv_files}

Expand Down

0 comments on commit 8c7f50a

Please sign in to comment.