Skip to content

Commit

Permalink
hsi get retry
Browse files Browse the repository at this point in the history
  • Loading branch information
forsyth2 committed Oct 12, 2022
1 parent 3046ffd commit 44f4317
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 26 deletions.
4 changes: 4 additions & 0 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ where
* ``--keep`` to keep a copy of the tar files on the local file system after
they have been extracted from the archive. Normally, they are deleted after
successful transfer.
* ``--retries`` to set the number of times to retry ``hsi get`` if it is unsuccessful.
The default is 1 retry (2 tries total). Note: for a retry to occur automatically because of
an incomplete tar file, then the archive you're extracting from
must have been created using ``zstash >= v1.1.0``.
* ``--tars`` to specify specific tars to extract. See "Check" above for example usage.
* ``-v`` increases output verbosity.
* ``[files]`` is a list of files to be extracted (standard wildcards supported).
Expand Down
2 changes: 2 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Run the test suite with `python -m unittest tests/test_*.py`
To run an individual test, run something like `python -m unittest tests.test_extract.TestExtract.testExtractRetries`
If running on Cori, it is preferable to run from $CSCRATCH rather than
/global/homes. Running from the latter may result in a
'Resource temporarily unavailable' error.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def testCheckKeepTars(self):
)
self.assertEqualOrStop(
output + err,
'INFO: Opening tar archive {}/000000.tar\nINFO: Checking file1.txt\nINFO: Checking file2.txt\nINFO: No failures detected when checking the files. If you have a log file, run "grep -i Exception <log-file>" to double check.\n'.format(
'INFO: zstash/000000.tar exists. Checking expected size matches actual size.\nINFO: Opening tar archive {}/000000.tar\nINFO: Checking file1.txt\nINFO: Checking file2.txt\nINFO: No failures detected when checking the files. If you have a log file, run "grep -i Exception <log-file>" to double check.\n'.format(
self.cache
),
)
Expand Down
58 changes: 51 additions & 7 deletions tests/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ class TestExtract(TestZstash):

# `zstash extract` is tested in TestExtract and TestExtractParallel.
# x = on, no mark = off, b = both on and off tested
# option | ExtractVerbose | Extract | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|
# --workers | | | | | |x|x|
# --cache | | |x| | | | |
# --keep | |x| | | | | |
# --tars | | | |x| | |x|
# -v |x| | | | |b| |
# option | ExtractVerbose | ExtractRetries | ExtractKeep | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|x|
# --workers | | | | | | |x|x|
# --cache | | | |x| | | | |
# --keep | | |x| | | | | |
# --retries | |x| | | | | | |
# --tars | | | | |x| | |x|
# -v |x| | | | | |b| |

def helperExtractVerbose(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Expand Down Expand Up @@ -121,6 +122,42 @@ def helperExtractVerbose(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
expected_absent = ["ERROR", "Not extracting"]
self.check_strings(cmd, output + err, expected_present, expected_absent)

def helperExtractRetries(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Test `zstash extract --retries`.
"""
self.hpss_path = hpss_path
use_hpss = self.setupDirs(test_name)
self.create(use_hpss, zstash_path)
self.assertWorkspace()
os.rename(self.test_dir, self.backup_dir)
os.mkdir(self.test_dir)
os.chdir(self.test_dir)
if not use_hpss:
shutil.copytree(
"{}/{}/{}".format(TOP_LEVEL, self.backup_dir, self.cache), self.copy_dir
)
cmd = "{}zstash extract --hpss={} --retries=3".format(
zstash_path, self.hpss_path
)
output, err = run_cmd(cmd)
os.chdir(TOP_LEVEL)
expected_present = [
"Opening tar archive zstash/000000.tar",
"Extracting file0.txt",
"Extracting file0_hard.txt",
"Extracting file0_soft.txt",
"Extracting file0_soft_bad.txt",
"Extracting file_empty.txt",
"Extracting dir/file1.txt",
"Extracting empty_dir",
"No failures detected when extracting the files",
]
if use_hpss:
expected_present.append("Transferring file from HPSS")
expected_absent = ["ERROR", "Not extracting"]
self.check_strings(cmd, output + err, expected_present, expected_absent)

def helperExtractKeep(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Test `zstash extract` with `--keep`.
Expand Down Expand Up @@ -251,6 +288,13 @@ def testExtractVerboseHPSS(self):
self.conditional_hpss_skip()
self.helperExtractVerbose("testExtractVerboseHPSS", HPSS_ARCHIVE)

def testExtractRetries(self):
self.helperExtractRetries("testExtractRetries", "none")

def testExtractRetriesHPSS(self):
self.conditional_hpss_skip()
self.helperExtractRetries("testExtractRetriesHPSS", HPSS_ARCHIVE)

def testExtractKeep(self):
self.helperExtractKeep("testExtractKeep", "none")

Expand Down
15 changes: 8 additions & 7 deletions tests/test_extract_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ class TestExtractParallel(TestZstash):

# `zstash extract` is tested in TestExtract and TestExtractParallel.
# x = on, no mark = off, b = both on and off tested
# option | ExtractVerbose | Extract | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|
# --workers | | | | | |x|x|
# --cache | | |x| | | | |
# --keep | |x| | | | | |
# --tars | | | |x| | |x|
# -v |x| | | | |b| |
# option | ExtractVerbose | ExtractRetries | ExtractKeep | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|x|
# --workers | | | | | | |x|x|
# --cache | | | |x| | | | |
# --keep | | |x| | | | | |
# --retries | |x| | | | | | |
# --tars | | | | |x| | |x|
# -v |x| | | | | |b| |

def helperExtractParallel(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Expand Down
80 changes: 70 additions & 10 deletions zstash/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
get_db_filename,
logger,
)
from .utils import update_config
from .utils import tars_table_exists, update_config


def extract(keep_files: bool = True):
Expand Down Expand Up @@ -94,6 +94,9 @@ def setup_extract() -> Tuple[argparse.Namespace, str]:
type=str,
help='path to the zstash archive on the local file system. The default name is "zstash".',
)
optional.add_argument(
"--retries", type=int, default=1, help="number of times to retry an hsi command"
)
optional.add_argument("--tars", type=str, help="specify which tars to process")
optional.add_argument(
"-v", "--verbose", action="store_true", help="increase output verbosity"
Expand Down Expand Up @@ -270,9 +273,11 @@ def extract_database(
failures: List[FilesRow]
if args.workers > 1:
logger.debug("Running zstash {} with multiprocessing".format(cmd))
failures = multiprocess_extract(args.workers, matches, keep_files, keep, cache)
failures = multiprocess_extract(
args.workers, matches, keep_files, keep, cache, cur, args
)
else:
failures = extractFiles(matches, keep_files, keep, cache)
failures = extractFiles(matches, keep_files, keep, cache, cur, args)

# Close database
logger.debug("Closing index database")
Expand All @@ -287,6 +292,8 @@ def multiprocess_extract(
keep_files: bool,
keep_tars: Optional[bool],
cache: str,
cur: sqlite3.Cursor,
args: argparse.Namespace,
) -> List[FilesRow]:
"""
Extract the files from the matches in parallel.
Expand Down Expand Up @@ -359,7 +366,7 @@ def multiprocess_extract(
)
process: multiprocessing.Process = multiprocessing.Process(
target=extractFiles,
args=(matches, keep_files, keep_tars, cache, worker),
args=(matches, keep_files, keep_tars, cache, cur, args, worker),
daemon=True,
)
process.start()
Expand All @@ -379,12 +386,36 @@ def multiprocess_extract(
return failures


def check_sizes_match(cur, tfname):
match: bool
if cur and tars_table_exists(cur):
logger.info(f"{tfname} exists. Checking expected size matches actual size.")
actual_size = os.path.getsize(tfname)
name_only = os.path.split(tfname)[1]
cur.execute(f"select size from tars where name is '{name_only}';")
expected_size: int = cur.fetchall()[0][0]
if expected_size != actual_size:
logger.info(
f"{name_only}: expected size={expected_size} != {actual_size}=actual_size"
)
match = False
else:
# Sizes match
match = True
else:
# Cannot access size information; assume the sizes match.
match = True
return match


# FIXME: C901 'extractFiles' is too complex (33)
def extractFiles( # noqa: C901
files: List[FilesRow],
keep_files: bool,
keep_tars: Optional[bool],
cache: str,
cur: sqlite3.Cursor,
args: argparse.Namespace,
multiprocess_worker: Optional[parallel.ExtractWorker] = None,
) -> List[FilesRow]:
"""
Expand Down Expand Up @@ -432,13 +463,42 @@ def extractFiles( # noqa: C901
if multiprocess_worker:
multiprocess_worker.set_curr_tar(files_row.tar)

if not os.path.exists(tfname):
# Will need to retrieve from HPSS
if config.hpss is not None:
hpss: str = config.hpss
if config.hpss is not None:
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
tries: int = args.retries + 1
# Set to True to test the `--retries` option with a forced failure.
# Then run `python -m unittest tests.test_extract.TestExtract.testExtractRetries`
test_retry: bool = False
while tries > 0:
tries -= 1
do_retrieve: bool

if not os.path.exists(tfname):
do_retrieve = True
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_get(hpss, tfname, cache)
do_retrieve = not check_sizes_match(cur, tfname)

try:
if test_retry:
test_retry = False
raise RuntimeError
if do_retrieve:
hpss_get(hpss, tfname, cache)
if not check_sizes_match(cur, tfname):
raise RuntimeError(
f"{tfname} size does not match expected size."
)
# `hpss_get` successful or not needed: no more tries needed
break
except RuntimeError as e:
if tries > 0:
logger.info(f"Retrying HPSS get: {tries} tries remaining.")
# Run the try-except block again
continue
else:
raise e

logger.info("Opening tar archive %s" % (tfname))
tar: tarfile.TarFile = tarfile.open(tfname, "r")
Expand Down
2 changes: 1 addition & 1 deletion zstash/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run_command(command: str, error_str: str):
logger.error(error_str)
logger.debug("stdout:\n{!r}".format(stdout))
logger.debug("stderr:\n{!r}".format(stderr))
raise Exception(error_str)
raise RuntimeError(error_str)


def get_files_to_archive(cache: str, exclude: str) -> List[str]:
Expand Down

0 comments on commit 44f4317

Please sign in to comment.