Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non block testing #355

Merged
merged 13 commits into from
Jan 8, 2025
2 changes: 1 addition & 1 deletion tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def add_files(self, use_hpss, zstash_path, keep=False, cache=None):
expected_present = ["Transferring file to HPSS"]
else:
expected_present = ["put: HPSS is unavailable"]
expected_present += ["INFO: Creating new tar archive"]
expected_present += ["Creating new tar archive"]
# Make sure none of the old files or directories are moved.
expected_absent = ["ERROR", "file0", "file_empty", "empty_dir"]
self.check_strings(cmd, output + err, expected_present, expected_absent)
Expand Down
18 changes: 13 additions & 5 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
get_files_to_archive,
run_command,
tars_table_exists,
ts_utc,
)


Expand All @@ -37,7 +38,7 @@ def create():
raise TypeError("Invalid config.hpss={}".format(config.hpss))

# Start doing actual work
logger.debug("Running zstash create")
logger.debug(f"{ts_utc()}: Running zstash create")
logger.debug("Local path : {}".format(path))
logger.debug("HPSS path : {}".format(hpss))
logger.debug("Max size : {}".format(config.maxsize))
Expand All @@ -54,11 +55,13 @@ def create():
if hpss != "none":
url = urlparse(hpss)
if url.scheme == "globus":
# identify globus endpoints
logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)")
globus_activate(hpss)
else:
# config.hpss is not "none", so we need to
# create target HPSS directory
logger.debug("Creating target HPSS directory")
logger.debug(f"{ts_utc()}: Creating target HPSS directory {hpss}")
mkdir_command: str = "hsi -q mkdir -p {}".format(hpss)
mkdir_error_str: str = "Could not create HPSS directory: {}".format(hpss)
run_command(mkdir_command, mkdir_error_str)
Expand All @@ -71,7 +74,7 @@ def create():
run_command(ls_command, ls_error_str)

# Create cache directory
logger.debug("Creating local cache directory")
logger.debug(f"{ts_utc()}: Creating local cache directory")
os.chdir(path)
try:
os.makedirs(cache)
Expand All @@ -84,11 +87,14 @@ def create():
# TODO: Verify that cache is empty

# Create and set up the database
logger.debug(f"{ts_utc()}: Calling create_database()")
failures: List[str] = create_database(cache, args)

# Transfer to HPSS. Always keep a local copy.
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
hpss_put(hpss, get_db_filename(cache), cache, keep=True)

logger.debug(f"{ts_utc()}: calling globus_finalize()")
globus_finalize(non_blocking=args.non_blocking)

if len(failures) > 0:
Expand Down Expand Up @@ -145,7 +151,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
optional.add_argument(
"--non-blocking",
action="store_true",
help="do not wait for each Globus transfer until it completes.",
help="do not wait for each Globus transfer to complete before creating additional archive files. This option will use more intermediate disk-space, but can increase throughput.",
)
optional.add_argument(
"-v", "--verbose", action="store_true", help="increase output verbosity"
Expand Down Expand Up @@ -185,7 +191,7 @@ def setup_create() -> Tuple[str, argparse.Namespace]:

def create_database(cache: str, args: argparse.Namespace) -> List[str]:
# Create new database
logger.debug("Creating index database")
logger.debug(f"{ts_utc()}:Creating index database")
if os.path.exists(get_db_filename(cache)):
# Remove old database
os.remove(get_db_filename(cache))
Expand Down Expand Up @@ -254,6 +260,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)
except FileNotFoundError:
raise Exception("Archive creation failed due to broken symlink.")
Expand All @@ -268,6 +275,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
)

# Close database
Expand Down
107 changes: 98 additions & 9 deletions zstash/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from six.moves.urllib.parse import urlparse

from .settings import logger
from .utils import ts_utc

hpss_endpoint_map = {
"ALCF": "de463ec4-6d04-11e5-ba46-22000b92c6ec",
Expand Down Expand Up @@ -157,16 +158,18 @@ def file_exists(name: str) -> bool:
return False


def globus_transfer(
remote_ep: str, remote_path: str, name: str, transfer_type: str
): # noqa: C901
# C901 'globus_transfer' is too complex (20)
def globus_transfer( # noqa: C901
remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool
):
global transfer_client
global local_endpoint
global remote_endpoint
global transfer_data
global task_id
global archive_directory_listing

logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}")
if not transfer_client:
globus_activate("globus://" + remote_ep)
if not transfer_client:
Expand Down Expand Up @@ -216,21 +219,50 @@ def globus_transfer(
try:
if task_id:
task = transfer_client.get_task(task_id)
if task["status"] == "ACTIVE":
return
elif task["status"] == "SUCCEEDED":
prev_task_status = task["status"]
# one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE}
# NOTE: How we behave here depends upon whether we want to support mutliple active transfers.
# Presently, we do not, except inadvertantly (if status == PENDING)
if prev_task_status == "ACTIVE":
logger.info(
f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning."
)
return "ACTIVE"
elif prev_task_status == "SUCCEEDED":
logger.info(
f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED. Continuing."
)
src_ep = task["source_endpoint_id"]
dst_ep = task["destination_endpoint_id"]
label = task["label"]
ts = ts_utc()
logger.info(
"Globus transfer {}, from {} to {}: {} succeeded".format(
task_id, src_ep, dst_ep, label
"{}:Globus transfer {}, from {} to {}: {} succeeded".format(
ts, task_id, src_ep, dst_ep, label
)
)
else:
logger.error("Transfer FAILED")
logger.error(
f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}. Continuing."
)

# DEBUG: review accumulated items in TransferData
logger.info(f"{ts_utc()}: TransferData: accumulated items:")
attribs = transfer_data.__dict__
for item in attribs["data"]["DATA"]:
if item["DATA_TYPE"] == "transfer_item":
print(f" source item: {item['source_path']}")

# SUBMIT new transfer here
logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}")
task = submit_transfer_with_checks(transfer_data)
task_id = task.get("task_id")
# NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer,
# the "lable" given here refers only to the LAST tarfile in the TransferData list.
logger.info(
f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}"
)

transfer_data = None
except TransferAPIError as e:
if e.code == "NoCredException":
Expand All @@ -246,9 +278,66 @@ def globus_transfer(
logger.error("Exception: {}".format(e))
sys.exit(1)

# test for blocking on new task_id
task_status = "UNKNOWN"
if not non_blocking:
task_status = globus_block_wait(
task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5
)
else:
logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}")

if transfer_type == "put":
return task_status

if transfer_type == "get" and task_id:
globus_wait(task_id)

return task_status


def globus_block_wait(
task_id: str, wait_timeout: int, polling_interval: int, max_retries: int
):
global transfer_client

# poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours
logger.info(
f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}"
)
task_status = "UNKNOWN"
retry_count = 0
while retry_count < max_retries:
try:
# Wait for the task to complete
transfer_client.task_wait(
task_id, timeout=wait_timeout, polling_interval=10
)
except Exception as e:
logger.error(f"Unexpected Exception: {e}")
else:
curr_task = transfer_client.get_task(task_id)
task_status = curr_task["status"]
if task_status == "SUCCEEDED":
break
finally:
retry_count += 1
logger.info(
f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds"
)

if retry_count == max_retries:
logger.info(
f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds"
)
task_status = "EXHAUSTED_TIMEOUT_RETRIES"

logger.info(
f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}"
)

return task_status


def globus_wait(task_id: str):
global transfer_client
Expand Down
30 changes: 23 additions & 7 deletions zstash/hpss.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .globus import globus_transfer
from .settings import get_db_filename, logger
from .utils import run_command
from .utils import run_command, ts_utc


def hpss_transfer(
Expand All @@ -17,6 +17,7 @@ def hpss_transfer(
transfer_type: str,
cache: str,
keep: bool = False,
non_blocking: bool = False,
):
if hpss == "none":
logger.info("{}: HPSS is unavailable".format(transfer_type))
Expand Down Expand Up @@ -86,8 +87,19 @@ def hpss_transfer(
os.chdir(path)

if scheme == "globus":
globus_status = "UNKNOWN"
# Transfer file using the Globus Transfer Service
globus_transfer(endpoint, url_path, name, transfer_type)
logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})")
globus_status = globus_transfer(
endpoint, url_path, name, transfer_type, non_blocking
)
logger.info(
f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}"
)
# NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer
# or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but
# we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer
# return a tuple (task_id, status).
else:
# Transfer file using `hsi`
command: str = 'hsi -q "cd {}; {} {}"'.format(hpss, transfer_command, name)
Expand All @@ -99,16 +111,20 @@ def hpss_transfer(
os.chdir(cwd)

if transfer_type == "put":
if not keep and scheme != "globus":
# We should not keep the local file, so delete it now that it is on HPSS
os.remove(file_path)
if not keep:
if (scheme != "globus") or (
globus_status == "SUCCEEDED" and not non_blocking
):
os.remove(file_path)


def hpss_put(hpss: str, file_path: str, cache: str, keep: bool = True):
def hpss_put(
hpss: str, file_path: str, cache: str, keep: bool = True, non_blocking: bool = False
):
"""
Put a file to the HPSS archive.
"""
hpss_transfer(hpss, file_path, "put", cache, keep)
hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking)


def hpss_get(hpss: str, file_path: str, cache: str):
Expand Down
22 changes: 18 additions & 4 deletions zstash/hpss_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from .hpss import hpss_put
from .settings import BLOCK_SIZE, TupleFilesRowNoId, TupleTarsRowNoId, config, logger
from .utils import create_tars_table, tars_table_exists
from .utils import create_tars_table, tars_table_exists, ts_utc


# Minimum output file object
Expand Down Expand Up @@ -63,6 +63,7 @@ def add_files(
keep: bool,
follow_symlinks: bool,
skip_tars_md5: bool = False,
non_blocking: bool = False,
) -> List[str]:

# Now, perform the actual archiving
Expand All @@ -87,7 +88,7 @@ def add_files(
tname = "{0:0{1}x}".format(itar, 6)
# Create the tar file name by adding ".tar"
tfname = "{}.tar".format(tname)
logger.info("Creating new tar archive {}".format(tfname))
logger.info(f"{ts_utc()}: Creating new tar archive {tfname}")
# Open that tar file in the cache
do_hash: bool
if not skip_tars_md5:
Expand Down Expand Up @@ -136,12 +137,13 @@ def add_files(
if i == nfiles - 1 or tarsize + next_file_size > maxsize:

# Close current temporary file
logger.debug("Closing tar archive {}".format(tfname))
logger.debug(f"{ts_utc()}: Closing tar archive {tfname}")
tar.close()

tarsize = tarFileObject.tell()
tar_md5: Optional[str] = tarFileObject.md5()
tarFileObject.close()
logger.info(f"{ts_utc()}: (add_files): Completed archive file {tfname}")
if not skip_tars_md5:
tar_tuple: TupleTarsRowNoId = (tfname, tarsize, tar_md5)
logger.info("tar name={}, tar size={}, tar md5={}".format(*tar_tuple))
Expand All @@ -156,7 +158,19 @@ def add_files(
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_put(hpss, os.path.join(cache, tfname), cache, keep)

# NOTE: These lines could be added under an "if debug" condition
# logger.info(f"{ts_utc()}: CONTENTS of CACHE upon call to hpss_put:")
# process = subprocess.run(["ls", "-l", "zstash"], capture_output=True, text=True)
# print(process.stdout)

logger.info(
f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname}"
)
hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking)
logger.info(
f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}"
)

# Update database with files that have been archived
# Add a row to the "files" table,
Expand Down
5 changes: 5 additions & 0 deletions zstash/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import shlex
import sqlite3
import subprocess
from datetime import datetime, timezone
from fnmatch import fnmatch
from typing import Any, List, Tuple

from .settings import TupleTarsRow, config, logger


def ts_utc():
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")


def filter_files(subset: str, files: List[str], include: bool) -> List[str]:

# Construct list of files to filter, based on
Expand Down
Loading