Skip to content

Commit

Permalink
wip: affiliations stream
Browse files Browse the repository at this point in the history
  • Loading branch information
zzacharo committed Oct 22, 2024
1 parent 5b66cb7 commit 623b700
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 5 deletions.
8 changes: 8 additions & 0 deletions cds_migrator_kit/rdm/migration/affiliations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""CDS-RDM migration stats module."""
72 changes: 72 additions & 0 deletions cds_migrator_kit/rdm/migration/affiliations/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""CDS-RDM migration load module."""
import logging
import os
import json

from invenio_db import db
from invenio_rdm_migrator.load.base import Load

from cds_rdm.models import CDSMigrationAffiliationMapping

from .log import AffiliationsLogger

logger = AffiliationsLogger.get_logger()


class CDSAffiliationsLoad(Load):
"""CDSAffiliationsLoad."""

def __init__(
self,
dry_run=False,
):
"""Constructor."""
self.dry_run = dry_run

def _prepare(self, entry):
"""Prepare the record."""
pass

def _save_affiliation(self, legacy_recid, affiliations):
"""."""

for affiliation in affiliations:
_affiliation_model = None
_original_input = affiliation.pop("original_input")
if affiliation.get("matched_id"):
_affiliation_model = CDSMigrationAffiliationMapping(
legacy_recid=legacy_recid,
legacy_affiliation_input=_original_input,
ror_exact_match=affiliation,
)
else:
_affiliation_model = CDSMigrationAffiliationMapping(
legacy_recid=legacy_recid,
legacy_affiliation_input=_original_input,
ror_suggested_match=affiliation.get("ror_suggestions"),
)
db.session.add(_affiliation_model)
db.session.commit()

def _load(self, entry):
"""Use the services to load the entries."""
if entry:
legacy_recid = entry["recid"]
creators_affiliations = entry["creators_affiliations"]
contributors_affiliations = entry["contributors_affiliations"]
try:
self._save_affiliation(legacy_recid, creators_affiliations)
self._save_affiliation(legacy_recid, contributors_affiliations)
except Exception as ex:
logger.error(ex)

def _cleanup(self, *args, **kwargs):
"""Cleanup the entries."""
pass
42 changes: 42 additions & 0 deletions cds_migrator_kit/rdm/migration/affiliations/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""CDS-RDM migration record stats logger module."""

import logging


class AffiliationsLogger:
"""Migrator affiliations logger."""

@classmethod
def initialize(cls, log_dir):
"""Constructor."""
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger("affiliations-migrator")
fh = logging.FileHandler(log_dir / "matched.log")
logger.setLevel(logging.WARNING)
logger.addHandler(fh)

# errors to file
fh = logging.FileHandler(log_dir / "unmatched.log")
fh.setLevel(logging.ERROR)
fh.setFormatter(formatter)
logger.addHandler(fh)

# info to stream/stdout
sh = logging.StreamHandler()
sh.setFormatter(formatter)
sh.setLevel(logging.INFO)
logger.addHandler(sh)

@classmethod
def get_logger(cls):
"""Get migration logger."""
return logging.getLogger("affiliations-migrator")
42 changes: 42 additions & 0 deletions cds_migrator_kit/rdm/migration/affiliations/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-RDM-Migrator is free software; you can redistribute it and/or modify
# it under the terms of the MIT License; see LICENSE file for more details.

"""InvenioRDM migration streams runner."""

from pathlib import Path

from invenio_rdm_migrator.streams import Stream

from cds_migrator_kit.rdm.migration.affiliations.log import AffiliationsLogger


class RecordAffiliationsRunner:
"""ETL streams runner."""

def __init__(self, stream_definition, filepath, log_dir, dry_run):
"""Constructor."""

self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)

AffiliationsLogger.initialize(self.log_dir)

self.stream = Stream(
stream_definition.name,
extract=stream_definition.extract_cls(filepath),
transform=stream_definition.transform_cls(),
load=stream_definition.load_cls(dry_run=dry_run),
)

def run(self):
"""Run Statistics ETL stream."""
try:
self.stream.run()
except Exception as e:
AffiliationsLogger.get_logger().exception(
f"Stream {self.stream.name} failed.", exc_info=1
)
23 changes: 23 additions & 0 deletions cds_migrator_kit/rdm/migration/affiliations/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""CDS-RDM migration streams module."""
from invenio_rdm_migrator.streams import StreamDefinition
from invenio_rdm_migrator.transform import IdentityTransform

from cds_migrator_kit.rdm.migration.extract import LegacyExtract

from .load import CDSAffiliationsLoad
from .transform import CDSToRDMAffiliationTransform

AffiliationsStreamDefinition = StreamDefinition(
name="affiliations",
extract_cls=LegacyExtract,
transform_cls=CDSToRDMAffiliationTransform,
load_cls=CDSAffiliationsLoad,
)
"""ETL stream for CDS to RDM records statistics."""
145 changes: 145 additions & 0 deletions cds_migrator_kit/rdm/migration/affiliations/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# CDS-RDM is free software; you can redistribute it and/or modify it under
# the terms of the MIT License; see LICENSE file for more details.

"""CDS-RDM transform step module."""
import csv
import datetime
import json
import logging
import os.path
import re
import requests
from collections import OrderedDict
from copy import deepcopy
from pathlib import Path

import arrow
from invenio_rdm_migrator.streams.records.transform import (
RDMRecordEntry,
RDMRecordTransform,
)
from opensearchpy import RequestError
from sqlalchemy.exc import NoResultFound

from cds_migrator_kit.rdm.migration.transform.users import CDSMissingUserLoad
from cds_migrator_kit.rdm.migration.transform.xml_processing.dumper import CDSRecordDump
from cds_migrator_kit.rdm.migration.transform.xml_processing.errors import (
LossyConversion,
RestrictedFileDetected,
UnexpectedValue,
ManualImportRequired,
CDSMigrationException,
MissingRequiredField,
)
from cds_migrator_kit.records.log import RDMJsonLogger
from invenio_access.permissions import system_identity
from invenio_search.engine import dsl
from invenio_records_resources.proxies import current_service_registry
from invenio_accounts.models import User

cli_logger = logging.getLogger("migrator")


class CDSToRDMAffiliationTransform(RDMRecordTransform):
"""CDSToRDMAffiliationTransform."""

def __init__(
self,
dry_run=False,
):
"""Constructor."""
self.dry_run = dry_run
super().__init__()

def affiliations_search(self, affiliation_name):

def get_ror_affiliation(affiliation):
url = "https://api.ror.org/organizations"
params = {"affiliation": affiliation}

try:
response = requests.get(url, params=params)
response.raise_for_status()
items = response.json().get("items")
if items:
for item in items:
if item["chosen"] is True:
return (True, item)
# score = item.get("score")
# if score > 0.9:
# return item
return (False, items)
except requests.exceptions.HTTPError as http_err:
cli_logger.exception(http_err)
except Exception as err:
cli_logger.exception(http_err)

(chosen, affiliation) = get_ror_affiliation(affiliation_name)

return (chosen, affiliation)

def _affiliations(self, json_entry, key):
_creators = deepcopy(json_entry.get(key, []))
_creators = list(filter(lambda x: x is not None, _creators))
_affiliations = []

for creator in _creators:
affiliations = creator.get("affiliations", [])

for affiliation_name in affiliations:
(chosen, match_or_suggestions) = self.affiliations_search(
affiliation_name
)
if chosen:
_affiliations.append(
{
"original_input": affiliation_name,
"matched_name": match_or_suggestions["organization"][
"name"
],
"matched_id": match_or_suggestions["organization"]["id"],
}
)
else:
_affiliations.append(
{
"original_input": affiliation_name,
"ror_suggestions": match_or_suggestions,
}
)

return _affiliations

def _transform(self, entry):
"""Transform a single entry."""
# creates the output structure for load step
# migration_logger = RDMJsonLogger()
record_dump = CDSRecordDump(
entry,
)
record_dump.prepare_revisions()

timestamp, json_data = record_dump.latest_revision
try:
return {
"recid": entry["recid"],
"creators_affiliations": self._affiliations(json_data, "creators"),
"contributors_affiliations": self._affiliations(
json_data, "contributors"
),
}
except Exception as e:
cli_logger.exception(e)

def _draft(self, entry):
return None

def _parent(self, entry):
return None

def _record(self, entry):
return None
32 changes: 32 additions & 0 deletions cds_migrator_kit/rdm/migration/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
from flask import current_app
from flask.cli import with_appcontext

from cds_migrator_kit.rdm.migration.affiliations.runner import RecordAffiliationsRunner
from cds_migrator_kit.rdm.migration.runner import Runner
from cds_migrator_kit.rdm.migration.stats.runner import RecordStatsRunner
from cds_migrator_kit.rdm.migration.streams import (
RecordStreamDefinition,
UserStreamDefinition,
)
from cds_migrator_kit.rdm.migration.affiliations.streams import (
AffiliationsStreamDefinition,
)
from cds_migrator_kit.rdm.migration.stats.streams import RecordStatsStreamDefinition

cli_logger = logging.getLogger("migrator")
Expand Down Expand Up @@ -80,3 +84,31 @@ def run(filepath, dry_run=False):
dry_run=dry_run,
)
runner.run()


@migration.group()
def affiliations():
"""Migration CLI for affiliations."""
pass


@affiliations.command()
@click.option(
"--dry-run",
is_flag=True,
)
@click.option(
"--filepath",
help="Path to the list of records file that the legacy statistics will be migrated.",
)
@with_appcontext
def run(filepath, dry_run=False):
"""Migrate the legacy statistics for the records in `filepath`"""
log_dir = Path(current_app.config["CDS_MIGRATOR_KIT_LOGS_PATH"]) / "affiliations"
runner = RecordAffiliationsRunner(
stream_definition=AffiliationsStreamDefinition,
filepath=filepath,
log_dir=log_dir,
dry_run=dry_run,
)
runner.run()
Loading

0 comments on commit 623b700

Please sign in to comment.