Skip to content

Commit

Permalink
Refactor transfer_complete_datasets for one transfer job (#340)
Browse files Browse the repository at this point in the history
* Refactor `transfer_complete_datasets`

This changes it to only do one globus task rather than one per dataset

* Add test to cover error catching in net helpers

* Cover more error catching

* Well that didn't work

* Add test for label on download of many datasets at once

* More test coverage

---------

Co-authored-by: Drew Leonard <[email protected]>
  • Loading branch information
Cadair and SolarDrew authored Sep 11, 2024
1 parent 983c1f1 commit 7f186b5
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 127 deletions.
1 change: 1 addition & 0 deletions changelog/340.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug with `dkist.net.transfer_complete_datasets` where a length one ``UnifiedResponse`` would cause an error.
1 change: 1 addition & 0 deletions changelog/340.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`dkist.net.transfer_complete_datasets` will now only create one Globus task for all datasets it downloads.
14 changes: 14 additions & 0 deletions dkist/net/globus/tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints):
assert f"{os.path.sep}b{os.path.sep}" + filepath.name == tfr["destination_path"]


def test_start_transfer_multiple_paths(mocker, transfer_client, mock_endpoints):
submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer",
return_value={"task_id": "task_id"})
mocker.patch("globus_sdk.TransferClient.get_submission_id",
return_value={"value": "wibble"})
file_list = list(map(Path, ["/a/name.fits", "/a/name2.fits"]))
dst_list = list(map(Path, ["/aplace/newname.fits", "/anotherplace/newname2.fits"]))
start_transfer_from_file_list("a", "b", dst_list, file_list)
transfer_manifest = submit_mock.call_args_list[0][0][0]["DATA"]

for filepath, tfr in zip(dst_list, transfer_manifest):
assert str(filepath) == tfr["destination_path"]


def test_process_event_list(transfer_client, mock_task_event_list):
(events,
json_events,
Expand Down
55 changes: 32 additions & 23 deletions dkist/net/globus/transfer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Functions and helpers for orchestrating and monitoring transfers using Globus.
"""
import copy
import json
import time
import pathlib
Expand All @@ -19,43 +18,50 @@
__all__ = ["watch_transfer_progress", "start_transfer_from_file_list"]


def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list,
src_base_path=None, recursive=False, label=None):
def start_transfer_from_file_list(
src_endpoint: str,
dst_endpoint: str,
dst_base_path: PathLike,
file_list: list[PathLike],
src_base_path: PathLike = None,
recursive: bool | list[bool] = False,
label: str = None
) -> str:
"""
Start a new transfer task for a list of files.
Parameters
----------
src_endpoint : `str`
src_endpoint
The endpoint to copy file from. Can be any identifier accepted by
`~dkist.net.globus.get_endpoint_id`.
dst_endpoint : `str`
dst_endpoint
The endpoint to copy file to. Can be any identifier accepted by
`~dkist.net.globus.get_endpoint_id`.
dst_base_path : `~pathlib.Path`
dst_base_path
The destination path, must be accessible from the endpoint, will be
created if it does not exist.
file_list : `list`
file_list
The list of file paths on the ``src_endpoint`` to transfer to the ``dst_endpoint``.
src_base_path : `~pathlib.Path`, optional
src_base_path
The path prefix on the items in ``file_list`` to be stripped before
copying to ``dst_base_path``. i.e. if the file path in ``path_list`` is
``/spam/eggs/filename.fits`` and ``src_base_path`` is ``/spam`` the
``eggs/`` folder will be copied to ``dst_base_path``. By default only
the filenames are kept, and none of the directories.
recursive : `bool` or `list` of `bool`, optional
recursive
Controls if the path in ``file_list`` is added to the Globus task with
the recursive flag or not.
This should be `True` if the element of ``file_list`` is a directory.
If you need to set this per-item in ``file_list`` it should be a `list`
of `bool` of equal length as ``file_list``.
label : `str`
label
Label for the Globus transfer. If None then a default will be used.
Returns
Expand Down Expand Up @@ -87,17 +93,20 @@ def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, fil
sync_level="checksum",
verify_checksum=True)

dst_base_path = pathlib.Path(dst_base_path)
src_file_list = copy.copy(file_list)
dst_file_list = []
for src_file in src_file_list:
# If a common prefix is not specified just copy the filename
if not src_base_path:
src_filepath = src_file.name
else:
# Otherwise use the filepath relative to the base path
src_filepath = src_file.relative_to(src_base_path)
dst_file_list.append(dst_base_path / src_filepath)
src_file_list = file_list
if not isinstance(dst_base_path, (list, tuple)):
dst_base_path = pathlib.Path(dst_base_path)
dst_file_list = []
for src_file in src_file_list:
# If a common prefix is not specified just copy the filename or last directory
if not src_base_path:
src_filepath = src_file.name
else:
# Otherwise use the filepath relative to the base path
src_filepath = src_file.relative_to(src_base_path)
dst_file_list.append(dst_base_path / src_filepath)
else:
dst_file_list = dst_base_path

for src_file, dst_file, rec in zip(src_file_list, dst_file_list, recursive):
transfer_manifest.add_item(str(src_file), str(dst_file), recursive=rec)
Expand Down Expand Up @@ -265,12 +274,12 @@ def watch_transfer_progress(task_id, tfr_client, poll_interval=5,

def _orchestrate_transfer_task(file_list: list[PathLike],
recursive: list[bool],
destination_path: PathLike = "/~/",
destination_path: PathLike | list[PathLike] = "/~/",
destination_endpoint: str = None,
*,
progress: bool | Literal["verbose"] = True,
wait: bool = True,
label=None):
label: str = None):
"""
Transfer the files given in file_list to the path on ``destination_endpoint``.
Expand Down
104 changes: 60 additions & 44 deletions dkist/net/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sunpy.net.base_client import QueryResponseRow
from sunpy.net.fido_factory import UnifiedResponse

from dkist.net import conf
from dkist.net.attrs import Dataset
from dkist.net.client import DKISTClient, DKISTQueryResponseTable
from dkist.net.globus.transfer import _orchestrate_transfer_task
Expand All @@ -35,6 +36,25 @@ def _get_dataset_inventory(dataset_id: str | Iterable[str]) -> DKISTQueryRespons
return results


def _get_globus_path_for_dataset(dataset: QueryResponseRow):
"""
Given a dataset ID get the directory on the source endpoint.
"""
if not isinstance(dataset, QueryResponseRow):
raise TypeError("Input should be a single row of dataset inventory.")

# At this point we only have one dataset, and it should be a row not a table
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]

return Path(conf.dataset_path.format(
datasetId=dataset_id,
primaryProposalId=proposal_id,
bucket=bucket
))


def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow | DKISTQueryResponseTable | UnifiedResponse,
path: PathLike = "/~/",
destination_endpoint: str = None,
Expand All @@ -52,14 +72,13 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow
``Fido.search``.
path
The path to save the data in, must be accessible by the Globus
endpoint.
The default value is ``/~/``.
It is possible to put placeholder strings in the path with any key
from the dataset inventory dictionary which can be accessed as
``ds.meta['inventory']``. An example of this would be
``path="~/dkist/{datasetId}"`` to save the files in a folder named
with the dataset ID being downloaded.
The path to save the data in, must be accessible by the Globus endpoint.
The default value is ``/~/``. It is possible to put placeholder strings
in the path with any key from inventory which can be shown with
:meth:`dkist.utils.inventory.path_format_keys`. An example of this
would be ``path="~/dkist/{primary_proposal_id}"`` to save the files in a
folder named for the proposal id. **Note** that ``{dataset_id}`` is
always added to the path if it is not already the last element.
destination_endpoint
A unique specifier for a Globus endpoint. If `None` a local
Expand Down Expand Up @@ -87,18 +106,22 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow
The path to the directories containing the dataset(s) on the destination endpoint.
"""
# Avoid circular import
from dkist.net import conf
path = Path(path)
if path.parts[-1] != "{dataset_id}":
path = path / "{dataset_id}"

if isinstance(datasets, (DKISTQueryResponseTable, QueryResponseRow)):
# These we don't have to pre-process
if isinstance(datasets, DKISTQueryResponseTable):
# This we don't have to pre-process
pass

elif isinstance(datasets, QueryResponseRow):
datasets = DKISTQueryResponseTable(datasets)

elif isinstance(datasets, UnifiedResponse):
# If we have a UnifiedResponse object, it could contain one or more dkist tables.
# Stack them and then treat them like we were passed a single table with many rows.
datasets = datasets["dkist"]
if len(datasets) > 1:
if isinstance(datasets, UnifiedResponse) and len(datasets) > 1:
datasets = table.vstack(datasets, metadata_conflicts="silent")

elif isinstance(datasets, str) or all(isinstance(d, str) for d in datasets):
Expand All @@ -109,45 +132,38 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow
# Anything else, error
raise TypeError(f"{type(datasets)} is of an unknown type, it should be search results or one or more dataset IDs.")

if not isinstance(datasets, QueryResponseRow) and len(datasets) > 1:
paths = []
for record in datasets:
paths.append(transfer_complete_datasets(record,
path=path,
destination_endpoint=destination_endpoint,
progress=progress,
wait=wait,
label=label))
return paths

# ensure a length one table is a row
if len(datasets) == 1:
datasets = datasets[0]

# At this point we only have one dataset, and it should be a row not a table
dataset = datasets
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]
source_paths = []
for record in datasets:
source_paths.append(_get_globus_path_for_dataset(record))

path_inv = path_format_inventory(dict(dataset))
destination_path = Path(path.format(**path_inv))
destination_paths = []
for dataset in datasets:
dataset_id = dataset["Dataset ID"]
proposal_id = dataset["Primary Proposal ID"]
bucket = dataset["Storage Bucket"]

file_list = [Path(conf.dataset_path.format(
datasetId=dataset_id,
primaryProposalId=proposal_id,
bucket=bucket
))]
path_inv = path_format_inventory(dict(dataset))
destination_paths.append(Path(str(path).format(**path_inv)))

if not label:
now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M")
datasetids = ",".join(datasets["Dataset ID"])
if len(datasetids) > 80:
datasetids = f"{len(datasets['Dataset ID'])} datasets"
label = f"DKIST Python Tools - {now} - {datasetids}"

now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M")
label = f"DKIST Python Tools - {now} {dataset_id}" if label is None else label
# Globus limits labels to 128 characters, so truncate if needed
# In principle this can't happen because of the truncation above, but just in case
if len(label) > 128:
label = label[:125] + "..." # pragma: no cover

_orchestrate_transfer_task(file_list,
_orchestrate_transfer_task(source_paths,
recursive=True,
destination_path=destination_path,
destination_path=destination_paths,
destination_endpoint=destination_endpoint,
progress=progress,
wait=wait,
label=label)

return destination_path / dataset_id
return destination_paths
Loading

0 comments on commit 7f186b5

Please sign in to comment.