Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ENOSPC more cleanly #3513

Merged
merged 6 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,17 @@ def _intake(
ofp.write(chunk)
hash_md5.update(chunk)
except OSError as exc:
# NOTE: Werkzeug doesn't support status 509, so the abort call
# in _dispatch will fail. Rather than figure out how to fix
# that, just report as an internal error.
if exc.errno == errno.ENOSPC:
msg = f"Out of space on {tar_full_path.root}"
else:
msg = f"Unexpected error {exc.errno} encountered during file upload"
raise APIInternalError(msg) from exc
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {tar_full_path.parent}",
)
raise APIInternalError(
f"Unexpected error encountered during file upload: {str(exc)!r} "
) from exc
except Exception as e:
raise APIInternalError(
"Unexpected error encountered during file upload"
"Unexpected error encountered during file upload: {str(e)!r}"
) from e

if bytes_received != stream.length:
Expand All @@ -381,12 +381,21 @@ def _intake(
)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
recovery.add(lambda: md5_full_path.unlink(missing_ok=True))
try:
md5_full_path.write_text(f"{intake.md5} {filename}\n")
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {md5_full_path.parent}",
)
webbnh marked this conversation as resolved.
Show resolved Hide resolved
raise APIInternalError(
f"Unexpected error encountered during MD5 creation: {str(exc)!r}"
) from exc
except Exception as e:
raise APIInternalError(
f"Failed to write .md5 file '{md5_full_path}'",
f"Failed to write .md5 file '{md5_full_path}': {str(e)!r}",
) from e

# Create a cache manager object
Expand All @@ -408,6 +417,15 @@ def _intake(
HTTPStatus.BAD_REQUEST,
f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}",
) from exc
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise APIAbort(
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
f"Out of space on {tar_full_path.parent}",
)
raise APIInternalError(
f"Unexpected error encountered during archive: {str(exc)!r}"
) from exc
except Exception as exc:
raise APIInternalError(
f"Unable to create dataset in file system for {tar_full_path}: {exc}"
Expand Down
35 changes: 21 additions & 14 deletions lib/pbench/server/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,35 +378,40 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
# standard .tar.xz
md5_source = tarball.with_suffix(".xz.md5")

destination = controller.path / tarball.name
md5_destination = controller.path / md5_source.name

# If either expected destination file exists, something is wrong
if (controller.path / tarball.name).exists():
raise DuplicateTarball(name)
if (controller.path / md5_source.name).exists():
if destination.exists() or md5_destination.exists():
raise DuplicateTarball(name)

# Copy the MD5 file first; only if that succeeds, copy the tarball
# itself.
# Move the MD5 file first; only if that succeeds, move the tarball
# itself. Note that we expect the source to be on the same
# filesystem as the ARCHIVE tree, and we want to avoid using double
# the space by copying large tarballs if the file can be moved.
try:
md5_destination = Path(shutil.copy2(md5_source, controller.path))
shutil.move(md5_source, md5_destination)
except Exception as e:
md5_destination.unlink(missing_ok=True)
controller.logger.error(
"ERROR copying dataset {} ({}) MD5: {}", name, tarball, e
"ERROR moving dataset {} ({}) MD5: {}", name, tarball, e
)
raise

try:
destination = Path(shutil.copy2(tarball, controller.path))
shutil.move(tarball, destination)
except Exception as e:
try:
md5_destination.unlink()
md5_destination.unlink(missing_ok=True)
except Exception as md5_e:
controller.logger.error(
"Unable to recover by removing {} MD5 after tarball copy failure: {}",
name,
md5_e,
)
destination.unlink(missing_ok=True)
controller.logger.error(
"ERROR copying dataset {} tarball {}: {}", name, tarball, e
"ERROR moving dataset {} tarball {}: {}", name, tarball, e
)
raise

Expand All @@ -419,12 +424,14 @@ def create(cls, tarball: Path, controller: "Controller") -> "Tarball":
# log it but do not abort
controller.logger.error("Unable to set SELINUX context for {}: {}", name, e)

# If we were able to copy both files, remove the originals
# If we were able to copy both files, remove the originals. If we moved
# the files above, instead of copying them, these will no longer exist
# and we'll ignore that condition silently.
try:
tarball.unlink()
md5_source.unlink()
tarball.unlink(missing_ok=True)
md5_source.unlink(missing_ok=True)
except Exception as e:
controller.logger.error("Error removing incoming dataset {}: {}", name, e)
controller.logger.error("Error removing staged upload {}: {}", name, e)
webbnh marked this conversation as resolved.
Show resolved Hide resolved

return cls(destination, controller)

Expand Down
75 changes: 71 additions & 4 deletions lib/pbench/test/unit/server/test_cache_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import hashlib
import io
from logging import Logger
Expand Down Expand Up @@ -244,17 +245,83 @@ def test_duplicate(
monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata)
cm = CacheManager(server_config, make_logger)

# Create a tarball file in the expected destination directory
# Create a tarball file in the expected destination directory: the
# subsequent create should report a duplicate.
controller = cm.archive_root / "ABC"
controller.mkdir()
(controller / source_tarball.name).write_text("Send in the clones")

# Attempting to create a dataset from the md5 file should result in
# a duplicate dataset error
with pytest.raises(DuplicateTarball) as exc:
cm.create(source_tarball)
assert exc.value.tarball == Dataset.stem(source_tarball)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coverage report indicates that we do not test the case of a pre-existing MD5 file (without a pre-existing tarball).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a new case, nor is the condition very realistic. I had kinda hoped to get this done today and I'm trying to finish something else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All true, but, realistic or not, we have code that checks for it, and, therefore, it arguably should be exercised by the tests. Whether it should be addressed in this PR or not is a totally separate question. (I'm just reporting what I found...and I did approve the PR....)


@pytest.mark.parametrize(
"allow,errno",
(
(".md5", errno.ENOSPC),
(".md5", errno.EEXIST),
("", errno.ENOSPC),
("", errno.EACCES),
),
)
def test_move_fails(
self,
monkeypatch,
selinux_disabled,
server_config,
make_logger,
tarball,
allow,
errno,
):
src: list[Path] = []
dest: list[Path] = []
real_move = shutil.move

def mymove(source: Path, destination: Path, *args, **kwargs) -> Path:
src.append(source)
if destination.is_dir():
d = destination / source.name
else:
d = destination
dest.append(d)
if source.suffix == allow:
return real_move(source, destination, *args, **kwargs)
if errno:
e = OSError(errno, "something went badly")
else:
e = Exception("I broke")
raise e
webbnh marked this conversation as resolved.
Show resolved Hide resolved

ulink: list[Path] = []
ok: list[bool] = []

def unlink(self, missing_ok: bool = False):
ulink.append(self)
ok.append(missing_ok)

source_tarball, source_md5, md5 = tarball
monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata)
cm = CacheManager(server_config, make_logger)
monkeypatch.setattr("pbench.server.cache_manager.shutil.move", mymove)
monkeypatch.setattr(Path, "unlink", unlink)
with pytest.raises(Exception) as e:
cm.create(source_tarball)
if errno:
assert isinstance(e.value, OSError), f"Wanted OSError, got {type(e.value)}"
assert e.value.errno == errno
else:
assert isinstance(
e.value, Exception
), f"Wanted Exception, got {type(e.value)}"
webbnh marked this conversation as resolved.
Show resolved Hide resolved
assert str(e.value) == "I broke"
assert src == [source_md5] + ([source_tarball] if allow else [])
assert len(src) == len(dest) == len(ulink) == len(ok) == 2 if allow else 1
webbnh marked this conversation as resolved.
Show resolved Hide resolved
for i, s in enumerate(src):
assert dest[i].name == s.name
assert ulink[i].name == s.name
assert dest[i] == ulink[i]
assert all(ok), f"Cleanup unlinks don't ignore missing: {ok}, {ulink}"

def test_tarball_subprocess_run_with_exception(self, monkeypatch):
"""Test to check the subprocess_run functionality of the Tarball when
an Exception occurred"""
Expand Down
107 changes: 95 additions & 12 deletions lib/pbench/test/unit/server/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,18 +260,32 @@ def test_bad_extension_upload(
self.verify_logs(caplog)
assert not self.cachemanager_created

@pytest.mark.parametrize("error", (errno.ENOSPC, errno.ENFILE, None))
@pytest.mark.parametrize(
"error,http_status,message",
(
(errno.ENOSPC, HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space on "),
(
errno.ENFILE,
HTTPStatus.INTERNAL_SERVER_ERROR,
"Internal Pbench Server Error:",
),
(None, HTTPStatus.INTERNAL_SERVER_ERROR, "Internal Pbench Server Error:"),
),
)
def test_bad_stream_read(
self, client, server_config, pbench_drb_token, monkeypatch, error
self,
client,
server_config,
pbench_drb_token,
monkeypatch,
error,
http_status,
message,
):
"""Test handling of errors from the intake stream read

The intake code handles errno.ENOSPC specially; however although the
code tried to raise an APIAbort with HTTPStatus.INSUFFICIENT_SPACE
(50), the werkzeug abort() doesn't support this and ends up with
a generic internal server error. Instead, we now have three distinct
cases which all result (to the client) in identical internal server
errors. Nevertheless, we exercise all three cases here.
The intake code reports errno.ENOSPC with 413/REQUEST_ENTITY_TOO_LARGE,
but other file create errors are reported as 500/INTERNAL_SERVER_ERROR.
"""
stream = BytesIO(b"12345")

Expand All @@ -294,10 +308,71 @@ def read(self):
data=data_fp,
headers=self.gen_headers(pbench_drb_token, "md5sum"),
)
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
assert response.json.get("message").startswith(
"Internal Pbench Server Error: log reference "
)
assert response.status_code == http_status
assert response.json.get("message").startswith(message)

@pytest.mark.parametrize(
"error,http_status,message",
(
(errno.ENOSPC, HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "Out of space on "),
(
errno.ENFILE,
HTTPStatus.INTERNAL_SERVER_ERROR,
"Internal Pbench Server Error:",
),
(None, HTTPStatus.INTERNAL_SERVER_ERROR, "Internal Pbench Server Error:"),
),
)
def test_md5_failure(
self,
monkeypatch,
client,
pbench_drb_token,
server_config,
tarball,
error,
http_status,
message,
):
"""Test handling of errors from MD5 file creation.

The intake code reports errno.ENOSPC with 413/REQUEST_ENTITY_TOO_LARGE,
but other file create errors are reported as 500/INTERNAL_SERVER_ERROR.
"""
path: Optional[Path] = None

def nogood_write(
self, data: str, encoding: str = None, errors: str = None
) -> int:
nonlocal path
path = self
if error:
e = OSError(error, "something went badly")
else:
e = Exception("Nobody expects the Spanish Exception")
raise e

real_unlink = Path.unlink
unlinks = []

def record_unlink(self, **kwargs):
unlinks.append(self.name)
real_unlink(self, **kwargs)

datafile, md5_file, md5 = tarball
monkeypatch.setattr(Path, "write_text", nogood_write)
monkeypatch.setattr(Path, "unlink", record_unlink)
with datafile.open("rb") as data_fp:
response = client.put(
self.gen_uri(server_config, datafile.name),
data=data_fp,
headers=self.gen_headers(pbench_drb_token, md5),
)
assert path.name == md5_file.name
assert md5_file.name in unlinks
assert datafile.name in unlinks
assert response.status_code == http_status
assert response.json.get("message").startswith(message)

def test_invalid_authorization_upload(
self, client, caplog, server_config, pbench_drb_token_invalid
Expand Down Expand Up @@ -391,6 +466,14 @@ def td_exists(self, *args, **kwargs):
(
(Exception("Test"), HTTPStatus.INTERNAL_SERVER_ERROR),
(DuplicateTarball("x"), HTTPStatus.BAD_REQUEST),
(
OSError(errno.ENOSPC, "The closet is too small!"),
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
),
(
OSError(errno.EACCES, "Can't get they-ah from he-ah"),
HTTPStatus.INTERNAL_SERVER_ERROR,
),
),
)
def test_upload_cachemanager_error(
Expand Down