Skip to content

Commit

Permalink
MEF: fix deleted agents
Browse files Browse the repository at this point in the history
* Deleted enteties could be accessed by MEF.
* Adds `with_deleted` to search.

Co-Authored-by: Peter Weber <[email protected]>
  • Loading branch information
rerowep committed Jun 14, 2022
1 parent ca6127c commit 4370ebc
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 71 deletions.
53 changes: 23 additions & 30 deletions rero_mef/agents/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""API for manipulating records."""


import contextlib

import click
from flask import current_app
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
Expand Down Expand Up @@ -161,17 +164,14 @@ def create_or_update_agent_mef_viaf(cls, data, id_=None, delete_pid=True,
from rero_mef.agents.mef.api import AgentMefRecord
from rero_mef.agents.viaf.api import AgentViafRecord

try:
with contextlib.suppress(Exception):
persistent_id = PersistentIdentifier.query.filter_by(
pid_type=cls.provider.pid_type,
pid_value=data.get('pid')
).one()
if persistent_id.status == PIDStatus.DELETED:
return None, Action.ALREADYDELETED, None, Action.DISCARD, \
None, False
except Exception:
pass

record, action = cls.create_or_update(
data=data,
id_=id_,
Expand All @@ -182,36 +182,29 @@ def create_or_update_agent_mef_viaf(cls, data, id_=None, delete_pid=True,
)
if action == Action.ERROR:
return None, action, None, Action.ERROR, None, False
if record.deleted:
mef_record, mef_action = record.delete_from_mef(
dbcommit=dbcommit,
reindex=reindex,
verbose=verbose
)
viaf_record = None
action = Action.DELETE
online = False
elif action == Action.UPTODATE:
mef_record = AgentMefRecord.get_mef_by_entity_pid(
record.pid, record.name)
mef_action = Action.UPTODATE
viaf_record, online = AgentViafRecord.get_viaf_by_agent(
record)
else:
if record.deleted:
mef_record, mef_action = record.delete_from_mef(
mef_record, mef_action, viaf_record, online = \
record.create_or_update_mef_viaf_record(
dbcommit=dbcommit,
reindex=reindex,
verbose=verbose
online=online
)
# record.delete(
# dbcommit=dbcommit,
# delindex=True,
# )
# record = None
action = Action.DELETE
viaf_record = None
online = False
else:
if action == Action.UPTODATE:
mef_record = AgentMefRecord.get_mef_by_entity_pid(
record.pid, record.name)
mef_action = Action.UPTODATE
viaf_record, online = AgentViafRecord.get_viaf_by_agent(
record)
else:
mef_record, mef_action, viaf_record, online = \
record.create_or_update_mef_viaf_record(
dbcommit=dbcommit,
reindex=reindex,
online=online
)
return record, action, mef_record, mef_action, viaf_record, online
return record, action, mef_record, mef_action, viaf_record, online

@classmethod
def get_online_record(cls, id, verbose=False):
Expand Down
6 changes: 1 addition & 5 deletions rero_mef/agents/mef/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ def replace_refs(self):
sources = []
for agent in ['rero', 'gnd', 'idref']:
if agent_data := data.get(agent):
if agent_data.get('deleted'):
data.pop(agent)
current_app.logger.info(
f'MEF replace refs {data.get("pid")} {agent} deleted')
elif agent_data.get('status'):
if agent_data.get('status'):
data.pop(agent)
current_app.logger.error(
f'MEF replace refs {data.get("pid")} {agent}'
Expand Down
23 changes: 10 additions & 13 deletions rero_mef/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ def create(cls, data, id_=None, delete_pid=False, dbcommit=False,
"""Create a new agent record."""
assert cls.minter
if '$schema' not in data:
type = cls.provider.pid_type
data = add_schema(data, type)
data = add_schema(data, cls.provider.pid_type)
if delete_pid:
data.pop('pid', None)
if not id_:
Expand Down Expand Up @@ -130,7 +129,6 @@ def update_test_md5(self, data, dbcommit=False, reindex=False):
if data.get('md5', 'data') == self.get('md5', 'agent'):
# record has no changes
return self, Action.UPTODATE
data = add_schema(data, self.name)
return_record = self.replace(
data=data, dbcommit=dbcommit, reindex=reindex)
return return_record, Action.UPDATE
Expand All @@ -145,6 +143,7 @@ def create_or_update(cls, data, id_=None, delete_pid=True, dbcommit=False,
pid = data.get('pid')
if agent_record := cls.get_record_by_pid(pid):
# record exist
data = add_schema(data, agent_record.provider.pid_type)
if test_md5:
return_record, agent_action = agent_record.update_test_md5(
data=data,
Expand Down Expand Up @@ -185,9 +184,8 @@ def get_record_by_pid(cls, pid, with_deleted=False):
pid
)
get_record_ok = True
return super().get_record(
persistent_identifier.object_uuid,
with_deleted=with_deleted)
return super().get_record(persistent_identifier.object_uuid,
with_deleted=with_deleted)

except PIDDoesNotExistError:
return None
Expand Down Expand Up @@ -358,8 +356,9 @@ def get_indexer_class(cls):
def reindex(self, forceindex=False):
"""Reindex record."""
indexer = self.get_indexer_class()
return indexer(version_type='external_gte').index(self) \
if forceindex else indexer().index(self)
if forceindex:
return indexer(version_type='external_gte').index(self)
return indexer().index(self)

def delete_from_index(self):
"""Delete record from index."""
Expand Down Expand Up @@ -408,9 +407,9 @@ def _get_indexer_class(self, payload):
"""Get the record class from payload."""
# take the first defined doc type for finding the class
pid_type = payload.get('doc_type', 'rec')
return obj_or_import_string(current_app.config.get(
'RECORDS_REST_ENDPOINTS'
).get(pid_type).get('indexer_class', RecordIndexer))
endpoints = current_app.config.get('RECORDS_REST_ENDPOINTS')
return obj_or_import_string(
endpoints.get(pid_type).get('indexer_class', RecordIndexer))

def process_bulk_queue(self, es_bulk_kwargs=None, stats_only=True):
"""Process bulk indexing queue.
Expand Down Expand Up @@ -495,8 +494,6 @@ def _index_action(self, payload):
f'{get_record_error_count} {payload["id"]}')
current_app.logger.error(msg)
db.session.rollback()
except Exception as err:
raise Exception(err)

index, doc_type = self.record_to_index(record)

Expand Down
4 changes: 3 additions & 1 deletion rero_mef/api_mef.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,15 @@ def get_pids_with_multiple_mef(cls, record_types=[], verbose=False):
return pids, multiple_pids, missing_pids

@classmethod
def get_all_missing_pids(cls, record_types=[], verbose=False):
def get_all_missing_pids(cls, record_types=None, verbose=False):
"""Get all missing agents.
:params record_types: Record types (pid_type).
:param verbose: Verbose.
:returns: missing pids, to much pids.
"""
if record_types is None:
record_types = []
missing_pids = {}
to_much_pids = {}
entity_classes = get_entity_classes()
Expand Down
9 changes: 8 additions & 1 deletion rero_mef/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,11 +440,15 @@
deleted=dict(
filter=dict(exists=dict(field="deleted"))
),
deleted_entities=dict(
filter=dict(exists=dict(field="*.deleted"))
),
),
filters={
'agent_type': terms_filter('type'),
'agent_sources': terms_filter('sources'),
'deleted': exists_filter('deleted'),
'deleted_entities': exists_filter('*.deleted'),
'rero_double': terms_filter('rero.pid')
}
),
Expand All @@ -459,7 +463,6 @@
rero=dict(
filter=dict(exists=dict(field="rero_pid"))
),

),
filters={
'gnd': exists_filter('gnd_pid'),
Expand Down Expand Up @@ -517,11 +520,15 @@
deleted=dict(
filter=dict(exists=dict(field="deleted"))
),
deleted_entities=dict(
filter=dict(exists=dict(field="*.deleted"))
),
),
filters={
'agent_type': terms_filter('type'),
'agent_sources': terms_filter('sources'),
'deleted': exists_filter('deleted'),
'deleted_entities': exists_filter('*.deleted'),
'rero_double': terms_filter('rero.pid')
}
),
Expand Down
11 changes: 11 additions & 0 deletions rero_mef/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,14 @@ def exists_filter(field):
def inner(values):
return Q('exists', field=field)
return inner


def not_exists_filter(field):
"""Create a term filter.
:param field: Field name.
:returns: Function that returns the Terms query.
"""
def inner(values):
return Q('bool', must_not=[Q('exists', field=field)])
return inner
9 changes: 6 additions & 3 deletions rero_mef/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ def _default_parser(qstr=None):
urlkwargs.add('q', query_string)

# include deleted
deleted = request.args.get('deleted')
if not deleted:
search = search.filter('bool', must_not=[Q('exists', field='deleted')])
with_deleted = request.args.get('with_deleted')
if not with_deleted:
search = search.filter('bool', must_not=[
Q('exists', field='deleted'), # no deleted MEF's
Q('exists', field='*.deleted') # no deleted entities
])

return search, urlkwargs
5 changes: 2 additions & 3 deletions rero_mef/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def create_or_update(index, record, entity, dbcommit=True, reindex=True,
reindex=reindex,
online=online
)
rec_id = returned_record.get('pid')
id = returned_record.get('pid')
id_type = 'pid :'
if not rec_id:
id_type = 'uuid:'
Expand Down Expand Up @@ -133,8 +133,7 @@ def delete(index, pid, entity, dbcommit=True, delindex=True, verbose=False):
agent_record = agent_class.get_record_by_pid(pid)
action = None
if agent_record:
result, action = agent_record.delete(dbcommit=dbcommit,
delindex=delindex)
_, action = agent_record.delete(dbcommit=dbcommit, delindex=delindex)
if verbose:
click.echo(f'{index:<10} Deleted {entity} {pid:<38} {action}')
else:
Expand Down
16 changes: 6 additions & 10 deletions rero_mef/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,15 @@ def oai_process_records_from_dates(name, sickle, oai_item_iterator,
f' | mef: {m_pid} {m_action}'
f' | viaf: {v_pid} online: {v_online}'
)
else:
if verbose:
click.echo(
f'NO TRANSFORMATION: {name} {count} '
f'{records[0]}'
)
elif verbose:
click.echo(
f'NO TRANSFORMATION: {name} {count} '
f'{records[0]}'
)
except Exception as err:
msg = f'Creating {name} {count}: {err}'
if rec:
msg += f'\n{rec}'

current_app.logger.error(msg)
if debug:
traceback.print_exc()
Expand All @@ -329,7 +327,6 @@ def oai_process_records_from_dates(name, sickle, oai_item_iterator,
if debug:
traceback.print_exc()
count = -1

my_from_date = my_from_date + timedelta(days=days_spann + 1)
if verbose:
from_date = my_from_date.strftime("%Y-%m-%d")
Expand Down Expand Up @@ -587,8 +584,7 @@ def resolve_record(path, object_class):
:returns: record for pid or {}
"""
try:
record = object_class.get_record_by_pid(path)
return record
return object_class.get_record_by_pid(path)
except PIDDoesNotExistError:
return {}

Expand Down
5 changes: 3 additions & 2 deletions tests/api/test_view_agents_mef.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def test_view_agents_mef(client, agent_mef_record):
'doc_count_error_upper_bound': 0,
'sum_other_doc_count': 0
},
'deleted': {'doc_count': 0},
'sources': {
'buckets': [
{'doc_count': 1, 'key': 'gnd'},
Expand All @@ -56,7 +55,9 @@ def test_view_agents_mef(client, agent_mef_record):
],
'doc_count_error_upper_bound': 0,
'sum_other_doc_count': 0
}
},
'deleted': {'doc_count': 0},
'deleted_entities': {'doc_count': 0},
}
url = url_for('api_agents_mef.redirect_item', pid=pid)
res = client.get(url)
Expand Down
22 changes: 19 additions & 3 deletions tests/unit/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ def test_reromefrecord_api(app, agent_idref_record):
assert AgentIdrefRecord.get_metadata_identifier_names() == (
'agent_idref_metadata', 'agent_idref_id')

count = 0
for _ in AgentIdrefRecord.get_all_records():
count += 1
count = sum(1 for _ in AgentIdrefRecord.get_all_records())
assert count == 1

_, agent_action = idref.update_test_md5(
Expand All @@ -62,3 +60,21 @@ def test_reromefrecord_api(app, agent_idref_record):
process_bulk_queue(stats_only=True)
# TODO: Find out how to test bulk indexing.
# assert process_bulk_queue(stats_only=True) == (1, 0)


# def test_create_deleted(agent_idref_record):
# """Test redirect IDREF."""
# agent_idref_record['pid'] = '069774331_delete'
# idref = AgentIdrefRecord.create(
# data=agent_idref_record,
# delete_pid=False,
# dbcommit=True,
# reindex=True,
# )
# idref = AgentMefRecord.create_deleted(
# record=idref,
# dbcommit=True,
# reindex=True
# )
# print('--->', idref)
# assert '$schema' in idref

0 comments on commit 4370ebc

Please sign in to comment.