Skip to content

Commit

Permalink
merge_all_files_parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Aug 30, 2024
1 parent cfc98e3 commit dd82058
Showing 1 changed file with 70 additions and 38 deletions.
108 changes: 70 additions & 38 deletions oc_meta/run/gen_rdf_from_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
# SOFTWARE

import gzip
import argparse
import gzip
import logging
import multiprocessing
import os
import re
import rdflib
import time
import uuid
from functools import lru_cache
from typing import Match
from zipfile import ZIP_DEFLATED, ZipFile

import orjson
import rdflib
from rdflib import ConjunctiveGraph, URIRef
import logging
from tqdm import tqdm
from functools import lru_cache
import orjson

# Variable used in several functions
entity_regex: str = r"^(.+)/([a-z][a-z])/(0[1-9]+0)?((?:[1-9][0-9]*)|(?:\d+-\d+))$"
Expand Down Expand Up @@ -269,7 +270,7 @@ def load_graph(file_path: str, cur_format: str = 'json-ld'):
def process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output):
modifications_by_file = {}
triples = 0
unique_id = uuid.uuid4()
unique_id = generate_unique_id()

for triple in context:
triples += len(triple)
Expand Down Expand Up @@ -308,26 +309,51 @@ def merge_files(output_root, base_file_name, file_extension, zip_output):
final_file_path = os.path.join(output_root, base_file_name + file_extension)
store_in_file(merged_graph, final_file_path, zip_output)

def merge_all_files(output_root, zip_output):
# Raggruppa i file da unire per nome base (senza l'UUID)
base_file_names = set(f.split('_')[0] for f in os.listdir(output_root) if '_' in f)

pbar = tqdm(total=len(base_file_names), desc="Merging files")

tasks = [(output_root, base_file_name, '.zip' if zip_output else '.json', zip_output) for base_file_name in base_file_names]
def merge_files_in_directory(directory, zip_output):
"""Function to merge files in a specific directory"""
files = [f for f in os.listdir(directory) if f.endswith('.zip' if zip_output else '.json')]
base_file_names = set(f.split('_')[0] for f in files)

for base_file_name in base_file_names:
files_to_merge = [f for f in files if f.startswith(base_file_name)]

merged_graph = ConjunctiveGraph()

for file_path in files_to_merge:
cur_full_path = os.path.join(directory, file_path)
loaded_graph = load_graph(cur_full_path)
merged_graph += loaded_graph

# Use the base_file_name without adding a new unique identifier
final_file_path = os.path.join(directory, f"{base_file_name}" + ('.zip' if zip_output else '.json'))
store_in_file(merged_graph, final_file_path, zip_output)

# Remove the original files after merging
for file_path in files_to_merge:
if file_path != os.path.basename(final_file_path):
os.remove(os.path.join(directory, file_path))

def generate_unique_id():
return f"{int(time.time())}-{uuid.uuid4()}"

def merge_files_wrapper(args):
directory, zip_output = args
merge_files_in_directory(directory, zip_output)

def merge_all_files_parallel(output_root, zip_output):
"""Function to merge files in parallel"""
directories_to_process = []
for root, dirs, files in os.walk(output_root):
if any(f.endswith('.zip' if zip_output else '.json') for f in files):
directories_to_process.append(root)

with multiprocessing.Pool() as pool:
for _ in pool.imap_unordered(lambda args: merge_files(*args), tasks):
pbar.update(1)

pool.close()
pool.join()

pbar.close()

def process_file_content(args):
file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format = args
list(tqdm(pool.imap(merge_files_wrapper,
[(dir, zip_output) for dir in directories_to_process]),
total=len(directories_to_process),
desc="Merging files in directories"))

def process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format):
with gzip.open(file_path, 'rb') as f:
data = f.read().decode('utf-8')
graph = ConjunctiveGraph()
Expand All @@ -341,33 +367,39 @@ def process_file_content(args):
graph_identifier = context.identifier
process_graph(context, graph_identifier, output_root, base_iri, file_limit, item_limit, zip_output)

def process_file_wrapper(args):
file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format = args
process_file_content(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format)

def process_chunk(chunk, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format):
with multiprocessing.Pool() as pool:
list(tqdm(pool.imap(process_file_wrapper, [(file_path, output_root, base_iri, file_limit, item_limit, zip_output, rdf_format) for file_path in chunk]), total=len(chunk), desc="Processing files"))

def main():
parser = argparse.ArgumentParser(description="Process gzipped input files into OC Meta RDF")
parser.add_argument('input_folder', type=str, help='Input folder containing gzipped input files')
parser.add_argument('output_root', type=str, help='Root folder for output OC Meta RDF files')
parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta. This setting can be safely left as is')
parser.add_argument('--base_iri', type=str, default='https://w3id.org/oc/meta/', help='The base URI of entities on Meta')
parser.add_argument('--file_limit', type=int, default=10000, help='Number of files per folder')
parser.add_argument('--item_limit', type=int, default=1000, help='Number of items per file')
parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', required=False, help='Zip output json files')
parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the gzipped input files (jsonld or nquads)')
parser.add_argument('-v', '--zip_output', default=True, dest='zip_output', action='store_true', help='Zip output json files')
parser.add_argument('--input_format', type=str, default='jsonld', choices=['jsonld', 'nquads'], help='Format of the input files')
parser.add_argument('--chunk_size', type=int, default=1000, help='Number of files to process before merging')
args = parser.parse_args()

file_extension = '.nq.gz' if args.input_format == 'nquads' else '.jsonld.gz'
rdf_format = 'nquads' if args.input_format == 'nquads' else 'json-ld'

files_to_process = [file for file in os.listdir(args.input_folder) if file.endswith(file_extension)]
tasks = [(os.path.join(args.input_folder, file_path), args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format) for file_path in files_to_process]

pbar = tqdm(total=len(files_to_process), desc="Completing jobs")

with multiprocessing.Pool() as pool:
for _ in pool.imap_unordered(process_file_content, tasks, chunksize=10):
pbar.update(1)

pool.close()
pool.join()
files_to_process = [os.path.join(args.input_folder, file) for file in os.listdir(args.input_folder) if file.endswith(file_extension)]
chunks = [files_to_process[i:i + args.chunk_size] for i in range(0, len(files_to_process), args.chunk_size)]

for i, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
print(f"\nProcessing chunk {i+1}/{len(chunks)}")
process_chunk(chunk, args.output_root, args.base_iri, args.file_limit, args.item_limit, args.zip_output, rdf_format)
print(f"Merging files for chunk {i+1}")
merge_all_files_parallel(args.output_root, args.zip_output)

pbar.close()
print("Processing complete")

if __name__ == "__main__":
main()

0 comments on commit dd82058

Please sign in to comment.