Skip to content

Commit

Permalink
[dashboard] Fix log proxy not loading non test/plain files. (ray-proj…
Browse files Browse the repository at this point in the history
…ect#33870)

Dashboard will fail to load the log file if the file doesn't have any prefix (or if the file name mapped to a mime type other than text/plain)

This PR fixes it by setting the charset properly.

Signed-off-by: Jack He <[email protected]>
  • Loading branch information
rickyyx authored and ProjectsByJackHe committed May 4, 2023
1 parent df99aa3 commit e760d68
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 56 deletions.
4 changes: 2 additions & 2 deletions dashboard/modules/log/log_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ async def get_log_from_proxy(self, req) -> aiohttp.web.StreamResponse:
)
sr.content_length = r.content_length
sr.content_type = r.content_type
sr.charset = r.charset

if r.charset and not sr.content_type.startswith("application/octet-stream"):
sr.charset = r.charset
writer = await sr.prepare(req)
async for data in r.content.iter_any():
await writer.write(data)
Expand Down
92 changes: 53 additions & 39 deletions dashboard/modules/log/tests/test_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,52 +135,66 @@ def write_log(s):


@pytest.mark.parametrize(
"test_file",
["test.log", "test#1234.log"],
"test_file,content_kind",
[
("test.log", "text/plain"),
("test#1234.log", "text/plain"),
("test_file_no_suffix", "application/octet-stream"),
("test_file_not_register_mimetypes.json", "application/json"),
("test_file_not_register_mimetypes.yaml", "application/octet-stream"),
],
)
def test_log_proxy(ray_start_with_dashboard, test_file):
def test_log_proxy(ray_start_with_dashboard, test_file, content_kind):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True
webui_url = ray_start_with_dashboard["webui_url"]
webui_url = format_web_url(webui_url)

timeout_seconds = 5
start_time = time.time()
last_ex = None
test_log_text = "test_log_text"
if content_kind == "text/plain":
data = bytearray("test_log_text", encoding="utf-8")
else:
data = bytearray(i for i in range(256))

# Prep the files
with open(
f"{ray._private.worker.global_worker.node.get_logs_dir_path()}/{test_file}", "w"
f"{ray._private.worker.global_worker.node.get_logs_dir_path()}/{test_file}",
"wb",
) as f:
f.write(test_log_text)
while True:
time.sleep(1)
try:
url = urllib.parse.quote(f"{webui_url}/logs/{test_file}")
# Test range request.
response = requests.get(
f"{webui_url}/log_proxy?url={url}",
headers={"Range": "bytes=2-5"},
)
response.raise_for_status()
assert response.text == test_log_text[2:6]
# Test 404.
response = requests.get(
f"{webui_url}/log_proxy?" f"url={webui_url}/logs/not_exist_file.log"
)
assert response.status_code == 404
break
except Exception as ex:
last_ex = ex
finally:
if time.time() > start_time + timeout_seconds:
ex_stack = (
traceback.format_exception(
type(last_ex), last_ex, last_ex.__traceback__
)
if last_ex
else []
)
ex_stack = "".join(ex_stack)
raise Exception(f"Timed out while testing, {ex_stack}")
f.write(data)

# Test basic fetching
def verify():
url = urllib.parse.quote(f"{webui_url}/logs/{test_file}")
response = requests.get(f"{webui_url}/log_proxy?url={url}")
response.raise_for_status()
assert response.content == data
return True

wait_for_condition(verify)

def verify():
url = urllib.parse.quote(f"{webui_url}/logs/{test_file}")
# Test range request.
response = requests.get(
f"{webui_url}/log_proxy?url={url}",
headers={
"Range": "bytes=2-5",
},
)
response.raise_for_status()
assert response.content == data[2:6]
return True

wait_for_condition(verify)

# Test 404.
def verify():
response = requests.get(
f"{webui_url}/log_proxy?" f"url={webui_url}/logs/not_exist_file.log"
)
assert response.status_code == 404
return True

wait_for_condition(verify)


@pytest.mark.parametrize(
Expand Down
13 changes: 10 additions & 3 deletions python/ray/experimental/state/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
)
from ray.experimental.state.common import (
DEFAULT_LIMIT,
DEFAULT_LOG_LIMIT,
DEFAULT_RPC_TIMEOUT,
ActorState,
GetApiOptions,
Expand Down Expand Up @@ -1123,9 +1122,11 @@ def get_log(
task_id: Optional[str] = None,
pid: Optional[int] = None,
follow: bool = False,
tail: int = DEFAULT_LOG_LIMIT,
tail: int = -1,
timeout: int = DEFAULT_RPC_TIMEOUT,
suffix: Optional[str] = None,
encoding: Optional[str] = "utf-8",
errors: Optional[str] = "strict",
_interval: Optional[float] = None,
) -> Generator[str, None, None]:
"""Retrieve log file based on file name or some entities ids (pid, actor id, task id).
Expand Down Expand Up @@ -1157,6 +1158,10 @@ def get_log(
the entire log.
timeout: Max timeout for requests made when getting the logs.
suffix: The suffix of the log file if query by id of tasks/workers/actors.
encoding: The encoding used to decode the content of the log file. Default is
"utf-8". Use None to get binary data directly.
errors: The error handling scheme to use for decoding errors. Default is
"strict". See https://docs.python.org/3/library/codecs.html#error-handlers
_interval: The interval in secs to print new logs when `follow=True`.
Return:
Expand Down Expand Up @@ -1201,7 +1206,9 @@ def get_log(
# First byte 1 means success.
if bytes.startswith(b"1"):
bytes.pop(0)
logs = bytes.decode("utf-8")
logs = bytes
if encoding is not None:
logs = bytes.decode(encoding=encoding, errors=errors)
else:
assert bytes.startswith(b"0")
error_msg = bytes.decode("utf-8")
Expand Down
33 changes: 32 additions & 1 deletion python/ray/experimental/state/state_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def object_summary(ctx, timeout: float, address: str):
required=False,
type=int,
default=DEFAULT_LOG_LIMIT,
help="Number of lines to tail from log. -1 indicates fetching the whole file.",
help="Number of lines to tail from log. Use -1 to fetch the whole file.",
)

log_interval_option = click.option(
Expand Down Expand Up @@ -760,6 +760,27 @@ def object_summary(ctx, timeout: float, address: str):
),
)

log_encoding_option = click.option(
"--encoding",
required=False,
default="utf-8",
help=(
"The encoding use to decode the log file. Accepts any encoding "
"supported by Python's `codecs` module. Defaults to utf-8."
),
)

log_encoding_errors_option = click.option(
"--encoding-errors",
required=False,
default="strict",
help=(
"The error handling scheme to use for decoding errors. "
"Accepts any error handling scheme supported by Python's `codecs`"
"module. Defaults to strict."
),
)


def _get_head_node_ip(address: Optional[str] = None):
"""Get the head node ip from the ray address if possible
Expand Down Expand Up @@ -790,6 +811,8 @@ def _print_log(
timeout: int = DEFAULT_RPC_TIMEOUT,
interval: Optional[float] = None,
suffix: Optional[str] = None,
encoding: str = "utf-8",
encoding_errors: str = "strict",
):
"""Wrapper around `get_log()` that prints the preamble and the log lines"""
if tail > 0:
Expand All @@ -814,6 +837,8 @@ def _print_log(
_interval=interval,
timeout=timeout,
suffix=suffix,
encoding=encoding,
errors=encoding_errors,
):
print(chunk, end="", flush=True)

Expand Down Expand Up @@ -888,6 +913,8 @@ def resolve_command(self, ctx, args):
@log_tail_option
@log_interval_option
@log_timeout_option
@log_encoding_option
@log_encoding_errors_option
@click.pass_context
@PublicAPI(stability="alpha")
def log_cluster(
Expand All @@ -900,6 +927,8 @@ def log_cluster(
tail: int,
interval: float,
timeout: int,
encoding: str,
encoding_errors: str,
):
"""Get/List logs that matches the GLOB_FILTER in the cluster.
By default, it prints a list of log files that match the filter.
Expand Down Expand Up @@ -974,6 +1003,8 @@ def log_cluster(
follow=follow,
interval=interval,
timeout=timeout,
encoding=encoding,
encoding_errors=encoding_errors,
)


Expand Down
78 changes: 67 additions & 11 deletions python/ray/tests/test_state_api_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,40 @@ def verify():

wait_for_condition(verify)

##############################
# Test binary files and encodings.
##############################
# Write a binary file to ray log directory.
log_dir = ray._private.worker.global_worker.node.get_logs_dir_path()
file = "test.bin"
binary_file = os.path.join(log_dir, file)
with open(binary_file, "wb") as f:
data = bytearray(i for i in range(256))
f.write(data)

# Get the log
def verify():
for read in get_log(node_ip=head_node["node_ip"], filename=file, encoding=None):
assert read == data

# Default utf-8
for read in get_log(
node_ip=head_node["node_ip"], filename=file, errors="replace"
):
assert read == data.decode(encoding="utf-8", errors="replace")

for read in get_log(
node_ip=head_node["node_ip"],
filename=file,
encoding="iso-8859-1",
errors="replace",
):
assert read == data.decode(encoding="iso-8859-1", errors="replace")

return True

wait_for_condition(verify)


def test_log_cli(shutdown_only):
ray.init(num_cpus=1)
Expand All @@ -796,8 +830,7 @@ def test_log_cli(shutdown_only):
# Test the head node is chosen by default.
def verify():
result = runner.invoke(logs_state_cli_group, ["cluster"])
print(result.output)
assert result.exit_code == 0
assert result.exit_code == 0, result.exception
assert "raylet.out" in result.output
assert "raylet.err" in result.output
assert "gcs_server.out" in result.output
Expand All @@ -810,7 +843,6 @@ def verify():
def verify():
result = runner.invoke(logs_state_cli_group, ["cluster", "raylet.out"])
assert result.exit_code == 0
print(result.output)
assert "raylet.out" not in result.output
assert "raylet.err" not in result.output
assert "gcs_server.out" not in result.output
Expand All @@ -824,8 +856,7 @@ def verify():
# Test when there's more than 1 match, it prints a list of logs.
def verify():
result = runner.invoke(logs_state_cli_group, ["cluster", "raylet.*"])
assert result.exit_code == 0
print(result.output)
assert result.exit_code == 0, result.exception
assert "raylet.out" in result.output
assert "raylet.err" in result.output
assert "gcs_server.out" not in result.output
Expand All @@ -847,8 +878,7 @@ def __init__(self):

def verify():
result = runner.invoke(logs_state_cli_group, ["actor", "--id", actor_id])
assert result.exit_code == 0
print(result.output)
assert result.exit_code == 0, result.exception
assert ACTOR_LOG_LINE in result.output
return True

Expand All @@ -868,8 +898,7 @@ def worker_func():

def verify():
result = runner.invoke(logs_state_cli_group, ["worker", "--pid", pid])
assert result.exit_code == 0
print(result.output)
assert result.exit_code == 0, result.exception
assert WORKER_LOG_LINE in result.output
return True

Expand All @@ -878,8 +907,7 @@ def verify():
# Test `ray logs raylet.*` forwarding to `ray logs cluster raylet.*`
def verify():
result = runner.invoke(logs_state_cli_group, ["raylet.*"])
assert result.exit_code == 0
print(result.output)
assert result.exit_code == 0, result.exception
assert "raylet.out" in result.output
assert "raylet.err" in result.output
assert "gcs_server.out" not in result.output
Expand All @@ -888,6 +916,34 @@ def verify():

wait_for_condition(verify)

# Test binary binary files and encodings.
log_dir = ray._private.worker.global_worker.node.get_logs_dir_path()
file = "test.bin"
binary_file = os.path.join(log_dir, file)
with open(binary_file, "wb") as f:
data = bytearray(i for i in range(256))
f.write(data)

def verify():
# Tailing with lines is not supported for binary files, thus the `tail=-1`
result = runner.invoke(
logs_state_cli_group,
[
file,
"--encoding",
"iso-8859-1",
"--encoding-errors",
"replace",
"--tail",
"-1",
],
)
assert result.exit_code == 0, result.exception
assert result.output == data.decode(encoding="iso-8859-1", errors="replace")
return True

wait_for_condition(verify)


if __name__ == "__main__":
import sys
Expand Down

0 comments on commit e760d68

Please sign in to comment.