Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hsi get retry #218

Merged
merged 1 commit into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ where
* ``--keep`` to keep a copy of the tar files on the local file system after
they have been extracted from the archive. Normally, they are deleted after
successful transfer.
* ``--retries`` to set the number of times to retry ``hsi get`` if it is unsuccessful.
The default is 1 retry (2 tries total). Note: for a retry to occur automatically because of
an incomplete tar file, then the archive you're extracting from
must have been created using ``zstash >= v1.1.0``.
* ``--tars`` to specify specific tars to extract. See "Check" above for example usage.
* ``-v`` increases output verbosity.
* ``[files]`` is a list of files to be extracted (standard wildcards supported).
Expand Down
2 changes: 2 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Run the test suite with `python -m unittest tests/test_*.py`

To run an individual test, run something like `python -m unittest tests.test_extract.TestExtract.testExtractRetries`

If running on Cori, it is preferable to run from $CSCRATCH rather than
/global/homes. Running from the latter may result in a
'Resource temporarily unavailable' error.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def testCheckKeepTars(self):
)
self.assertEqualOrStop(
output + err,
'INFO: Opening tar archive {}/000000.tar\nINFO: Checking file1.txt\nINFO: Checking file2.txt\nINFO: No failures detected when checking the files. If you have a log file, run "grep -i Exception <log-file>" to double check.\n'.format(
'INFO: zstash/000000.tar exists. Checking expected size matches actual size.\nINFO: Opening tar archive {}/000000.tar\nINFO: Checking file1.txt\nINFO: Checking file2.txt\nINFO: No failures detected when checking the files. If you have a log file, run "grep -i Exception <log-file>" to double check.\n'.format(
self.cache
),
)
Expand Down
58 changes: 51 additions & 7 deletions tests/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ class TestExtract(TestZstash):

# `zstash extract` is tested in TestExtract and TestExtractParallel.
# x = on, no mark = off, b = both on and off tested
# option | ExtractVerbose | Extract | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|
# --workers | | | | | |x|x|
# --cache | | |x| | | | |
# --keep | |x| | | | | |
# --tars | | | |x| | |x|
# -v |x| | | | |b| |
# option | ExtractVerbose | ExtractRetries | ExtractKeep | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|x|
# --workers | | | | | | |x|x|
# --cache | | | |x| | | | |
# --keep | | |x| | | | | |
# --retries | |x| | | | | | |
# --tars | | | | |x| | |x|
# -v |x| | | | | |b| |

def helperExtractVerbose(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Expand Down Expand Up @@ -121,6 +122,42 @@ def helperExtractVerbose(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
expected_absent = ["ERROR", "Not extracting"]
self.check_strings(cmd, output + err, expected_present, expected_absent)

def helperExtractRetries(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Test `zstash extract --retries`.
"""
self.hpss_path = hpss_path
use_hpss = self.setupDirs(test_name)
self.create(use_hpss, zstash_path)
self.assertWorkspace()
os.rename(self.test_dir, self.backup_dir)
os.mkdir(self.test_dir)
os.chdir(self.test_dir)
if not use_hpss:
shutil.copytree(
"{}/{}/{}".format(TOP_LEVEL, self.backup_dir, self.cache), self.copy_dir
)
cmd = "{}zstash extract --hpss={} --retries=3".format(
zstash_path, self.hpss_path
)
output, err = run_cmd(cmd)
os.chdir(TOP_LEVEL)
expected_present = [
"Opening tar archive zstash/000000.tar",
"Extracting file0.txt",
"Extracting file0_hard.txt",
"Extracting file0_soft.txt",
"Extracting file0_soft_bad.txt",
"Extracting file_empty.txt",
"Extracting dir/file1.txt",
"Extracting empty_dir",
"No failures detected when extracting the files",
]
if use_hpss:
expected_present.append("Transferring file from HPSS")
expected_absent = ["ERROR", "Not extracting"]
self.check_strings(cmd, output + err, expected_present, expected_absent)

def helperExtractKeep(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Test `zstash extract` with `--keep`.
Expand Down Expand Up @@ -251,6 +288,13 @@ def testExtractVerboseHPSS(self):
self.conditional_hpss_skip()
self.helperExtractVerbose("testExtractVerboseHPSS", HPSS_ARCHIVE)

def testExtractRetries(self):
self.helperExtractRetries("testExtractRetries", "none")

def testExtractRetriesHPSS(self):
self.conditional_hpss_skip()
self.helperExtractRetries("testExtractRetriesHPSS", HPSS_ARCHIVE)

def testExtractKeep(self):
self.helperExtractKeep("testExtractKeep", "none")

Expand Down
15 changes: 8 additions & 7 deletions tests/test_extract_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ class TestExtractParallel(TestZstash):

# `zstash extract` is tested in TestExtract and TestExtractParallel.
# x = on, no mark = off, b = both on and off tested
# option | ExtractVerbose | Extract | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|
# --workers | | | | | |x|x|
# --cache | | |x| | | | |
# --keep | |x| | | | | |
# --tars | | | |x| | |x|
# -v |x| | | | |b| |
# option | ExtractVerbose | ExtractRetries | ExtractKeep | ExtractCache | ExtractTars | ExtractFile | ExtractParallel | ExtractParallelTars |
# --hpss |x|x|x|x|x|x|x|x|
# --workers | | | | | | |x|x|
# --cache | | | |x| | | | |
# --keep | | |x| | | | | |
# --retries | |x| | | | | | |
# --tars | | | | |x| | |x|
# -v |x| | | | | |b| |

def helperExtractParallel(self, test_name, hpss_path, zstash_path=ZSTASH_PATH):
"""
Expand Down
80 changes: 70 additions & 10 deletions zstash/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
get_db_filename,
logger,
)
from .utils import update_config
from .utils import tars_table_exists, update_config


def extract(keep_files: bool = True):
Expand Down Expand Up @@ -94,6 +94,9 @@ def setup_extract() -> Tuple[argparse.Namespace, str]:
type=str,
help='path to the zstash archive on the local file system. The default name is "zstash".',
)
optional.add_argument(
"--retries", type=int, default=1, help="number of times to retry an hsi command"
)
optional.add_argument("--tars", type=str, help="specify which tars to process")
optional.add_argument(
"-v", "--verbose", action="store_true", help="increase output verbosity"
Expand Down Expand Up @@ -270,9 +273,11 @@ def extract_database(
failures: List[FilesRow]
if args.workers > 1:
logger.debug("Running zstash {} with multiprocessing".format(cmd))
failures = multiprocess_extract(args.workers, matches, keep_files, keep, cache)
failures = multiprocess_extract(
args.workers, matches, keep_files, keep, cache, cur, args
)
else:
failures = extractFiles(matches, keep_files, keep, cache)
failures = extractFiles(matches, keep_files, keep, cache, cur, args)

# Close database
logger.debug("Closing index database")
Expand All @@ -287,6 +292,8 @@ def multiprocess_extract(
keep_files: bool,
keep_tars: Optional[bool],
cache: str,
cur: sqlite3.Cursor,
args: argparse.Namespace,
) -> List[FilesRow]:
"""
Extract the files from the matches in parallel.
Expand Down Expand Up @@ -359,7 +366,7 @@ def multiprocess_extract(
)
process: multiprocessing.Process = multiprocessing.Process(
target=extractFiles,
args=(matches, keep_files, keep_tars, cache, worker),
args=(matches, keep_files, keep_tars, cache, cur, args, worker),
daemon=True,
)
process.start()
Expand All @@ -379,12 +386,36 @@ def multiprocess_extract(
return failures


def check_sizes_match(cur, tfname):
match: bool
if cur and tars_table_exists(cur):
logger.info(f"{tfname} exists. Checking expected size matches actual size.")
actual_size = os.path.getsize(tfname)
name_only = os.path.split(tfname)[1]
cur.execute(f"select size from tars where name is '{name_only}';")
expected_size: int = cur.fetchall()[0][0]
if expected_size != actual_size:
logger.info(
f"{name_only}: expected size={expected_size} != {actual_size}=actual_size"
)
match = False
else:
# Sizes match
match = True
else:
# Cannot access size information; assume the sizes match.
match = True
return match


# FIXME: C901 'extractFiles' is too complex (33)
def extractFiles( # noqa: C901
files: List[FilesRow],
keep_files: bool,
keep_tars: Optional[bool],
cache: str,
cur: sqlite3.Cursor,
args: argparse.Namespace,
multiprocess_worker: Optional[parallel.ExtractWorker] = None,
) -> List[FilesRow]:
"""
Expand Down Expand Up @@ -432,13 +463,42 @@ def extractFiles( # noqa: C901
if multiprocess_worker:
multiprocess_worker.set_curr_tar(files_row.tar)

if not os.path.exists(tfname):
# Will need to retrieve from HPSS
if config.hpss is not None:
hpss: str = config.hpss
if config.hpss is not None:
hpss: str = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
tries: int = args.retries + 1
# Set to True to test the `--retries` option with a forced failure.
forsyth2 marked this conversation as resolved.
Show resolved Hide resolved
# Then run `python -m unittest tests.test_extract.TestExtract.testExtractRetries`
test_retry: bool = False
while tries > 0:
tries -= 1
do_retrieve: bool

if not os.path.exists(tfname):
do_retrieve = True
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))
hpss_get(hpss, tfname, cache)
do_retrieve = not check_sizes_match(cur, tfname)

try:
if test_retry:
test_retry = False
raise RuntimeError
if do_retrieve:
hpss_get(hpss, tfname, cache)
if not check_sizes_match(cur, tfname):
raise RuntimeError(
f"{tfname} size does not match expected size."
)
# `hpss_get` successful or not needed: no more tries needed
break
except RuntimeError as e:
if tries > 0:
logger.info(f"Retrying HPSS get: {tries} tries remaining.")
# Run the try-except block again
continue
else:
raise e

logger.info("Opening tar archive %s" % (tfname))
tar: tarfile.TarFile = tarfile.open(tfname, "r")
Expand Down
2 changes: 1 addition & 1 deletion zstash/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run_command(command: str, error_str: str):
logger.error(error_str)
logger.debug("stdout:\n{!r}".format(stdout))
logger.debug("stderr:\n{!r}".format(stderr))
raise Exception(error_str)
raise RuntimeError(error_str)


def get_files_to_archive(cache: str, exclude: str) -> List[str]:
Expand Down