Skip to content

Commit

Permalink
Add basic relay support to Pbench Agent
Browse files Browse the repository at this point in the history
PBENCH-1142

This changes the Pbench Agent `results` mechanism to support a new mode,
refactoring the `CopyResults` class into a hierarchy with `CopyResultToServer`
and `CopyResultToRelay` and an overloaded `push` to do the work. The `--token`
option is now required only when `--relay` is not specified.

In addition to `--relay <relay>` to push a manifest and tarball to a Relay, I
added a `--server <server>` to override the default config file value, which
should allow us to deploy a containerized Pbench Agent without needing to map in
a customized config file just to set the Pbench Server URI.

The agent "man pages" have been updated with the new options, and some general
cleanup left over from distributed-system-analysis#3442.

_NOTE_: with this change we have a full end-to-end relay mechanism, but it's
simplistic. You need to start a relay server manually from the file relay repo
at `distributed-system-analysis/file-relay`, and supply that URI to the
`pbench-results-move --relay <relay>` command. In the future we'd like to
package the relay and allow management and hosting through Pbench Agent
commands within a standard container.
  • Loading branch information
dbutenhof committed Jun 12, 2023
1 parent d302664 commit 39eb35e
Show file tree
Hide file tree
Showing 8 changed files with 722 additions and 329 deletions.
354 changes: 227 additions & 127 deletions docs/Agent/user-guide/man_page.md

Large diffs are not rendered by default.

221 changes: 138 additions & 83 deletions lib/pbench/agent/results.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import collections
import datetime
from functools import partial
import hashlib
from io import BytesIO, IOBase
import json
from logging import Logger
import os
from pathlib import Path
import shutil
import subprocess
from typing import List, Optional
import urllib.parse
from typing import Any, List, Optional

import requests

from pbench.agent import PbenchAgentConfig
from pbench.cli import CliContext
from pbench.common import MetadataLog
from pbench.common.exceptions import BadMDLogFormat
from pbench.common.utils import md5sum, validate_hostname
Expand Down Expand Up @@ -278,7 +282,7 @@ def make_result_tb(self, single_threaded: bool = False) -> TarballRecord:
return TarballRecord(name=tarball, length=tar_len, md5=tar_md5)


class CopyResultTb:
class CopyResult:
"""Interfaces for copying result tar balls remotely using the server's HTTP
PUT method for uploads.
"""
Expand All @@ -289,93 +293,144 @@ class FileUploadError(Exception):
pass

def __init__(
self,
tarball: str,
tarball_len: int,
tarball_md5: str,
config: PbenchAgentConfig,
logger: Logger,
self, logger: Logger, access: Optional[str], metadata: Optional[List[str]]
):
"""Constructor for object representing tar ball to be copied remotely.
Raises
FileNotFoundError if the given tar ball does not exist
"""
self.tarball = Path(tarball)
if not self.tarball.exists():
raise FileNotFoundError(f"Tar ball '{self.tarball}' does not exist")
self.tarball_len = tarball_len
self.tarball_md5 = tarball_md5
server_rest_url = config.get("results", "server_rest_url")
tbname = urllib.parse.quote(self.tarball.name)
self.upload_url = f"{server_rest_url}/upload/{tbname}"
"""Constructor for object representing a tar ball destination"""
self.logger = logger
self.uri = None
self.headers = {}
self.params: dict[str, Any] = {"access": access} if access else {}
if metadata:
self.params["metadata"] = metadata

def push(self, tarball: Path, tarball_md5: str) -> requests.Response:
raise NotImplementedError()

def copy_result_tb(
self, token: str, access: Optional[str] = None, metadata: Optional[List] = None
@staticmethod
def create(
config: PbenchAgentConfig,
logger: Logger,
relay: Optional[str],
server: Optional[str],
token: Optional[str],
access: Optional[str],
metadata: Optional[list[str]],
) -> "CopyResult":
if relay:
return CopyResultToRelay(logger, relay, access, metadata)
else:
return CopyResultToServer(config, logger, server, token, access, metadata)

@classmethod
def cli_create(
cls, context: CliContext, config: PbenchAgentConfig, logger: Logger
) -> "CopyResult":
return cls.create(
config,
logger,
context.relay,
context.server,
context.token,
context.access,
context.metadata,
)

def _put(
self,
stream: IOBase,
uri: str,
headers: dict[str, Any] = {},
query: dict[str, Any] = {},
) -> requests.Response:
"""Copies the tar ball from the agent to the configured server using upload
API.
try:
return requests.put(uri, data=stream, headers=headers, params=query)
except requests.exceptions.ConnectionError as exc:
raise RuntimeError(f"Cannot connect to '{uri}': {exc}")
except Exception as exc:
raise self.FileUploadError(
f"Pbench Server file upload to {uri!r} failed: {str(exc)!r}"
)

Args:
token: a token which establishes that the caller is
authorized to make the PUT request on behalf of a
specific user.
access: keyword that identifies whether a dataset needs
to be published public or private. Optional: if omitted
the result will be the server default.
metadata: list of metadata keys to be sent on PUT. (Optional)
Format: key:value
Returns:
response from the PUT request

Raises:
RuntimeError if a connection to the server fails
FileUploadError if the tar ball failed to upload properly
class CopyResultToServer(CopyResult):
def __init__(
self,
config: PbenchAgentConfig,
logger: Logger,
server: Optional[str],
token: Optional[str],
access: Optional[str],
metadata: Optional[list[str]],
):
super().__init__(logger, access, metadata)
if server:
path = config.get("results", "rest_endpoint")
uri = f"{server}/{path}"
else:
uri = config.get("results", "server_rest_url")
self.uri = f"{uri}/upload/{{name}}"
self.headers.update(
{
"content-type": "application/octet-stream",
"Authorization": f"Bearer {token}",
}
)

"""
params = {"access": access} if access else {}
if metadata:
params["metadata"] = metadata
def push(self, tarball: Path, tarball_md5: str) -> requests.Response:
if not tarball.exists():
raise FileNotFoundError(f"Tar ball '{tarball}' does not exist")
self.headers["Content-MD5"] = tarball_md5
tar_uri = self.uri.format(name=tarball.name)
with tarball.open("rb") as f:
r = self._put(f, tar_uri, self.headers, self.params)
return r

headers = {
"Content-MD5": self.tarball_md5,
"Authorization": f"Bearer {token}",
}
with self.tarball.open("rb") as f:
try:
request = requests.Request(
"PUT", self.upload_url, data=f, headers=headers, params=params
).prepare()

# Per RFC 2616, a request must not contain both
# Content-Length and Transfer-Encoding headers; however,
# the server would like to receive the Content-Length
# header, but the requests package may opt to generate
# the Transfer-Encoding header instead...so, check that
# we got what we want before we send the request. Also,
# confirm that the contents of the Content-Length header
# is what we expect.
assert (
"Transfer-Encoding" not in request.headers
), "Upload request unexpectedly contains a `Transfer-Encoding` header"
assert (
"Content-Length" in request.headers
), "Upload request unexpectedly missing a `Content-Length` header"
assert request.headers["Content-Length"] == str(self.tarball_len), (
"Upload request `Content-Length` header contains {} -- "
"expected {}".format(
request.headers["Content-Length"], self.tarball_len
)
)

return requests.Session().send(request)
except requests.exceptions.ConnectionError as exc:
raise RuntimeError(f"Cannot connect to '{self.upload_url}': {exc}")
except Exception as exc:
raise self.FileUploadError(
"There was something wrong with the file upload request: "
f"file: '{self.tarball}', URL: '{self.upload_url}';"
f" error: '{exc}'"
class CopyResultToRelay(CopyResult):
def __init__(
self,
logger: Logger,
relay: Optional[str],
access: Optional[str],
metadata: Optional[list[str]],
):
super().__init__(logger, access, metadata)
self.uri = f"{relay}/{{sha256}}"

def push(self, tarball: Path, tarball_md5: str) -> requests.Response:
if not tarball.exists():
raise FileNotFoundError(f"Tar ball '{tarball!s}' does not exist")
try:
with tarball.open("rb") as f:
d = hashlib.sha256(usedforsecurity=False)
for buf in iter(partial(f.read, 2**20), b""):
d.update(buf)
tar_uri = self.uri.format(sha256=d.hexdigest())
self.logger.debug("Relay tarball %s", tar_uri)

f.seek(0) # rewind since re-opening doesn't work
r = self._put(
f, tar_uri, headers={"content-type": "application/octet-stream"}
)
if not r.ok:
return r
self.params.update(
{"name": tarball.name, "uri": tar_uri, "md5": tarball_md5}
)
manifest = bytes(json.dumps(self.params, sort_keys=True), encoding="utf-8")
d = hashlib.sha256(manifest, usedforsecurity=False)
manifest_uri = self.uri.format(sha256=d.hexdigest())
self.logger.debug(
"Uploading manifest %s as %s to relay",
manifest.decode("utf-8"),
manifest_uri,
)
r = self._put(
BytesIO(manifest),
manifest_uri,
headers={"content-type": "application/octet-stream"},
)
return r
except Exception as e:
self.logger.exception("Problem relaying tarball: %s", str(e))
raise
46 changes: 19 additions & 27 deletions lib/pbench/cli/agent/commands/results/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import requests

from pbench.agent.base import BaseCommand
from pbench.agent.results import CopyResultTb, MakeResultTb
from pbench.agent.results import CopyResult, MakeResultTb
from pbench.cli import CliContext, pass_cli_context, sort_click_command_parameters
from pbench.cli.agent.commands.results.results_options import results_common_options
from pbench.cli.agent.options import common_options
Expand Down Expand Up @@ -36,6 +36,7 @@ def execute(self, single_threaded: bool, delete: bool = True) -> int:
runs_copied = 0
failures = 0
no_of_tb = 0
crt = CopyResult.cli_create(self.context, self.config, self.logger)

with tempfile.TemporaryDirectory(
dir=self.config.pbench_tmp, prefix="pbench-results-move."
Expand All @@ -51,7 +52,7 @@ def execute(self, single_threaded: bool, delete: bool = True) -> int:

try:
mrt = MakeResultTb(
result_dir,
str(result_dir),
temp_dir,
self.context.controller,
self.config,
Expand All @@ -73,7 +74,7 @@ def execute(self, single_threaded: bool, delete: bool = True) -> int:
continue

try:
result_tb_name, result_tb_len, result_tb_md5 = mrt.make_result_tb(
result_tb_name, _, result_tb_md5 = mrt.make_result_tb(
single_threaded=single_threaded
)
except BadMDLogFormat as exc:
Expand All @@ -98,24 +99,17 @@ def execute(self, single_threaded: bool, delete: bool = True) -> int:
continue

try:
crt = CopyResultTb(
result_tb_name,
result_tb_len,
result_tb_md5,
self.config,
self.logger,
)
res = crt.copy_result_tb(
self.context.token, self.context.access, self.context.metadata
)
res = crt.push(result_tb_name, result_tb_md5)
if res.ok and self.context.relay:
click.echo(f"RELAY {result_tb_name.name}: {res.url}")
if not res.ok:
try:
msg = res.json()["message"]
except requests.exceptions.JSONDecodeError:
msg = res.text if res.text else res.reason
raise CopyResultTb.FileUploadError(msg)
raise CopyResult.FileUploadError(msg)
except Exception as exc:
if isinstance(exc, (CopyResultTb.FileUploadError, RuntimeError)):
if isinstance(exc, (CopyResult.FileUploadError, RuntimeError)):
msg = "Error uploading file"
else:
msg = "Unexpected error occurred copying tar ball remotely"
Expand Down Expand Up @@ -204,13 +198,6 @@ def execute(self, single_threaded: bool, delete: bool = True) -> int:
is_flag=True,
help="Use single threaded compression with 'xz'",
)
@click.option(
"--show-server",
help=(
"Display information about the pbench server where the result(s) will"
" be moved (Not implemented)"
),
)
@pass_cli_context
def main(
context: CliContext,
Expand All @@ -220,7 +207,8 @@ def main(
delete: bool,
metadata: List,
xz_single_threaded: bool,
show_server: str,
server: str,
relay: str,
):
"""Move result directories to the configured Pbench server."""
clk_ctx = click.get_current_context()
Expand All @@ -237,19 +225,23 @@ def main(
err=True,
)
clk_ctx.exit(1)

if relay and server:
click.echo("Cannot use both relay and Pbench Server destination.", err=True)
clk_ctx.exit(2)

if validate_hostname(controller) != 0:
# We check once to avoid having to deal with a bad controller each
# time we try to copy the results.
click.echo(f"Controller, {controller!r}, is not a valid host name")
clk_ctx.exit(1)
context.controller = controller

context.controller = controller
context.access = access
context.token = token
context.metadata = metadata

if show_server:
click.echo("WARNING -- Option '--show-server' is not implemented", err=True)
context.server = server
context.relay = relay

try:
rv = MoveResults(context).execute(xz_single_threaded, delete=delete)
Expand Down
Loading

0 comments on commit 39eb35e

Please sign in to comment.