Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add continuous collection of profiles in java profiler #730

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
36224e6
Add continuous collection of profiles in java profiler
marcin-ol Mar 20, 2023
eca2e20
Fix lint
marcin-ol Mar 20, 2023
af2c036
Check if target process is running before attempting to stop async-pr…
marcin-ol Mar 21, 2023
e7d358c
Test fixes
marcin-ol Mar 21, 2023
d9b7f24
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Mar 21, 2023
1b222de
Fix cleanup when jattach socket gets unavailable
marcin-ol Mar 22, 2023
619bb73
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Mar 23, 2023
c741557
Make profiling continuous with async-profiler's recycle feature
marcin-ol Mar 23, 2023
9b30743
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Mar 23, 2023
4e4b047
Adjust stacks for cumulative outcomes from dump
marcin-ol Mar 23, 2023
cac997d
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Mar 23, 2023
ea38081
Adjust output of each profiled java process independently
marcin-ol Mar 23, 2023
20d7840
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Jul 10, 2023
2feb8b7
Use updated async-profiler, synced with 2.10
marcin-ol Jul 14, 2023
eb14300
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Jul 17, 2023
b7d0f32
Reset traces when calling dump
marcin-ol Jul 19, 2023
6fec95c
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Jul 20, 2023
93e47ce
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Jul 20, 2023
11e20b2
Make java profiling truly continuous, invoking only dump
marcin-ol Jul 24, 2023
9c27886
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Jul 24, 2023
64d3309
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Aug 2, 2023
f440822
Merge remote-tracking branch 'upstream/master' into java-ap-continuou…
marcin-ol Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 149 additions & 19 deletions gprofiler/profilers/java.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import re
import secrets
import signal
from enum import Enum
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
from subprocess import CompletedProcess
from threading import Event, Lock
Expand Down Expand Up @@ -444,6 +445,27 @@ def get_ap_version() -> str:
r"KB\n\n"
)

# Error reported by dump action, when profiler has not started; message defined in source:
# https://github.com/async-profiler/async-profiler/blob/v2.10/src/profiler.cpp#L1195
_DUMP_ERROR_PROFILER_NOT_STARTED_RE = re.compile(r"\[ERROR\] Profiler has not started\n")

# Status reported by async profiler when it's started. Defined here:
# https://github.com/async-profiler/async-profiler/blob/e3b7bfca227ae5c916f00abfacf0e61291df3a67/src/profiler.cpp#L1541
_AP_STATUS_RUNNING_RE = re.compile(r"Profiling is running for (?P<uptime>\d+) seconds\n")


class AsyncProfilerStartStatus(Enum):
NOT_RUNNING = auto()
STARTED_BY_US = auto()
ALREADY_RUNNING = auto()


@dataclass(frozen=True)
class AsyncProfilerStatus:
message: Optional[str]
running: bool
uptime: int = 0


class AsyncProfiledProcess:
"""
Expand Down Expand Up @@ -522,6 +544,10 @@ def __init__(
self._collect_meminfo = collect_meminfo
self._include_method_modifiers = ",includemm" if include_method_modifiers else ""
self._include_line_numbers = ",includeln" if java_line_numbers == "line-of-function" else ""
self._started_by_us = False
self._setup_state: Optional[bool] = None
self._jattach_socket_disabled = False
self._start_status = AsyncProfilerStartStatus.NOT_RUNNING

def _find_rw_exec_dir(self) -> str:
"""
Expand Down Expand Up @@ -550,6 +576,11 @@ def _find_rw_exec_dir(self) -> str:
)

def __enter__(self: T) -> T:
self.setup()
return self

def setup(self) -> None:
assert self._setup_state is None, "AsyncProfilerProcess must not be setup twice"
# create the directory structure for executable libap, make sure it's owned by root
# for sanity & simplicity, mkdir_owned_root() does not support creating parent directories, as this allows
# the caller to absentmindedly ignore the check of the parents ownership.
Expand All @@ -569,20 +600,24 @@ def __enter__(self: T) -> T:
self._recreate_log()
# copy libasyncProfiler.so if needed
self._copy_libap()

return self
self._setup_state = True

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_ctb: Optional[TracebackType],
) -> None:
self.teardown()

def teardown(self) -> None:
assert self._setup_state is True, "AsyncProfilerProcess must not run teardown twice"
# ignore_errors because we are deleting paths via /proc/pid/root - and the pid
# we're using might have gone down already.
# remove them as best effort.
remove_path(self._output_path_host, missing_ok=True)
remove_path(self._log_path_host, missing_ok=True)
self._setup_state = False

def _existing_realpath(self, path: str) -> Optional[str]:
"""
Expand Down Expand Up @@ -664,6 +699,9 @@ def _get_interval_arg(self, interval: int) -> str:
return f",alloc={interval}"
return f",interval={interval}"

def _get_ap_recycle_args(self, ap_timeout: int) -> str:
return f",recycle,timeout={ap_timeout}"

def _get_start_cmd(self, interval: int, ap_timeout: int) -> List[str]:
return self._get_base_cmd() + [
f"start,event={self._mode}"
Expand All @@ -682,6 +720,20 @@ def _get_stop_cmd(self, with_output: bool) -> List[str]:
f"{self._get_extra_ap_args()}"
]

def _get_dump_cmd(self, ap_timeout: int = 0) -> List[str]:
return self._get_base_cmd() + [
f"dump,"
# reset trace after dump to start each profiling cycle with zeroed counters
"resettrace,"
# make profiler check it's active (not stopped due to timeout) when running dump
"dumpactive,"
f"log={self._log_path_process}"
f"{self._get_ap_output_args()}"
f"{self._get_ap_recycle_args(ap_timeout)}"
f"{',lib' if self._profiler_state.insert_dso_name else ''}{',meminfolog' if self._collect_meminfo else ''}"
f"{self._get_extra_ap_args()}"
]

def _read_ap_log(self) -> str:
if not os.path.exists(self._log_path_host):
return "(log file doesn't exist)"
Expand Down Expand Up @@ -719,6 +771,8 @@ def _run_async_profiler(self, cmd: List[str]) -> str:
raise JattachTimeout(*args, timeout=self._jattach_timeout) from None
elif e.stderr == "Could not start attach mechanism: No such file or directory\n":
# this is true for jattach_hotspot
# in this case socket won't be recreated and jattach calls are useless
self._jattach_socket_disabled = True
raise JattachSocketMissingException(*args) from None
else:
raise JattachException(*args) from None
Expand Down Expand Up @@ -750,9 +804,11 @@ def _run_fdtransfer(self) -> None:
timeout=self._FDTRANSFER_TIMEOUT,
)

def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeout: int = 0) -> bool:
def start_async_profiler(
self, interval: int, second_try: bool = False, ap_timeout: int = 0
) -> AsyncProfilerStartStatus:
"""
Returns True if profiling was started; False if it was already started.
Returns STARTED_BY_US if profiling was started; ALREADY_RUNNING if it was already started.
ap_timeout defaults to 0, which means "no timeout" for AP (see call to startTimer() in profiler.cpp)
"""
if self._mode == "cpu" and not second_try:
Expand All @@ -761,19 +817,28 @@ def start_async_profiler(self, interval: int, second_try: bool = False, ap_timeo
start_cmd = self._get_start_cmd(interval, ap_timeout)
try:
self._run_async_profiler(start_cmd)
return True
self._start_status = AsyncProfilerStartStatus.STARTED_BY_US
return self._start_status
except JattachException as e:
if e.is_ap_loaded:
if (
e.returncode == 200 # 200 == AP's COMMAND_ERROR
and e.get_ap_log() == "[ERROR] Profiler already started\n"
):
# profiler was already running
return False
self._start_status = AsyncProfilerStartStatus.ALREADY_RUNNING
return self._start_status
raise

def stop_async_profiler(self, with_output: bool) -> str:
return self._run_async_profiler(self._get_stop_cmd(with_output))
output: str = self._run_async_profiler(self._get_stop_cmd(with_output))
self._start_status = AsyncProfilerStartStatus.NOT_RUNNING
return output

def reset_start_status(self) -> None:
self._start_status = AsyncProfilerStartStatus.NOT_RUNNING

def dump_from_async_profiler(self, ap_timeout: int = 0) -> str:
return self._run_async_profiler(self._get_dump_cmd(ap_timeout))

def read_output(self) -> Optional[str]:
try:
Expand All @@ -784,6 +849,12 @@ def read_output(self) -> Optional[str]:
return None
raise

def is_started_by_us(self) -> bool:
return self._start_status == AsyncProfilerStartStatus.STARTED_BY_US

def is_jattach_socket_disabled(self) -> bool:
return self._jattach_socket_disabled


@register_profiler(
"Java",
Expand Down Expand Up @@ -976,6 +1047,8 @@ def __init__(
self._java_full_hserr = java_full_hserr
self._include_method_modifiers = java_include_method_modifiers
self._java_line_numbers = java_line_numbers
# keep ap_process instances to enable continuous profiling
self._pid_to_ap_proc: Dict[int, Optional[AsyncProfiledProcess]] = {}

def _init_ap_mode(self, profiling_mode: str, ap_mode: str) -> None:
assert profiling_mode in ("cpu", "allocation"), "async-profiler support only cpu/allocation profiling modes"
Expand Down Expand Up @@ -1160,6 +1233,37 @@ def _check_async_profiler_loaded(self, process: Process) -> bool:

return False

def _get_ap_proc(
self,
process: Process,
profiler_state: ProfilerState,
mode: str,
ap_safemode: int,
ap_args: str,
jattach_timeout: int,
mcache: int,
collect_meminfo: bool,
include_method_modifiers: bool = False,
java_line_numbers: str = "none",
) -> AsyncProfiledProcess:
ap_proc: Optional[AsyncProfiledProcess] = self._pid_to_ap_proc.get(process.pid, None)
if ap_proc is None:
ap_proc = AsyncProfiledProcess(
process,
profiler_state,
mode,
ap_safemode,
ap_args,
jattach_timeout,
mcache,
collect_meminfo,
include_method_modifiers,
java_line_numbers,
)
ap_proc.setup()
self._pid_to_ap_proc[process.pid] = ap_proc
return ap_proc

def _profile_process(self, process: Process, duration: int, spawned: bool) -> ProfileData:
comm = process_comm(process)
exe = process_exe(process)
Expand Down Expand Up @@ -1201,7 +1305,7 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr
logger.debug("Process paths", pid=process.pid, execfn=execfn, exe=exe)
logger.debug("Process mapped files", pid=process.pid, maps=set(m.path for m in process.memory_maps()))

with AsyncProfiledProcess(
ap_proc = self._get_ap_proc(
process,
self._profiler_state,
self._mode,
Expand All @@ -1212,8 +1316,8 @@ def _profile_process(self, process: Process, duration: int, spawned: bool) -> Pr
self._report_meminfo,
self._include_method_modifiers,
self._java_line_numbers,
) as ap_proc:
stackcollapse = self._profile_ap_process(ap_proc, comm, duration)
)
stackcollapse = self._profile_ap_process(ap_proc, comm, duration)

return ProfileData(stackcollapse, appid, app_metadata, container_name)

Expand All @@ -1234,9 +1338,13 @@ def _log_mem_usage(ap_log: str, pid: int) -> None:
pid=pid,
)

def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount:
def _prepare_async_profiler(self, ap_proc: AsyncProfiledProcess) -> AsyncProfilerStartStatus:
# first time we need to start profiler ourselves
# after that we rely on async-prpofiler dump and recycle functions to signal failed dump
if ap_proc.is_started_by_us():
return AsyncProfilerStartStatus.STARTED_BY_US
started = ap_proc.start_async_profiler(self._interval, ap_timeout=self._ap_timeout)
if not started:
if started == AsyncProfilerStartStatus.ALREADY_RUNNING:
logger.info(f"Found async-profiler already started on {ap_proc.process.pid}, trying to stop it...")
# stop, and try to start again. this might happen if AP & gProfiler go out of sync: for example,
# gProfiler being stopped brutally, while AP keeps running. If gProfiler is later started again, it will
Expand All @@ -1245,10 +1353,14 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration
# surely does.
ap_proc.stop_async_profiler(with_output=False)
started = ap_proc.start_async_profiler(self._interval, second_try=True, ap_timeout=self._ap_timeout)
if not started:
if started == AsyncProfilerStartStatus.ALREADY_RUNNING:
raise Exception(
f"async-profiler is still running in {ap_proc.process.pid}, even after trying to stop it!"
)
return AsyncProfilerStartStatus.STARTED_BY_US

def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration: int) -> StackToSampleCount:
self._prepare_async_profiler(ap_proc)

try:
wait_event(
Expand All @@ -1263,18 +1375,27 @@ def _profile_ap_process(self, ap_proc: AsyncProfiledProcess, comm: str, duration
logger.debug(f"Profiled process {ap_proc.process.pid} exited before stopping async-profiler")
# fall-through - try to read the output, since async-profiler writes it upon JVM exit.
finally:
# don't stop profiler now; will do it in stop()
if is_process_running(ap_proc.process):
ap_log = ap_proc.stop_async_profiler(True)
if self._report_meminfo:
self._log_mem_usage(ap_log, ap_proc.process.pid)
try:
ap_log = ap_proc.dump_from_async_profiler(self._ap_timeout)
if self._report_meminfo:
self._log_mem_usage(ap_log, ap_proc.process.pid)
except JattachException as e:
if _DUMP_ERROR_PROFILER_NOT_STARTED_RE.search(e.get_ap_log()) is not None:
logger.warning(f"Profiler for process {ap_proc.process.pid} wasn't ready for collecting stacks")
ap_proc.reset_start_status()
return self._profiling_error_stack("error", "profiler wasn't ready for collecting stacks", comm)
raise

output = ap_proc.read_output()
if output is None:
logger.warning(f"Profiled process {ap_proc.process.pid} exited before reading the output")
return self._profiling_error_stack("error", "process exited before reading the output", comm)
else:
logger.info(f"Finished profiling process {ap_proc.process.pid}")
return parse_one_collapsed(output, comm)
dumped_stacks = parse_one_collapsed(output, comm)
return dumped_stacks

def _check_hotspot_error(self, ap_proc: AsyncProfiledProcess) -> None:
pid = ap_proc.process.pid
Expand Down Expand Up @@ -1334,6 +1455,9 @@ def stop(self) -> None:
if self._enabled_proc_events_java:
proc_events.unregister_exit_callback(self._proc_exit_callback)
self._enabled_proc_events_java = False
pids_to_remove = list(self._pid_to_ap_proc.keys())
for pid in pids_to_remove:
self._stop_profiling_for_pid(pid)
super().stop()

def _proc_exit_callback(self, tid: int, pid: int, exit_code: int) -> None:
Expand Down Expand Up @@ -1402,6 +1526,11 @@ def _handle_new_kernel_messages(self) -> None:
else:
self._handle_kernel_messages(messages)

def _stop_profiling_for_pid(self, pid: int) -> None:
ap_proc = self._pid_to_ap_proc.pop(pid, None)
if ap_proc is not None:
ap_proc.teardown()

def snapshot(self) -> ProcessToProfileData:
try:
return super().snapshot()
Expand All @@ -1411,4 +1540,5 @@ def snapshot(self) -> ProcessToProfileData:
self._want_to_profile_pids -= self._pids_to_remove
for pid in self._pids_to_remove:
self._pid_to_java_version.pop(pid, None)
self._stop_profiling_for_pid(pid)
self._pids_to_remove.clear()
6 changes: 3 additions & 3 deletions scripts/async_profiler_build_shared.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#
set -euo pipefail

VERSION=v2.10g2
GIT_REV="40b850a4101756bc398051661d1adbbe5d7e2211"
VERSION=timeout-recycle
GIT_REV="0107623aee6b96e081bf9b12bb964d6533fff1db"

git clone --depth 1 -b "$VERSION" https://github.com/Granulate/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV"
git clone --depth 1 -b "$VERSION" https://github.com/marcin-ol/async-profiler.git && cd async-profiler && git reset --hard "$GIT_REV"
make all

# add a version file to the build directory
Expand Down
Loading
Loading