Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow deletion of uploaded relay datasets #3512

Merged
merged 3 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dashboard/src/actions/relayActions.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const uploadFile = () => async (dispatch, getState) => {
const endpoints = getState().apiEndpoint.endpoints;
const fileURI = getState().overview.relayInput;
const uri = uriTemplate(endpoints, "relay", { uri: fileURI });
const response = await API.post(uri, null, null);
const response = await API.post(uri, null, { params: { delete: "t" } });
if (response.status >= 200 && response.status < 300) {
dispatch(showToast(SUCCESS, response.data.message));
dispatch(setRelayModalState(false));
Expand Down
14 changes: 14 additions & 0 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ def _stream(self, intake: Intake, request: Request) -> Access:
"""
raise NotImplementedError()

def _cleanup(self, args: ApiParams, intake: Intake, notes: list[str]):
"""Clean up after a completed upload

Default behavior is to do nothing: each subclass can provide a custom
behavior for cleanup after successful transfer.

Args:
intake: The intake object
args: API parameters
notes: A list of error strings to report problems.
"""
pass

def _intake(
self, args: ApiParams, request: Request, context: ApiContext
) -> Response:
Expand Down Expand Up @@ -477,6 +490,7 @@ def _intake(
enable_next = [OperationName.INDEX] if should_index else None
if not should_index:
notes.append("Indexing is disabled by 'archive only' setting.")
self._cleanup(args, intake, notes)
Sync(current_app.logger, OperationName.UPLOAD).update(
dataset=dataset, state=OperationState.OK, enabled=enable_next
)
Expand Down
41 changes: 40 additions & 1 deletion lib/pbench/server/api/resources/relay.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from http import HTTPStatus

from flask import Response
from flask import current_app, Response
from flask.wrappers import Request
import requests

Expand Down Expand Up @@ -33,6 +33,7 @@ def __init__(self, config: PbenchServerConfig):
uri_schema=Schema(Parameter("filename", ParamType.STRING)),
query_schema=Schema(
Parameter("access", ParamType.ACCESS),
Parameter("delete", ParamType.BOOLEAN),
Parameter(
"metadata",
ParamType.LIST,
Expand Down Expand Up @@ -107,6 +108,10 @@ def _identify(self, args: ApiParams, request: Request) -> Intake:
HTTPStatus.BAD_GATEWAY, f"Relay info missing {str(e)!r}"
) from e

# If the API client specified metadata, add it to the manifest
# metadata list. When the common code processes the list into a dict,
# any later duplicate keys will override the earlier values.
metadata += args.query.get("metadata", [])
return Intake(name, md5, access, metadata, uri)

def _stream(self, intake: Intake, request: Request) -> Access:
Expand Down Expand Up @@ -149,6 +154,40 @@ def _stream(self, intake: Intake, request: Request) -> Access:
HTTPStatus.BAD_REQUEST, f"Unable to retrieve relay tarball: {str(e)!r}"
) from e

def _cleanup(self, args: ApiParams, intake: Intake, notes: list[str]):
"""Clean up after a completed upload

When pulling datasets from a relay, the client can ask that the relay
files be deleted on successful completion to avoid accumulating storage
on the relay server.

We capture all HTTP errors here, since there's not much we can do to
clean up, and the dataset has already been successfully transferred.
We just note the problems so they can be investigated.

Args:
args: API parameters
intake: The intake object containing the tarball URI
notes: A list of error strings to report problems.
"""
errors = False
if args.query.get("delete"):
for uri in (args.uri["uri"], intake.uri):
reason = None
try:
response = requests.delete(uri)
if not response.ok:
reason = response.reason
except ConnectionError as e:
reason = str(e)
if reason:
errors = True
msg = f"Unable to remove relay file {uri}: {reason!r}"
current_app.logger.warning("INTAKE relay {}: {}", intake.name, msg)
notes.append(msg)
if not errors:
notes.append("Relay files were successfully removed.")

def _post(self, args: ApiParams, request: Request, context: ApiContext) -> Response:
"""Launch the Relay operation from an HTTP POST"""
return self._intake(args, request, context)
177 changes: 164 additions & 13 deletions lib/pbench/test/unit/server/test_relay.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from http import HTTPStatus
from logging import Logger
from pathlib import Path
from typing import Union

from flask import Request
import pytest
Expand Down Expand Up @@ -87,13 +88,16 @@ def test_missing_authorization_header(self, client, server_config):
assert response.status_code == HTTPStatus.UNAUTHORIZED
assert not self.cachemanager_created

@responses.activate
@pytest.mark.freeze_time("2023-07-01")
def test_relay(self, client, server_config, pbench_drb_token, tarball):
@pytest.mark.parametrize("delete", ("false", "true", None))
@responses.activate
def test_relay(self, client, server_config, pbench_drb_token, tarball, delete):
"""Verify the success path

Ensure successful completion when the primary relay URI returns a valid
relay manifest referencing a secondary relay URI containing a tarball.

Also check that the DELETE requests happen when `?delete` is specified.
"""
file, md5file, md5 = tarball
name = Dataset.stem(file)
Expand All @@ -117,22 +121,34 @@ def test_relay(self, client, server_config, pbench_drb_token, tarball):
headers={"content-length": f"{file.stat().st_size}"},
content_type="application/octet-stream",
)
if delete == "true":
responses.add(
responses.DELETE, "https://relay.example.com/uri1", status=HTTPStatus.OK
)
responses.add(
responses.DELETE, "https://relay.example.com/uri2", status=HTTPStatus.OK
)
response = client.post(
self.gen_uri(server_config, "https://relay.example.com/uri1"),
query_string={"delete": delete} if delete else None,
headers=self.gen_headers(pbench_drb_token),
)
assert (
response.status_code == HTTPStatus.CREATED
), f"Unexpected result, {response.text}"
expected_notes = [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
]
if delete == "true":
expected_notes.append("Relay files were successfully removed.")
assert response.json == {
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
],
"notes": expected_notes,
}
assert len(responses.calls) == 4 if delete == "true" else 2
assert (
response.headers["location"]
== f"https://localhost/api/v1/datasets/{md5}/inventory/"
Expand Down Expand Up @@ -162,23 +178,21 @@ def test_relay(self, client, server_config, pbench_drb_token, tarball):
assert audit[1].name == "relay"
assert audit[1].object_type == AuditType.DATASET
assert audit[1].object_id == md5
assert audit[1].object_name == Dataset.stem(file)
assert audit[1].object_name == name
assert audit[1].user_id == DRB_USER_ID
assert audit[1].user_name == "drb"
assert audit[1].reason is None
assert audit[1].attributes == {
"access": "private",
"metadata": {"global.pbench.test": "data"},
"notes": [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
],
"notes": expected_notes,
}

@responses.activate
def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball):
"""Verify failure when secondary relay URI is not found"""
file, md5file, md5 = tarball
name = Dataset.stem(file)
responses.add(
responses.GET,
"https://relay.example.com/uri1",
Expand Down Expand Up @@ -211,7 +225,7 @@ def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball):
assert audit[0].name == "relay"
assert audit[0].object_type == AuditType.DATASET
assert audit[0].object_id == md5
assert audit[0].object_name == Dataset.stem(file)
assert audit[0].object_name == name
assert audit[0].user_id == DRB_USER_ID
assert audit[0].user_name == "drb"
assert audit[0].reason is None
Expand All @@ -226,7 +240,7 @@ def test_relay_tar_fail(self, client, server_config, pbench_drb_token, tarball):
assert audit[1].name == "relay"
assert audit[1].object_type == AuditType.DATASET
assert audit[1].object_id == md5
assert audit[1].object_name == Dataset.stem(file)
assert audit[1].object_name == name
assert audit[1].user_id == DRB_USER_ID
assert audit[1].user_name == "drb"
assert audit[1].reason == AuditReason.CONSISTENCY
Expand Down Expand Up @@ -392,3 +406,140 @@ def test_relay_tarball_connection(
assert (
response.json["message"] == "Unable to connect to results URI: 'leaky wire'"
)

@pytest.mark.freeze_time("2023-07-01")
@pytest.mark.parametrize(
"status1,status2",
(
((HTTPStatus.OK, None), ((HTTPStatus.BAD_REQUEST, "Bad Request"))),
((HTTPStatus.BAD_REQUEST, "Bad Request"), (HTTPStatus.OK, None)),
(
(HTTPStatus.BAD_REQUEST, "Bad Request"),
(HTTPStatus.BAD_REQUEST, "Bad Request"),
),
((ConnectionError("testing"), "testing"), (HTTPStatus.OK, None)),
((HTTPStatus.OK, None), (ConnectionError("testing"), "testing")),
(
(ConnectionError("testing1"), "testing1"),
(ConnectionError("testing2"), "testing2"),
),
),
)
@responses.activate
def test_delete_failures(
self,
client,
server_config,
pbench_drb_token,
tarball,
status1: tuple[Union[HTTPStatus, Exception], str],
status2: tuple[Union[HTTPStatus, Exception], str],
):
"""Verify reporting of delete failures

Ensure successful completion with appropriate notes when deletion of
the relay files fails.
"""
file, md5file, md5 = tarball
name = Dataset.stem(file)
responses.add(
responses.GET,
"https://relay.example.com/uri1",
status=HTTPStatus.OK,
json={
"uri": "https://relay.example.com/uri2",
"name": file.name,
"md5": md5,
"access": "private",
"metadata": ["global.pbench.test:data"],
},
)
responses.add(
responses.GET,
"https://relay.example.com/uri2",
status=HTTPStatus.OK,
body=file.open("rb"),
headers={"content-length": f"{file.stat().st_size}"},
content_type="application/octet-stream",
)
responses.add(
responses.DELETE,
"https://relay.example.com/uri1",
status=status1[0]
if isinstance(status1[0], int)
else HTTPStatus.ALREADY_REPORTED,
body=status1[0] if isinstance(status1[0], Exception) else None,
)
responses.add(
responses.DELETE,
"https://relay.example.com/uri2",
status=status2[0]
if isinstance(status2[0], int)
else HTTPStatus.ALREADY_REPORTED,
body=status2[0] if isinstance(status2[0], Exception) else None,
)
response = client.post(
self.gen_uri(server_config, "https://relay.example.com/uri1"),
query_string={"delete": "true"},
headers=self.gen_headers(pbench_drb_token),
)
assert (
response.status_code == HTTPStatus.CREATED
), f"Unexpected result, {response.text}"
expected_notes = [
"Identified benchmark workload 'unknown'.",
"Expected expiration date is 2025-06-30.",
]
if status1[0] != HTTPStatus.OK:
expected_notes.append(
f"Unable to remove relay file https://relay.example.com/uri1: '{status1[1]}'"
)
if status2[0] != HTTPStatus.OK:
expected_notes.append(
f"Unable to remove relay file https://relay.example.com/uri2: '{status2[1]}'"
)
assert response.json == {
"message": "File successfully uploaded",
"name": name,
"resource_id": md5,
"notes": expected_notes,
}
assert len(responses.calls) == 4
assert (
response.headers["location"]
== f"https://localhost/api/v1/datasets/{md5}/inventory/"
)

audit = Audit.query()
assert len(audit) == 2
assert audit[0].id == 1
assert audit[0].root_id is None
assert audit[0].operation == OperationCode.CREATE
assert audit[0].status == AuditStatus.BEGIN
assert audit[0].name == "relay"
assert audit[0].object_type == AuditType.DATASET
assert audit[0].object_id == md5
assert audit[0].object_name == name
assert audit[0].user_id == DRB_USER_ID
assert audit[0].user_name == "drb"
assert audit[0].reason is None
assert audit[0].attributes == {
"access": "private",
"metadata": {"global.pbench.test": "data"},
}
assert audit[1].id == 2
assert audit[1].root_id == 1
assert audit[1].operation == OperationCode.CREATE
assert audit[1].status == AuditStatus.SUCCESS
assert audit[1].name == "relay"
assert audit[1].object_type == AuditType.DATASET
assert audit[1].object_id == md5
assert audit[1].object_name == name
assert audit[1].user_id == DRB_USER_ID
assert audit[1].user_name == "drb"
assert audit[1].reason is None
assert audit[1].attributes == {
"access": "private",
"metadata": {"global.pbench.test": "data"},
"notes": expected_notes,
}