From 470fd3f1d651c046f1ce14c09804e3fcdb710381 Mon Sep 17 00:00:00 2001 From: Nikita Savelyev Date: Tue, 16 Jul 2024 16:03:55 +0200 Subject: [PATCH] Changed API for memory logger. Added tests. --- .github/workflows/precommit.yml | 15 ++ tests/tools/requirements.txt | 2 + tests/tools/test_memory_monitor.py | 174 +++++++++++++++ tools/memory_logger.py | 228 -------------------- tools/memory_monitor.py | 325 +++++++++++++++++++++++++++++ tools/requirements.txt | 1 + 6 files changed, 517 insertions(+), 228 deletions(-) create mode 100644 tests/tools/requirements.txt create mode 100644 tests/tools/test_memory_monitor.py delete mode 100644 tools/memory_logger.py create mode 100644 tools/memory_monitor.py diff --git a/.github/workflows/precommit.yml b/.github/workflows/precommit.yml index e72b0e32244..5cdc8acf906 100644 --- a/.github/workflows/precommit.yml +++ b/.github/workflows/precommit.yml @@ -229,3 +229,18 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} name: coverage_tensorflow flags: TENSORFLOW + tools: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6 + with: + lfs: true + - uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0 + with: + python-version: 3.8.18 + cache: pip + - name: Install NNCF and test requirements + run: | + pip install -r tests/tools/requirements.txt + - name: Run tools precommit test scope + run: pytest -ra tests/tools \ No newline at end of file diff --git a/tests/tools/requirements.txt b/tests/tools/requirements.txt new file mode 100644 index 00000000000..035dbe35f30 --- /dev/null +++ b/tests/tools/requirements.txt @@ -0,0 +1,2 @@ +matplotlib +pytest \ No newline at end of file diff --git a/tests/tools/test_memory_monitor.py b/tests/tools/test_memory_monitor.py new file mode 100644 index 00000000000..379627e8c2f --- /dev/null +++ b/tests/tools/test_memory_monitor.py @@ -0,0 +1,174 @@ +# Copyright (c) 2024 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import os +import sys +import time +from pathlib import Path + +import numpy as np +import pytest + +from tests.shared.isolation_runner import ISOLATION_RUN_ENV_VAR +from tests.shared.isolation_runner import run_pytest_case_function_in_separate_process +from tools.memory_monitor import MemoryMonitor +from tools.memory_monitor import MemoryType +from tools.memory_monitor import MemoryUnit +from tools.memory_monitor import monitor_memory_for_callable + +BYTES_TO_ALLOCATE_SMALL = 2**20 # 1 MiB +BYTES_TO_ALLOCATE_LARGE = 100 * 2**20 # 100 MiB +PREALLOCATE_DURATION = 0.5 +ALLOCATE_DURATION = 0.5 +DEALLOCATE_DURATION = 0.5 +BASELINE_MEMORY_VAR = "TEST_BASELINE_MEMORY" +TEMP_DIR_VAR = "TEST_TEMP_DIR" + + +def is_windows() -> bool: + return "win32" in sys.platform + + +def allocate(n_bytes, sleep_before_deallocation=False, sleep_after_deallocation=False): + if sleep_before_deallocation: + time.sleep(PREALLOCATE_DURATION) + data = np.ones((n_bytes,), dtype=np.uint8) + time.sleep(ALLOCATE_DURATION) + del data + gc.collect() + if sleep_after_deallocation: + time.sleep(DEALLOCATE_DURATION) + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +def test_memory_monitor_api(tmpdir): + tmpdir = Path(tmpdir) + + memory_monitor = MemoryMonitor().start() + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_monitor.stop() + time_values, memory_values = memory_monitor.get_data() + + filename_suffix1 = "_test1" + memory_monitor.save_memory_logs( + time_values, memory_values, tmpdir, plot_title="Test", filename_suffix=filename_suffix1 + ) + saved_files = tuple(tmpdir.glob("*")) + assert len(saved_files) == 2 + assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.txt"), saved_files)) + assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.png"), saved_files)) + + filename_suffix2 = "_test2" + txt_filepath = next(filter(lambda fn: str(fn).endswith(".txt"), saved_files)) + memory_monitor.save_memory_plot(txt_filepath, plot_title="Test re-plot", filename_suffix=filename_suffix2) + saved_files = list(tmpdir.glob("*")) + assert len(saved_files) == 3 + assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix2}.png"), saved_files)) + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +@pytest.mark.parametrize("memory_type", MemoryType.__members__.values()) +def test_memory_type(memory_type): + memory_monitor = MemoryMonitor(memory_type=memory_type).start() + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_monitor.stop() + memory_monitor.get_data() + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +@pytest.mark.parametrize("memory_unit", MemoryUnit.__members__.values()) +def test_memory_unit(memory_unit): + memory_monitor = MemoryMonitor(memory_unit=memory_unit).start() + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_monitor.stop() + memory_monitor.get_data() + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +@pytest.mark.parametrize("interval", (5e-2, 1e-1)) +def test_interval(interval): + memory_monitor = MemoryMonitor(interval=interval).start() + allocate(BYTES_TO_ALLOCATE_SMALL, sleep_before_deallocation=True, sleep_after_deallocation=True) + memory_monitor.stop() + time_values, memory_values = memory_monitor.get_data() + assert len(time_values) == len(memory_values) + assert len(time_values) == pytest.approx( + (PREALLOCATE_DURATION + ALLOCATE_DURATION + DEALLOCATE_DURATION) / interval, rel=1e-1 + ) + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +@pytest.mark.parametrize("max_value", (True, False)) +def test_monitor_memory_for_callable(max_value): + allocate_fn = lambda: allocate(BYTES_TO_ALLOCATE_SMALL) + fn_res, memory_data = monitor_memory_for_callable(allocate_fn, max_value=max_value) + assert fn_res is None + + if max_value: + assert isinstance(memory_data, float) + else: + time_values, memory_values = memory_data + assert len(time_values) == len(memory_values) + + +@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy") +def test_memory_values_isolated(): + baseline_memory = float(os.environ[BASELINE_MEMORY_VAR]) if BASELINE_MEMORY_VAR in os.environ else None + + memory_monitor = MemoryMonitor(memory_type=MemoryType.RSS, memory_unit=MemoryUnit.B).start() + bytes_to_allocate = 1 if baseline_memory is None else BYTES_TO_ALLOCATE_LARGE + allocate(bytes_to_allocate, sleep_before_deallocation=True, sleep_after_deallocation=True) + memory_monitor.stop() + _, memory_values = memory_monitor.get_data() + + if baseline_memory is None: + print("\nMax memory:", max(memory_values)) + else: + memory_values = list(map(lambda it: it - baseline_memory, memory_values)) + rel = 1e-2 + assert max(memory_values) == pytest.approx(BYTES_TO_ALLOCATE_LARGE, rel=rel) + assert abs(memory_values[0]) < BYTES_TO_ALLOCATE_LARGE * rel + assert abs(memory_values[-1]) < BYTES_TO_ALLOCATE_LARGE * rel + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +def test_memory_values(): + # The first run of the test collects the memory that is allocated by default + _, stdout, _ = run_pytest_case_function_in_separate_process(test_memory_values_isolated) + max_mem_line = next(filter(lambda s: "Max memory:" in s, stdout.split("\n"))) + baseline_memory = max_mem_line.split(" ")[-1] + + # The second run of the test checks that the added amount of memory is correctly represented by the memory monitor + os.environ[BASELINE_MEMORY_VAR] = baseline_memory + run_pytest_case_function_in_separate_process(test_memory_values_isolated) + del os.environ[BASELINE_MEMORY_VAR] + + +@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy") +def test_at_exit_isolated(): + memory_monitor = MemoryMonitor() + at_exit_fn = lambda: memory_monitor.save_memory_logs(*memory_monitor.get_data(), Path(os.environ[TEMP_DIR_VAR])) + memory_monitor.start(at_exit_fn=at_exit_fn) + allocate(BYTES_TO_ALLOCATE_SMALL) + + +@pytest.mark.skipif(is_windows(), reason="Check on linux only") +def test_at_exit(tmpdir): + os.environ[TEMP_DIR_VAR] = str(tmpdir) + run_pytest_case_function_in_separate_process(test_at_exit_isolated) + del os.environ[TEMP_DIR_VAR] + + tmpdir = Path(tmpdir) + saved_files = tuple(tmpdir.glob("*")) + assert len(saved_files) == 2 + assert any(map(lambda fn: str(fn).endswith(".txt"), saved_files)) + assert any(map(lambda fn: str(fn).endswith(".png"), saved_files)) diff --git a/tools/memory_logger.py b/tools/memory_logger.py deleted file mode 100644 index e29f338a5ef..00000000000 --- a/tools/memory_logger.py +++ /dev/null @@ -1,228 +0,0 @@ -# Copyright (c) 2024 Intel Corporation -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import atexit -import os -import queue -import subprocess -import threading -import time -from enum import Enum -from functools import partial -from pathlib import Path - -import matplotlib.pyplot as plt -import psutil - - -class MemoryType(Enum): - RSS = "rss" - RSS_TOP = "rss_top" - SYSTEM = "system" - SYSTEM_NORMALIZED = "system-normalized" - - -class MemoryUnit(Enum): - B = "B" # byte - KiB = "KiB" # Kibibyte - MiB = "MiB" # Mibibyte - GiB = "GiB" # Gibibyte - KB = "KB" # Kilobyte - MB = "MB" # Megabyte - GB = "GB" # Gigabyte - - -class MemoryLogger: - def __init__( - self, - log_dir: Path, - plot_title: str = "", - interval: float = 0.1, - memory_type: MemoryType = MemoryType.RSS, - memory_unit: MemoryUnit = MemoryUnit.MiB, - ): - """ - Memory logging utility to measure python process memory footprint. After start_logging() is called, it creates - a thread which runs in parallel and takes memory measurements every *interval* seconds using the specified - *memory_type* approach. When stop_logging() is called, the memory measuring thread is stopped and memory report - is saved into *log_dir* folder. There are two log files: one with data values in a .txt format and another one - in a form of a 2D time-memory plot. Calling stop_logging() is optional, because it will automatically be called - at program exit. The example usage is in the end of this file. - - :param log_dir: Directory where to save logged files. - :param plot_title: Title to put into plot title. - :param interval: How frequently to take memory measurements (seconds). - :param memory_type: Type of memory to log. Accepts four possible values: - - MemoryType.RSS: Resident Set Size is the portion of memory occupied by a process that is held in RAM. - Values are obtained through psutil library. If some data is read using mmap, RSS will report this data - as allocated, however this is not necessarily the case. - - MemoryType.RSS_TOP: The same type of memory as above, but the values are parsed from a table output of - the *top* command. Usually, reports the same values as RSS, but with worse resolution. - Note: There can be issues when measuring with RSS_TOP a python script which is run from IDE, in this case - it should be run from terminal. - - MemoryType.SYSTEM_NORMALIZED: This metric is defined as the difference between total system virtual memory - and system available memory. Be aware, that this way it is affected by other processes that can change - RAM availability. For the same reason it has to be normalized to not take into account memory taken by - other processed on th system. This is done by subtracting the starting memory from all values, so the - logging hast to be started int the beginning of the script to not miss any already allocated memory. - - MemoryType.SYSTEM: Same as above, but not normalized, i.e. will log state of the overall system memory. - - RSS and SYSTEM behave differently when mmap is used, e.g. during OV model loading. RSS will report data - which was read with mmap enabled as allocated, however this is not necessarily the case. SYSTEM* does not - report memory loaded with mmap. So it can be used to analyze "pure" memory usage without contribution of - mmap pages which are actually free, but are reported as allocated by RSS. - :param memory_unit: Unit to report memory in. - """ - self.log_dir = log_dir - self.plot_title = plot_title - self.interval = interval - self.memory_type = memory_type - self.memory_unit = memory_unit - - self._monitoring_thread_should_stop = False - self._monitoring_in_progress = False - - self._memory_monitor_thread = None - self._memory_data_queue = None - self._stop_logging_atexit_fn = None - - def start_logging(self): - if self._monitoring_in_progress: - raise Exception("Monitoring already in progress") - - self._memory_data_queue = queue.Queue() - self._monitoring_thread_should_stop = False - - self._memory_monitor_thread = threading.Thread(target=self._monitor_memory) - self._memory_monitor_thread.daemon = True - self._memory_monitor_thread.start() - self._stop_logging_atexit_fn = lambda: self.stop_logging() - atexit.register(self._stop_logging_atexit_fn) - - self._monitoring_in_progress = True - - return self - - def stop_logging(self): - self._monitoring_thread_should_stop = True - self._monitoring_in_progress = False - self._memory_monitor_thread.join() - self._log_memory_usage() - atexit.unregister(self._stop_logging_atexit_fn) - - def _log_memory_usage(self, filename_suffix=""): - memory_usage_data = list(self._memory_data_queue.queue) - time_data, memory_data = tuple(zip(*memory_usage_data)) - time_data = _subtract_first_element(list(time_data)) - if self.memory_type == MemoryType.SYSTEM_NORMALIZED: - memory_data = _subtract_first_element(list(memory_data)) - - # Convert to target memory unit - memory_data = list(map(partial(_cast_bytes_to, memory_unit=self.memory_unit), memory_data)) - - if not self.log_dir.exists(): - self.log_dir.mkdir(parents=True) - - filename_label = f"{self.memory_type.value}_memory_usage{filename_suffix}" - # Save measurements to file - with open(self.log_dir / f"{filename_label}.txt", "w") as log_file: - for timestamp, memory_usage in zip(time_data, memory_data): - log_file.write(f"{timestamp} {memory_usage:.3f}\n") - - log_file.writelines( - [ - f"Total time: {time_data[-1] - time_data[0]}\n", - f"Max memory: {max(memory_data):.3f} ({self.memory_unit.value})", - ] - ) - - # Save memory plot - fig = plt.figure(figsize=(10, 6)) - plt.plot(time_data, memory_data) - plt.xlabel("Time (seconds)") - plt.ylabel(f"Memory Usage ({self.memory_type.value}, {self.memory_unit.value})") - plt.title(self.plot_title) - plt.grid(True) - plt.tight_layout() - plt.savefig(self.log_dir / f"{filename_label}.png") - plt.close(fig) - - def _monitor_memory(self): - while not self._monitoring_thread_should_stop: - if self.memory_type == MemoryType.RSS: - bytes_used = psutil.Process().memory_info().rss - elif self.memory_type == MemoryType.RSS_TOP: - elem_filter = lambda it: "\x1b" not in it - new_line_delimiter = "\x1b(B\x1b[m\x1b[39;49m" - header_line = -3 - res_column = 5 # Resident Memory Size (KiB): The non-swapped physical memory a task is using. - - res = subprocess.run( - f"top -n 1 -p {os.getpid()}".split(" "), - capture_output=True, - text=True, - ) - stdout, _ = res.stdout, res.stderr - lines = stdout.split(new_line_delimiter) - if len(lines) < abs(header_line): - continue - assert tuple(filter(elem_filter, lines[header_line].split()))[res_column] == "RES" - line_elems = tuple(filter(elem_filter, lines[header_line + 1].split())) - res_data = line_elems[res_column] - if res_data.endswith("m") or res_data.endswith("g"): - float_value = float(res_data[:-1].replace(",", ".")) - bytes_used = float_value * 2 ** (30 if "g" in res_data else 20) - else: - bytes_used = float(res_data) * 2**10 - elif self.memory_type in [MemoryType.SYSTEM, MemoryType.SYSTEM_NORMALIZED]: - bytes_used = psutil.virtual_memory().total - psutil.virtual_memory().available - else: - raise Exception("Unknown memory type to log") - self._memory_data_queue.put((time.perf_counter(), bytes_used)) - time.sleep(self.interval) - - -def _cast_bytes_to(bytes, memory_unit, round_to_int=False): - memory_unit_divisors = { - MemoryUnit.B: 1, - MemoryUnit.KiB: 2**10, - MemoryUnit.MiB: 2**20, - MemoryUnit.GiB: 2**30, - MemoryUnit.KB: 10**3, - MemoryUnit.MB: 10**6, - MemoryUnit.GB: 10**9, - } - result = bytes / memory_unit_divisors[memory_unit] - return int(result) if round_to_int else result - - -def _subtract_first_element(data): - for i in range(1, len(data)): - data[i] = data[i] - data[0] - data[0] = 0 - return data - - -if __name__ == "__main__": - memory_loggers = [] - for memory_type in [MemoryType.RSS, MemoryType.SYSTEM_NORMALIZED]: - memory_loggers.append(MemoryLogger(Path("./logs"), memory_type=memory_type).start_logging()) - import numpy as np - from tqdm import tqdm - - a = [] - for i in tqdm(range(10)): - a.append(np.random.random((1 << 20,))) - time.sleep(2) - del a - time.sleep(2) - # Optional: - # map(lambda ml: ml.stop_logging(), memory_loggers) diff --git a/tools/memory_monitor.py b/tools/memory_monitor.py new file mode 100644 index 00000000000..c726eba147b --- /dev/null +++ b/tools/memory_monitor.py @@ -0,0 +1,325 @@ +# Copyright (c) 2024 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import atexit +import os +import queue +import subprocess +import threading +import time +from enum import Enum +from pathlib import Path +from typing import Callable, List, Optional, Tuple + +import matplotlib.pyplot as plt +import psutil + + +class MemoryType(Enum): + RSS = "rss" + RSS_TOP = "rss_top" + SYSTEM = "system" + + +class MemoryUnit(Enum): + B = "B" # byte + KiB = "KiB" # Kibibyte + MiB = "MiB" # Mibibyte + GiB = "GiB" # Gibibyte + KB = "KB" # Kilobyte + MB = "MB" # Megabyte + GB = "GB" # Gigabyte + + +class MemoryMonitor: + MemoryType = MemoryType + MemoryUnit = MemoryUnit + + def __init__( + self, + interval: float = 0.1, + memory_type: MemoryType = MemoryType.RSS, + memory_unit: MemoryUnit = MemoryUnit.MiB, + ): + """ + Memory monitoring utility to measure python process memory footprint. After start() is called, it + creates a thread which runs in parallel and takes memory measurements every *interval* seconds using the + specified *memory_type* approach. When stop() is called, the memory measuring thread is stopped. The results + can be obtained by calling get_data(). Memory logs can be saved by calling save_memory_logs(). There are two + log files: one with data values in a .txt format and another one in a form of a 2D time-memory plot. + + :param interval: How frequently to take memory measurements (in seconds). + :param memory_type: Type of memory to log. Accepts four possible values: + - MemoryType.RSS: Resident Set Size is the portion of memory occupied by a process that is held in RAM. + Values are obtained through psutil library. If some data is read using mmap, RSS will report this data + as allocated, however this is not necessarily the case. + - MemoryType.RSS_TOP: The same type of memory as above, but the values are parsed from a table output of + the *top* command. Usually, reports the same values as RSS, but with worse resolution. + Note: There can be issues when measuring with RSS_TOP a python script which is run from IDE, in this case + it should be run from terminal. + - MemoryType.SYSTEM: This metric is defined as the difference between total system virtual memory + and system available memory. Be aware, that this way it is affected by other processes that can change + RAM availability. It is advised to call get_data(memory_from_zero=True) for this type of memory logging, + if one is interested in memory footprint for a certain process. This subtracts the starting memory from + all values. + + RSS and SYSTEM behave differently when mmap is used, e.g. during OV model loading. RSS will report data + which was read with mmap enabled as allocated, however this is not necessarily the case. SYSTEM does not + report memory loaded with mmap. So it can be used to analyze "pure" memory usage without contribution of + mmap pages which are actually free, but are reported as allocated by RSS. + :param memory_unit: Unit to report memory in. + """ + self.interval = interval + self.memory_type = memory_type + self.memory_unit = memory_unit + + self._monitoring_thread_should_stop = False + self._monitoring_in_progress = False + + self._memory_monitor_thread = None + self._memory_values_queue = None + self._stop_logging_atexit_fn = None + + def start(self, at_exit_fn: Optional[Callable] = None) -> "MemoryMonitor": + """ + Start memory monitoring. + + :param at_exit_fn: A callable to execute at program exit. Useful fot providing logs saving routine, e.g. + ``` + at_exit_fn = lambda: memory_monitor.save_memory_logs(*memory_monitor.get_data(), save_dir) + memory_monitor.start(at_exit_fn=at_exit_fn) + ``` + """ + if self._monitoring_in_progress: + raise Exception("Monitoring already in progress") + + self._memory_values_queue = queue.Queue() + self._monitoring_thread_should_stop = False + + self._memory_monitor_thread = threading.Thread(target=self._monitor_memory) + self._memory_monitor_thread.daemon = True + self._memory_monitor_thread.start() + if at_exit_fn: + self._stop_logging_atexit_fn = at_exit_fn + atexit.register(self._stop_logging_atexit_fn) + + self._monitoring_in_progress = True + + return self + + def stop(self): + """ + Stop memory monitoring. + """ + if not self._monitoring_in_progress: + return + self._monitoring_thread_should_stop = True + self._monitoring_in_progress = False + self._memory_monitor_thread.join() + if self._stop_logging_atexit_fn is not None: + atexit.unregister(self._stop_logging_atexit_fn) + self._stop_logging_atexit_fn = None + + def get_data(self, memory_from_zero: Optional[bool] = False) -> Tuple[List, List]: + """ + :param memory_from_zero: Whether to normalize memory measurements by subtracting the first value. This way + the measurements will start with 0. Hence, is not very reliable and may actually result in negative values. + :returns: A tuple of list where the first element corresponds to measurements timestamps and the second one -- + to memory values. + """ + memory_usage_data = list(self._memory_values_queue.queue) + time_values, memory_values = tuple(zip(*memory_usage_data)) + time_values = _subtract_first_element(list(time_values)) + if memory_from_zero: + memory_values = _subtract_first_element(list(memory_values)) + + # Convert to target memory unit + memory_values = list(map(partial(_cast_bytes_to, memory_unit=self.memory_unit), memory_values)) + + return time_values, memory_values + + def save_memory_logs( + self, + time_values: List[float], + memory_values: List[float], + save_dir: Path, + plot_title: Optional[str] = "", + filename_suffix: Optional[str] = "", + ): + """ + Save memory logs as a text file and a 2D plot. + + :param time_values: Timestamps of the memory measurements. + :param memory_values: Memory measurements. + :param save_dir: Directory to save logs into. + :param plot_title: A title for a plot. + :param filename_suffix: A string suffix to give to the saved files. + """ + if not save_dir.exists(): + save_dir.mkdir(parents=True) + + filename_label = f"{self.memory_type.value}_memory_usage{filename_suffix}" + # Save measurements to text file + log_filepath = save_dir / f"{filename_label}.txt" + with open(log_filepath, "w") as log_file: + for timestamp, memory_usage in zip(time_values, memory_values): + log_file.write(f"{timestamp} {memory_usage:.3f}\n") + + log_file.writelines( + [ + f"Total time: {time_values[-1] - time_values[0]}\n", + f"Max memory: {max(memory_values):.3f} ({self.memory_unit.value})", + ] + ) + + # Save measurements plot + self.save_memory_plot(log_filepath, plot_title) + + def save_memory_plot(self, log_filepath: Path, plot_title: Optional[str] = "", filename_suffix: Optional[str] = ""): + """ + Parse pre-saved txt file logs and plot a new figure based on this data. May be useful for re-plotting with + different title. + + :param log_filepath: A path to a .txt log file. + :param plot_title: A title to give to a plot. + :param filename_suffix: A string suffix to give to the saved figure. + """ + with open(log_filepath, "r") as f: + lines = f.readlines() + time_values, memory_values = [], [] + for line in lines[:-2]: + time_value, memory_value = tuple(map(float, line.split(" "))) + time_values.append(time_value) + memory_values.append(memory_value) + + fig = plt.figure(figsize=(10, 6)) + plt.plot(time_values, memory_values) + plt.xlabel("Time (seconds)") + plt.ylabel(f"Memory Usage ({self.memory_type.value}, {self.memory_unit.value})") + plt.title(plot_title) + plt.grid(True) + plt.tight_layout() + plt.savefig(str(log_filepath).replace(".txt", f"{filename_suffix}.png")) + plt.close(fig) + + def _monitor_memory(self): + while not self._monitoring_thread_should_stop: + if self.memory_type == MemoryType.RSS: + bytes_used = psutil.Process().memory_info().rss + # print(_cast_bytes_to(bytes_used, MemoryUnit.MiB)) + elif self.memory_type == MemoryType.RSS_TOP: + elem_filter = lambda it: "\x1b" not in it + new_line_delimiter = "\x1b(B\x1b[m\x1b[39;49m" + header_line = -3 + res_column = 5 # Resident Memory Size (KiB): The non-swapped physical memory a task is using. + + res = subprocess.run( + f"top -n 1 -p {os.getpid()}".split(" "), + capture_output=True, + text=True, + ) + stdout, _ = res.stdout, res.stderr + lines = stdout.split(new_line_delimiter) + if len(lines) < abs(header_line): + continue + assert tuple(filter(elem_filter, lines[header_line].split()))[res_column] == "RES" + line_elems = tuple(filter(elem_filter, lines[header_line + 1].split())) + res_data = line_elems[res_column] + if res_data.endswith("m") or res_data.endswith("g"): + float_value = float(res_data.replace(",", ".")) + bytes_used = float_value * 2 ** (30 if "g" in res_data else 20) + else: + bytes_used = float(res_data) * 2**10 + elif self.memory_type == MemoryType.SYSTEM: + bytes_used = psutil.virtual_memory().total - psutil.virtual_memory().available + else: + raise Exception("Unknown memory type to log") + self._memory_values_queue.put((time.perf_counter(), bytes_used)) + time.sleep(self.interval) + + +def monitor_memory_for_callable( + f: Callable, + interval: Optional[float] = 0.1, + memory_type: Optional[MemoryType] = MemoryType.RSS, + memory_unit: Optional[MemoryUnit] = MemoryUnit.MiB, + max_value: Optional[bool] = False, +): + """ + Monitor memory from the start to the end of execution of some callable function. + Works by subtracting the first memory measurement from all the other ones so that the resulting sequence starts + from 0. Hence, is not very reliable and may actually result in negative values. + + :param f: A callable to monitor. + :param interval: Interval in seconds to take measurements. + :param memory_type: Memory type. + :param memory_unit: Memory unit. + :param max_value: If True, only the maximum value will be reported. Otherwise, a tuple containing two list for + timestamps and memory values is returned. + :returns: If max_value=False, returns a tuple containing two list for timestamps and memory values. Otherwise, + returns a single float -- the maximum memory recorded. + """ + memory_monitor = MemoryMonitor(interval, memory_type, memory_unit) + memory_monitor.start() + f_result = f() + memory_monitor.stop() + + time_values, memory_values = memory_monitor.get_data(memory_from_zero=True) + if max_value: + return f_result, max(memory_values) + return f_result, (time_values, memory_values) + + +def _cast_bytes_to(bytes, memory_unit, round_to_int=False): + memory_unit_divisors = { + MemoryUnit.B: 1, + MemoryUnit.KiB: 2**10, + MemoryUnit.MiB: 2**20, + MemoryUnit.GiB: 2**30, + MemoryUnit.KB: 10**3, + MemoryUnit.MB: 10**6, + MemoryUnit.GB: 10**9, + } + result = bytes / memory_unit_divisors[memory_unit] + return int(result) if round_to_int else result + + +def _subtract_first_element(data): + for i in range(1, len(data)): + data[i] = data[i] - data[0] + data[0] = 0 + return data + + +if __name__ == "__main__": + # Example usage + + from functools import partial + + import numpy as np + from tqdm import tqdm + + def log(mm, fz): + mm.save_memory_logs( + *mm.get_data(memory_from_zero=fz), save_dir=Path("memory_logs"), filename_suffix="_from-zero" if fz else "" + ) + + memory_monitors = [] + for memory_type, mem_from_zero in [(MemoryType.RSS, False), (MemoryType.SYSTEM, False), (MemoryType.SYSTEM, True)]: + memory_monitor = MemoryMonitor(memory_type=memory_type) + memory_monitor.start(at_exit_fn=partial(log, memory_monitor, mem_from_zero)) + + a = [] + for i in tqdm(range(10)): + a.append(np.random.random((1 << 20,))) + time.sleep(1) + del a + time.sleep(1) diff --git a/tools/requirements.txt b/tools/requirements.txt index 8f57ab34ca9..e31590a00dd 100644 --- a/tools/requirements.txt +++ b/tools/requirements.txt @@ -1 +1,2 @@ mdutils +matplotlib<3.6 \ No newline at end of file