From dd820582cf9bc8265cac73bde5bdacc68a5d55ac Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Fri, 30 Aug 2024 21:15:26 +0200 Subject: [PATCH] merge_all_files_parallel --- oc_meta/run/gen_rdf_from_export.py | 108 +++++++++++++++++++---------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/oc_meta/run/gen_rdf_from_export.py b/oc_meta/run/gen_rdf_from_export.py index 34807de3..c64cb8fe 100644 --- a/oc_meta/run/gen_rdf_from_export.py +++ b/oc_meta/run/gen_rdf_from_export.py @@ -14,21 +14,22 @@ # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS # SOFTWARE -import gzip import argparse +import gzip +import logging import multiprocessing import os import re -import rdflib +import time import uuid +from functools import lru_cache from typing import Match from zipfile import ZIP_DEFLATED, ZipFile +import orjson +import rdflib from rdflib import ConjunctiveGraph, URIRef -import logging from tqdm import tqdm -from functools import lru_cache -import orjson # Variable used in several functions entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$" @@ -269,7 +270,7 @@ def load_graph(file_path: str, cur_format: str = 'json-ld'): def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output): modifications_by_file = {} triples = 0 - unique_id = uuid.uuid4() + unique_id = generate_unique_id() for triple in context: triples += len(triple) @@ -308,26 +309,51 @@ def merge_files(output_root, base_file_name, file_extension, zip_output): final_file_path = os.path.join(output_root, base_file_name + file_extension) store_in_file(merged_graph, final_file_path, zip_output) -def merge_all_files(output_root, zip_output): - # Raggruppa i file da unire per nome base (senza l'UUID) - base_file_names = set(f.split('_')[0] for f in os.listdir(output_root) if '_' in f) - - pbar = tqdm(total=len(base_file_names), desc="Merging files") - - tasks = [(output_root, base_file_name, '.zip' if zip_output else '.json', zip_output) for base_file_name in base_file_names] +def merge_files_in_directory(directory, zip_output): + """Function to merge files in a specific directory""" + files = [f for f in os.listdir(directory) if f.endswith('.zip' if zip_output else '.json')] + base_file_names = set(f.split('_')[0] for f in files) + + for base_file_name in base_file_names: + files_to_merge = [f for f in files if f.startswith(base_file_name)] + + merged_graph = ConjunctiveGraph() + + for file_path in files_to_merge: + cur_full_path = os.path.join(directory, file_path) + loaded_graph = load_graph(cur_full_path) + merged_graph += loaded_graph + + # Use the base_file_name without adding a new unique identifier + final_file_path = os.path.join(directory, f"{base_file_name}" + ('.zip' if zip_output else '.json')) + store_in_file(merged_graph, final_file_path, zip_output) + + # Remove the original files after merging + for file_path in files_to_merge: + if file_path != os.path.basename(final_file_path): + os.remove(os.path.join(directory, file_path)) + +def generate_unique_id(): + return f"{int(time.time())}-{uuid.uuid4()}" + +def merge_files_wrapper(args): + directory, zip_output = args + merge_files_in_directory(directory, zip_output) + +def merge_all_files_parallel(output_root, zip_output): + """Function to merge files in parallel""" + directories_to_process = [] + for root, dirs, files in os.walk(output_root): + if any(f.endswith('.zip' if zip_output else '.json') for f in files): + directories_to_process.append(root) with multiprocessing.Pool() as pool: - for _ in pool.imap_unordered(lambda args: merge_files(*args), tasks): - pbar.update(1) - - pool.close() - pool.join() - - pbar.close() - -def process_file_content(args): - file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format = args + list(tqdm(pool.imap(merge_files_wrapper, + [(dir, zip_output) for dir in directories_to_process]), + total=len(directories_to_process), + desc="Merging files in directories")) +def process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format): with gzip.open(file_path, 'rb') as f: data = f.read().decode('utf-8') graph = ConjunctiveGraph() @@ -341,33 +367,39 @@ def process_file_content(args): graph_identifier = context.identifier process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output) +def process_file_wrapper(args): + file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format = args + process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format) + +def process_chunk(chunk, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format): + with multiprocessing.Pool() as pool: + list(tqdm(pool.imap(process_file_wrapper, [(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format) for file_path in chunk]), total=len(chunk), desc="Processing files")) + def main(): parser = argparse.ArgumentParser(description="Process gzipped input files into OC Meta RDF") parser.add_argument('input_folder', type=str, help='Input folder containing gzipped input files') parser.add_argument('output_root', type=str, help='Root folder for output OC Meta RDF files') - parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta. This setting can be safely left as is') + parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta') parser.add_argument('--file_limit', type=int, default=10000, help='Number of files per folder') parser.add_argument('--item_limit', type=int, default=1000, help='Number of items per file') - parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', required=False, help='Zip output json files') - parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the gzipped input files (jsonld or nquads)') + parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', help='Zip output json files') + parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the input files') + parser.add_argument('--chunk_size', type=int, default=1000, help='Number of files to process before merging') args = parser.parse_args() file_extension = '.nq.gz' if args.input_format == 'nquads' else '.jsonld.gz' rdf_format = 'nquads' if args.input_format == 'nquads' else 'json-ld' - files_to_process = [file for file in os.listdir(args.input_folder) if file.endswith(file_extension)] - tasks = [(os.path.join(args.input_folder, file_path), args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format) for file_path in files_to_process] - - pbar = tqdm(total=len(files_to_process), desc="Completing jobs") - - with multiprocessing.Pool() as pool: - for _ in pool.imap_unordered(process_file_content, tasks, chunksize=10): - pbar.update(1) - - pool.close() - pool.join() + files_to_process = [os.path.join(args.input_folder, file) for file in os.listdir(args.input_folder) if file.endswith(file_extension)] + chunks = [files_to_process[i:i + args.chunk_size] for i in range(0, len(files_to_process), args.chunk_size)] + + for i, chunk in enumerate(tqdm(chunks, desc="Processing chunks")): + print(f"\nProcessing chunk {i+1}/{len(chunks)}") + process_chunk(chunk, args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format) + print(f"Merging files for chunk {i+1}") + merge_all_files_parallel(args.output_root, args.zip_output) - pbar.close() + print("Processing complete") if __name__ == "__main__": main() \ No newline at end of file