From a0d4244dcecd0cd8ff2a2457b2b7af3125aa4288 Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Tue, 19 Nov 2024 17:33:31 +0100 Subject: [PATCH] entity_merger test --- .../duplicated_entities_simultaneously.py | 210 ++-- poetry.lock | 8 +- pyproject.toml | 2 +- test/entity_merger_test.py | 1050 +++++++++++++++++ test/merger/input/0.csv | 2 + test/merger/input/1.csv | 2 + test/merger/input/2.csv | 2 + test/merger/input/empty.csv | 1 + test/merger/meta_config.yaml | 72 ++ test/merger/time_agnostic_library_config.json | 1 + 10 files changed, 1261 insertions(+), 89 deletions(-) create mode 100644 test/entity_merger_test.py create mode 100644 test/merger/input/0.csv create mode 100644 test/merger/input/1.csv create mode 100644 test/merger/input/2.csv create mode 100644 test/merger/input/empty.csv create mode 100644 test/merger/meta_config.yaml create mode 100644 test/merger/time_agnostic_library_config.json diff --git a/oc_meta/run/merge/duplicated_entities_simultaneously.py b/oc_meta/run/merge/duplicated_entities_simultaneously.py index ada6815d..42da24eb 100644 --- a/oc_meta/run/merge/duplicated_entities_simultaneously.py +++ b/oc_meta/run/merge/duplicated_entities_simultaneously.py @@ -2,7 +2,7 @@ import concurrent.futures import csv import os -from typing import List +from typing import List, Dict from oc_meta.plugins.editor import MetaEditor from oc_ocdm.graph import GraphSet @@ -10,98 +10,140 @@ from tqdm import tqdm -def get_entity_type(entity_url): - parts = entity_url.split('/') - if 'oc' in parts and 'meta' in parts: - try: - return parts[parts.index('meta') + 1] - except IndexError: - return None - return None - -def read_csv(csv_file) -> List[dict]: - data = [] - with open(csv_file, mode='r', newline='', encoding='utf-8') as file: - csv_reader = csv.DictReader(file) - for row in csv_reader: - if 'Done' not in row: - row['Done'] = 'False' - data.append(row) - return data - -def write_csv(csv_file, data): - fieldnames = data[0].keys() - with open(csv_file, mode='w', newline='', encoding='utf-8') as file: - writer = csv.DictWriter(file, fieldnames=fieldnames) - writer.writeheader() - for row in data: - writer.writerow(row) - -def process_file(csv_file, meta_config, resp_agent, entity_types, stop_file_path): - data = read_csv(csv_file) - meta_editor = MetaEditor(meta_config, resp_agent, save_queries=True) - - # Creiamo un unico GraphSet per tutte le operazioni - g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler) - modified = False - - for row in data: - if os.path.exists(stop_file_path): - break +class EntityMerger: + def __init__(self, meta_config: str, resp_agent: str, entity_types: List[str], stop_file_path: str, workers: int): + self.meta_config = meta_config + self.resp_agent = resp_agent + self.entity_types = entity_types + self.stop_file_path = stop_file_path + self.workers = workers - entity_type = get_entity_type(row['surviving_entity']) - if row.get('Done') != 'True' and entity_type in entity_types: - surviving_entity = URIRef(row['surviving_entity']) - merged_entities = row['merged_entities'].split('; ') + @staticmethod + def get_entity_type(entity_url: str) -> str: + parts = entity_url.split('/') + if 'oc' in parts and 'meta' in parts: + try: + return parts[parts.index('meta') + 1] + except IndexError: + return None + return None + + @staticmethod + def read_csv(csv_file: str) -> List[Dict]: + data = [] + with open(csv_file, mode='r', newline='', encoding='utf-8') as file: + csv_reader = csv.DictReader(file) + for row in csv_reader: + if 'Done' not in row: + row['Done'] = 'False' + data.append(row) + return data + + @staticmethod + def write_csv(csv_file: str, data: List[Dict]): + fieldnames = data[0].keys() + with open(csv_file, mode='w', newline='', encoding='utf-8') as file: + writer = csv.DictWriter(file, fieldnames=fieldnames) + writer.writeheader() + for row in data: + writer.writerow(row) + + @staticmethod + def count_csv_rows(csv_file: str) -> int: + with open(csv_file, 'r', encoding='utf-8') as f: + return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row + + def process_file(self, csv_file: str) -> str: + """Process a single CSV file with its own MetaEditor instance""" + data = self.read_csv(csv_file) + meta_editor = MetaEditor(self.meta_config, self.resp_agent, save_queries=True) + modified = False + + # Create a GraphSet for the current file + g_set = GraphSet(meta_editor.base_iri, custom_counter_handler=meta_editor.counter_handler) - for merged_entity in merged_entities: - merged_entity = merged_entity.strip() + for row in data: + if os.path.exists(self.stop_file_path): + break + + entity_type = self.get_entity_type(row['surviving_entity']) + if row.get('Done') != 'True' and entity_type in self.entity_types: + surviving_entity = URIRef(row['surviving_entity']) + merged_entities = row['merged_entities'].split('; ') + + for merged_entity in merged_entities: + merged_entity = merged_entity.strip() + try: + meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity)) + except ValueError: + continue + + row['Done'] = 'True' + modified = True + if modified: + meta_editor.save(g_set) + self.write_csv(csv_file, data) + + return csv_file + + def process_folder(self, csv_folder: str): + """Process all CSV files in a folder using parallel processing""" + if os.path.exists(self.stop_file_path): + os.remove(self.stop_file_path) + + csv_files = [os.path.join(csv_folder, file) + for file in os.listdir(csv_folder) + if file.endswith('.csv')] + + # Filter CSV files based on number of rows and workers + if self.workers > 4: + csv_files = [file for file in csv_files + if self.count_csv_rows(file) <= 10000] + + with concurrent.futures.ProcessPoolExecutor(max_workers=self.workers) as executor: + futures = { + executor.submit(self.process_file, csv_file): csv_file + for csv_file in csv_files + } + + for future in tqdm(concurrent.futures.as_completed(futures), + total=len(futures), + desc="Overall Progress"): try: - meta_editor.merge(g_set, surviving_entity, URIRef(merged_entity)) - except ValueError: - continue - - row['Done'] = 'True' - modified = True + future.result() + except Exception as e: + print(f"Error processing file: {e}") - # Salviamo le modifiche una sola volta alla fine - if modified: - meta_editor.save(g_set) - - write_csv(csv_file, data) - return csv_file - -def count_csv_rows(csv_file): - with open(csv_file, 'r', encoding='utf-8') as f: - return sum(1 for _ in f) - 1 # Subtract 1 to exclude the header row def main(): parser = argparse.ArgumentParser(description="Merge entities from CSV files in a folder.") - parser.add_argument('csv_folder', type=str, help="Path to the folder containing CSV files") - parser.add_argument('meta_config', type=str, help="Meta configuration string") - parser.add_argument('resp_agent', type=str, help="Responsible agent string") - parser.add_argument('--entity_types', nargs='+', default=['ra', 'br', 'id'], help="Types of entities to merge (ra, br, id)") - parser.add_argument('--stop_file', type=str, default="stop.out", help="Path to the stop file") - parser.add_argument('--workers', type=int, default=4, help="Number of parallel workers") + parser.add_argument('csv_folder', type=str, + help="Path to the folder containing CSV files") + parser.add_argument('meta_config', type=str, + help="Meta configuration string") + parser.add_argument('resp_agent', type=str, + help="Responsible agent string") + parser.add_argument('--entity_types', nargs='+', + default=['ra', 'br', 'id'], + help="Types of entities to merge (ra, br, id)") + parser.add_argument('--stop_file', type=str, + default="stop.out", + help="Path to the stop file") + parser.add_argument('--workers', type=int, + default=4, + help="Number of parallel workers") + args = parser.parse_args() - if os.path.exists(args.stop_file): - os.remove(args.stop_file) - - csv_files = [os.path.join(args.csv_folder, file) for file in os.listdir(args.csv_folder) if file.endswith('.csv')] - - # Filtrare i file CSV in base al numero di righe e al numero di workers - if args.workers > 4: - csv_files = [file for file in csv_files if count_csv_rows(file) <= 10000] - - with concurrent.futures.ProcessPoolExecutor(max_workers=args.workers) as executor: - futures = {executor.submit(process_file, csv_file, args.meta_config, args.resp_agent, args.entity_types, args.stop_file): csv_file for csv_file in csv_files} - - for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Overall Progress"): - try: - future.result() - except Exception as e: - print(f"Error processing file: {e}") + merger = EntityMerger( + meta_config=args.meta_config, + resp_agent=args.resp_agent, + entity_types=args.entity_types, + stop_file_path=args.stop_file, + workers=args.workers + ) + + merger.process_folder(args.csv_folder) if __name__ == "__main__": diff --git a/poetry.lock b/poetry.lock index 6ca48c57..99a3904b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1367,13 +1367,13 @@ zstandard = ">=0.21.0,<0.22.0" [[package]] name = "oc-ocdm" -version = "9.1.2" +version = "9.2.0" description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel." optional = false python-versions = "<3.14,>=3.8" files = [ - {file = "oc_ocdm-9.1.2-py3-none-any.whl", hash = "sha256:2f6c3efca0dddc4eb4b7fd31a7990cc9cdfe4fb9ece4a767e17721d403d772ba"}, - {file = "oc_ocdm-9.1.2.tar.gz", hash = "sha256:df0787c6fcaae5eac17cbd0793761da2c02021b8ba00ddd9ff66513f3d5c6916"}, + {file = "oc_ocdm-9.2.0-py3-none-any.whl", hash = "sha256:bcffb65f613a950dc0649cc6a2cb81e2ac3132432bf7a0eb70c26d6c3c2bf6cf"}, + {file = "oc_ocdm-9.2.0.tar.gz", hash = "sha256:6f99b0601f51da856476f0278957051a22d6b88bcb8e81f98c4eac80c7f9d5a3"}, ] [package.dependencies] @@ -2707,4 +2707,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.14" -content-hash = "312daf6c45aaf115e4ffdf12adf66e6739e3e19164a35485bec2c764d060dccf" +content-hash = "cf3d4482a09535ee0fea7914eef18021ef5a295f278bb66ad5b7f6bcd02b1d18" diff --git a/pyproject.toml b/pyproject.toml index b088a4f1..192df674 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ oc-ds-converter = "^1.0.4" ijson = "^3.2.3" internetarchive = "^3.7.0" zenodopy = "^0.3.0" -oc-ocdm = "^9.1.2" +oc-ocdm = "9.2.0" retrying = "^1.3.4" orjson = "^3.10.7" rdflib-ocdm = "0.3.11" diff --git a/test/entity_merger_test.py b/test/entity_merger_test.py new file mode 100644 index 00000000..952f2a00 --- /dev/null +++ b/test/entity_merger_test.py @@ -0,0 +1,1050 @@ +import csv +import json +import os +import re +import unittest +from shutil import rmtree + +import redis +from oc_meta.run.merge.duplicated_entities_simultaneously import EntityMerger +from oc_ocdm.counter_handler.redis_counter_handler import RedisCounterHandler +from oc_ocdm.graph import GraphSet +from oc_ocdm.prov.prov_set import ProvSet +from oc_ocdm.storer import Storer +from rdflib import XSD, Graph, Literal, URIRef +from SPARQLWrapper import POST, SPARQLWrapper + +BASE = os.path.join('test', 'merger') +OUTPUT = os.path.join(BASE, 'output/') +META_CONFIG = os.path.join('test', 'merger', 'meta_config.yaml') +SERVER = 'http://127.0.0.1:8805/sparql' + +# Redis configuration +REDIS_HOST = 'localhost' +REDIS_PORT = 6379 +REDIS_DB = 5 + +def reset_triplestore(): + """Reset the test triplestore graphs""" + endpoint = SPARQLWrapper(SERVER) + for graph in [ + 'https://w3id.org/oc/meta/br/', + 'https://w3id.org/oc/meta/ra/', + 'https://w3id.org/oc/meta/re/', + 'https://w3id.org/oc/meta/id/', + 'https://w3id.org/oc/meta/ar/' + ]: + endpoint.setQuery(f'CLEAR GRAPH <{graph}>') + endpoint.setMethod(POST) + endpoint.query() + +def reset_redis_counters(): + redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) + redis_client.flushdb() + +def get_counter_handler(): + return RedisCounterHandler(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) + +class TestEntityMerger(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.counter_handler = get_counter_handler() + + def setUp(self): + # Reset environment + if os.path.exists(OUTPUT): + rmtree(OUTPUT) + os.makedirs(os.path.join(BASE, 'csv'), exist_ok=True) + reset_triplestore() + reset_redis_counters() + + # Initialize test data + self.setup_test_data() + + # Create merger instance + self.merger = EntityMerger( + meta_config=META_CONFIG, + resp_agent='https://orcid.org/0000-0002-8420-0696', + entity_types=['ra', 'br', 'id'], + stop_file_path='stop.out', + workers=4 + ) + + def tearDown(self): + if os.path.exists(os.path.join(BASE, 'csv')): + rmtree(os.path.join(BASE, 'csv')) + if os.path.exists(OUTPUT): + rmtree(OUTPUT) + if os.path.exists('stop.out'): + os.remove('stop.out') + reset_triplestore() + + def setup_test_data(self): + """Create initial test data in triplestore""" + # Create a GraphSet for test data + g_set = GraphSet("https://w3id.org/oc/meta/", supplier_prefix="060", custom_counter_handler=self.counter_handler) + + # Create first author entity with specific ID + author1 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0601")) + author1.has_name("John Smith") + + # Create ORCID identifier for author1 with specific ID + orcid_id = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0601")) + orcid_id.create_orcid("0000-0001-1234-5678") + author1.has_identifier(orcid_id) + + # Create second author entity with specific ID + author2 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0602")) + author2.has_name("J. Smith") + + # Create VIAF identifier for author2 with specific ID + viaf_id = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0602")) + viaf_id.create_viaf("12345678") + author2.has_identifier(viaf_id) + + # Create a publication with specific ID + pub = g_set.add_br(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/br/0601")) + pub.has_title("Test Publication") + pub.has_pub_date("2024-01-01") + + # Create role for first author with specific ID + role1 = g_set.add_ar(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ar/0601")) + role1.create_author() + role1.is_held_by(author1) + pub.has_contributor(role1) + + # Create role for second author with specific ID + role2 = g_set.add_ar(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ar/0602")) + role2.create_author() + role2.is_held_by(author2) + pub.has_contributor(role2) + + prov = ProvSet(g_set, "https://w3id.org/oc/meta/", wanted_label=False, custom_counter_handler=self.counter_handler) + prov.generate_provenance() + + rdf_output = os.path.join(OUTPUT, 'rdf') + os.sep + + res_storer = Storer(abstract_set=g_set, dir_split=10000, + n_file_item=1000, output_format='json-ld', + zip_output=False) + prov_storer = Storer(abstract_set=prov, dir_split=10000, + n_file_item=1000, output_format='json-ld', + zip_output=False) + + res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + res_storer.upload_all(triplestore_url=SERVER, base_dir=rdf_output, batch_size=10, save_queries=False) + + # Create CSV file for merger + merge_data = [{ + 'surviving_entity': str(author1.res), + 'merged_entities': str(author2.res), + 'Done': 'False' + }] + self.write_csv('merge_test.csv', merge_data) + + def write_csv(self, filename: str, data: list): + filepath = os.path.join(BASE, 'csv', filename) + with open(filepath, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=data[0].keys()) + writer.writeheader() + writer.writerows(data) + + def check_sparql_query_content(self, query: str, expected_triples: dict): + """ + Check if a SPARQL query contains expected triples in DELETE and INSERT sections. + + Args: + query (str): The SPARQL update query string + expected_triples (dict): Dictionary with 'delete' and 'insert' keys containing + lists of triple patterns to check for + """ + # Split query into DELETE and INSERT sections + delete_match = re.search(r'DELETE DATA \{ GRAPH.*?\{(.*?)\}.*?\}', query, re.DOTALL) + insert_match = re.search(r'INSERT DATA \{ GRAPH.*?\{(.*?)\}.*?\}', query, re.DOTALL) + + delete_section = delete_match.group(1) if delete_match else "" + insert_section = insert_match.group(1) if insert_match else "" + # Check DELETE patterns + if 'delete' in expected_triples: + for triple in expected_triples['delete']: + self.assertIn(triple, delete_section.strip(), + f"Expected triple not found in DELETE section: {triple}") + + # Check INSERT patterns + if 'insert' in expected_triples: + for triple in expected_triples['insert']: + self.assertIn(triple, insert_section.strip(), + f"Expected triple not found in INSERT section: {triple}") + + def test_get_entity_type(self): + """Test the static method get_entity_type""" + test_cases = [ + ('https://w3id.org/oc/meta/ra/06107', 'ra'), + ('https://w3id.org/oc/meta/br/06101', 'br'), + ('https://w3id.org/oc/meta/id/06105', 'id'), + ('https://example.com/invalid/url', None), + ('', None) + ] + + for url, expected in test_cases: + with self.subTest(url=url): + self.assertEqual( + EntityMerger.get_entity_type(url), + expected + ) + + def test_read_write_csv(self): + """Test CSV read and write operations""" + test_data = [{ + 'surviving_entity': 'https://w3id.org/oc/meta/ra/06107', + 'merged_entities': 'https://w3id.org/oc/meta/ra/06205', + 'Done': 'False' + }] + + # Write test data + test_file = os.path.join(BASE, 'csv', 'test.csv') + EntityMerger.write_csv(test_file, test_data) + + # Read back and verify + read_data = EntityMerger.read_csv(test_file) + self.assertEqual(test_data, read_data) + + def test_count_csv_rows(self): + """Test CSV row counting""" + # Test with empty file + empty_file = os.path.join(BASE, 'csv', 'empty.csv') + with open(empty_file, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=['col1', 'col2']) + writer.writeheader() + self.assertEqual(EntityMerger.count_csv_rows(empty_file), 0) + + # Test with multiple rows + test_file = os.path.join(BASE, 'input', '0.csv') + row_count = EntityMerger.count_csv_rows(test_file) + self.assertEqual(row_count, 1) + + def test_process_file_with_stop_file(self): + """Test that processing stops when stop file is present""" + # Create stop file + with open(self.merger.stop_file_path, 'w') as f: + f.write('') + + # Process test file + test_file = os.path.join(BASE, 'csv', 'merge_test.csv') + result = self.merger.process_file(test_file) + + # Verify the file wasn't processed (Done should still be False) + data = EntityMerger.read_csv(test_file) + self.assertEqual(data[0]['Done'], 'False') + + def test_process_folder(self): + """Test processing multiple files in a folder""" + csv_folder = os.path.join(BASE, 'csv') + self.merger.process_folder(csv_folder) + + # Verify all files were processed + for filename in ['merge_test.csv']: + with self.subTest(file=filename): + data = EntityMerger.read_csv(os.path.join(csv_folder, filename)) + self.assertEqual(data[0]['Done'], 'True') + + def test_process_folder_with_worker_limit(self): + """Test processing folder with worker count > 4""" + self.merger.workers = 5 + csv_folder = os.path.join(BASE, 'csv') + + # Create a large file + large_data = [{ + 'surviving_entity': f'https://w3id.org/oc/meta/ra/0610{i}', + 'merged_entities': f'https://w3id.org/oc/meta/ra/0620{i}', + 'Done': 'False' + } for i in range(15000)] # Create more than 10000 rows + self.write_csv('large.csv', large_data) + + self.merger.process_folder(csv_folder) + + # Verify only small files were processed + large_file_data = EntityMerger.read_csv(os.path.join(csv_folder, 'large.csv')) + self.assertEqual(large_file_data[0]['Done'], 'False') # Large file should be skipped + + small_file_data = EntityMerger.read_csv(os.path.join(csv_folder, 'merge_test.csv')) + self.assertEqual(small_file_data[0]['Done'], 'True') # Small file should be processed + + def test_merge_authors_with_real_data(self): + """Test merging two author entities with real data""" + # Process the merge + csv_folder = os.path.join(BASE, 'csv') + self.merger.process_folder(csv_folder) + + # Verify files structure + rdf_path = os.path.join(OUTPUT) + self.assertTrue(os.path.exists(os.path.join(rdf_path, 'rdf', 'ra', '060', '10000', '1000'))) + self.assertTrue(os.path.exists(os.path.join(rdf_path, 'rdf', 'ra', '060', '10000', '1000', 'prov'))) + + # Load and verify data files + ra_file = os.path.join(rdf_path, 'rdf', 'ra', '060', '10000', '1000.json') + with open(ra_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/ra/0601': + # Check has both identifiers + identifiers = {id_obj['@id'] for id_obj in entity['http://purl.org/spar/datacite/hasIdentifier']} + self.assertEqual(len(identifiers), 2) + self.assertIn('https://w3id.org/oc/meta/id/0601', identifiers) + self.assertIn('https://w3id.org/oc/meta/id/0602', identifiers) + + # Check name + self.assertEqual(entity['http://xmlns.com/foaf/0.1/name'][0]['@value'], 'J. Smith') + + # Check merged entity no longer exists + if entity['@id'] == 'https://w3id.org/oc/meta/ra/0602': + self.fail('Merged entity should not exist') + + # Check role reassignment + ar_file = os.path.join(rdf_path, 'rdf', 'ar', '060', '10000', '1000.json') + with open(ar_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + agent = entity['http://purl.org/spar/pro/isHeldBy'][0]['@id'] + self.assertEqual(agent, 'https://w3id.org/oc/meta/ra/0601', + 'All roles should point to surviving entity') + + # Check provenance + prov_file = os.path.join(rdf_path, 'rdf', 'ra', '060', '10000', '1000', 'prov', 'se.json') + found_merge_prov = False + expected_triples = { + 'delete': [ + ' "John Smith"' + ], + 'insert': [ + ' ', + ' "J. Smith"' + ] + } + with open(prov_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if '/prov/se/' in entity['@id'] and 'merge' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + found_merge_prov = True + + # Check provenance fields + self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', entity) + self.assertIn('http://www.w3.org/ns/prov#wasAttributedTo', entity) + self.assertIn('https://w3id.org/oc/ontology/hasUpdateQuery', entity) + + # Get actual query and normalize both expected and actual + actual_query = entity['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + self.check_sparql_query_content(actual_query, expected_triples) + + self.assertTrue(found_merge_prov, "No merge provenance found") + + + def test_merge_with_invalid_entity_type(self): + """Test merging with an invalid entity type""" + # Create test data with invalid entity type + invalid_data = [{ + 'surviving_entity': 'https://w3id.org/oc/meta/invalid/0601', + 'merged_entities': 'https://w3id.org/oc/meta/invalid/0602', + 'Done': 'False' + }] + test_file = os.path.join(BASE, 'csv', 'invalid_type.csv') + self.write_csv('invalid_type.csv', invalid_data) + self.merger.process_file(test_file) + data = EntityMerger.read_csv(test_file) + self.assertEqual(data[0]['Done'], 'False') + + def test_merge_with_nonexistent_entities(self): + """Test merging when one or both entities don't exist""" + # Create test data with nonexistent entities + nonexistent_data = [{ + 'surviving_entity': 'https://w3id.org/oc/meta/ra/9999', + 'merged_entities': 'https://w3id.org/oc/meta/ra/9998', + 'Done': 'False' + }] + test_file = os.path.join(BASE, 'csv', 'nonexistent.csv') + self.write_csv('nonexistent.csv', nonexistent_data) + self.merger.process_file(test_file) + data = EntityMerger.read_csv(test_file) + self.assertEqual(data[0]['Done'], 'True') + + def test_merge_multiple_entities(self): + """Test merging multiple entities into one surviving entity""" + # Create additional test entities + g_set = GraphSet("https://w3id.org/oc/meta/", supplier_prefix="060", + custom_counter_handler=self.counter_handler) + + # Create additional authors + author3 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0603")) + author3.has_name("John A. Smith") + + author4 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0604")) + author4.has_name("J A Smith") + + # Add identifiers + viaf_id = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0603")) + viaf_id.create_viaf("123456789") + author3.has_identifier(viaf_id) + + researcher_id = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0604")) + researcher_id.create_wikidata("Q12345") + author4.has_identifier(researcher_id) + + # Create publications and roles + pub2 = g_set.add_br(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/br/0602")) + pub2.has_title("Another Test Publication") + + role3 = g_set.add_ar(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ar/0603")) + role3.create_author() + role3.is_held_by(author3) + pub2.has_contributor(role3) + + role4 = g_set.add_ar(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ar/0604")) + role4.create_author() + role4.is_held_by(author4) + pub2.has_contributor(role4) + + # Store and upload + prov = ProvSet(g_set, "https://w3id.org/oc/meta/", wanted_label=False, + custom_counter_handler=self.counter_handler) + prov.generate_provenance() + + rdf_output = os.path.join(OUTPUT, 'rdf') + os.sep + + res_storer = Storer(abstract_set=g_set, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + prov_storer = Storer(abstract_set=prov, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + + res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + res_storer.upload_all(triplestore_url=SERVER, base_dir=rdf_output, + batch_size=10, save_queries=False) + + # Create merge data + merge_data = [{ + 'surviving_entity': 'https://w3id.org/oc/meta/ra/0601', + 'merged_entities': 'https://w3id.org/oc/meta/ra/0602; https://w3id.org/oc/meta/ra/0603; https://w3id.org/oc/meta/ra/0604', + 'Done': 'False' + }] + test_file = os.path.join(BASE, 'csv', 'multiple_merge.csv') + self.write_csv('multiple_merge.csv', merge_data) + + # Process the merge + csv_folder = os.path.join(BASE, 'csv') + self.merger.process_folder(csv_folder) + + # Verify the results by checking the output files + rdf_path = os.path.join(OUTPUT, 'rdf') + + # 1. Check researcher file for surviving entity and merged data + ra_file = os.path.join(rdf_path, 'ra', '060', '10000', '1000.json') + with open(ra_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/ra/0601': + # Check has all identifiers + identifiers = {id_obj['@id'] for id_obj in entity['http://purl.org/spar/datacite/hasIdentifier']} + self.assertEqual(len(identifiers), 4) + expected_ids = { + 'https://w3id.org/oc/meta/id/0601', + 'https://w3id.org/oc/meta/id/0602', + 'https://w3id.org/oc/meta/id/0603', + 'https://w3id.org/oc/meta/id/0604' + } + self.assertEqual(identifiers, expected_ids) + + # Check name (should take the last merged name) + self.assertEqual(entity['http://xmlns.com/foaf/0.1/name'][0]['@value'], 'J A Smith') + + # Check merged entities no longer exist + self.assertNotIn(entity['@id'], [ + 'https://w3id.org/oc/meta/ra/0602', + 'https://w3id.org/oc/meta/ra/0603', + 'https://w3id.org/oc/meta/ra/0604' + ]) + + # 2. Check role assignments in agent role file + ar_file = os.path.join(rdf_path, 'ar', '060', '10000', '1000.json') + with open(ar_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if 'http://purl.org/spar/pro/isHeldBy' in entity: + agent = entity['http://purl.org/spar/pro/isHeldBy'][0]['@id'] + self.assertEqual(agent, 'https://w3id.org/oc/meta/ra/0601', + 'All roles should point to surviving entity') + + # 3. Check provenance + prov_file = os.path.join(rdf_path, 'ra', '060', '10000', '1000', 'prov', 'se.json') + with open(prov_file) as f: + data = json.load(f) + + # Get all provenance snapshots for surviving entity + surviving_snapshots = [] + for graph in data: + if graph['@id'] == 'https://w3id.org/oc/meta/ra/0601/prov/': + for entity in graph.get('@graph', []): + # Skip creation snapshot + if 'created' not in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + surviving_snapshots.append(entity) + + # Should have 2 merge snapshots (one partial, one final) + self.assertEqual(len(surviving_snapshots), 2, + "Should have exactly 2 merge snapshots") + + # Verify partial merge (0601 with 0602) + partial_merge = next(s for s in surviving_snapshots + if '0602' in s['http://purl.org/dc/terms/description'][0]['@value'] + and '0603' not in s['http://purl.org/dc/terms/description'][0]['@value']) + + # Check partial merge metadata + self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', partial_merge) + self.assertIn('http://www.w3.org/ns/prov#wasAttributedTo', partial_merge) + self.assertEqual( + partial_merge['http://www.w3.org/ns/prov#wasAttributedTo'][0]['@id'], + 'https://orcid.org/0000-0002-8420-0696' + ) + + # Check partial merge query content + partial_query = partial_merge['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + expected_partial = { + 'delete': [ + ' "John Smith"' + ], + 'insert': [ + ' ', + ' "J. Smith"' + ] + } + self.check_sparql_query_content(partial_query, expected_partial) + + # Verify final merge (0601 with 0602, 0603, 0604) + final_merge = next(s for s in surviving_snapshots + if '0602' in s['http://purl.org/dc/terms/description'][0]['@value'] + and '0603' in s['http://purl.org/dc/terms/description'][0]['@value'] + and '0604' in s['http://purl.org/dc/terms/description'][0]['@value']) + + # Check final merge metadata + self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', final_merge) + self.assertIn('http://www.w3.org/ns/prov#wasAttributedTo', final_merge) + self.assertEqual( + final_merge['http://www.w3.org/ns/prov#wasAttributedTo'][0]['@id'], + 'https://orcid.org/0000-0002-8420-0696' + ) + + # Check final merge query content + final_query = final_merge['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + expected_final = { + 'delete': [ + ' "John Smith"' + ], + 'insert': [ + ' ', + ' ', + ' "J A Smith"', + ' ' + ] + } + self.check_sparql_query_content(final_query, expected_final) + + # Verify deletion snapshots exist for merged entities + merged_ids = ['0602', '0603', '0604'] + for merged_id in merged_ids: + merged_snapshots = [] + for graph in data: + if graph['@id'] == f'https://w3id.org/oc/meta/ra/{merged_id}/prov/': + for entity in graph.get('@graph', []): + if 'deleted' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + merged_snapshots.append(entity) + + self.assertGreater(len(merged_snapshots), 0, + f"No deletion snapshot found for ra/{merged_id}") + + # Verify deletion queries + for snapshot in merged_snapshots: + self.assertIn('https://w3id.org/oc/ontology/hasUpdateQuery', snapshot) + delete_query = snapshot['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + self.assertIn(f'', delete_query) + self.assertIn('DELETE DATA', delete_query) + + def test_merge_with_conflicting_data(self): + """Test merging entities with conflicting information""" + # Create test entities with conflicting data + g_set = GraphSet("https://w3id.org/oc/meta/", supplier_prefix="060", + custom_counter_handler=self.counter_handler) + + # Create conflicting authors + author5 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0605")) + author5.has_name("John Smith") + author5.has_given_name("John") + author5.has_family_name("Smith") + + author6 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0606")) + author6.has_name("Johnny Smith") + author6.has_given_name("Johnny") + author6.has_family_name("Smith") + + # Add same identifier to both (which should be impossible in real data) + same_orcid = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0605")) + same_orcid.create_orcid("0000-0001-9999-9999") + author5.has_identifier(same_orcid) + + same_orcid2 = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0606")) + same_orcid2.create_orcid("0000-0001-9999-9999") + author6.has_identifier(same_orcid2) + + # Store and upload + prov = ProvSet(g_set, "https://w3id.org/oc/meta/", wanted_label=False, + custom_counter_handler=self.counter_handler) + prov.generate_provenance() + + rdf_output = os.path.join(OUTPUT, 'rdf') + os.sep + + res_storer = Storer(abstract_set=g_set, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + prov_storer = Storer(abstract_set=prov, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + + res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + res_storer.upload_all(triplestore_url=SERVER, base_dir=rdf_output, + batch_size=10, save_queries=False) + + # Create merge data + merge_data = [{ + 'surviving_entity': 'https://w3id.org/oc/meta/ra/0605', + 'merged_entities': 'https://w3id.org/oc/meta/ra/0606', + 'Done': 'False' + }] + test_file = os.path.join(BASE, 'csv', 'conflicting_merge.csv') + self.write_csv('conflicting_merge.csv', merge_data) + + # Process the merge + csv_folder = os.path.join(BASE, 'csv') + self.merger.process_folder(csv_folder) + + # Verify the results by checking the output files + rdf_path = os.path.join(OUTPUT, 'rdf') + + # 1. Check researcher file for surviving entity and merged data + ra_file = os.path.join(rdf_path, 'ra', '060', '10000', '1000.json') + with open(ra_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/ra/0605': + # Check identifiers - should only keep one instance + identifiers = {id_obj['@id'] for id_obj in entity.get('http://purl.org/spar/datacite/hasIdentifier', [])} + self.assertEqual(len(identifiers), 1) + self.assertEqual(identifiers, {'https://w3id.org/oc/meta/id/0605'}) + + # Check name was preserved + self.assertEqual(entity['http://xmlns.com/foaf/0.1/name'][0]['@value'], 'Johnny Smith') + self.assertEqual(entity['http://xmlns.com/foaf/0.1/givenName'][0]['@value'], 'Johnny') + self.assertEqual(entity['http://xmlns.com/foaf/0.1/familyName'][0]['@value'], 'Smith') + + # Check merged entity does not exist in output + self.assertNotEqual(entity['@id'], 'https://w3id.org/oc/meta/ra/0606') + + # 2. Check provenance + prov_file = os.path.join(rdf_path, 'ra', '060', '10000', '1000', 'prov', 'se.json') + with open(prov_file) as f: + data = json.load(f) + + # Find merge snapshot + merge_snapshot = None + for graph in data: + if graph['@id'] == 'https://w3id.org/oc/meta/ra/0605/prov/': + for entity in graph.get('@graph', []): + if 'merge' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + merge_snapshot = entity + break + + self.assertIsNotNone(merge_snapshot, "No merge snapshot found") + + # Verify merge metadata + self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', merge_snapshot) + self.assertIn('http://www.w3.org/ns/prov#wasAttributedTo', merge_snapshot) + + # Check the merge query - should not duplicate the conflicting ORCID + merge_query = merge_snapshot['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + expected_triples = { + 'delete': [ + ' "John Smith"', + ' "John"', + ], + 'insert': [ + ' "Johnny Smith"', + ' "Johnny"', + ] + } + self.check_sparql_query_content(merge_query, expected_triples) + + # Verify deletion snapshot exists for merged entity + delete_snapshot = None + for graph in data: + if graph['@id'] == 'https://w3id.org/oc/meta/ra/0606/prov/': + for entity in graph.get('@graph', []): + if 'deleted' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + delete_snapshot = entity + break + + self.assertIsNotNone(delete_snapshot, "No deletion snapshot found for merged entity") + + # Verify deletion query + delete_query = delete_snapshot['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + self.assertIn('DELETE DATA', delete_query) + self.assertIn('', delete_query) + + def test_merge_bibliographic_resources(self): + """Test merging two bibliographic resource entities""" + g_set = GraphSet("https://w3id.org/oc/meta/", supplier_prefix="060", + custom_counter_handler=self.counter_handler) + + # Create first publication with some metadata + pub1 = g_set.add_br(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/br/0603")) + pub1.has_title("Data Integration Methods") + pub1.has_subtitle("A Comprehensive Review") + pub1.has_pub_date("2023") + + # Create issue for pub1 + issue = g_set.add_br(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/br/0605")) + issue.create_issue() + issue.has_number("4") + pub1.is_part_of(issue) + + # Create resource embodiment for pub1 + re1 = g_set.add_re(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/re/0603")) + re1.has_starting_page("1") + re1.has_ending_page("20") + pub1.has_format(re1) + + # Add DOI identifier for pub1 + doi_id = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0603")) + doi_id.create_doi("10.1000/example.doi.1") + pub1.has_identifier(doi_id) + + # Create second publication with complementary metadata + pub2 = g_set.add_br(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/br/0604")) + pub2.has_title("Data Integration Methods") # Same title + pub2.has_pub_date("2023") # Same year + + # Create volume for pub2 + volume = g_set.add_br(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/br/0606")) + volume.create_volume() + volume.has_number("15") + pub2.is_part_of(volume) + + # Create resource embodiment for pub2 + re2 = g_set.add_re(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/re/0604")) + re2.has_starting_page("100") + re2.has_ending_page("120") + pub2.has_format(re2) + + # Add ISBN identifier for pub2 + isbn_id = g_set.add_id(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/id/0604")) + isbn_id.create_isbn("978-0-123456-47-2") + pub2.has_identifier(isbn_id) + + # Create authors and roles + author1 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0605")) + author1.has_name("Jane Doe") + + author2 = g_set.add_ra(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ra/0606")) + author2.has_name("John Smith") + + # Add roles for pub1 + role1 = g_set.add_ar(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ar/0605")) + role1.create_author() + role1.is_held_by(author1) + pub1.has_contributor(role1) + + # Add roles for pub2 + role2 = g_set.add_ar(resp_agent="https://orcid.org/0000-0002-8420-0696", + res=URIRef("https://w3id.org/oc/meta/ar/0606")) + role2.create_author() + role2.is_held_by(author2) + pub2.has_contributor(role2) + + # Store and upload + prov = ProvSet(g_set, "https://w3id.org/oc/meta/", wanted_label=False, + custom_counter_handler=self.counter_handler) + prov.generate_provenance() + + rdf_output = os.path.join(OUTPUT, 'rdf') + os.sep + + res_storer = Storer(abstract_set=g_set, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + prov_storer = Storer(abstract_set=prov, dir_split=10000, n_file_item=1000, + output_format='json-ld', zip_output=False) + + res_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + prov_storer.store_all(base_dir=rdf_output, base_iri="https://w3id.org/oc/meta/") + res_storer.upload_all(triplestore_url=SERVER, base_dir=rdf_output, + batch_size=10, save_queries=False) + + # Create merge data + merge_data = [{ + 'surviving_entity': 'https://w3id.org/oc/meta/br/0603', + 'merged_entities': 'https://w3id.org/oc/meta/br/0604', + 'Done': 'False' + }] + test_file = os.path.join(BASE, 'csv', 'br_merge.csv') + self.write_csv('br_merge.csv', merge_data) + + # Process the merge + csv_folder = os.path.join(BASE, 'csv') + self.merger.process_folder(csv_folder) + + # Verify the results by checking the output files + rdf_path = os.path.join(OUTPUT, 'rdf') + + # 1. Check bibliographic resource file + br_file = os.path.join(rdf_path, 'br', '060', '10000', '1000.json') + with open(br_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/br/0603': + # Check basic metadata + self.assertEqual( + entity['http://purl.org/dc/terms/title'][0]['@value'], + 'Data Integration Methods' + ) + self.assertEqual( + entity['http://purl.org/spar/fabio/hasSubtitle'][0]['@value'], + 'A Comprehensive Review' + ) + self.assertEqual( + entity['http://prismstandard.org/namespaces/basic/2.0/publicationDate'][0]['@value'], + '2023' + ) + + # Check part relationships + parts = {part['@id'] for part in entity['http://purl.org/vocab/frbr/core#partOf']} + self.assertEqual(len(parts), 1) + self.assertIn('https://w3id.org/oc/meta/br/0606', parts) # Volume + + # Check formats (resource embodiments) + formats = {fmt['@id'] for fmt in entity['http://purl.org/vocab/frbr/core#embodiment']} + self.assertEqual(len(formats), 1) + self.assertIn('https://w3id.org/oc/meta/re/0603', formats) + + # Check identifiers + identifiers = {id_obj['@id'] for id_obj in entity['http://purl.org/spar/datacite/hasIdentifier']} + self.assertEqual(len(identifiers), 2) + self.assertIn('https://w3id.org/oc/meta/id/0603', identifiers) + self.assertIn('https://w3id.org/oc/meta/id/0604', identifiers) + + # Check issue metadata + elif entity['@id'] == 'https://w3id.org/oc/meta/br/0605': + self.assertIn('http://purl.org/spar/fabio/JournalIssue', + entity['@type']) + self.assertEqual( + entity['http://purl.org/spar/fabio/hasSequenceIdentifier'][0]['@value'], + '4' + ) + + # Check volume metadata + elif entity['@id'] == 'https://w3id.org/oc/meta/br/0606': + self.assertIn('http://purl.org/spar/fabio/JournalVolume', + entity['@type']) + self.assertEqual( + entity['http://purl.org/spar/fabio/hasSequenceIdentifier'][0]['@value'], + '15' + ) + + # Check merged entity no longer exists + self.assertNotEqual(entity['@id'], 'https://w3id.org/oc/meta/br/0604') + + # 2. Check resource embodiments + re_file = os.path.join(rdf_path, 're', '060', '10000', '1000.json') + with open(re_file) as f: + data = json.load(f) + res_embodiments = {} + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] in ['https://w3id.org/oc/meta/re/0603', 'https://w3id.org/oc/meta/re/0604']: + res_embodiments[entity['@id']] = { + 'start': entity['http://prismstandard.org/namespaces/basic/2.0/startingPage'][0]['@value'], + 'end': entity['http://prismstandard.org/namespaces/basic/2.0/endingPage'][0]['@value'] + } + + self.assertEqual(len(res_embodiments), 2) + self.assertEqual(res_embodiments['https://w3id.org/oc/meta/re/0603']['start'], '1') + self.assertEqual(res_embodiments['https://w3id.org/oc/meta/re/0603']['end'], '20') + self.assertEqual(res_embodiments['https://w3id.org/oc/meta/re/0604']['start'], '100') + self.assertEqual(res_embodiments['https://w3id.org/oc/meta/re/0604']['end'], '120') + + # 3. Check role assignments + ar_file = os.path.join(rdf_path, 'ar', '060', '10000', '1000.json') + with open(ar_file) as f: + data = json.load(f) + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/ar/0605': + self.assertIn('http://purl.org/spar/pro/withRole', entity) + self.assertEqual( + entity['http://purl.org/spar/pro/withRole'][0]['@id'], + 'http://purl.org/spar/pro/author' + ) + holder = entity['http://purl.org/spar/pro/isHeldBy'][0]['@id'] + self.assertEqual(holder, 'https://w3id.org/oc/meta/ra/0605') + + # 4. Check provenance + prov_file = os.path.join(rdf_path, 'br', '060', '10000', '1000', 'prov', 'se.json') + with open(prov_file) as f: + data = json.load(f) + + # Find merge snapshot + merge_snapshot = None + for graph in data: + if graph['@id'] == 'https://w3id.org/oc/meta/br/0603/prov/': + for entity in graph.get('@graph', []): + if 'merge' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + merge_snapshot = entity + break + + self.assertIsNotNone(merge_snapshot, "No merge snapshot found") + + # Check merge query content + merge_query = merge_snapshot['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + expected_triples = { + 'delete': [ + ' ' + ], + 'insert': [ + ' ', + ' ' + ] + } + self.check_sparql_query_content(merge_query, expected_triples) + + # Verify deletion snapshot exists for merged entity + delete_snapshot = None + for graph in data: + if graph['@id'] == 'https://w3id.org/oc/meta/br/0604/prov/': + for entity in graph.get('@graph', []): + if 'deleted' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + delete_snapshot = entity + break + + self.assertIsNotNone(delete_snapshot, "No deletion snapshot found for merged entity") + + # Verify deletion query + delete_query = delete_snapshot['https://w3id.org/oc/ontology/hasUpdateQuery'][0]['@value'] + expected_delete_triples = { + 'delete': [ + ' "Data Integration Methods"', + ' "2023"', + ' ', + ' ', + ' ', + ' ', + ' ' + ] + } + self.check_sparql_query_content(delete_query, expected_delete_triples) + + # Check that all related entities have appropriate provenance + for graph in data: + # Check volume provenance + if graph['@id'] == 'https://w3id.org/oc/meta/br/0606/prov/': + found_volume_creation = False + for entity in graph.get('@graph', []): + if 'created' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + found_volume_creation = True + self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', entity) + self.assertIn('http://www.w3.org/ns/prov#wasAttributedTo', entity) + self.assertTrue(found_volume_creation, "No creation provenance found for volume") + + # Check resource embodiment provenance + if graph['@id'] == 'https://w3id.org/oc/meta/re/0604/prov/': + found_re_creation = False + for entity in graph.get('@graph', []): + if 'created' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + found_re_creation = True + self.assertIn('http://www.w3.org/ns/prov#generatedAtTime', entity) + self.assertIn('http://www.w3.org/ns/prov#wasAttributedTo', entity) + self.assertTrue(found_re_creation, "No creation provenance found for resource embodiment") + + # Verify all metadata inheritance + # We expect the surviving entity to inherit all identifiers + # while maintaining its original metadata (title, subtitle, resource embodiment, issue, contributors) + + # Check if provenance shows correct sequence of operations + merge_timestamps = [] + for graph in data: + if graph['@id'] == 'https://w3id.org/oc/meta/br/0603/prov/': + for entity in graph.get('@graph', []): + if 'merge' in entity.get('http://purl.org/dc/terms/description', [{}])[0].get('@value', '').lower(): + timestamp = entity['http://www.w3.org/ns/prov#generatedAtTime'][0]['@value'] + merge_timestamps.append(timestamp) + + # Check timestamps are in correct order + self.assertEqual(len(merge_timestamps), 1, "Should have exactly one merge operation") + + br_file = os.path.join(rdf_path, 'br', '060', '10000', '1000.json') + with open(br_file) as f: + data = json.load(f) + volume_found = False + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/br/0606': # Volume + volume_found = True + self.assertIn('http://purl.org/spar/fabio/JournalVolume', entity['@type']) + + self.assertTrue(volume_found, "Volume should still exist after merge") + + re_file = os.path.join(rdf_path, 're', '060', '10000', '1000.json') + with open(re_file) as f: + data = json.load(f) + re_found = False + for graph in data: + for entity in graph.get('@graph', []): + if entity['@id'] == 'https://w3id.org/oc/meta/re/0604': # RE from merged entity + re_found = True + self.assertEqual( + entity['http://prismstandard.org/namespaces/basic/2.0/startingPage'][0]['@value'], + '100' + ) + + self.assertTrue(re_found, "Resource embodiment should still exist after merge") + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/test/merger/input/0.csv b/test/merger/input/0.csv new file mode 100644 index 00000000..d49e643d --- /dev/null +++ b/test/merger/input/0.csv @@ -0,0 +1,2 @@ +"id","title","author","issue","volume","venue","page","pub_date","type","publisher","editor" +"doi:10.1002/(sici)1097-0142(19990501)85:9<2023::aid-cncr21>3.0.co;2-2","A Review Of Hemolytic Uremic Syndrome In Patients Treated With Gemcitabine Therapy","Fung, Man C.; Storniolo, Anna Maria; Nguyen, Binh; Arning, Michael; Brookfield, William; Vigil, James","9","85","Cancer [issn:0008-543X issn:1097-0142 doi:10.1002/(issn)1097-0142]","2023-2032","1999-05-01","journal article","Wiley [crossref:311]","" \ No newline at end of file diff --git a/test/merger/input/1.csv b/test/merger/input/1.csv new file mode 100644 index 00000000..3c443f5b --- /dev/null +++ b/test/merger/input/1.csv @@ -0,0 +1,2 @@ +"id","title","author","issue","volume","venue","page","pub_date","type","publisher","editor" +"","","","","","","","","","","" \ No newline at end of file diff --git a/test/merger/input/2.csv b/test/merger/input/2.csv new file mode 100644 index 00000000..0e0de23c --- /dev/null +++ b/test/merger/input/2.csv @@ -0,0 +1,2 @@ +"id","title","author","issue","volume","venue","page","pub_date","type","publisher","editor" +"doi:10.1162/qss_a_00292","","","","","","","2024-04-14","","","" \ No newline at end of file diff --git a/test/merger/input/empty.csv b/test/merger/input/empty.csv new file mode 100644 index 00000000..417d39cc --- /dev/null +++ b/test/merger/input/empty.csv @@ -0,0 +1 @@ +col1,col2 diff --git a/test/merger/meta_config.yaml b/test/merger/meta_config.yaml new file mode 100644 index 00000000..5963ea6c --- /dev/null +++ b/test/merger/meta_config.yaml @@ -0,0 +1,72 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# Copyright 2016 Silvio Peroni +# Copyright 2020 Fabio Mariani +# Copyright 2022 Arcangelo Massari +# +# Permission to use, copy, modify, and/or distribute this software for any purpose +# with or without fee is hereby granted, provided that the above copyright notice +# and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED 'AS IS' AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, +# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, +# DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS +# SOFTWARE. + +# Mandatory settings + +# Endpoint URL to load the output RDF +triplestore_url: http://127.0.0.1:8805/sparql +# A list of triplestore URLs containing provenance metadata +provenance_endpoints: [] +# Directory where raw CSV files are stored +input_csv_dir: test/merger/input +# The path to the base directory to save all output files +base_output_dir: &base_output_dir test/merger/output/ +# A URI string representing the provenance agent which is considered responsible for the RDF graph manipulation +resp_agent: https://w3id.org/oc/meta/prov/pa/1 + +# Optional settings + +# Folder where RDF files are saved. Since these files are the heaviest, it is sometimes convenient to save them on HDD, while the triplestore needs to be on SSD for its efficient operation +output_rdf_dir: *base_output_dir +# The base URI of entities on Meta. This setting can be safely left as is +base_iri: https://w3id.org/oc/meta/ +# URL where the namespaces and prefixes used in the OpenCitations Data Model are defined. This setting can be safely left as is +context_path: https://w3id.org/oc/corpus/context.json +# Number of files per folder. dir_split_number's value must be multiple of items_per_file's value. This parameter is useful only if you choose to return the output in json-ld format +dir_split_number: 10000 +# Number of items per file. This parameter is useful only if you choose to return the output in json-ld format +items_per_file: 1000 +# This value is used as the default prefix if no prefix is specified. It is a deprecated parameter, valid only for backward compatibility and can safely be ignored +default_dir: _ +# A prefix for the sequential number in entities’ URIs. This setting can be safely left as is +supplier_prefix: '060' +# If True, save all the graphset and provset in one file, and save all the graphset on the triplestore. +# If False, the graphs are saved according to the usual OpenCitations strategy (the "complex" hierarchy of folders and subfolders for each type of entity) +rdf_output_in_chunks: False +# If True, the folder specified in output_rdf_dir must contain zipped JSON files, and the output will be zipped +zip_output_rdf: False +# Data source URL. This setting can be safely left as is +source: https://api.crossref.org/ +# If True, use the DOI API service to check if DOIs are valid +use_doi_api_service: False +# Number of cores to devote to the Meta process +workers_number: 2 +# True if Blazegraph was used as a provenance triplestore, and a textual index was built to speed up queries. For more information, see https://github.com/blazegraph/database/wiki/Rebuild_Text_Index_Procedure +blazegraph_full_text_search: False +# True if Fuseki was used as a provenance triplestore, and a textual index was built to speed up queries. For more information, see https://jena.apache.org/documentation/query/text-query.html +fuseki_full_text_search: False +# True if Virtuoso was used as a provenance triplestore, and a textual index was built to speed up queries. For more information, see https://docs.openlinksw.com/virtuoso/rdfsparqlrulefulltext/ +virtuoso_full_text_search: True +# The name of the Lucene connector if GraphDB was used as a provenance triplestore and a textual index was built to speed up queries. For more information, see https://graphdb.ontotext.com/documentation/free/general-full-text-search-with-connectors.html +graphdb_connector_name: '' +# Specifies the triplestore URL to use as a cache to make queries on provenance faster +cache_endpoint: '' +# If your cache provenance triplestore uses different endpoints for reading and writing (e.g. GraphDB), specify the endpoint for writing in this parameter +cache_update_endpoint: '' +# Fields in the silencer list are only updated if there is no information on that field in OpenCitations Meta. For example, if 'author' is specified, any new authors are not added to the list if authors are already present. +silencer: ['author', 'editor', 'publisher'] diff --git a/test/merger/time_agnostic_library_config.json b/test/merger/time_agnostic_library_config.json new file mode 100644 index 00000000..dced7ea7 --- /dev/null +++ b/test/merger/time_agnostic_library_config.json @@ -0,0 +1 @@ +{"dataset": {"triplestore_urls": ["http://127.0.0.1:8805/sparql"], "file_paths": []}, "provenance": {"triplestore_urls": [], "file_paths": []}, "blazegraph_full_text_search": false, "fuseki_full_text_search": false, "virtuoso_full_text_search": true, "graphdb_connector_name": "", "cache_triplestore_url": {"endpoint": "", "update_endpoint": ""}} \ No newline at end of file