Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: irods download refactoring and new generic sodar downloader with preset for dragen data (#226 ) #227

Merged
merged 11 commits into from
Apr 8, 2024
105 changes: 20 additions & 85 deletions cubi_tk/irods/check.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,46 @@
"""``cubi-tk irods check``: Check target iRODS collection (all md5 files? metadata md5 consistent? enough replicas?)."""

import argparse
from contextlib import contextmanager
import json
from multiprocessing.pool import ThreadPool
import os
import re
import typing

from irods.collection import iRODSCollection
from irods.column import Like
from irods.data_object import iRODSDataObject
from irods.models import Collection as CollectionModel
from irods.models import DataObject as DataObjectModel
from irods.session import iRODSSession
from logzero import logger
import tqdm

from ..irods_common import DEFAULT_HASH_SCHEME, HASH_SCHEMES, iRODSRetrieveCollection

MIN_NUM_REPLICAS = 2
NUM_PARALLEL_TESTS = 4
NUM_DISPLAY_FILES = 20
HASH_SCHEMES = {
"MD5": {"regex": re.compile(r"[0-9a-fA-F]{32}")},
"SHA256": {"regex": re.compile(r"[0-9a-fA-F]{64}")},
}
DEFAULT_HASH_SCHEME = "MD5"


class IrodsCheckCommand:
class IrodsCheckCommand(iRODSRetrieveCollection):
"""Implementation of iRDOS check command."""

command_name = "check"

def __init__(self, args):
#: Command line arguments.
self.args = args
def __init__(self, args, hash_scheme=DEFAULT_HASH_SCHEME, ask=False, irods_env_path=None):
"""Constructor.

#: Path to iRODS environment file
self.irods_env_path = os.path.join(
os.path.expanduser("~"), ".irods", "irods_environment.json"
)
:param args: argparse object with command line arguments.
:type args: argparse.Namespace

#: iRODS environment
self.irods_env = None
:param hash_scheme: iRODS hash scheme, default MD5.
:type hash_scheme: str, optional

def _init_irods(self):
"""Connect to iRODS."""
try:
return iRODSSession(irods_env_file=self.irods_env_path)
except Exception as e:
logger.error("iRODS connection failed: %s", self.get_irods_error(e))
logger.error("Are you logged in? try 'iinit'")
raise
:param ask: Confirm with user before certain actions.
:type ask: bool, optional

@contextmanager
def _get_irods_sessions(self, count=NUM_PARALLEL_TESTS):
if count < 1:
count = 1
irods_sessions = [self._init_irods() for _ in range(count)]
try:
yield irods_sessions
finally:
for irods in irods_sessions:
irods.cleanup()
:param irods_env_path: Path to irods_environment.json
:type irods_env_path: pathlib.Path, optional
"""
super.__init__(hash_scheme, ask, irods_env_path)

Check warning on line 41 in cubi_tk/irods/check.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods/check.py#L41

Added line #L41 was not covered by tests
#: Command line arguments.
self.args = args

Check warning on line 43 in cubi_tk/irods/check.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods/check.py#L43

Added line #L43 was not covered by tests

@classmethod
def setup_argparse(cls, parser: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -100,40 +78,6 @@
)
parser.add_argument("irods_path", help="Path to an iRODS collection.")

@classmethod
def get_irods_error(cls, e: Exception):
"""Return logger friendly iRODS exception."""
es = str(e)
return es if es != "None" else e.__class__.__name__

def get_data_objs(
self, root_coll: iRODSCollection
) -> typing.Dict[
str, typing.Union[typing.Dict[str, iRODSDataObject], typing.List[iRODSDataObject]]
]:
"""Get data objects recursively under the given iRODS path."""
data_objs = dict(files=[], checksums={})
ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.args.hash_scheme.upper()]
irods_sess = root_coll.manager.sess

query = irods_sess.query(DataObjectModel, CollectionModel).filter(
Like(CollectionModel.name, f"{root_coll.path}%")
)

for res in query:
# If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional, likely because a name/path/... attribute is overwritten somewhere
coll_res = {k: v for k, v in res.items() if k.icat_id >= 500}
obj_res = {k: v for k, v in res.items() if k.icat_id < 500}
coll = iRODSCollection(root_coll.manager, coll_res)
obj = iRODSDataObject(irods_sess.data_objects, parent=coll, results=[obj_res])

if obj.path.endswith("." + self.args.hash_scheme.lower()):
data_objs["checksums"][obj.path] = obj
elif obj.path.split(".")[-1] not in ignore_schemes:
data_objs["files"].append(obj)

return data_objs

def check_args(self, _args):
# Check hash scheme
if _args.hash_scheme.upper() not in HASH_SCHEMES:
Expand Down Expand Up @@ -170,18 +114,9 @@
logger.info("iRODS environment: %s", irods_env)

# Connect to iRODS
with self._get_irods_sessions(self.args.num_parallel_tests) as irods_sessions:
try:
root_coll = irods_sessions[0].collections.get(self.args.irods_path)
logger.info(
"{} iRODS connection{} initialized".format(
len(irods_sessions), "s" if len(irods_sessions) != 1 else ""
)
)
except Exception as e:
logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e))
raise

with self.session as irods_session:
root_coll = irods_session.collections.get(self.args.irods_path)

Check warning on line 118 in cubi_tk/irods/check.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods/check.py#L117-L118

Added lines #L117 - L118 were not covered by tests
logger.info("1 iRODS connection initialized")
# Get files and run checks
logger.info("Querying for data objects")
data_objs = self.get_data_objs(root_coll)
Expand Down
127 changes: 124 additions & 3 deletions cubi_tk/irods_common.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
from collections import defaultdict
import getpass
import os.path
from pathlib import Path
from typing import Iterable
import re
from typing import Dict, Iterable, List, Union

import attrs
from irods.collection import iRODSCollection
from irods.column import Like
from irods.data_object import iRODSDataObject
from irods.exception import (
CAT_INVALID_AUTHENTICATION,
CAT_INVALID_USER,
CAT_PASSWORD_EXPIRED,
PAM_AUTH_PASSWORD_FAILED,
)
from irods.keywords import FORCE_FLAG_KW
from irods.models import Collection as CollectionModel
from irods.models import DataObject as DataObjectModel
from irods.password_obfuscation import encode
from irods.session import NonAnonymousLoginWithoutPassword, iRODSSession
import logzero
Expand All @@ -20,6 +28,13 @@
formatter = logzero.LogFormatter(fmt="%(message)s")
output_logger = logzero.setup_logger(formatter=formatter)

#: Default hash scheme. Although iRODS provides alternatives, the whole of `snappy` pipeline uses MD5.
HASH_SCHEMES = {
"MD5": {"regex": re.compile(r"[0-9a-fA-F]{32}")},
"SHA256": {"regex": re.compile(r"[0-9a-fA-F]{64}")},
}
DEFAULT_HASH_SCHEME = "MD5"


@attrs.frozen(auto_attribs=True)
class TransferJob:
Expand Down Expand Up @@ -219,14 +234,18 @@
logger.error("Problem during iRODS checksumming.")
logger.error(self.get_irods_error(e))

def get(self):
def get(self, force_overwrite: bool = False):
"""Download files from SODAR."""
with self.session as session:
self.__jobs = [
attrs.evolve(job, bytes=session.data_objects.get(job.path_remote).size)
for job in self.__jobs
]
self.__total_bytes = sum([job.bytes for job in self.__jobs])

kw_options = {}
if force_overwrite:
kw_options = {FORCE_FLAG_KW: None} # Keyword has no value, just needs to be present

Check warning on line 248 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L248

Added line #L248 was not covered by tests
# Double tqdm for currently transferred file info
with (
tqdm(
Expand All @@ -242,13 +261,115 @@
file_log.set_description_str(
f"File [{n + 1}/{len(self.__jobs)}]: {Path(job.path_local).name}"
)
if os.path.exists(job.path_local) and not force_overwrite: # pragma: no cover
logger.info(
f"{Path(job.path_local).name} already exists. Skipping, use force_overwrite to re-download."
)
continue
try:
Path(job.path_local).parent.mkdir(parents=True, exist_ok=True)
with self.session as session:
session.data_objects.get(job.path_remote, job.path_local)
session.data_objects.get(job.path_remote, job.path_local, **kw_options)
t.update(job.bytes)
except FileNotFoundError: # pragma: no cover
raise
except Exception as e: # pragma: no cover
logger.error(f"Problem during transfer of {job.path_remote}")
logger.error(self.get_irods_error(e))
t.clear()


class iRODSRetrieveCollection(iRODSCommon):
"""Class retrieves iRODS Collection associated with Assay"""

def __init__(
self, hash_scheme: str = DEFAULT_HASH_SCHEME, ask: bool = False, irods_env_path: Path = None
):
"""Constructor.

:param hash_scheme: iRODS hash scheme, default MD5.
:type hash_scheme: str, optional

:param ask: Confirm with user before certain actions.
:type ask: bool, optional

:param irods_env_path: Path to irods_environment.json
:type irods_env_path: pathlib.Path, optional
"""
super().__init__(ask, irods_env_path)
self.hash_scheme = hash_scheme

def retrieve_irods_data_objects(self, irods_path: str) -> Dict[str, List[iRODSDataObject]]:
"""Retrieve data objects from iRODS.

:param irods_path: iRODS path.

:return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str);
Value: list of iRODSDataObject (native python-irodsclient object).
"""

# Connect to iRODS
with self.session as session:
try:
root_coll = session.collections.get(irods_path)

# Get files and run checks
logger.info("Querying for data objects")

if root_coll is not None:
irods_data_objs = self._irods_query(session, root_coll)
irods_obj_dict = self.parse_irods_collection(irods_data_objs)
return irods_obj_dict

except Exception as e: # pragma: no cover
logger.error("Failed to retrieve iRODS path: %s", self.get_irods_error(e))
raise

return {}

Check warning on line 328 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L328

Added line #L328 was not covered by tests

def _irods_query(
self,
session: iRODSSession,
root_coll: iRODSCollection,
) -> Dict[str, Union[Dict[str, iRODSDataObject], List[iRODSDataObject]]]:
"""Get data objects recursively under the given iRODS path."""

ignore_schemes = [k.lower() for k in HASH_SCHEMES if k != self.hash_scheme.upper()]

Check warning on line 337 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L337

Added line #L337 was not covered by tests

query = session.query(DataObjectModel, CollectionModel).filter(

Check warning on line 339 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L339

Added line #L339 was not covered by tests
Like(CollectionModel.name, f"{root_coll.path}%")
)

data_objs = dict(files=[], checksums={})
for res in query:

Check warning on line 344 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L343-L344

Added lines #L343 - L344 were not covered by tests
# If the 'res' dict is not split into Colllection&Object the resulting iRODSDataObject is not fully functional,
# likely because a name/path/... attribute is overwritten somewhere
coll_res = {k: v for k, v in res.items() if k.icat_id >= 500}
obj_res = {k: v for k, v in res.items() if k.icat_id < 500}
Nicolai-vKuegelgen marked this conversation as resolved.
Show resolved Hide resolved
coll = iRODSCollection(root_coll.manager, coll_res)
obj = iRODSDataObject(session.data_objects, parent=coll, results=[obj_res])

Check warning on line 350 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L347-L350

Added lines #L347 - L350 were not covered by tests

if obj.path.endswith("." + self.hash_scheme.lower()):
data_objs["checksums"][obj.path] = obj
elif obj.path.split(".")[-1] not in ignore_schemes:
Nicolai-vKuegelgen marked this conversation as resolved.
Show resolved Hide resolved
data_objs["files"].append(obj)

Check warning on line 355 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L352-L355

Added lines #L352 - L355 were not covered by tests

return data_objs

Check warning on line 357 in cubi_tk/irods_common.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/irods_common.py#L357

Added line #L357 was not covered by tests

@staticmethod
def parse_irods_collection(irods_data_objs) -> Dict[str, List[iRODSDataObject]]:
Nicolai-vKuegelgen marked this conversation as resolved.
Show resolved Hide resolved
"""Parse iRODS collection

:param irods_data_objs: iRODS collection.
:type irods_data_objs: dict

:return: Returns dictionary representation of iRODS collection information. Key: File name in iRODS (str);
Value: list of iRODSDataObject (native python-irodsclient object).
"""
# Initialise variables
output_dict = defaultdict(list)

for obj in irods_data_objs["files"]:
output_dict[obj.name].append(obj)

return output_dict
7 changes: 2 additions & 5 deletions cubi_tk/snappy/check_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
from collections import defaultdict
import os
from pathlib import Path
from types import SimpleNamespace
import typing

from biomedsheets import shortcuts
from logzero import logger

from ..common import load_toml_config
from ..sodar_common import RetrieveSodarCollection
from .common import get_biomedsheet_path, load_sheet_tsv
from .retrieve_irods_collection import DEFAULT_HASH_SCHEME, RetrieveIrodsCollection


class FindFilesCommon:
Expand Down Expand Up @@ -684,9 +683,7 @@
variant_caller_class = VariantCallingChecker

# Find all remote files (iRODS)
pseudo_args = SimpleNamespace(hash_scheme=DEFAULT_HASH_SCHEME)
library_remote_files_dict = RetrieveIrodsCollection(
pseudo_args,
library_remote_files_dict = RetrieveSodarCollection(

Check warning on line 686 in cubi_tk/snappy/check_remote.py

View check run for this annotation

Codecov / codecov/patch

cubi_tk/snappy/check_remote.py#L686

Added line #L686 was not covered by tests
self.args.sodar_url,
self.args.sodar_api_token,
self.args.assay_uuid,
Expand Down
Loading
Loading