From 7f186b5763f9e1f2c9a32d9c356d1258b8fb5715 Mon Sep 17 00:00:00 2001 From: Stuart Mumford Date: Wed, 11 Sep 2024 16:02:54 +0100 Subject: [PATCH] Refactor `transfer_complete_datasets` for one transfer job (#340) * Refactor `transfer_complete_datasets` This changes it to only do one globus task rather than one per dataset * Add test to cover error catching in net helpers * Cover more error catching * Well that didn't work * Add test for label on download of many datasets at once * More test coverage --------- Co-authored-by: Drew Leonard --- changelog/340.bugfix.rst | 1 + changelog/340.feature.rst | 1 + dkist/net/globus/tests/test_transfer.py | 14 +++ dkist/net/globus/transfer.py | 55 ++++++---- dkist/net/helpers.py | 104 ++++++++++-------- dkist/net/tests/test_helpers.py | 135 +++++++++++++----------- dkist/utils/inventory.py | 2 +- 7 files changed, 185 insertions(+), 127 deletions(-) create mode 100644 changelog/340.bugfix.rst create mode 100644 changelog/340.feature.rst diff --git a/changelog/340.bugfix.rst b/changelog/340.bugfix.rst new file mode 100644 index 00000000..8b22d1d8 --- /dev/null +++ b/changelog/340.bugfix.rst @@ -0,0 +1 @@ +Fix a bug with `dkist.net.transfer_complete_datasets` where a length one ``UnifiedResponse`` would cause an error. diff --git a/changelog/340.feature.rst b/changelog/340.feature.rst new file mode 100644 index 00000000..eb592000 --- /dev/null +++ b/changelog/340.feature.rst @@ -0,0 +1 @@ +`dkist.net.transfer_complete_datasets` will now only create one Globus task for all datasets it downloads. diff --git a/dkist/net/globus/tests/test_transfer.py b/dkist/net/globus/tests/test_transfer.py index 35f186c8..21491470 100644 --- a/dkist/net/globus/tests/test_transfer.py +++ b/dkist/net/globus/tests/test_transfer.py @@ -107,6 +107,20 @@ def test_start_transfer_src_base(mocker, transfer_client, mock_endpoints): assert f"{os.path.sep}b{os.path.sep}" + filepath.name == tfr["destination_path"] +def test_start_transfer_multiple_paths(mocker, transfer_client, mock_endpoints): + submit_mock = mocker.patch("globus_sdk.TransferClient.submit_transfer", + return_value={"task_id": "task_id"}) + mocker.patch("globus_sdk.TransferClient.get_submission_id", + return_value={"value": "wibble"}) + file_list = list(map(Path, ["/a/name.fits", "/a/name2.fits"])) + dst_list = list(map(Path, ["/aplace/newname.fits", "/anotherplace/newname2.fits"])) + start_transfer_from_file_list("a", "b", dst_list, file_list) + transfer_manifest = submit_mock.call_args_list[0][0][0]["DATA"] + + for filepath, tfr in zip(dst_list, transfer_manifest): + assert str(filepath) == tfr["destination_path"] + + def test_process_event_list(transfer_client, mock_task_event_list): (events, json_events, diff --git a/dkist/net/globus/transfer.py b/dkist/net/globus/transfer.py index 90e711bb..2669738f 100644 --- a/dkist/net/globus/transfer.py +++ b/dkist/net/globus/transfer.py @@ -1,7 +1,6 @@ """ Functions and helpers for orchestrating and monitoring transfers using Globus. """ -import copy import json import time import pathlib @@ -19,43 +18,50 @@ __all__ = ["watch_transfer_progress", "start_transfer_from_file_list"] -def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, file_list, - src_base_path=None, recursive=False, label=None): +def start_transfer_from_file_list( + src_endpoint: str, + dst_endpoint: str, + dst_base_path: PathLike, + file_list: list[PathLike], + src_base_path: PathLike = None, + recursive: bool | list[bool] = False, + label: str = None +) -> str: """ Start a new transfer task for a list of files. Parameters ---------- - src_endpoint : `str` + src_endpoint The endpoint to copy file from. Can be any identifier accepted by `~dkist.net.globus.get_endpoint_id`. - dst_endpoint : `str` + dst_endpoint The endpoint to copy file to. Can be any identifier accepted by `~dkist.net.globus.get_endpoint_id`. - dst_base_path : `~pathlib.Path` + dst_base_path The destination path, must be accessible from the endpoint, will be created if it does not exist. - file_list : `list` + file_list The list of file paths on the ``src_endpoint`` to transfer to the ``dst_endpoint``. - src_base_path : `~pathlib.Path`, optional + src_base_path The path prefix on the items in ``file_list`` to be stripped before copying to ``dst_base_path``. i.e. if the file path in ``path_list`` is ``/spam/eggs/filename.fits`` and ``src_base_path`` is ``/spam`` the ``eggs/`` folder will be copied to ``dst_base_path``. By default only the filenames are kept, and none of the directories. - recursive : `bool` or `list` of `bool`, optional + recursive Controls if the path in ``file_list`` is added to the Globus task with the recursive flag or not. This should be `True` if the element of ``file_list`` is a directory. If you need to set this per-item in ``file_list`` it should be a `list` of `bool` of equal length as ``file_list``. - label : `str` + label Label for the Globus transfer. If None then a default will be used. Returns @@ -87,17 +93,20 @@ def start_transfer_from_file_list(src_endpoint, dst_endpoint, dst_base_path, fil sync_level="checksum", verify_checksum=True) - dst_base_path = pathlib.Path(dst_base_path) - src_file_list = copy.copy(file_list) - dst_file_list = [] - for src_file in src_file_list: - # If a common prefix is not specified just copy the filename - if not src_base_path: - src_filepath = src_file.name - else: - # Otherwise use the filepath relative to the base path - src_filepath = src_file.relative_to(src_base_path) - dst_file_list.append(dst_base_path / src_filepath) + src_file_list = file_list + if not isinstance(dst_base_path, (list, tuple)): + dst_base_path = pathlib.Path(dst_base_path) + dst_file_list = [] + for src_file in src_file_list: + # If a common prefix is not specified just copy the filename or last directory + if not src_base_path: + src_filepath = src_file.name + else: + # Otherwise use the filepath relative to the base path + src_filepath = src_file.relative_to(src_base_path) + dst_file_list.append(dst_base_path / src_filepath) + else: + dst_file_list = dst_base_path for src_file, dst_file, rec in zip(src_file_list, dst_file_list, recursive): transfer_manifest.add_item(str(src_file), str(dst_file), recursive=rec) @@ -265,12 +274,12 @@ def watch_transfer_progress(task_id, tfr_client, poll_interval=5, def _orchestrate_transfer_task(file_list: list[PathLike], recursive: list[bool], - destination_path: PathLike = "/~/", + destination_path: PathLike | list[PathLike] = "/~/", destination_endpoint: str = None, *, progress: bool | Literal["verbose"] = True, wait: bool = True, - label=None): + label: str = None): """ Transfer the files given in file_list to the path on ``destination_endpoint``. diff --git a/dkist/net/helpers.py b/dkist/net/helpers.py index 7627d06a..67314d9d 100644 --- a/dkist/net/helpers.py +++ b/dkist/net/helpers.py @@ -13,6 +13,7 @@ from sunpy.net.base_client import QueryResponseRow from sunpy.net.fido_factory import UnifiedResponse +from dkist.net import conf from dkist.net.attrs import Dataset from dkist.net.client import DKISTClient, DKISTQueryResponseTable from dkist.net.globus.transfer import _orchestrate_transfer_task @@ -35,6 +36,25 @@ def _get_dataset_inventory(dataset_id: str | Iterable[str]) -> DKISTQueryRespons return results +def _get_globus_path_for_dataset(dataset: QueryResponseRow): + """ + Given a dataset ID get the directory on the source endpoint. + """ + if not isinstance(dataset, QueryResponseRow): + raise TypeError("Input should be a single row of dataset inventory.") + + # At this point we only have one dataset, and it should be a row not a table + dataset_id = dataset["Dataset ID"] + proposal_id = dataset["Primary Proposal ID"] + bucket = dataset["Storage Bucket"] + + return Path(conf.dataset_path.format( + datasetId=dataset_id, + primaryProposalId=proposal_id, + bucket=bucket + )) + + def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow | DKISTQueryResponseTable | UnifiedResponse, path: PathLike = "/~/", destination_endpoint: str = None, @@ -52,14 +72,13 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow ``Fido.search``. path - The path to save the data in, must be accessible by the Globus - endpoint. - The default value is ``/~/``. - It is possible to put placeholder strings in the path with any key - from the dataset inventory dictionary which can be accessed as - ``ds.meta['inventory']``. An example of this would be - ``path="~/dkist/{datasetId}"`` to save the files in a folder named - with the dataset ID being downloaded. + The path to save the data in, must be accessible by the Globus endpoint. + The default value is ``/~/``. It is possible to put placeholder strings + in the path with any key from inventory which can be shown with + :meth:`dkist.utils.inventory.path_format_keys`. An example of this + would be ``path="~/dkist/{primary_proposal_id}"`` to save the files in a + folder named for the proposal id. **Note** that ``{dataset_id}`` is + always added to the path if it is not already the last element. destination_endpoint A unique specifier for a Globus endpoint. If `None` a local @@ -87,18 +106,22 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow The path to the directories containing the dataset(s) on the destination endpoint. """ - # Avoid circular import - from dkist.net import conf + path = Path(path) + if path.parts[-1] != "{dataset_id}": + path = path / "{dataset_id}" - if isinstance(datasets, (DKISTQueryResponseTable, QueryResponseRow)): - # These we don't have to pre-process + if isinstance(datasets, DKISTQueryResponseTable): + # This we don't have to pre-process pass + elif isinstance(datasets, QueryResponseRow): + datasets = DKISTQueryResponseTable(datasets) + elif isinstance(datasets, UnifiedResponse): # If we have a UnifiedResponse object, it could contain one or more dkist tables. # Stack them and then treat them like we were passed a single table with many rows. datasets = datasets["dkist"] - if len(datasets) > 1: + if isinstance(datasets, UnifiedResponse) and len(datasets) > 1: datasets = table.vstack(datasets, metadata_conflicts="silent") elif isinstance(datasets, str) or all(isinstance(d, str) for d in datasets): @@ -109,45 +132,38 @@ def transfer_complete_datasets(datasets: str | Iterable[str] | QueryResponseRow # Anything else, error raise TypeError(f"{type(datasets)} is of an unknown type, it should be search results or one or more dataset IDs.") - if not isinstance(datasets, QueryResponseRow) and len(datasets) > 1: - paths = [] - for record in datasets: - paths.append(transfer_complete_datasets(record, - path=path, - destination_endpoint=destination_endpoint, - progress=progress, - wait=wait, - label=label)) - return paths - - # ensure a length one table is a row - if len(datasets) == 1: - datasets = datasets[0] - # At this point we only have one dataset, and it should be a row not a table - dataset = datasets - dataset_id = dataset["Dataset ID"] - proposal_id = dataset["Primary Proposal ID"] - bucket = dataset["Storage Bucket"] + source_paths = [] + for record in datasets: + source_paths.append(_get_globus_path_for_dataset(record)) - path_inv = path_format_inventory(dict(dataset)) - destination_path = Path(path.format(**path_inv)) + destination_paths = [] + for dataset in datasets: + dataset_id = dataset["Dataset ID"] + proposal_id = dataset["Primary Proposal ID"] + bucket = dataset["Storage Bucket"] - file_list = [Path(conf.dataset_path.format( - datasetId=dataset_id, - primaryProposalId=proposal_id, - bucket=bucket - ))] + path_inv = path_format_inventory(dict(dataset)) + destination_paths.append(Path(str(path).format(**path_inv))) + + if not label: + now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M") + datasetids = ",".join(datasets["Dataset ID"]) + if len(datasetids) > 80: + datasetids = f"{len(datasets['Dataset ID'])} datasets" + label = f"DKIST Python Tools - {now} - {datasetids}" - now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M") - label = f"DKIST Python Tools - {now} {dataset_id}" if label is None else label + # Globus limits labels to 128 characters, so truncate if needed + # In principle this can't happen because of the truncation above, but just in case + if len(label) > 128: + label = label[:125] + "..." # pragma: no cover - _orchestrate_transfer_task(file_list, + _orchestrate_transfer_task(source_paths, recursive=True, - destination_path=destination_path, + destination_path=destination_paths, destination_endpoint=destination_endpoint, progress=progress, wait=wait, label=label) - return destination_path / dataset_id + return destination_paths diff --git a/dkist/net/tests/test_helpers.py b/dkist/net/tests/test_helpers.py index 6223d2d7..2d633425 100644 --- a/dkist/net/tests/test_helpers.py +++ b/dkist/net/tests/test_helpers.py @@ -39,11 +39,11 @@ def test_download_default_keywords(orchestrate_transfer_mock, keywords): ) if keywords["label"] is None: - keywords["label"] = f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} AAAA" + keywords["label"] = f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} - AAAA" orchestrate_transfer_mock.assert_called_once_with( [Path("/data/pm_1_10/AAAA")], recursive=True, - destination_path=Path("/~"), + destination_path=[Path("/~/AAAA")], **keywords ) @@ -79,11 +79,11 @@ def test_transfer_from_dataset_id(mocker, orchestrate_transfer_mock): orchestrate_transfer_mock.assert_called_once_with( [Path("/data/pm_1_10/AAAA")], recursive=True, - destination_path=Path("/~"), + destination_path=[Path("/~/AAAA")], destination_endpoint=None, progress=True, wait=True, - label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} AAAA" + label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} - AAAA" ) get_inv_mock.assert_called_once_with("AAAA") @@ -113,32 +113,50 @@ def test_transfer_from_multiple_dataset_id(mocker, orchestrate_transfer_mock): transfer_complete_datasets(["AAAA", "BBBB"]) - orchestrate_transfer_mock.assert_has_calls( - [ - mocker.call( - [Path("/data/pm_1_10/AAAA")], - recursive=True, - destination_path=Path("/~"), - destination_endpoint=None, - progress=True, - wait=True, - label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} AAAA", - ), - mocker.call( - [Path("/data/pm_1_10/BBBB")], - recursive=True, - destination_path=Path("/~"), - destination_endpoint=None, - progress=True, - wait=True, - label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} BBBB", - ), - ] + orchestrate_transfer_mock.assert_called_once_with( + [Path("/data/pm_1_10/AAAA"), Path("/data/pm_1_10/BBBB")], + recursive=True, + destination_path=[Path("/~/AAAA"), Path("/~/BBBB")], + destination_endpoint=None, + progress=True, + wait=True, + label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} - AAAA,BBBB", ) get_inv_mock.assert_called_once_with(["AAAA", "BBBB"]) +def test_transfer_from_many_dataset_id(mocker, orchestrate_transfer_mock): + """Check that the short label is used when downloading many datasets""" + + many_ds = [a*4 for a in "ABCDEFGHIJKLMNOPQ"] + get_inv_mock = mocker.patch( + "dkist.net.helpers._get_dataset_inventory", + autospec=True, + return_value=DKISTQueryResponseTable([ + { + "Dataset ID": _id, + "Primary Proposal ID": "pm_1_10", + "Storage Bucket": "data", + "Wavelength Max": 856, + "Wavelength Min": 854, + } for _id in many_ds + ]), + ) + + transfer_complete_datasets(many_ds) + + orchestrate_transfer_mock.assert_called_once_with( + mocker.ANY, + recursive=mocker.ANY, + destination_path=mocker.ANY, + destination_endpoint=mocker.ANY, + progress=mocker.ANY, + wait=mocker.ANY, + label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} - {len(many_ds)} datasets" + ) + + def test_transfer_from_table(orchestrate_transfer_mock, mocker): res = DKISTQueryResponseTable( { @@ -153,21 +171,11 @@ def test_transfer_from_table(orchestrate_transfer_mock, mocker): transfer_complete_datasets(res, label="fibble") kwargs = {"progress": True, "wait": True, "destination_endpoint": None, "label": "fibble"} - orchestrate_transfer_mock.assert_has_calls( - [ - mocker.call( - [Path("/data/pm_1_10/A")], - recursive=True, - destination_path=Path("/~"), - **kwargs - ), - mocker.call( - [Path("/data/pm_2_20/B")], - recursive=True, - destination_path=Path("/~"), - **kwargs - ), - ] + orchestrate_transfer_mock.assert_called_once_with( + [Path("/data/pm_1_10/A"), Path("/data/pm_2_20/B")], + recursive=True, + destination_path=[Path("/~/A"), Path("/~/B")], + **kwargs ) @@ -190,7 +198,7 @@ def test_transfer_from_length_one_table(orchestrate_transfer_mock, mocker): mocker.call( [Path("/data/pm_1_10/A")], recursive=True, - destination_path=Path("/~"), + destination_path=[Path("/~/A")], **kwargs ), ] @@ -216,7 +224,7 @@ def test_transfer_from_row(orchestrate_transfer_mock, mocker): mocker.call( [Path("/data/pm_1_10/A")], recursive=True, - destination_path=Path("/~"), + destination_path=[Path("/~/A")], **kwargs ), ] @@ -249,21 +257,11 @@ def test_transfer_from_UnifiedResponse(orchestrate_transfer_mock, mocker): transfer_complete_datasets(res, label="fibble") kwargs = {"progress": True, "wait": True, "destination_endpoint": None, "label": "fibble"} - orchestrate_transfer_mock.assert_has_calls( - [ - mocker.call( - [Path("/data/pm_1_10/A")], - recursive=True, - destination_path=Path("/~"), - **kwargs - ), - mocker.call( - [Path("/data/pm_2_20/B")], - recursive=True, - destination_path=Path("/~"), - **kwargs - ), - ] + orchestrate_transfer_mock.assert_called_once_with( + [Path("/data/pm_1_10/A"), Path("/data/pm_2_20/B")], + recursive=True, + destination_path=[Path("/~/A"), Path("/~/B")], + **kwargs ) @@ -288,11 +286,30 @@ def test_transfer_path_interpolation(orchestrate_transfer_mock, mocker): orchestrate_transfer_mock.assert_called_once_with( [Path("/data/pm_1_10/AAAA")], recursive=True, - destination_path=Path("HIT/AAAA"), + destination_path=[Path("HIT/AAAA")], destination_endpoint=None, progress=True, wait=True, - label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} AAAA" + label=f"DKIST Python Tools - {datetime.datetime.now().strftime('%Y-%m-%dT%H-%M')} - AAAA" ) get_inv_mock.assert_called_once_with("AAAA") + + +def test_transfer_dataset_wrong_type(mocker, orchestrate_transfer_mock): + """ + Test that transfer fails if `_get_globus_path_for_dataset` is given bad input. + In practice this should never happen in the wild because of error catching elsewhere, + but worth checking in case that changes. + """ + get_inv_mock = mocker.patch("dkist.net.helpers._get_dataset_inventory", + autospec=True, + return_value="This is not a QueryResponseRow") + + with pytest.raises(TypeError, match="Input should be a single row of dataset inventory."): + transfer_complete_datasets("AAAA") + + # Also check that just giving a bad type to transfer_complete_datasets fails + # Again, shouldn't happen but we'll check anyway + with pytest.raises(TypeError, match="is of an unknown type, it should be search results or one or more dataset IDs."): + transfer_complete_datasets([42]) diff --git a/dkist/utils/inventory.py b/dkist/utils/inventory.py index 9f9f77ca..440ce53e 100644 --- a/dkist/utils/inventory.py +++ b/dkist/utils/inventory.py @@ -85,7 +85,7 @@ def _key_clean(key): return key.lower() -def path_format_keys(keymap): +def path_format_keys(keymap=INVENTORY_KEY_MAP): """ Return a list of all valid keys for path formatting. """