Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow b2access ID as parameter for all ownership CLI commands #1940

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions b2share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,5 @@
#: There is no password so don't send password change emails
SECURITY_SEND_PASSWORD_CHANGE_EMAIL=False
SECURITY_SEND_PASSWORD_RESET_NOTICE_EMAIL=False

DEFAULT_OWNERSHIP=['[email protected]']
201 changes: 93 additions & 108 deletions b2share/modules/management/ownership/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from __future__ import absolute_import, print_function
import click
import os
import uuid

from flask.cli import with_appcontext
from flask import app, current_app
Expand All @@ -38,50 +39,45 @@
from invenio_accounts.models import User
from invenio_pidrelations.contrib.versioning import PIDVersioning
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_oauthclient.utils import oauth_get_user
from invenio_oauthclient.models import UserIdentity

from b2share.modules.deposit.api import Deposit
from b2share.utils import ESSearch, to_tabulate
from b2share.modules.users.cli import get_user

from .errors import UserAlreadyOwner
from .utils import get_record_by_pid, add_ownership_to_record, find_version_master, check_user


def render_error(error, type, pid, action):
raise click.ClickException(click.style(error, fg="red") + "\t\t{}: ".format(type) +
click.style(pid, fg="blue") + "\t\tAction: {}".format(action))


def pid2record(pid):
pid = PersistentIdentifier.get('b2rec', pid)
return B2ShareRecord.get_record(pid.object_uuid)


def replace_ownership(pid, user_id: int, obj_type='record'):
record = pid2record(pid)
record = get_record_by_pid(pid)
record['_deposit']['owners'] = [user_id]
try:
record = record.commit()
db.session.commit()
except Exception as e:
db.session.rollback()
click.secho(
"Error for object {}, skipping".format(obj.id), fg='red')
"Error for object {}, skipping".format(pid), fg='red')
click.secho(e)


def add_ownership(obj, user_id: int, obj_type='record'):
if user_id not in obj['_deposit']['owners']:
obj['_deposit']['owners'].append(user_id)
try:
obj = obj.commit()
db.session.commit()
except Exception as e:
db.session.rollback()
click.secho(
"Error for object {}, skipping".format(obj.id), fg='red')
click.secho(e)

else:
try:
add_ownership_to_record(obj, user_id)
except UserAlreadyOwner as e:
render_error("User is already owner of object", 'object id',
obj['_deposit']['id'], "skipping")
except Exception as e:
click.secho(
"Error for object {}, skipping".format(obj.id), fg='red')
click.secho(e)


def remove_ownership(obj, user_id: int, obj_type='record'):
Expand Down Expand Up @@ -109,17 +105,14 @@ def list_ownership(record_pid):
all_pids = [v.pid_value for v in version_master.children.all()]
click.secho("PID\t\t\t\t\tOwners", fg='green')
for single_pid in all_pids:
record = pid2record(single_pid)
record = get_record_by_pid(single_pid)
owners = record['_deposit']['owners']
click.secho("%s\t%s" % (
single_pid,
" ".join([str(User.query.filter(
User.id.in_([w])).all()[0].email) for w in owners])))


def check_user(user_email):
return User.query.filter(User.email == user_email).one_or_none()

# decorator to patch the current_app.config file temporarely
def patch_current_app_config(vars):
def decorator(function):
Expand All @@ -128,7 +121,8 @@ def inner(*args, **kwargs):
v, None) for v in vars.keys()}
for v, val in vars.items():
if val is None:
raise Exception("Value for var {} is None.\nSet up the variable to run the command.".format(str(v)))
raise Exception(
"Value for var {} is None.\nSet up the variable to run the command.".format(str(v)))
current_app.config[v] = val
out = function(*args, **kwargs)
for v, val in old_vars.items():
Expand All @@ -138,26 +132,46 @@ def inner(*args, **kwargs):
return decorator


@click.group()
def ownership():
"""ownership management commands."""

def _is_valid_uuid(input_string: str) -> bool:
"""
Checks if a string is a valid UUID
"""
try:
_ = uuid.UUID(input_string)
return True
except ValueError:
return False

def find_version_master(pid):
"""Retrieve the PIDVersioning of a record PID.

:params pid: record PID.
def _get_user_by_email_or_id(user_email_or_id: str):
"""
from b2share.modules.deposit.errors import RecordNotFoundVersioningError
from b2share.modules.records.providers import RecordUUIDProvider
try:
child_pid = RecordUUIDProvider.get(pid).pid
if child_pid.status == PIDStatus.DELETED:
raise RecordNotFoundVersioningError()
except PIDDoesNotExistError as e:
raise RecordNotFoundVersioningError() from e
returns a user by email or b2access id
"""
# user that we want to replace
if not _is_valid_uuid(user_email_or_id):
user_email = user_email_or_id
user = User.query.filter(User.email == user_email).one_or_none()
if user is None:
raise click.ClickException(
"User <{}> does not exist. Please check the email and try again".format(user_email))
else:
user_id_external = uuid.UUID(user_email_or_id)
user_identity = UserIdentity.query.filter(UserIdentity.id == str(user_id_external)).one_or_none()
if user_identity is None:
raise click.ClickException(
"User <{}> does not exist. Please check the b2access id and try again".format(user_id_external))
user_id = user_identity.id_user # get the local ID from the identity
user = User.query.filter(User.id == user_id).one_or_none() # find user by local ID
if user is None:
raise click.ClickException(
"User <{}> not found internally, but user_identity <{}> exists, please contact a system administrator"
.format(user_id_external, user_id))
return user

return PIDVersioning(child=child_pid)

@click.group()
def ownership():
"""ownership management commands."""


@ownership.command()
Expand All @@ -174,25 +188,21 @@ def list(record_pid):
@ownership.command()
@with_appcontext
@click.argument('record-pid', required=True, type=str)
@click.argument('user-email', required=True, type=str)
@click.argument('user-email-or-id', required=True, type=str)
@click.option('-q', '--quiet', is_flag=True, default=False)
@click.option('-y', '--yes-i-know', is_flag=True, default=False)
def reset(record_pid, user_email, yes_i_know, quiet):
def reset(record_pid, user_email_or_id, yes_i_know, quiet):
""" Remove the previous ownership and set up the new user as a unique owner for all the version of the record.

:params record-pid: B2rec record PID
user-email: user email
user-email-or-id: user email or b2access ID
"""
if check_user(user_email) is None:
raise click.ClickException(
click.style(
"""User does not exist. Please check the email and try again""", fg="red"))
user = _get_user_by_email_or_id(user_email_or_id)
list_ownership(record_pid)
if yes_i_know or click.confirm(
"Are you sure you want to reset the owership? Previous owners will not be able to access the records anymore.", abort=True):
version_master = find_version_master(record_pid)
all_pids = [v.pid_value for v in version_master.children.all()]
user = User.query.filter(User.email == user_email).one_or_none()
for single_pid in all_pids:
replace_ownership(single_pid, user.id)
if not quiet:
Expand All @@ -203,22 +213,19 @@ def reset(record_pid, user_email, yes_i_know, quiet):
@ownership.command()
@with_appcontext
@click.argument('record-pid', required=True, type=str)
@click.argument('user-email', required=True, type=str)
@click.argument('user-email-or-id', required=True, type=str)
@click.option('-q', '--quiet', is_flag=True, default=False)
def add(record_pid, user_email, quiet):
def add(record_pid, user_email_or_id, quiet):
""" Add user as owner for all the version of the record.

:params record-pid: B2rec record PID
user-email: user email
user-email-or-id: user email or b2access ID
"""
if check_user(user_email) is None:
raise click.ClickException(
"""User does not exist. Please check the email and try again""")
user = _get_user_by_email_or_id(user_email_or_id)
version_master = find_version_master(record_pid)
all_pids = [v.pid_value for v in version_master.children.all()]
user = User.query.filter(User.email == user_email).one_or_none()
for single_pid in all_pids:
record = pid2record(single_pid)
record = get_record_by_pid(single_pid)
add_ownership(record, user.id)
if not quiet:
click.secho("Ownership Updated!", fg='red')
Expand All @@ -228,23 +235,19 @@ def add(record_pid, user_email, quiet):
@ownership.command()
@with_appcontext
@click.argument('record-pid', required=True, type=str)
@click.argument('user-email', required=True, type=str)
@click.argument('user-email-or-id', required=True, type=str)
@click.option('-q', '--quiet', is_flag=True, default=False)
def remove(record_pid, user_email, quiet):
def remove(record_pid, user_email_or_id, quiet):
""" Remove user as an owner of the record.

:params record-pid: B2rec record PID
user-email: user email
user-email-or-id: user email or b2access ID
"""
if check_user(user_email) is None:
raise click.ClickException(
"""User does not exist. Please check the email and try again""")

user = _get_user_by_email_or_id(user_email_or_id)
version_master = find_version_master(record_pid)
all_pids = [v.pid_value for v in version_master.children.all()]
user = User.query.filter(User.email == user_email).one_or_none()
for single_pid in all_pids:
record = pid2record(single_pid)
record = get_record_by_pid(single_pid)
remove_ownership(record, user.id)
if not quiet:
click.secho("Ownership Updated!", fg='red')
Expand All @@ -253,19 +256,15 @@ def remove(record_pid, user_email, quiet):

@ownership.command()
@with_appcontext
@click.argument('user-email', required=True, type=str)
@click.argument('user-email-or-id', required=True, type=str)
@click.option('-t', '--type', type=click.Choice(['deposit', 'record']), required=False)
def find(user_email, type=None):
def find(user_email_or_id, type=None):
""" Find all the records or/and deposits where user is one of the owners.

:params user-email: user email
type: record type (deposit, record)
"""
if check_user(user_email) is None:
raise click.ClickException(
"User <{}> does not exist. Please check the email and try again".format(user_email))
# user that we want to find in the db
user = get_user(user_email=user_email)
user = _get_user_by_email_or_id(user_email_or_id)
if user is not None:
# search using ESSearch class and filtering by type
search = search_es(user, type=type)
Expand All @@ -276,29 +275,21 @@ def find(user_email, type=None):
click.secho(click.style(
"No objs found with owner {}".format(str(user)), fg="red"))

@ownership.command('add-all')
@ownership.command('transfer-add')
@with_appcontext
@click.option('-t', '--type', type=click.Choice(['deposit', 'record']), required=False)
@click.argument('user-email', required=True, type=str)
@click.argument('new-user-email', required=True, type=str)
@click.argument('user-email-or-id', required=True, type=str)
@click.argument('new-user-email-or-id', required=True, type=str)
@patch_current_app_config({'SERVER_NAME': os.environ.get('JSONSCHEMAS_HOST')})
def transfer_add(user_email, new_user_email, type=None):
def transfer_add(user_email_or_id, new_user_email_or_id):
""" Add user to all the records or/and deposits if user is not of the owners already.

:param user-email: user email of the owner of the records/deposits
:param new-user-email: user email of the new owner
:param type: record type (deposit, record)

:params user-email-or-id: user email or b2access id of the old owner
new-user-email-or-id: user email or b2access id of the new owner
"""
if check_user(user_email) is None:
raise click.ClickException(
"User <{}> does not exist. Please check the email and try again".format(user_email))
if check_user(new_user_email) is None:
raise click.ClickException(
"New User <{}> does not exist. Please check the email and try again".format(new_user_email))
# user that we want to replace
user = get_user(user_email=user_email)
# new owner that we want to add
new_user = get_user(user_email=new_user_email)
user = _get_user_by_email_or_id(user_email_or_id)
new_user = _get_user_by_email_or_id(new_user_email_or_id)

changed = False
if user is not None and new_user is not None:
# search using ESSearch class and filtering by type
Expand All @@ -308,7 +299,7 @@ def transfer_add(user_email, new_user_email, type=None):
click.secho(click.style("Initial state:", fg="blue"))
print(to_tabulate(search))

#update values for each record/deposit
# update values for each record/deposit
for id in search.keys():
obj = Deposit.get_record(id)
# try to update
Expand All @@ -330,23 +321,18 @@ def transfer_add(user_email, new_user_email, type=None):
click.secho(click.style(
"It was not possible to update the ownership", fg="red"))


@ownership.command('remove-all')
@with_appcontext
@click.argument('user-email', required=True, type=str)
@click.argument('user-email-or-id', required=True, type=str)
@click.option('-t', '--type', type=click.Choice(['deposit', 'record']), required=False)
@patch_current_app_config({'SERVER_NAME': os.environ.get('JSONSCHEMAS_HOST')})
def transfer_remove(user_email, type=None):
def transfer_remove(user_email_or_id, type=None):
""" remove user to all the records or/and deposits.
:param user-email: user email
:param type: record type (deposit, record)

:params user-email: user email
type: record type (deposit, record)
"""
if check_user(user_email) is None:
raise click.ClickException(
"User <{}> does not exist. Please check the email and try again".format(user_email))
# user that we want to remove
user = get_user(user_email=user_email)
user = _get_user_by_email_or_id(user_email_or_id)
changed = False
if user is not None:
# search using ESSearch class and filtering by type
Expand All @@ -356,7 +342,7 @@ def transfer_remove(user_email, type=None):
click.secho(click.style("Initial state:", fg="blue"))
print(to_tabulate(search))

#update values for each record/deposit
# update values for each record/deposit
for id in search.keys():
obj = Deposit.get_record(id)
# try to delete user
Expand Down Expand Up @@ -385,7 +371,7 @@ def search_es(user, type):
use ESSearch to find obj where user is owner

:params user: User obj
type: filter the query. Possible values (deposit, record or None)
type: filter the query. Possible values (deposit, record or None)
'''

query = 'owners:{} || _deposit.owners:{}'.format(user.id, user.id)
Expand All @@ -402,4 +388,3 @@ def search_es(user, type):
else:
click.secho("The search has returned 0 maches.")
return None

Loading