diff --git a/oc_meta/run/merge/check_merged_brs_results.py b/oc_meta/run/merge/check_merged_brs_results.py index 9550152..005507d 100644 --- a/oc_meta/run/merge/check_merged_brs_results.py +++ b/oc_meta/run/merge/check_merged_brs_results.py @@ -9,14 +9,20 @@ from multiprocessing import Pool, cpu_count import yaml -from oc_meta.plugins.editor import MetaEditor +from filelock import FileLock from rdflib import RDF, ConjunctiveGraph, Literal, Namespace, URIRef from SPARQLWrapper import JSON, SPARQLWrapper from tqdm import tqdm +from oc_meta.plugins.editor import MetaEditor + DATACITE = "http://purl.org/spar/datacite/" FABIO = "http://purl.org/spar/fabio/" PROV = Namespace("http://www.w3.org/ns/prov#") +PRO = Namespace("http://purl.org/spar/pro/") +DCTERMS = Namespace("http://purl.org/dc/terms/") +FRBR = Namespace("http://purl.org/vocab/frbr/core#") +PRISM = Namespace("http://prismstandard.org/namespaces/basic/2.1/") def read_csv(csv_file): with open(csv_file, 'r') as f: @@ -33,34 +39,75 @@ def sparql_query_with_retry(sparql, max_retries=3, initial_delay=1, backoff_fact delay = initial_delay * (backoff_factor ** attempt) time.sleep(delay + random.uniform(0, 1)) +def check_br_constraints(g: ConjunctiveGraph, entity): + issues = [] + + # Check types + types = list(g.objects(entity, RDF.type, unique=True)) + if not types: + issues.append(f"Entity {entity} has no type") + elif len(types) > 2: + issues.append(f"Entity {entity} has more than two types") + elif URIRef(FABIO + "Expression") not in types: + issues.append(f"Entity {entity} is not a fabio:Expression") + + # Check if entity is a journal issue or volume + is_journal_issue = URIRef(FABIO + "JournalIssue") in types + is_journal_volume = URIRef(FABIO + "JournalVolume") in types + + # Check identifiers + identifiers = list(g.objects(entity, URIRef(DATACITE + "hasIdentifier"), unique=True)) + if not identifiers: + issues.append(f"Entity {entity} has no datacite:hasIdentifier") + + # Check title (zero or one) + titles = list(g.objects(entity, DCTERMS.title, unique=True)) + if len(titles) > 1: + issues.append(f"Entity {entity} has multiple titles") + + # Check part of (zero or one) + part_of = list(g.objects(entity, FRBR.partOf, unique=True)) + if len(part_of) > 1: + issues.append(f"Entity {entity} has multiple partOf relations") + + # Check publication date (zero or one) + pub_dates = list(g.objects(entity, PRISM.hasPublicationDate, unique=True)) + if len(pub_dates) > 1: + issues.append(f"Entity {entity} has multiple publication dates") + + # Check sequence identifier (zero or one) + seq_ids = list(g.objects(entity, URIRef(FABIO + "hasSequenceIdentifier"), unique=True)) + if len(seq_ids) > 1: + issues.append(f"Entity {entity} has multiple sequence identifiers") + elif seq_ids and not (is_journal_issue or is_journal_volume): + issues.append(f"Entity {entity} has sequence identifier but is not a journal issue or volume") + + return issues + def check_entity_file(file_path: str, entity_uri, is_surviving): - with zipfile.ZipFile(file_path, 'r') as zip_ref: - for filename in zip_ref.namelist(): - with zip_ref.open(filename) as file: - g = ConjunctiveGraph() - g.parse(file, format='json-ld') - entity = URIRef(entity_uri) - - if (entity, None, None) not in g: - if is_surviving: - tqdm.write(f"Error in file {file_path}: Surviving entity {entity_uri} does not exist") - return - - if not is_surviving: - tqdm.write(f"Error in file {file_path}: Merged entity {entity_uri} still exists") - return - - types = list(g.objects(entity, RDF.type)) - if not types: - tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has no type") - elif len(types) > 2: - tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has more than two types") - elif URIRef(FABIO + "Expression") not in types: - tqdm.write(f"Error in file {file_path}: Entity {entity_uri} is not a fabio:Expression") - - identifiers = list(g.objects(entity, URIRef(DATACITE + "hasIdentifier"))) - if not identifiers: - tqdm.write(f"Error in file {file_path}: Entity {entity_uri} has no datacite:hasIdentifier") + lock_path = f"{file_path}.lock" + lock = FileLock(lock_path) + + with lock: + with zipfile.ZipFile(file_path, 'r') as zip_ref: + for filename in zip_ref.namelist(): + with zip_ref.open(filename) as file: + g = ConjunctiveGraph() + g.parse(file, format='json-ld') + entity = URIRef(entity_uri) + + if (entity, None, None) not in g: + if is_surviving: + tqdm.write(f"Error in file {file_path}: Surviving entity {entity_uri} does not exist") + return + + if not is_surviving: + tqdm.write(f"Error in file {file_path}: Merged entity {entity_uri} still exists") + return + + br_issues = check_br_constraints(g, entity) + for issue in br_issues: + tqdm.write(f"Error in file {file_path}: {issue}") # Check provenance prov_file_path = file_path.replace('.zip', '') + '/prov/se.zip' diff --git a/oc_meta/run/merge/entities.py b/oc_meta/run/merge/entities.py index e60a137..a36eaec 100644 --- a/oc_meta/run/merge/entities.py +++ b/oc_meta/run/merge/entities.py @@ -261,9 +261,9 @@ def process_folder(self, csv_folder: str): if file.endswith('.csv')] # Filter CSV files based on number of rows and workers - # if self.workers > 4: - # csv_files = [file for file in csv_files - # if self.count_csv_rows(file) <= 10000] + if self.workers > 4: + csv_files = [file for file in csv_files + if self.count_csv_rows(file) <= 10000] with concurrent.futures.ProcessPoolExecutor(max_workers=self.workers) as executor: futures = { diff --git a/poetry.lock b/poetry.lock index 99a3904..7303ed1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1367,13 +1367,13 @@ zstandard = ">=0.21.0,<0.22.0" [[package]] name = "oc-ocdm" -version = "9.2.0" +version = "9.2.1" description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel." optional = false python-versions = "<3.14,>=3.8" files = [ - {file = "oc_ocdm-9.2.0-py3-none-any.whl", hash = "sha256:bcffb65f613a950dc0649cc6a2cb81e2ac3132432bf7a0eb70c26d6c3c2bf6cf"}, - {file = "oc_ocdm-9.2.0.tar.gz", hash = "sha256:6f99b0601f51da856476f0278957051a22d6b88bcb8e81f98c4eac80c7f9d5a3"}, + {file = "oc_ocdm-9.2.1-py3-none-any.whl", hash = "sha256:19f2cf6b5f5d0ebfac83daff84cd494fb1289f836a7b6cb1540cc1d10e8970ba"}, + {file = "oc_ocdm-9.2.1.tar.gz", hash = "sha256:3ccdd171c3d8cfe97f95d64807229388208b47463aed9659d33ba1fbb3755ca5"}, ] [package.dependencies] @@ -2707,4 +2707,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.14" -content-hash = "cf3d4482a09535ee0fea7914eef18021ef5a295f278bb66ad5b7f6bcd02b1d18" +content-hash = "ed679e54ec95128c5db0083e95402177c5a0f721cfc9d085e7e17f8d22d31841" diff --git a/pyproject.toml b/pyproject.toml index 192df67..79a59a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ oc-ds-converter = "^1.0.4" ijson = "^3.2.3" internetarchive = "^3.7.0" zenodopy = "^0.3.0" -oc-ocdm = "9.2.0" +oc-ocdm = "9.2.1" retrying = "^1.3.4" orjson = "^3.10.7" rdflib-ocdm = "0.3.11"