Skip to content

Commit

Permalink
Globus transfer optimization (#214)
Browse files Browse the repository at this point in the history
* Activate Globus endpoints before creating tarballs

* Transfer multiple tar files in one Globus job

* Check if a file exists before downloading it using Globus

* Fix bugs - Globus transfer related
  • Loading branch information
lukaszlacinski authored Nov 8, 2022
1 parent 23b372f commit a2ea88a
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 58 deletions.
12 changes: 7 additions & 5 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from six.moves.urllib.parse import urlparse

from .globus import globus_activate, globus_finalize
from .hpss import hpss_put
from .hpss_utils import add_files
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
Expand Down Expand Up @@ -53,7 +54,9 @@ def create():

if hpss != "none":
url = urlparse(hpss)
if url.scheme != "globus":
if url.scheme == "globus":
globus_activate(hpss)
else:
# config.hpss is not "none", so we need to
# create target HPSS directory
logger.debug("Creating target HPSS directory")
Expand Down Expand Up @@ -85,9 +88,9 @@ def create():
failures: List[str] = create_database(cache, args)

# Transfer to HPSS. Always keep a local copy.
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)
hpss_put(hpss, get_db_filename(cache), cache, keep=True)

globus_finalize(non_blocking=args.non_blocking)

if len(failures) > 0:
# List the failures
Expand Down Expand Up @@ -240,7 +243,6 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
cache,
args.keep,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
193 changes: 151 additions & 42 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from fair_research_login.client import NativeClient
from globus_sdk import TransferAPIError, TransferClient, TransferData
from globus_sdk.services.transfer.response.iterable import IterableTransferResponse
from six.moves.urllib.parse import urlparse

from .settings import logger

Expand All @@ -23,18 +25,31 @@
r"cori.*\.nersc\.gov": "9d6d99eb-6d04-11e5-ba46-22000b92c6ec",
}

remote_endpoint = None
local_endpoint = None
transfer_client: TransferClient = None
transfer_data: TransferData = None
task_id = None
archive_directory_listing: IterableTransferResponse = None

def globus_transfer( # noqa: C901
remote_endpoint, remote_path, name, transfer_type, non_blocking=False
):

def globus_activate(hpss: str):
"""
Read the local globus endpoint UUID from ~/.zstash.ini.
If the ini file does not exist, create an ini file with empty values,
and try to find the local endpoint UUID based on the FQDN
"""
global transfer_client
global local_endpoint
global remote_endpoint

url = urlparse(hpss)
if url.scheme != "globus":
return
remote_endpoint = url.netloc

ini_path = os.path.expanduser("~/.zstash.ini")
ini = configparser.ConfigParser()
local_endpoint = None
if ini.read(ini_path):
if "local" in ini.sections():
local_endpoint = ini["local"].get("globus_endpoint_uuid")
Expand All @@ -59,33 +74,17 @@ def globus_transfer( # noqa: C901
if remote_endpoint.upper() in hpss_endpoint_map.keys():
remote_endpoint = hpss_endpoint_map.get(remote_endpoint.upper())

if transfer_type == "get":
src_ep = remote_endpoint
src_path = os.path.join(remote_path, name)
dst_ep = local_endpoint
dst_path = os.path.join(os.getcwd(), name)
else:
src_ep = local_endpoint
src_path = os.path.join(os.getcwd(), name)
dst_ep = remote_endpoint
dst_path = os.path.join(remote_path, name)

subdir = os.path.basename(os.path.normpath(remote_path))
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
filename = name.split(".")[0]
label = subdir_label + " " + filename

native_client = NativeClient(
client_id="6c1629cf-446c-49e7-af95-323c6412397f",
app_name="Zstash",
default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all",
)
native_client.login(no_local_server=True, refresh_tokens=True)
transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org")
tc = TransferClient(authorizer=transfer_authorizer)
transfer_client = TransferClient(authorizer=transfer_authorizer)

for ep_id in [src_ep, dst_ep]:
r = tc.endpoint_autoactivate(ep_id, if_expires_in=600)
for ep_id in [local_endpoint, remote_endpoint]:
r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
if r.get("code") == "AutoActivationFailed":
logger.error(
"The {} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{} and (re)activate the endpoint.".format(
Expand All @@ -94,19 +93,91 @@ def globus_transfer( # noqa: C901
)
sys.exit(1)

td = TransferData(
tc,
src_ep,
dst_ep,
label=label,
sync_level="checksum",
verify_checksum=True,
preserve_timestamp=True,
fail_on_quota_errors=True,
)
td.add_item(src_path, dst_path)

def file_exists(name: str) -> bool:
global archive_directory_listing

for entry in archive_directory_listing:
if entry.get("name") == name:
return True
return False


def globus_transfer(
remote_ep: str, remote_path: str, name: str, transfer_type: str
): # noqa: C901
global transfer_client
global local_endpoint
global remote_endpoint
global transfer_data
global task_id
global archive_directory_listing

if not transfer_client:
globus_activate("globus://" + remote_ep)
if not transfer_client:
sys.exit(1)

if transfer_type == "get":
if not archive_directory_listing:
archive_directory_listing = transfer_client.operation_ls(
remote_endpoint, remote_path
)
if not file_exists(name):
logger.error(
"Remote file globus://{}{}/{} does not exist".format(
remote_ep, remote_path, name
)
)
sys.exit(1)

if transfer_type == "get":
src_ep = remote_endpoint
src_path = os.path.join(remote_path, name)
dst_ep = local_endpoint
dst_path = os.path.join(os.getcwd(), name)
else:
src_ep = local_endpoint
src_path = os.path.join(os.getcwd(), name)
dst_ep = remote_endpoint
dst_path = os.path.join(remote_path, name)

subdir = os.path.basename(os.path.normpath(remote_path))
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
filename = name.split(".")[0]
label = subdir_label + " " + filename

if not transfer_data:
transfer_data = TransferData(
transfer_client,
src_ep,
dst_ep,
label=label,
verify_checksum=True,
preserve_timestamp=True,
fail_on_quota_errors=True,
)
transfer_data.add_item(src_path, dst_path)
transfer_data["label"] = subdir_label + " " + filename
try:
task = tc.submit_transfer(td)
if task_id:
task = transfer_client.get_task(task_id)
if task["status"] == "ACTIVE":
return
elif task["status"] == "SUCCEEDED":
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
logger.info(
"Globus transfer {}, from {} to {}: {} succeeded".format(
task_id, src_ep, dst_ep, label
)
)
else:
logger.error("Transfer FAILED")
task = transfer_client.submit_transfer(transfer_data)
task_id = task.get("task_id")
transfer_data = None
except TransferAPIError as e:
if e.code == "NoCredException":
logger.error(
Expand All @@ -121,29 +192,35 @@ def globus_transfer( # noqa: C901
logger.error("Exception: {}".format(e))
sys.exit(1)

if non_blocking:
return
if transfer_type == "get" and task_id:
globus_wait(task_id)


def globus_wait(task_id: str):
global transfer_client

try:
task_id = task.get("task_id")
"""
A Globus transfer job (task) can be in one of the three states:
ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a
status of the transfer job (task) from the Globus Transfer service,
with 20 second timeout limit. If the task is ACTIVE after time runs
out 'task_wait' returns False, and True otherwise.
"""
while not tc.task_wait(task_id, timeout=20, polling_interval=20):
while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20):
pass
"""
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED).
Check if the transfer SUCCEEDED or FAILED.
"""
task = tc.get_task(task_id)
task = transfer_client.get_task(task_id)
if task["status"] == "SUCCEEDED":
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
logger.info(
"Globus transfer {}, from {}{} to {}{} succeeded".format(
task_id, src_ep, src_path, dst_ep, dst_path
"Globus transfer {}, from {} to {}: {} succeeded".format(
task_id, src_ep, dst_ep, label
)
)
else:
Expand All @@ -161,3 +238,35 @@ def globus_transfer( # noqa: C901
except Exception as e:
logger.error("Exception: {}".format(e))
sys.exit(1)


def globus_finalize(non_blocking: bool = False):
global transfer_client
global transfer_data
global task_id

last_task_id = None

if transfer_data:
try:
last_task = transfer_client.submit_transfer(transfer_data)
last_task_id = last_task.get("task_id")
except TransferAPIError as e:
if e.code == "NoCredException":
logger.error(
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
e.message
)
)
else:
logger.error(e)
sys.exit(1)
except Exception as e:
logger.error("Exception: {}".format(e))
sys.exit(1)

if not non_blocking:
if task_id:
globus_wait(task_id)
if last_task_id:
globus_wait(last_task_id)
13 changes: 4 additions & 9 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def hpss_transfer(
transfer_type: str,
cache: str,
keep: bool = False,
non_blocking: bool = False,
):
if hpss == "none":
logger.info("{}: HPSS is unavailable".format(transfer_type))
Expand Down Expand Up @@ -88,9 +87,7 @@ def hpss_transfer(

if scheme == "globus":
# Transfer file using the Globus Transfer Service
globus_transfer(
endpoint, url_path, name, transfer_type, non_blocking=non_blocking
)
globus_transfer(endpoint, url_path, name, transfer_type)
else:
# Transfer file using `hsi`
command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name)
Expand All @@ -102,18 +99,16 @@ def hpss_transfer(
os.chdir(cwd)

if transfer_type == "put":
if not keep:
if not keep and scheme != "globus":
# We should not keep the local file, so delete it now that it is on HPSS
os.remove(file_path)


def hpss_put(
hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False
):
def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True):
"""
Put a file to the HPSS archive.
"""
hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking)
hpss_transfer(hpss, file_path, "put", cache, keep)


def hpss_get(hpss: str, file_path: str, cache: str):
Expand Down
3 changes: 1 addition & 2 deletions zstash/hpss_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def add_files(
cache: str,
keep: bool,
skip_tars_md5: bool = False,
non_blocking: bool = False,
) -> List[str]:

# Now, perform the actual archiving
Expand Down Expand Up @@ -155,7 +154,7 @@ def add_files(
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking)
hpss_put(hpss, os.path.join(cache, tfname), cache, keep)

# Update database with files that have been archived
# Add a row to the "files" table,
Expand Down
9 changes: 9 additions & 0 deletions zstash/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from typing import List, Optional, Tuple

from .globus import globus_activate, globus_finalize
from .hpss import hpss_get, hpss_put
from .hpss_utils import add_files
from .settings import (
Expand Down Expand Up @@ -44,6 +45,8 @@ def update():
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, get_db_filename(cache), cache, keep=True)

globus_finalize(non_blocking=args.non_blocking)

# List failures
if len(failures) > 0:
logger.warning("Some files could not be archived")
Expand Down Expand Up @@ -87,6 +90,11 @@ def setup_update() -> Tuple[argparse.Namespace, str]:
type=str,
help='path to the zstash archive on the local file system. The default name is "zstash".',
)
optional.add_argument(
"--non-blocking",
action="store_true",
help="do not wait for each Globus transfer until it completes.",
)
optional.add_argument(
"-v", "--verbose", action="store_true", help="increase output verbosity"
)
Expand Down Expand Up @@ -116,6 +124,7 @@ def update_database(args: argparse.Namespace, cache: str) -> Optional[List[str]]
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
globus_activate(hpss)
hpss_get(hpss, get_db_filename(cache), cache)
else:
error_str: str = (
Expand Down

0 comments on commit a2ea88a

Please sign in to comment.