Skip to content

Commit

Permalink
Refactor transfer_complete_datasets
Browse files Browse the repository at this point in the history
This changes it to only do one globus task rather than one per dataset
  • Loading branch information
Cadair committed Mar 6, 2024
1 parent 788bfd9 commit 300fa04
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 127 deletions.
1 change: 1 addition & 0 deletions changelog/340.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug with `dkist.net.transfer_complete_datasets` where a length one ``UnifiedResponse`` would cause an error.
1 change: 1 addition & 0 deletions changelog/340.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`dkist.net.transfer_complete_datasets` will now only create one Globus task for all datasets it downloads.
55 changes: 32 additions & 23 deletions dkist/net/globus/transfer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Functions and helpers for orchestrating and monitoring transfers using Globus.
"""
import copy
import json
import time
import pathlib
Expand All @@ -19,43 +18,50 @@
__all__ = ['watch_transfer_progress', 'start_transfer_from_file_list']


def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list,
src_base_path=None, recursive=False, label=None):
def start_transfer_from_file_list(
src_endpoint: str,
dst_endpoint: str,
dst_base_path: PathLike,
file_list: List[PathLike],
src_base_path: PathLike = None,
recursive: Union[bool, List[bool]] = False,
label: str = None
) -> str:
"""
Start a new transfer task for a list of files.
Parameters
----------
src_endpoint : `str`
src_endpoint
The endpoint to copy file from. Can be any identifier accepted by
`~dkist.net.globus.get_endpoint_id`.
dst_endpoint : `str`
dst_endpoint
The endpoint to copy file to. Can be any identifier accepted by
`~dkist.net.globus.get_endpoint_id`.
dst_base_path : `~pathlib.Path`
dst_base_path
The destination path, must be accessible from the endpoint, will be
created if it does not exist.
file_list : `list`
file_list
The list of file paths on the ``src_endpoint`` to transfer to the ``dst_endpoint``.
src_base_path : `~pathlib.Path`, optional
src_base_path
The path prefix on the items in ``file_list`` to be stripped before
copying to ``dst_base_path``. i.e. if the file path in ``path_list`` is
``/spam/eggs/filename.fits`` and ``src_base_path`` is ``/spam`` the
``eggs/`` folder will be copied to ``dst_base_path``. By default only
the filenames are kept, and none of the directories.
recursive : `bool` or `list` of `bool`, optional
recursive
Controls if the path in ``file_list`` is added to the Globus task with
the recursive flag or not.
This should be `True` if the element of ``file_list`` is a directory.
If you need to set this per-item in ``file_list`` it should be a `list`
of `bool` of equal length as ``file_list``.
label : `str`
label
Label for the Globus transfer. If None then a default will be used.
Returns
Expand Down Expand Up @@ -87,17 +93,20 @@ def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, fil
sync_level="checksum",
verify_checksum=True)

dst_base_path = pathlib.Path(dst_base_path)
src_file_list = copy.copy(file_list)
dst_file_list = []
for src_file in src_file_list:
# If a common prefix is not specified just copy the filename
if not src_base_path:
src_filepath = src_file.name
else:
# Otherwise use the filepath relative to the base path
src_filepath = src_file.relative_to(src_base_path)
dst_file_list.append(dst_base_path / src_filepath)
src_file_list = file_list
if not isinstance(dst_base_path, (list, tuple)):
dst_base_path = pathlib.Path(dst_base_path)
dst_file_list = []
for src_file in src_file_list:
# If a common prefix is not specified just copy the filename or last directory
if not src_base_path:
src_filepath = src_file.name
else:
# Otherwise use the filepath relative to the base path
src_filepath = src_file.relative_to(src_base_path)
dst_file_list.append(dst_base_path / src_filepath)
else:
dst_file_list = dst_base_path

Check warning on line 109 in dkist/net/globus/transfer.py

View check run for this annotation

Codecov / codecov/patch

dkist/net/globus/transfer.py#L109

Added line #L109 was not covered by tests

for src_file, dst_file, rec in zip(src_file_list, dst_file_list, recursive):
transfer_manifest.add_item(str(src_file), str(dst_file), recursive=rec)
Expand Down Expand Up @@ -266,12 +275,12 @@ def watch_transfer_progress(task_id, tfr_client, poll_interval=5,

def _orchestrate_transfer_task(file_list: List[PathLike],
recursive: List[bool],
destination_path: PathLike = "/~/",
destination_path: Union[PathLike, List[PathLike]] = "/~/",
destination_endpoint: str = None,
*,
progress: Union[bool, Literal["verbose"]] = True,
wait: bool = True,
label=None):
label: str = None):
"""
Transfer the files given in file_list to the path on ``destination_endpoint``.
Expand Down
103 changes: 59 additions & 44 deletions dkist/net/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sunpy.net.base_client import QueryResponseRow
from sunpy.net.fido_factory import UnifiedResponse

from dkist.net import conf
from dkist.net.attrs import Dataset
from dkist.net.client import DKISTClient, DKISTQueryResponseTable
from dkist.net.globus.transfer import _orchestrate_transfer_task
Expand All @@ -33,6 +34,25 @@ def _get_dataset_inventory(dataset_id: Union[str, Iterable[str]]) -> DKISTQueryR
return results


def _get_globus_path_for_dataset(dataset: QueryResponseRow):
"""
Given a dataset ID get the directory on the source endpoint.
"""
if not isinstance(dataset, QueryResponseRow):
raise TypeError("Input should be a single row of dataset inventory.")

Check warning on line 42 in dkist/net/helpers.py

View check run for this annotation

Codecov / codecov/patch

dkist/net/helpers.py#L42

Added line #L42 was not covered by tests

# At this point we only have one dataset, and it should be a row not a table
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]

return Path(conf.dataset_path.format(
datasetId=dataset_id,
primaryProposalId=proposal_id,
bucket=bucket
))


def transfer_complete_datasets(datasets: Union[str, Iterable[str], QueryResponseRow, DKISTQueryResponseTable, UnifiedResponse],
path: PathLike = "/~/",
destination_endpoint: str = None,
Expand All @@ -50,14 +70,13 @@ def transfer_complete_datasets(datasets: Union[str, Iterable[str], QueryResponse
``Fido.search``.
path
The path to save the data in, must be accessible by the Globus
endpoint.
The default value is ``/~/``.
It is possible to put placeholder strings in the path with any key
from the dataset inventory dictionary which can be accessed as
``ds.meta['inventory']``. An example of this would be
``path="~/dkist/{datasetId}"`` to save the files in a folder named
with the dataset ID being downloaded.
The path to save the data in, must be accessible by the Globus endpoint.
The default value is ``/~/``. It is possible to put placeholder strings
in the path with any key from inventory which can be shown with
:meth:`dkist.utils.inventory.path_format_keys`. An example of this
would be ``path="~/dkist/{primary_proposal_id}"`` to save the files in a
folder named for the proposal id. **Note** that ``{dataset_id}`` is
always added to the path if it is not already the last element.
destination_endpoint
A unique specifier for a Globus endpoint. If `None` a local
Expand Down Expand Up @@ -85,18 +104,22 @@ def transfer_complete_datasets(datasets: Union[str, Iterable[str], QueryResponse
The path to the directories containing the dataset(s) on the destination endpoint.
"""
# Avoid circular import
from dkist.net import conf
path = Path(path)
if path.parts[-1] != "{dataset_id}":
path = path / "{dataset_id}"

if isinstance(datasets, (DKISTQueryResponseTable, QueryResponseRow)):
# These we don't have to pre-process
if isinstance(datasets, DKISTQueryResponseTable):
# This we don't have to pre-process
pass

elif isinstance(datasets, QueryResponseRow):
datasets = DKISTQueryResponseTable(datasets)

elif isinstance(datasets, UnifiedResponse):
# If we have a UnifiedResponse object, it could contain one or more dkist tables.
# Stack them and then treat them like we were passed a single table with many rows.
datasets = datasets["dkist"]
if len(datasets) > 1:
if isinstance(datasets, UnifiedResponse) and len(datasets) > 1:
datasets = table.vstack(datasets, metadata_conflicts="silent")

elif isinstance(datasets, str) or all((isinstance(d, str) for d in datasets)):
Expand All @@ -107,45 +130,37 @@ def transfer_complete_datasets(datasets: Union[str, Iterable[str], QueryResponse
# Anything else, error
raise TypeError(f"{type(datasets)} is of an unknown type, it should be search results or one or more dataset IDs.")

if not isinstance(datasets, QueryResponseRow) and len(datasets) > 1:
paths = []
for record in datasets:
paths.append(transfer_complete_datasets(record,
path=path,
destination_endpoint=destination_endpoint,
progress=progress,
wait=wait,
label=label))
return paths

# ensure a length one table is a row
if len(datasets) == 1:
datasets = datasets[0]

# At this point we only have one dataset, and it should be a row not a table
dataset = datasets
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]
source_paths = []
for record in datasets:
source_paths.append(_get_globus_path_for_dataset(record))

path_inv = path_format_inventory(dict(dataset))
destination_path = Path(path.format(**path_inv))
destination_paths = []
for dataset in datasets:
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]

file_list = [Path(conf.dataset_path.format(
datasetId=dataset_id,
primaryProposalId=proposal_id,
bucket=bucket
))]
path_inv = path_format_inventory(dict(dataset))
destination_paths.append(Path(str(path).format(**path_inv)))

if not label:
now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M")
datasetids = ','.join(datasets["Dataset ID"])
if len(datasetids) > 80:
datasetids = f"{len(datasets['Dataset ID'])} datasets"

Check warning on line 151 in dkist/net/helpers.py

View check run for this annotation

Codecov / codecov/patch

dkist/net/helpers.py#L151

Added line #L151 was not covered by tests
label = f"DKIST Python Tools - {now} - {datasetids}"

now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M")
label = f"DKIST Python Tools - {now} {dataset_id}" if label is None else label
# Globus limits labels to 128 characters, so truncate if needed
if len(label) > 128:
label = label[:125] + '...'

Check warning on line 156 in dkist/net/helpers.py

View check run for this annotation

Codecov / codecov/patch

dkist/net/helpers.py#L156

Added line #L156 was not covered by tests

_orchestrate_transfer_task(file_list,
_orchestrate_transfer_task(source_paths,
recursive=True,
destination_path=destination_path,
destination_path=destination_paths,
destination_endpoint=destination_endpoint,
progress=progress,
wait=wait,
label=label)

return destination_path / dataset_id
return destination_paths
Loading

0 comments on commit 300fa04

Please sign in to comment.