Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Globus transfer optimization #214

Merged
merged 4 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from six.moves.urllib.parse import urlparse

from .globus import globus_activate, globus_finalize
from .hpss import hpss_put
from .hpss_utils import add_files
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
Expand Down Expand Up @@ -53,7 +54,9 @@ def create():

if hpss != "none":
url = urlparse(hpss)
if url.scheme != "globus":
if url.scheme == "globus":
globus_activate(hpss)
else:
# config.hpss is not "none", so we need to
# create target HPSS directory
logger.debug("Creating target HPSS directory")
Expand Down Expand Up @@ -85,9 +88,9 @@ def create():
failures: List[str] = create_database(cache, args)

# Transfer to HPSS. Always keep a local copy.
hpss_put(
hpss, get_db_filename(cache), cache, keep=True, non_blocking=args.non_blocking
)
hpss_put(hpss, get_db_filename(cache), cache, keep=True)

globus_finalize(non_blocking=args.non_blocking)

if len(failures) > 0:
# List the failures
Expand Down Expand Up @@ -240,7 +243,6 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
cache,
args.keep,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
193 changes: 151 additions & 42 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from fair_research_login.client import NativeClient
from globus_sdk import TransferAPIError, TransferClient, TransferData
from globus_sdk.services.transfer.response.iterable import IterableTransferResponse
from six.moves.urllib.parse import urlparse

from .settings import logger

Expand All @@ -23,18 +25,31 @@
r"cori.*\.nersc\.gov": "9d6d99eb-6d04-11e5-ba46-22000b92c6ec",
}

remote_endpoint = None
local_endpoint = None
transfer_client: TransferClient = None
transfer_data: TransferData = None
task_id = None
archive_directory_listing: IterableTransferResponse = None

def globus_transfer( # noqa: C901
remote_endpoint, remote_path, name, transfer_type, non_blocking=False
):

def globus_activate(hpss: str):
"""
Read the local globus endpoint UUID from ~/.zstash.ini.
If the ini file does not exist, create an ini file with empty values,
and try to find the local endpoint UUID based on the FQDN
"""
global transfer_client
global local_endpoint
global remote_endpoint

url = urlparse(hpss)
if url.scheme != "globus":
return
remote_endpoint = url.netloc

ini_path = os.path.expanduser("~/.zstash.ini")
ini = configparser.ConfigParser()
local_endpoint = None
if ini.read(ini_path):
if "local" in ini.sections():
local_endpoint = ini["local"].get("globus_endpoint_uuid")
Expand All @@ -59,33 +74,17 @@ def globus_transfer( # noqa: C901
if remote_endpoint.upper() in hpss_endpoint_map.keys():
remote_endpoint = hpss_endpoint_map.get(remote_endpoint.upper())

if transfer_type == "get":
src_ep = remote_endpoint
src_path = os.path.join(remote_path, name)
dst_ep = local_endpoint
dst_path = os.path.join(os.getcwd(), name)
else:
src_ep = local_endpoint
src_path = os.path.join(os.getcwd(), name)
dst_ep = remote_endpoint
dst_path = os.path.join(remote_path, name)

subdir = os.path.basename(os.path.normpath(remote_path))
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
filename = name.split(".")[0]
label = subdir_label + " " + filename

native_client = NativeClient(
client_id="6c1629cf-446c-49e7-af95-323c6412397f",
app_name="Zstash",
default_scopes="openid urn:globus:auth:scope:transfer.api.globus.org:all",
)
native_client.login(no_local_server=True, refresh_tokens=True)
transfer_authorizer = native_client.get_authorizers().get("transfer.api.globus.org")
tc = TransferClient(authorizer=transfer_authorizer)
transfer_client = TransferClient(authorizer=transfer_authorizer)

for ep_id in [src_ep, dst_ep]:
r = tc.endpoint_autoactivate(ep_id, if_expires_in=600)
for ep_id in [local_endpoint, remote_endpoint]:
r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600)
if r.get("code") == "AutoActivationFailed":
logger.error(
"The {} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{} and (re)activate the endpoint.".format(
Expand All @@ -94,19 +93,91 @@ def globus_transfer( # noqa: C901
)
sys.exit(1)

td = TransferData(
tc,
src_ep,
dst_ep,
label=label,
sync_level="checksum",
verify_checksum=True,
preserve_timestamp=True,
fail_on_quota_errors=True,
)
td.add_item(src_path, dst_path)

def file_exists(name: str) -> bool:
global archive_directory_listing

for entry in archive_directory_listing:
if entry.get("name") == name:
return True
return False


def globus_transfer(
remote_ep: str, remote_path: str, name: str, transfer_type: str
): # noqa: C901
global transfer_client
global local_endpoint
global remote_endpoint
global transfer_data
global task_id
global archive_directory_listing

if not transfer_client:
globus_activate("globus://" + remote_ep)
if not transfer_client:
sys.exit(1)

if transfer_type == "get":
if not archive_directory_listing:
archive_directory_listing = transfer_client.operation_ls(
remote_endpoint, remote_path
)
if not file_exists(name):
logger.error(
"Remote file globus://{}{}/{} does not exist".format(
remote_ep, remote_path, name
)
)
sys.exit(1)

if transfer_type == "get":
src_ep = remote_endpoint
src_path = os.path.join(remote_path, name)
dst_ep = local_endpoint
dst_path = os.path.join(os.getcwd(), name)
else:
src_ep = local_endpoint
src_path = os.path.join(os.getcwd(), name)
dst_ep = remote_endpoint
dst_path = os.path.join(remote_path, name)

subdir = os.path.basename(os.path.normpath(remote_path))
subdir_label = re.sub("[^A-Za-z0-9_ -]", "", subdir)
filename = name.split(".")[0]
label = subdir_label + " " + filename

if not transfer_data:
transfer_data = TransferData(
transfer_client,
src_ep,
dst_ep,
label=label,
verify_checksum=True,
preserve_timestamp=True,
fail_on_quota_errors=True,
)
transfer_data.add_item(src_path, dst_path)
transfer_data["label"] = subdir_label + " " + filename
try:
task = tc.submit_transfer(td)
if task_id:
task = transfer_client.get_task(task_id)
if task["status"] == "ACTIVE":
return
elif task["status"] == "SUCCEEDED":
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
logger.info(
"Globus transfer {}, from {} to {}: {} succeeded".format(
task_id, src_ep, dst_ep, label
)
)
else:
logger.error("Transfer FAILED")
task = transfer_client.submit_transfer(transfer_data)
task_id = task.get("task_id")
transfer_data = None
except TransferAPIError as e:
if e.code == "NoCredException":
logger.error(
Expand All @@ -121,29 +192,35 @@ def globus_transfer( # noqa: C901
logger.error("Exception: {}".format(e))
sys.exit(1)

if non_blocking:
return
if transfer_type == "get" and task_id:
globus_wait(task_id)


def globus_wait(task_id: str):
global transfer_client

try:
task_id = task.get("task_id")
"""
A Globus transfer job (task) can be in one of the three states:
ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a
status of the transfer job (task) from the Globus Transfer service,
with 20 second timeout limit. If the task is ACTIVE after time runs
out 'task_wait' returns False, and True otherwise.
"""
while not tc.task_wait(task_id, timeout=20, polling_interval=20):
while not transfer_client.task_wait(task_id, timeout=20, polling_interval=20):
pass
"""
The Globus transfer job (task) has been finished (SUCCEEDED or FAILED).
Check if the transfer SUCCEEDED or FAILED.
"""
task = tc.get_task(task_id)
task = transfer_client.get_task(task_id)
if task["status"] == "SUCCEEDED":
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
logger.info(
"Globus transfer {}, from {}{} to {}{} succeeded".format(
task_id, src_ep, src_path, dst_ep, dst_path
"Globus transfer {}, from {} to {}: {} succeeded".format(
task_id, src_ep, dst_ep, label
)
)
else:
Expand All @@ -161,3 +238,35 @@ def globus_transfer( # noqa: C901
except Exception as e:
logger.error("Exception: {}".format(e))
sys.exit(1)


def globus_finalize(non_blocking: bool = False):
global transfer_client
global transfer_data
global task_id

last_task_id = None

if transfer_data:
try:
last_task = transfer_client.submit_transfer(transfer_data)
last_task_id = last_task.get("task_id")
except TransferAPIError as e:
if e.code == "NoCredException":
logger.error(
"{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format(
e.message
)
)
else:
logger.error(e)
sys.exit(1)
except Exception as e:
logger.error("Exception: {}".format(e))
sys.exit(1)

if not non_blocking:
if task_id:
globus_wait(task_id)
if last_task_id:
globus_wait(last_task_id)
13 changes: 4 additions & 9 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def hpss_transfer(
transfer_type: str,
cache: str,
keep: bool = False,
non_blocking: bool = False,
):
if hpss == "none":
logger.info("{}: HPSS is unavailable".format(transfer_type))
Expand Down Expand Up @@ -88,9 +87,7 @@ def hpss_transfer(

if scheme == "globus":
# Transfer file using the Globus Transfer Service
globus_transfer(
endpoint, url_path, name, transfer_type, non_blocking=non_blocking
)
globus_transfer(endpoint, url_path, name, transfer_type)
else:
# Transfer file using `hsi`
command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name)
Expand All @@ -102,18 +99,16 @@ def hpss_transfer(
os.chdir(cwd)

if transfer_type == "put":
if not keep:
if not keep and scheme != "globus":
# We should not keep the local file, so delete it now that it is on HPSS
os.remove(file_path)


def hpss_put(
hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False
):
def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True):
"""
Put a file to the HPSS archive.
"""
hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking)
hpss_transfer(hpss, file_path, "put", cache, keep)


def hpss_get(hpss: str, file_path: str, cache: str):
Expand Down
3 changes: 1 addition & 2 deletions zstash/hpss_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def add_files(
cache: str,
keep: bool,
skip_tars_md5: bool = False,
non_blocking: bool = False,
) -> List[str]:

# Now, perform the actual archiving
Expand Down Expand Up @@ -155,7 +154,7 @@ def add_files(
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking)
hpss_put(hpss, os.path.join(cache, tfname), cache, keep)

# Update database with files that have been archived
# Add a row to the "files" table,
Expand Down
9 changes: 9 additions & 0 deletions zstash/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from datetime import datetime
from typing import List, Optional, Tuple

from .globus import globus_activate, globus_finalize
from .hpss import hpss_get, hpss_put
from .hpss_utils import add_files
from .settings import (
Expand Down Expand Up @@ -44,6 +45,8 @@ def update():
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, get_db_filename(cache), cache, keep=True)

globus_finalize(non_blocking=args.non_blocking)

# List failures
if len(failures) > 0:
logger.warning("Some files could not be archived")
Expand Down Expand Up @@ -87,6 +90,11 @@ def setup_update() -> Tuple[argparse.Namespace, str]:
type=str,
help='path to the zstash archive on the local file system. The default name is "zstash".',
)
optional.add_argument(
"--non-blocking",
action="store_true",
help="do not wait for each Globus transfer until it completes.",
)
optional.add_argument(
"-v", "--verbose", action="store_true", help="increase output verbosity"
)
Expand Down Expand Up @@ -116,6 +124,7 @@ def update_database(args: argparse.Namespace, cache: str) -> Optional[List[str]]
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
globus_activate(hpss)
hpss_get(hpss, get_db_filename(cache), cache)
else:
error_str: str = (
Expand Down