From a2ea88a5ed33e771da7fd809d69a290f198205cc Mon Sep 17 00:00:00 2001 From: Lukasz Lacinski Date: Tue, 8 Nov 2022 17:38:35 -0600 Subject: [PATCH] Globus transfer optimization (#214) * Activate Globus endpoints before creating tarballs * Transfer multiple tar files in one Globus job * Check if a file exists before downloading it using Globus * Fix bugs - Globus transfer related --- zstash/create.py | 12 +-- zstash/globus.py | 193 +++++++++++++++++++++++++++++++++---------- zstash/hpss.py | 13 +-- zstash/hpss_utils.py | 3 +- zstash/update.py | 9 ++ 5 files changed, 172 insertions(+), 58 deletions(-) diff --git a/zstash/create.py b/zstash/create.py index de93fbd4..a80c1c16 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -10,6 +10,7 @@ from six.moves.urllib.parse import urlparse +from .globus import globus_activate, globus_finalize from .hpss import hpss_put from .hpss_utils import add_files from .settings import DEFAULT_CACHE, config, get_db_filename, logger @@ -53,7 +54,9 @@ def create(): if hpss != "none": url = urlparse(hpss) - if url.scheme != "globus": + if url.scheme == "globus": + globus_activate(hpss) + else: # config.hpss is not "none", so we need to # create target HPSS directory logger.debug("Creating target HPSS directory") @@ -85,9 +88,9 @@ def create(): failures: List[str] = create_database(cache, args) # Transfer to HPSS. Always keep a local copy. - hpss_put( - hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking - ) + hpss_put(hpss, get_db_filename(cache), cache, keep=True) + + globus_finalize(non_blocking=args.non_blocking) if len(failures) > 0: # List the failures @@ -240,7 +243,6 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: cache, args.keep, skip_tars_md5=args.no_tars_md5, - non_blocking=args.non_blocking, ) # Close database diff --git a/zstash/globus.py b/zstash/globus.py index 07a877f7..c1196081 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -8,6 +8,8 @@ from fair_research_login.client import NativeClient from globus_sdk import TransferAPIError, TransferClient, TransferData +from globus_sdk.services.transfer.response.iterable import IterableTransferResponse +from six.moves.urllib.parse import urlparse from .settings import logger @@ -23,18 +25,31 @@ r"cori.*\.nersc\.gov": "9d6d99eb-6d04-11e5-ba46-22000b92c6ec", } +remote_endpoint = None +local_endpoint = None +transfer_client: TransferClient = None +transfer_data: TransferData = None +task_id = None +archive_directory_listing: IterableTransferResponse = None -def globus_transfer( # noqa: C901 - remote_endpoint, remote_path, name, transfer_type, non_blocking=False -): + +def globus_activate(hpss: str): """ Read the local globus endpoint UUID from ~/.zstash.ini. If the ini file does not exist, create an ini file with empty values, and try to find the local endpoint UUID based on the FQDN """ + global transfer_client + global local_endpoint + global remote_endpoint + + url = urlparse(hpss) + if url.scheme != "globus": + return + remote_endpoint = url.netloc + ini_path = os.path.expanduser("~/.zstash.ini") ini = configparser.ConfigParser() - local_endpoint = None if ini.read(ini_path): if "local" in ini.sections(): local_endpoint = ini["local"].get("globus_endpoint_uuid") @@ -59,22 +74,6 @@ def globus_transfer( # noqa: C901 if remote_endpoint.upper() in hpss_endpoint_map.keys(): remote_endpoint = hpss_endpoint_map.get(remote_endpoint.upper()) - if transfer_type == "get": - src_ep = remote_endpoint - src_path = os.path.join(remote_path, name) - dst_ep = local_endpoint - dst_path = os.path.join(os.getcwd(), name) - else: - src_ep = local_endpoint - src_path = os.path.join(os.getcwd(), name) - dst_ep = remote_endpoint - dst_path = os.path.join(remote_path, name) - - subdir = os.path.basename(os.path.normpath(remote_path)) - subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir) - filename = name.split(".")[0] - label = subdir_label + " " + filename - native_client = NativeClient( client_id="6c1629cf-446c-49e7-af95-323c6412397f", app_name="Zstash", @@ -82,10 +81,10 @@ def globus_transfer( # noqa: C901 ) native_client.login(no_local_server=True, refresh_tokens=True) transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org") - tc = TransferClient(authorizer=transfer_authorizer) + transfer_client = TransferClient(authorizer=transfer_authorizer) - for ep_id in [src_ep, dst_ep]: - r = tc.endpoint_autoactivate(ep_id, if_expires_in=600) + for ep_id in [local_endpoint, remote_endpoint]: + r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600) if r.get("code") == "AutoActivationFailed": logger.error( "The {} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{} and (re)activate the endpoint.".format( @@ -94,19 +93,91 @@ def globus_transfer( # noqa: C901 ) sys.exit(1) - td = TransferData( - tc, - src_ep, - dst_ep, - label=label, - sync_level="checksum", - verify_checksum=True, - preserve_timestamp=True, - fail_on_quota_errors=True, - ) - td.add_item(src_path, dst_path) + +def file_exists(name: str) -> bool: + global archive_directory_listing + + for entry in archive_directory_listing: + if entry.get("name") == name: + return True + return False + + +def globus_transfer( + remote_ep: str, remote_path: str, name: str, transfer_type: str +): # noqa: C901 + global transfer_client + global local_endpoint + global remote_endpoint + global transfer_data + global task_id + global archive_directory_listing + + if not transfer_client: + globus_activate("globus://" + remote_ep) + if not transfer_client: + sys.exit(1) + + if transfer_type == "get": + if not archive_directory_listing: + archive_directory_listing = transfer_client.operation_ls( + remote_endpoint, remote_path + ) + if not file_exists(name): + logger.error( + "Remote file globus://{}{}/{} does not exist".format( + remote_ep, remote_path, name + ) + ) + sys.exit(1) + + if transfer_type == "get": + src_ep = remote_endpoint + src_path = os.path.join(remote_path, name) + dst_ep = local_endpoint + dst_path = os.path.join(os.getcwd(), name) + else: + src_ep = local_endpoint + src_path = os.path.join(os.getcwd(), name) + dst_ep = remote_endpoint + dst_path = os.path.join(remote_path, name) + + subdir = os.path.basename(os.path.normpath(remote_path)) + subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir) + filename = name.split(".")[0] + label = subdir_label + " " + filename + + if not transfer_data: + transfer_data = TransferData( + transfer_client, + src_ep, + dst_ep, + label=label, + verify_checksum=True, + preserve_timestamp=True, + fail_on_quota_errors=True, + ) + transfer_data.add_item(src_path, dst_path) + transfer_data["label"] = subdir_label + " " + filename try: - task = tc.submit_transfer(td) + if task_id: + task = transfer_client.get_task(task_id) + if task["status"] == "ACTIVE": + return + elif task["status"] == "SUCCEEDED": + src_ep = task["source_endpoint_id"] + dst_ep = task["destination_endpoint_id"] + label = task["label"] + logger.info( + "Globus transfer {}, from {} to {}: {} succeeded".format( + task_id, src_ep, dst_ep, label + ) + ) + else: + logger.error("Transfer FAILED") + task = transfer_client.submit_transfer(transfer_data) + task_id = task.get("task_id") + transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": logger.error( @@ -121,11 +192,14 @@ def globus_transfer( # noqa: C901 logger.error("Exception: {}".format(e)) sys.exit(1) - if non_blocking: - return + if transfer_type == "get" and task_id: + globus_wait(task_id) + + +def globus_wait(task_id: str): + global transfer_client try: - task_id = task.get("task_id") """ A Globus transfer job (task) can be in one of the three states: ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a @@ -133,17 +207,20 @@ def globus_transfer( # noqa: C901 with 20 second timeout limit. If the task is ACTIVE after time runs out 'task_wait' returns False, and True otherwise. """ - while not tc.task_wait(task_id, timeout=20, polling_interval=20): + while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20): pass """ The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). Check if the transfer SUCCEEDED or FAILED. """ - task = tc.get_task(task_id) + task = transfer_client.get_task(task_id) if task["status"] == "SUCCEEDED": + src_ep = task["source_endpoint_id"] + dst_ep = task["destination_endpoint_id"] + label = task["label"] logger.info( - "Globus transfer {}, from {}{} to {}{} succeeded".format( - task_id, src_ep, src_path, dst_ep, dst_path + "Globus transfer {}, from {} to {}: {} succeeded".format( + task_id, src_ep, dst_ep, label ) ) else: @@ -161,3 +238,35 @@ def globus_transfer( # noqa: C901 except Exception as e: logger.error("Exception: {}".format(e)) sys.exit(1) + + +def globus_finalize(non_blocking: bool = False): + global transfer_client + global transfer_data + global task_id + + last_task_id = None + + if transfer_data: + try: + last_task = transfer_client.submit_transfer(transfer_data) + last_task_id = last_task.get("task_id") + except TransferAPIError as e: + if e.code == "NoCredException": + logger.error( + "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( + e.message + ) + ) + else: + logger.error(e) + sys.exit(1) + except Exception as e: + logger.error("Exception: {}".format(e)) + sys.exit(1) + + if not non_blocking: + if task_id: + globus_wait(task_id) + if last_task_id: + globus_wait(last_task_id) diff --git a/zstash/hpss.py b/zstash/hpss.py index ed61fa9e..a055fe99 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -17,7 +17,6 @@ def hpss_transfer( transfer_type: str, cache: str, keep: bool = False, - non_blocking: bool = False, ): if hpss == "none": logger.info("{}: HPSS is unavailable".format(transfer_type)) @@ -88,9 +87,7 @@ def hpss_transfer( if scheme == "globus": # Transfer file using the Globus Transfer Service - globus_transfer( - endpoint, url_path, name, transfer_type, non_blocking=non_blocking - ) + globus_transfer(endpoint, url_path, name, transfer_type) else: # Transfer file using `hsi` command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name) @@ -102,18 +99,16 @@ def hpss_transfer( os.chdir(cwd) if transfer_type == "put": - if not keep: + if not keep and scheme != "globus": # We should not keep the local file, so delete it now that it is on HPSS os.remove(file_path) -def hpss_put( - hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False -): +def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking) + hpss_transfer(hpss, file_path, "put", cache, keep) def hpss_get(hpss: str, file_path: str, cache: str): diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 2e1ed419..44ba1008 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -61,7 +61,6 @@ def add_files( cache: str, keep: bool, skip_tars_md5: bool = False, - non_blocking: bool = False, ) -> List[str]: # Now, perform the actual archiving @@ -155,7 +154,7 @@ def add_files( hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) + hpss_put(hpss, os.path.join(cache, tfname), cache, keep) # Update database with files that have been archived # Add a row to the "files" table, diff --git a/zstash/update.py b/zstash/update.py index 35d2b6d8..a85cdac0 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -9,6 +9,7 @@ from datetime import datetime from typing import List, Optional, Tuple +from .globus import globus_activate, globus_finalize from .hpss import hpss_get, hpss_put from .hpss_utils import add_files from .settings import ( @@ -44,6 +45,8 @@ def update(): raise TypeError("Invalid config.hpss={}".format(config.hpss)) hpss_put(hpss, get_db_filename(cache), cache, keep=True) + globus_finalize(non_blocking=args.non_blocking) + # List failures if len(failures) > 0: logger.warning("Some files could not be archived") @@ -87,6 +90,11 @@ def setup_update() -> Tuple[argparse.Namespace, str]: type=str, help='path to the zstash archive on the local file system. The default name is "zstash".', ) + optional.add_argument( + "--non-blocking", + action="store_true", + help="do not wait for each Globus transfer until it completes.", + ) optional.add_argument( "-v", "--verbose", action="store_true", help="increase output verbosity" ) @@ -116,6 +124,7 @@ def update_database(args: argparse.Namespace, cache: str) -> Optional[List[str]] hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) + globus_activate(hpss) hpss_get(hpss, get_db_filename(cache), cache) else: error_str: str = (