Skip to content

Commit

Permalink
gen rdf from export: merge files and invalid resources names
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Aug 29, 2024
1 parent 1838116 commit a776fc1
Showing 1 changed file with 57 additions and 8 deletions.
65 changes: 57 additions & 8 deletions oc_meta/run/gen_rdf_from_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import os
import re
import rdflib
import uuid
from typing import Match
from zipfile import ZIP_DEFLATED, ZipFile

from filelock import FileLock
from rdflib import ConjunctiveGraph, URIRef
import logging
from tqdm import tqdm
Expand Down Expand Up @@ -96,9 +96,17 @@ def get_prov_subject_count(prov_res: URIRef) -> str:
def get_resource_number(res: URIRef) -> int:
string_iri: str = str(res)
if "/prov/" in string_iri:
return int(_get_match_cached(prov_regex, 4, string_iri))
match = _get_match_cached(prov_regex, 4, string_iri)
else:
return int(_get_match_cached(entity_regex, 4, string_iri))
match = _get_match_cached(entity_regex, 4, string_iri)
if not match:
logging.warning(f"Could not extract resource number from URI: {string_iri}")
return -1 # or some other default value
try:
return int(match)
except ValueError:
logging.error(f"Invalid resource number in URI: {string_iri}, extracted: {match}")
return -1 # or some other default value

def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:
cur_number: int = get_resource_number(res)
Expand All @@ -125,6 +133,11 @@ def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_

cur_number: int = get_resource_number(res)

if cur_number == -1:
# Handle the error case
logging.error(f"Could not process URI: {string_iri}")
return None, None # or some default paths

# Find the correct file number where to save the resources
cur_file_split: int = 0
while True:
Expand Down Expand Up @@ -256,11 +269,18 @@ def load_graph(file_path: str, cur_format: str = 'json-ld'):
def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output):
modifications_by_file = {}
triples = 0
unique_id = uuid.uuid4()

for triple in context:
triples += len(triple)
entity_uri = triple[0]
_, cur_file_path = find_paths(entity_uri, output_root, base_iri, '_', file_limit, item_limit, True)
if cur_file_path is None:
logging.warning(f"Skipping triple due to invalid URI: {entity_uri}")
continue
cur_file_path = cur_file_path.replace('.json', '.zip') if zip_output else cur_file_path
cur_file_path = cur_file_path.replace('.zip', f'_{unique_id}.zip') if zip_output else cur_file_path.replace('.json', f'_{unique_id}.json')

if cur_file_path not in modifications_by_file:
modifications_by_file[cur_file_path] = {
"graph_identifier": graph_identifier,
Expand All @@ -269,13 +289,42 @@ def process_graph(context, graph_identifier, output_root, base_iri, file_limit,
modifications_by_file[cur_file_path]["triples"].append(triple)

for file_path, data in modifications_by_file.items():
lock = FileLock(f'{file_path}.lock')
with lock:
stored_g = load_graph(file_path) if os.path.exists(file_path) else ConjunctiveGraph()
stored_g = store(data["triples"], data["graph_identifier"], stored_g)
store_in_file(stored_g, file_path, zip_output)
stored_g = load_graph(file_path) if os.path.exists(file_path) else ConjunctiveGraph()
stored_g = store(data["triples"], data["graph_identifier"], stored_g)
store_in_file(stored_g, file_path, zip_output)
return triples

def merge_files(output_root, base_file_name, file_extension, zip_output):
"""Funzione per fondere i file generati dai diversi processi"""
files_to_merge = [f for f in os.listdir(output_root) if f.startswith(base_file_name) and f.endswith(file_extension)]

merged_graph = ConjunctiveGraph()

for file_path in files_to_merge:
cur_full_path = os.path.join(output_root, file_path)
loaded_graph = load_graph(cur_full_path)
merged_graph += loaded_graph

final_file_path = os.path.join(output_root, base_file_name + file_extension)
store_in_file(merged_graph, final_file_path, zip_output)

def merge_all_files(output_root, zip_output):
# Raggruppa i file da unire per nome base (senza l'UUID)
base_file_names = set(f.split('_')[0] for f in os.listdir(output_root) if '_' in f)

pbar = tqdm(total=len(base_file_names), desc="Merging files")

tasks = [(output_root, base_file_name, '.zip' if zip_output else '.json', zip_output) for base_file_name in base_file_names]

with multiprocessing.Pool() as pool:
for _ in pool.imap_unordered(lambda args: merge_files(*args), tasks):
pbar.update(1)

pool.close()
pool.join()

pbar.close()

def process_file_content(args):
file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format = args

Expand Down

0 comments on commit a776fc1

Please sign in to comment.