diff --git a/cds_migrator_kit/rdm/migration/affiliations/__init__.py b/cds_migrator_kit/rdm/migration/affiliations/__init__.py new file mode 100644 index 0000000..7265909 --- /dev/null +++ b/cds_migrator_kit/rdm/migration/affiliations/__init__.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration stats module.""" diff --git a/cds_migrator_kit/rdm/migration/affiliations/load.py b/cds_migrator_kit/rdm/migration/affiliations/load.py new file mode 100644 index 0000000..bb2a2f6 --- /dev/null +++ b/cds_migrator_kit/rdm/migration/affiliations/load.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration load module.""" +import logging +import os +import json + +from invenio_db import db +from invenio_rdm_migrator.load.base import Load + +from cds_rdm.models import CDSMigrationAffiliationMapping + +from .log import AffiliationsLogger + +logger = AffiliationsLogger.get_logger() + + +class CDSAffiliationsLoad(Load): + """CDSAffiliationsLoad.""" + + def __init__( + self, + dry_run=False, + ): + """Constructor.""" + self.dry_run = dry_run + + def _prepare(self, entry): + """Prepare the record.""" + pass + + def _save_affiliation(self, legacy_recid, affiliations): + """.""" + + for affiliation in affiliations: + _affiliation_model = None + _original_input = affiliation.pop("original_input") + if affiliation.get("matched_id"): + _affiliation_model = CDSMigrationAffiliationMapping( + legacy_recid=legacy_recid, + legacy_affiliation_input=_original_input, + ror_exact_match=affiliation, + ) + else: + _affiliation_model = CDSMigrationAffiliationMapping( + legacy_recid=legacy_recid, + legacy_affiliation_input=_original_input, + ror_suggested_match=affiliation.get("ror_suggestions"), + ) + db.session.add(_affiliation_model) + db.session.commit() + + def _load(self, entry): + """Use the services to load the entries.""" + if entry: + legacy_recid = entry["recid"] + creators_affiliations = entry["creators_affiliations"] + contributors_affiliations = entry["contributors_affiliations"] + try: + self._save_affiliation(legacy_recid, creators_affiliations) + self._save_affiliation(legacy_recid, contributors_affiliations) + except Exception as ex: + logger.error(ex) + + def _cleanup(self, *args, **kwargs): + """Cleanup the entries.""" + pass diff --git a/cds_migrator_kit/rdm/migration/affiliations/log.py b/cds_migrator_kit/rdm/migration/affiliations/log.py new file mode 100644 index 0000000..e3452e2 --- /dev/null +++ b/cds_migrator_kit/rdm/migration/affiliations/log.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration record stats logger module.""" + +import logging + + +class AffiliationsLogger: + """Migrator affiliations logger.""" + + @classmethod + def initialize(cls, log_dir): + """Constructor.""" + formatter = logging.Formatter( + fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + logger = logging.getLogger("affiliations-migrator") + fh = logging.FileHandler(log_dir / "matched.log") + logger.setLevel(logging.WARNING) + logger.addHandler(fh) + + # errors to file + fh = logging.FileHandler(log_dir / "unmatched.log") + fh.setLevel(logging.ERROR) + fh.setFormatter(formatter) + logger.addHandler(fh) + + # info to stream/stdout + sh = logging.StreamHandler() + sh.setFormatter(formatter) + sh.setLevel(logging.INFO) + logger.addHandler(sh) + + @classmethod + def get_logger(cls): + """Get migration logger.""" + return logging.getLogger("affiliations-migrator") diff --git a/cds_migrator_kit/rdm/migration/affiliations/runner.py b/cds_migrator_kit/rdm/migration/affiliations/runner.py new file mode 100644 index 0000000..2c8c7a3 --- /dev/null +++ b/cds_migrator_kit/rdm/migration/affiliations/runner.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# Invenio-RDM-Migrator is free software; you can redistribute it and/or modify +# it under the terms of the MIT License; see LICENSE file for more details. + +"""InvenioRDM migration streams runner.""" + +from pathlib import Path + +from invenio_rdm_migrator.streams import Stream + +from cds_migrator_kit.rdm.migration.affiliations.log import AffiliationsLogger + + +class RecordAffiliationsRunner: + """ETL streams runner.""" + + def __init__(self, stream_definition, filepath, log_dir, dry_run): + """Constructor.""" + + self.log_dir = Path(log_dir) + self.log_dir.mkdir(parents=True, exist_ok=True) + + AffiliationsLogger.initialize(self.log_dir) + + self.stream = Stream( + stream_definition.name, + extract=stream_definition.extract_cls(filepath), + transform=stream_definition.transform_cls(), + load=stream_definition.load_cls(dry_run=dry_run), + ) + + def run(self): + """Run Statistics ETL stream.""" + try: + self.stream.run() + except Exception as e: + AffiliationsLogger.get_logger().exception( + f"Stream {self.stream.name} failed.", exc_info=1 + ) diff --git a/cds_migrator_kit/rdm/migration/affiliations/streams.py b/cds_migrator_kit/rdm/migration/affiliations/streams.py new file mode 100644 index 0000000..588c58d --- /dev/null +++ b/cds_migrator_kit/rdm/migration/affiliations/streams.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM migration streams module.""" +from invenio_rdm_migrator.streams import StreamDefinition +from invenio_rdm_migrator.transform import IdentityTransform + +from cds_migrator_kit.rdm.migration.extract import LegacyExtract + +from .load import CDSAffiliationsLoad +from .transform import CDSToRDMAffiliationTransform + +AffiliationsStreamDefinition = StreamDefinition( + name="affiliations", + extract_cls=LegacyExtract, + transform_cls=CDSToRDMAffiliationTransform, + load_cls=CDSAffiliationsLoad, +) +"""ETL stream for CDS to RDM records statistics.""" diff --git a/cds_migrator_kit/rdm/migration/affiliations/transform.py b/cds_migrator_kit/rdm/migration/affiliations/transform.py new file mode 100644 index 0000000..ec76e26 --- /dev/null +++ b/cds_migrator_kit/rdm/migration/affiliations/transform.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2022 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""CDS-RDM transform step module.""" +import csv +import datetime +import json +import logging +import os.path +import re +import requests +from collections import OrderedDict +from copy import deepcopy +from pathlib import Path + +import arrow +from invenio_rdm_migrator.streams.records.transform import ( + RDMRecordEntry, + RDMRecordTransform, +) +from opensearchpy import RequestError +from sqlalchemy.exc import NoResultFound + +from cds_migrator_kit.rdm.migration.transform.users import CDSMissingUserLoad +from cds_migrator_kit.rdm.migration.transform.xml_processing.dumper import CDSRecordDump +from cds_migrator_kit.rdm.migration.transform.xml_processing.errors import ( + LossyConversion, + RestrictedFileDetected, + UnexpectedValue, + ManualImportRequired, + CDSMigrationException, + MissingRequiredField, +) +from cds_migrator_kit.records.log import RDMJsonLogger +from invenio_access.permissions import system_identity +from invenio_search.engine import dsl +from invenio_records_resources.proxies import current_service_registry +from invenio_accounts.models import User + +cli_logger = logging.getLogger("migrator") + + +class CDSToRDMAffiliationTransform(RDMRecordTransform): + """CDSToRDMAffiliationTransform.""" + + def __init__( + self, + dry_run=False, + ): + """Constructor.""" + self.dry_run = dry_run + super().__init__() + + def affiliations_search(self, affiliation_name): + + def get_ror_affiliation(affiliation): + url = "https://api.ror.org/organizations" + params = {"affiliation": affiliation} + + try: + response = requests.get(url, params=params) + response.raise_for_status() + items = response.json().get("items") + if items: + for item in items: + if item["chosen"] is True: + return (True, item) + # score = item.get("score") + # if score > 0.9: + # return item + return (False, items) + except requests.exceptions.HTTPError as http_err: + cli_logger.exception(http_err) + except Exception as err: + cli_logger.exception(http_err) + + (chosen, affiliation) = get_ror_affiliation(affiliation_name) + + return (chosen, affiliation) + + def _affiliations(self, json_entry, key): + _creators = deepcopy(json_entry.get(key, [])) + _creators = list(filter(lambda x: x is not None, _creators)) + _affiliations = [] + + for creator in _creators: + affiliations = creator.get("affiliations", []) + + for affiliation_name in affiliations: + (chosen, match_or_suggestions) = self.affiliations_search( + affiliation_name + ) + if chosen: + _affiliations.append( + { + "original_input": affiliation_name, + "matched_name": match_or_suggestions["organization"][ + "name" + ], + "matched_id": match_or_suggestions["organization"]["id"], + } + ) + else: + _affiliations.append( + { + "original_input": affiliation_name, + "ror_suggestions": match_or_suggestions, + } + ) + + return _affiliations + + def _transform(self, entry): + """Transform a single entry.""" + # creates the output structure for load step + # migration_logger = RDMJsonLogger() + record_dump = CDSRecordDump( + entry, + ) + record_dump.prepare_revisions() + + timestamp, json_data = record_dump.latest_revision + try: + return { + "recid": entry["recid"], + "creators_affiliations": self._affiliations(json_data, "creators"), + "contributors_affiliations": self._affiliations( + json_data, "contributors" + ), + } + except Exception as e: + cli_logger.exception(e) + + def _draft(self, entry): + return None + + def _parent(self, entry): + return None + + def _record(self, entry): + return None diff --git a/cds_migrator_kit/rdm/migration/cli.py b/cds_migrator_kit/rdm/migration/cli.py index 36521c3..7ac7884 100644 --- a/cds_migrator_kit/rdm/migration/cli.py +++ b/cds_migrator_kit/rdm/migration/cli.py @@ -13,12 +13,16 @@ from flask import current_app from flask.cli import with_appcontext +from cds_migrator_kit.rdm.migration.affiliations.runner import RecordAffiliationsRunner from cds_migrator_kit.rdm.migration.runner import Runner from cds_migrator_kit.rdm.migration.stats.runner import RecordStatsRunner from cds_migrator_kit.rdm.migration.streams import ( RecordStreamDefinition, UserStreamDefinition, ) +from cds_migrator_kit.rdm.migration.affiliations.streams import ( + AffiliationsStreamDefinition, +) from cds_migrator_kit.rdm.migration.stats.streams import RecordStatsStreamDefinition cli_logger = logging.getLogger("migrator") @@ -80,3 +84,31 @@ def run(filepath, dry_run=False): dry_run=dry_run, ) runner.run() + + +@migration.group() +def affiliations(): + """Migration CLI for affiliations.""" + pass + + +@affiliations.command() +@click.option( + "--dry-run", + is_flag=True, +) +@click.option( + "--filepath", + help="Path to the list of records file that the legacy statistics will be migrated.", +) +@with_appcontext +def run(filepath, dry_run=False): + """Migrate the legacy statistics for the records in `filepath`""" + log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "affiliations" + runner = RecordAffiliationsRunner( + stream_definition=AffiliationsStreamDefinition, + filepath=filepath, + log_dir=log_dir, + dry_run=dry_run, + ) + runner.run() diff --git a/cds_migrator_kit/rdm/migration/stats/log.py b/cds_migrator_kit/rdm/migration/stats/log.py index da7532b..060ef55 100644 --- a/cds_migrator_kit/rdm/migration/stats/log.py +++ b/cds_migrator_kit/rdm/migration/stats/log.py @@ -19,7 +19,7 @@ def initialize(cls, log_dir): formatter = logging.Formatter( fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) - logger = logging.getLogger("migrator") + logger = logging.getLogger("stats-migrator") fh = logging.FileHandler(log_dir / "success.log") logger.setLevel(logging.WARNING) logger.addHandler(fh) @@ -39,4 +39,4 @@ def initialize(cls, log_dir): @classmethod def get_logger(cls): """Get migration logger.""" - return logging.getLogger("migrator") + return logging.getLogger("stats-migrator") diff --git a/cds_migrator_kit/rdm/migration/streams.yaml b/cds_migrator_kit/rdm/migration/streams.yaml index 44c180e..b9ff35f 100644 --- a/cds_migrator_kit/rdm/migration/streams.yaml +++ b/cds_migrator_kit/rdm/migration/streams.yaml @@ -16,4 +16,4 @@ records: transform: files_dump_dir: cds_migrator_kit/rdm/migration/data/summer_student_reports/files/ missing_users: cds_migrator_kit/rdm/migration/data/users - community_id: 63448ca7-c814-4716-b099-a39766df6dbb + community_id: c2a0fef2-03c2-4dd8-a2a3-e85a169d2efe diff --git a/cds_migrator_kit/rdm/migration/transform/users.py b/cds_migrator_kit/rdm/migration/transform/users.py index f7b7ef0..6b5d51c 100644 --- a/cds_migrator_kit/rdm/migration/transform/users.py +++ b/cds_migrator_kit/rdm/migration/transform/users.py @@ -59,7 +59,7 @@ def transform(self, entry): ) record_dump.prepare_revisions() - timestamp, json_data = record_dump.revisions[-1] + timestamp, json_data = record_dump.latest_revision return json_data diff --git a/cds_migrator_kit/rdm/migration/transform/xml_processing/dumper.py b/cds_migrator_kit/rdm/migration/transform/xml_processing/dumper.py index 7b6a809..b731598 100644 --- a/cds_migrator_kit/rdm/migration/transform/xml_processing/dumper.py +++ b/cds_migrator_kit/rdm/migration/transform/xml_processing/dumper.py @@ -45,7 +45,7 @@ def created(self): def prepare_revisions(self): """Prepare revisions.""" - self.latest_revision = self._prepare_revision(self.data["record"].pop(-1)) + self.latest_revision = self._prepare_revision(self.data["record"][-1]) def prepare_files(self): """Get files from data dump."""