Skip to content

Commit

Permalink
feat(utils): add include_shared_workflows parameter (reanahub#216)
Browse files Browse the repository at this point in the history
Adds new `include_shared_workflows` parameter to optionally retrieve
unowned workflows that are shared with the user.

Closes reanahub/reana-client#687
  • Loading branch information
DaanRosendal committed Mar 18, 2024
1 parent cc0c168 commit 7f4fabe
Showing 1 changed file with 99 additions and 39 deletions.
138 changes: 99 additions & 39 deletions reana_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
from typing import Optional
from uuid import UUID

from sqlalchemy import inspect, func
from sqlalchemy.orm import defer
from reana_commons.utils import get_disk_usage
from reana_commons.errors import REANAMissingWorkspaceError

from reana_commons.utils import get_disk_usage
from reana_db.config import (
PERIODIC_RESOURCE_QUOTA_UPDATE_POLICY,
WORKFLOW_TERMINATION_QUOTA_UPDATE_POLICY,
)
from sqlalchemy import func, inspect
from sqlalchemy.orm import defer


def build_workspace_path(user_id, workflow_id=None, workspace_root_path=None):
Expand Down Expand Up @@ -60,7 +59,9 @@ def split_run_number(run_number):
return int(run_number), 0


def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
def _get_workflow_with_uuid_or_name(
uuid_or_name, user_uuid, include_shared_workflows=False
):
"""Get Workflow from database with uuid or name.
:param uuid_or_name: String representing a valid UUIDv4 or valid
Expand All @@ -78,7 +79,8 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
:rtype: reana-db.models.Workflow
"""
from reana_db.models import Workflow

from reana_db.models import UserWorkflow, Workflow

# Check existence
if not uuid_or_name:
Expand All @@ -101,7 +103,7 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
if is_uuid:
# `uuid_or_name` is an UUIDv4.
# Search with it since it is expected to be unique.
return _get_workflow_by_uuid(uuid_or_name, user_uuid)
return _get_workflow_by_uuid(uuid_or_name, user_uuid, include_shared_workflows)

else:
# `uuid_or_name` is not and UUIDv4. Expect it is a name.
Expand All @@ -127,13 +129,17 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
except ValueError:
# Couldn't split. Probably not a dot-separated string.
# -> Search with `uuid_or_name`
return _get_workflow_by_name(uuid_or_name, user_uuid)
return _get_workflow_by_name(
uuid_or_name, user_uuid, include_shared_workflows
)

# Check if `run_number` was specified
if not run_number:
# No `run_number` specified.
# -> Search by `workflow_name`
return _get_workflow_by_name(workflow_name, user_uuid)
return _get_workflow_by_name(
workflow_name, user_uuid, include_shared_workflows
)

# `run_number` was specified.
try:
Expand All @@ -142,43 +148,85 @@ def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
# The specified `run_number` is not valid.
# Assume that this string is the name of
# the workflow and search with it.
return _get_workflow_by_name(uuid_or_name, user_uuid)
return _get_workflow_by_name(
uuid_or_name, user_uuid, include_shared_workflows
)

# `run_number` is valid.

# Search by `run_number_major` and `run_number_minor`, since it is a primary key.
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number_major == run_number_major,
Workflow.run_number_minor == run_number_minor,
Workflow.owner_id == user_uuid,
).one_or_none()
if include_shared_workflows:
workflow = (
Workflow.query.outerjoin(
UserWorkflow, UserWorkflow.workflow_id == Workflow.id_
)
.filter(
(Workflow.name == workflow_name)
& (Workflow.run_number_major == run_number_major)
& (Workflow.run_number_minor == run_number_minor)
& (
(Workflow.owner_id == user_uuid)
| (UserWorkflow.user_id == user_uuid)
)
)
.one_or_none()
)
else:
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number_major == run_number_major,
Workflow.run_number_minor == run_number_minor,
Workflow.owner_id == user_uuid,
).one_or_none()

if not workflow:
raise ValueError(
"REANA_WORKON is set to {0}, but "
f"REANA_WORKON is set to {workflow_name}.{str(int(run_number))}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_name)
"variable appropriately."
)

return workflow


def _get_workflow_by_name(workflow_name, user_uuid):
def _get_workflow_by_name(workflow_name, user_uuid, include_shared_workflows=False):
"""From Workflows named as `workflow_name` the latest run_number.
Only use when you are sure that workflow_name is not UUIDv4.
:rtype: reana-db.models.Workflow
"""
from reana_db.models import Workflow
from reana_db.models import UserWorkflow, Workflow

workflow = (
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
if include_shared_workflows:
workflow = (
Workflow.query.outerjoin(
UserWorkflow, Workflow.id_ == UserWorkflow.workflow_id
)
.filter(
(Workflow.name == workflow_name)
& (
(Workflow.owner_id == user_uuid)
| (UserWorkflow.user_id == user_uuid)
)
)
.order_by(
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
)
.first()
)
.order_by(Workflow.run_number_major.desc(), Workflow.run_number_minor.desc())
.first()
)
else:
workflow = (
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
)
.order_by(
Workflow.run_number_major.desc(), Workflow.run_number_minor.desc()
)
.first()
)

if not workflow:
raise ValueError(
"REANA_WORKON is set to {0}, but "
Expand All @@ -189,7 +237,7 @@ def _get_workflow_by_name(workflow_name, user_uuid):
return workflow


def _get_workflow_by_uuid(workflow_uuid, user_uuid):
def _get_workflow_by_uuid(workflow_uuid, user_uuid, include_shared_workflows=False):
"""Get Workflow with UUIDv4.
:param workflow_uuid: UUIDv4 of a Workflow.
Expand All @@ -198,11 +246,28 @@ def _get_workflow_by_uuid(workflow_uuid, user_uuid):
:rtype: reana-db.models.Workflow
"""
from reana_db.models import Workflow

workflow = Workflow.query.filter(
Workflow.id_ == workflow_uuid, Workflow.owner_id == user_uuid
).first()
from reana_db.models import UserWorkflow, Workflow

if include_shared_workflows:
workflow = (
Workflow.query.outerjoin(
UserWorkflow, Workflow.id_ == UserWorkflow.workflow_id
)
.filter(
(Workflow.id_ == workflow_uuid)
& (
(Workflow.owner_id == user_uuid)
| (UserWorkflow.user_id == user_uuid)
)
)
.first()
)
else:
workflow = Workflow.query.filter(
Workflow.id_ == workflow_uuid, Workflow.owner_id == user_uuid
).first()

if not workflow:
raise ValueError(
"REANA_WORKON is set to {0}, but "
Expand Down Expand Up @@ -322,11 +387,11 @@ def update_users_disk_quota(
"""
from reana_db.database import Session
from reana_db.models import (
Workflow,
WorkflowResource,
ResourceType,
User,
UserResource,
Workflow,
WorkflowResource,
)

if not override_policy_checks and should_skip_quota_update(ResourceType.disk):
Expand Down Expand Up @@ -382,12 +447,7 @@ def update_workflow_cpu_quota(workflow) -> int:
:return: Workflow running time in milliseconds if workflow has terminated, else 0.
"""
from reana_db.database import Session

from reana_db.models import (
ResourceType,
UserResource,
WorkflowResource,
)
from reana_db.models import ResourceType, UserResource, WorkflowResource

if should_skip_quota_update(ResourceType.cpu):
return
Expand Down

0 comments on commit 7f4fabe

Please sign in to comment.