diff --git a/.github/workflows/precommit.yml b/.github/workflows/precommit.yml index e72b0e32244..0af09cbeae7 100644 --- a/.github/workflows/precommit.yml +++ b/.github/workflows/precommit.yml @@ -229,3 +229,18 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} name: coverage_tensorflow flags: TENSORFLOW + tools: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6 + with: + lfs: true + - uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0 + with: + python-version: 3.8.18 + cache: pip + - name: Install NNCF and test requirements + run: | + pip install -r tests/tools/requirements.txt + - name: Run tools precommit test scope + run: PYTHONPATH=./ pytest -ra tests/tools diff --git a/tests/post_training/conftest.py b/tests/post_training/conftest.py index 0cc92c29866..04947f79e6c 100644 --- a/tests/post_training/conftest.py +++ b/tests/post_training/conftest.py @@ -29,3 +29,9 @@ def pytest_addoption(parser): action="store_true", help="Add additional columns to reports.csv", ) + parser.addoption( + "--memory-monitor", + action="store_true", + help="Report memory using MemoryMonitor from tools/memory_monitor.py. " + "Warning: currently, reported memory values are not always reproducible.", + ) diff --git a/tests/post_training/pipelines/base.py b/tests/post_training/pipelines/base.py index c74569da3ee..519668a491a 100644 --- a/tests/post_training/pipelines/base.py +++ b/tests/post_training/pipelines/base.py @@ -9,6 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import datetime as dt +import gc import os import re import time @@ -30,6 +31,9 @@ import nncf from nncf import TargetDevice from tests.shared.command import Command +from tools.memory_monitor import MemoryType +from tools.memory_monitor import MemoryUnit +from tools.memory_monitor import memory_monitor_context DEFAULT_VAL_THREADS = 4 @@ -139,6 +143,8 @@ class RunInfo: metric_value: Optional[float] = None metric_diff: Optional[float] = None compression_memory_usage: Optional[int] = None + compression_memory_usage_rss: Optional[int] = None + compression_memory_usage_system: Optional[int] = None status: Optional[str] = None fps: Optional[float] = None time_total: Optional[float] = None @@ -159,7 +165,7 @@ def format_memory_usage(memory): return int(memory) def get_result_dict(self): - return { + result = { "Model": self.model, "Backend": self.backend.value if self.backend else None, "Metric name": self.metric_name, @@ -168,7 +174,6 @@ def get_result_dict(self): "Num FQ": self.num_compress_nodes.num_fq_nodes, "Num int4": self.num_compress_nodes.num_int4, "Num int8": self.num_compress_nodes.num_int8, - "RAM MiB": self.format_memory_usage(self.compression_memory_usage), "Compr. time": self.format_time(self.time_compression), **self.stats_from_output.get_stats(), "Total time": self.format_time(self.time_total), @@ -176,6 +181,15 @@ def get_result_dict(self): "Status": self.status[:LIMIT_LENGTH_OF_STATUS] if self.status is not None else None, } + if self.compression_memory_usage_rss is None and self.compression_memory_usage_system is None: + result["RAM MiB"] = self.format_memory_usage(self.compression_memory_usage) + if self.compression_memory_usage_rss is not None: + result["RAM MiB RSS"] = self.format_memory_usage(self.compression_memory_usage_rss) + if self.compression_memory_usage_system is not None: + result["RAM MiB System"] = self.format_memory_usage(self.compression_memory_usage_system) + + return result + class BaseTestPipeline(ABC): """ @@ -195,6 +209,7 @@ def __init__( run_benchmark_app: bool, params: dict = None, batch_size: int = 1, + memory_monitor: bool = False, ) -> None: self.reported_name = reported_name self.model_id = model_id @@ -205,6 +220,7 @@ def __init__( self.reference_data = reference_data self.params = params or {} self.batch_size = batch_size + self.memory_monitor = memory_monitor self.no_eval = no_eval self.run_benchmark_app = run_benchmark_app self.output_model_dir: Path = self.output_dir / self.reported_name / self.backend.value @@ -352,7 +368,19 @@ def compress(self) -> None: torch.set_num_threads(int(inference_num_threads)) start_time = time.perf_counter() - self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True) + if self.memory_monitor: + gc.collect() + with memory_monitor_context( + interval=0.1, + memory_unit=MemoryUnit.MiB, + return_max_value=True, + save_dir=self.output_model_dir / "ptq_memory_logs", + ) as mmc: + self._compress() + self.run_info.compression_memory_usage_rss = mmc.memory_data[MemoryType.RSS] + self.run_info.compression_memory_usage_system = mmc.memory_data[MemoryType.SYSTEM] + else: + self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True) self.run_info.time_compression = time.perf_counter() - start_time def save_compressed_model(self) -> None: diff --git a/tests/post_training/pipelines/lm_weight_compression.py b/tests/post_training/pipelines/lm_weight_compression.py index 27479fe6a50..757232f4b63 100644 --- a/tests/post_training/pipelines/lm_weight_compression.py +++ b/tests/post_training/pipelines/lm_weight_compression.py @@ -8,7 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import gc import os import re import shutil @@ -32,6 +32,9 @@ from tests.post_training.pipelines.base import BaseTestPipeline from tests.post_training.pipelines.base import StatsFromOutput from tests.shared.paths import TEST_ROOT +from tools.memory_monitor import MemoryType +from tools.memory_monitor import MemoryUnit +from tools.memory_monitor import memory_monitor_context @dataclass @@ -178,7 +181,19 @@ def compress(self) -> None: print("Weight compression...") start_time = time.perf_counter() - self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True) + if self.memory_monitor: + gc.collect() + with memory_monitor_context( + interval=0.1, + memory_unit=MemoryUnit.MiB, + return_max_value=True, + save_dir=self.output_model_dir / "wc_memory_logs", + ) as mmc: + self._compress() + self.run_info.compression_memory_usage_rss = mmc.memory_data[MemoryType.RSS] + self.run_info.compression_memory_usage_system = mmc.memory_data[MemoryType.SYSTEM] + else: + self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True) self.run_info.time_compression = time.perf_counter() - start_time def collect_data_from_stdout(self, stdout: str): diff --git a/tests/post_training/test_quantize_conformance.py b/tests/post_training/test_quantize_conformance.py index 368664de3da..de3b0bc96d1 100644 --- a/tests/post_training/test_quantize_conformance.py +++ b/tests/post_training/test_quantize_conformance.py @@ -80,6 +80,11 @@ def fixture_extra_columns(pytestconfig): return pytestconfig.getoption("extra_columns") +@pytest.fixture(scope="session", name="memory_monitor") +def fixture_memory_monitor(pytestconfig): + return pytestconfig.getoption("memory_monitor") + + def _parse_version(s: Path): version_str = re.search(r".*_(\d+\.\d+).(?:yaml|yml)", s.name).group(1) return version.parse(version_str) @@ -242,6 +247,7 @@ def test_ptq_quantization( run_benchmark_app: bool, capsys: pytest.CaptureFixture, extra_columns: bool, + memory_monitor: bool, ): pipeline = None err_msg = None @@ -267,6 +273,7 @@ def test_ptq_quantization( "no_eval": no_eval, "run_benchmark_app": run_benchmark_app, "batch_size": batch_size, + "memory_monitor": memory_monitor, } ) pipeline: BaseTestPipeline = pipeline_cls(**pipeline_kwargs) @@ -311,6 +318,7 @@ def test_weight_compression( run_benchmark_app: bool, capsys: pytest.CaptureFixture, extra_columns: bool, + memory_monitor: bool, ): pipeline = None err_msg = None @@ -330,6 +338,7 @@ def test_weight_compression( "no_eval": no_eval, "run_benchmark_app": run_benchmark_app, "batch_size": batch_size, + "memory_monitor": memory_monitor, } ) pipeline: BaseTestPipeline = pipeline_cls(**pipeline_kwargs) diff --git a/tests/tools/requirements.txt b/tests/tools/requirements.txt new file mode 100644 index 00000000000..9960ef502a6 --- /dev/null +++ b/tests/tools/requirements.txt @@ -0,0 +1,3 @@ +matplotlib +psutil +pytest diff --git a/tests/tools/test_memory_monitor.py b/tests/tools/test_memory_monitor.py new file mode 100644 index 00000000000..c8573e2f71a --- /dev/null +++ b/tests/tools/test_memory_monitor.py @@ -0,0 +1,185 @@ +# Copyright (c) 2024 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import os +import queue +import sys +import time +from pathlib import Path + +import numpy as np +import pytest + +from tests.shared.isolation_runner import ISOLATION_RUN_ENV_VAR +from tests.shared.isolation_runner import run_pytest_case_function_in_separate_process +from tools.memory_monitor import MemoryMonitor +from tools.memory_monitor import MemoryType +from tools.memory_monitor import MemoryUnit +from tools.memory_monitor import memory_monitor_context + +BYTES_TO_ALLOCATE_SMALL = 2**20 # 1 MiB +BYTES_TO_ALLOCATE_LARGE = 100 * 2**20 # 100 MiB +PREALLOCATE_DURATION = 0.5 +ALLOCATE_DURATION = 0.5 +DEALLOCATE_DURATION = 0.5 +BASELINE_MEMORY_VAR = "TEST_BASELINE_MEMORY" +TEMP_DIR_VAR = "TEST_TEMP_DIR" + + +if "win32" in sys.platform: + pytest.skip("Windows is not supported", allow_module_level=True) + + +def allocate(n_bytes, sleep_before_deallocation=False, sleep_after_deallocation=False): + if sleep_before_deallocation: + time.sleep(PREALLOCATE_DURATION) + data = np.ones((n_bytes,), dtype=np.uint8) + time.sleep(ALLOCATE_DURATION) + del data + gc.collect() + if sleep_after_deallocation: + time.sleep(DEALLOCATE_DURATION) + + +def test_memory_monitor_api(tmpdir): + tmpdir = Path(tmpdir) + + memory_monitor = MemoryMonitor().start() + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_monitor.stop() + time_values, memory_values = memory_monitor.get_data() + + filename_suffix1 = "_test1" + memory_monitor.save_memory_logs( + time_values, memory_values, tmpdir, plot_title="Test", filename_suffix=filename_suffix1 + ) + saved_files = tuple(tmpdir.glob("*")) + assert len(saved_files) == 2 + assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.txt"), saved_files)) + assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.png"), saved_files)) + + filename_suffix2 = "_test2" + txt_filepath = next(filter(lambda fn: str(fn).endswith(".txt"), saved_files)) + memory_monitor.save_memory_plot(txt_filepath, plot_title="Test re-plot", filename_suffix=filename_suffix2) + saved_files = list(tmpdir.glob("*")) + assert len(saved_files) == 3 + assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix2}.png"), saved_files)) + + +@pytest.mark.parametrize("memory_type", (MemoryType.RSS, MemoryType.SYSTEM)) +def test_memory_type(memory_type): + memory_monitor = MemoryMonitor(memory_type=memory_type).start() + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_monitor.stop() + memory_monitor.get_data() + + +@pytest.mark.parametrize("memory_unit", MemoryUnit.__members__.values()) +def test_memory_unit(memory_unit): + memory_monitor = MemoryMonitor(memory_unit=memory_unit).start() + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_monitor.stop() + memory_monitor.get_data() + + +@pytest.mark.parametrize("interval", (5e-2, 1e-1)) +def test_interval(interval): + memory_monitor = MemoryMonitor(interval=interval).start() + allocate(BYTES_TO_ALLOCATE_SMALL, sleep_before_deallocation=True, sleep_after_deallocation=True) + memory_monitor.stop() + time_values, memory_values = memory_monitor.get_data() + assert len(time_values) == len(memory_values) + assert len(time_values) == pytest.approx( + (PREALLOCATE_DURATION + ALLOCATE_DURATION + DEALLOCATE_DURATION) / interval, rel=1e-1 + ) + + +@pytest.mark.parametrize("return_max_value", (True, False)) +def test_memory_monitor_context(tmpdir, return_max_value): + tmpdir = Path(tmpdir) + with memory_monitor_context(return_max_value=return_max_value, save_dir=tmpdir) as mmc: + allocate(BYTES_TO_ALLOCATE_SMALL) + memory_data = mmc.memory_data + + assert isinstance(memory_data, dict) + assert MemoryType.RSS in memory_data + assert MemoryType.SYSTEM in memory_data + if return_max_value: + assert all(map(lambda v: isinstance(v, float), memory_data.values())) + else: + assert all(map(lambda v: isinstance(v, tuple) and len(v) == 2 and len(v[0]) == len(v[1]), memory_data.values())) + + saved_files = tuple(tmpdir.glob("*")) + assert len(saved_files) == 8 + assert sum(map(lambda fn: int(str(fn).endswith(".txt")), saved_files)) == 4 + assert sum(map(lambda fn: int(str(fn).endswith(".png")), saved_files)) == 4 + + +def test_empty_logs(tmpdir): + memory_monitor = MemoryMonitor().start() + memory_monitor.stop() + memory_monitor._memory_values_queue = queue.Queue() # make sure no logs are recorded + time_values, memory_values = memory_monitor.get_data() + assert len(time_values) == len(memory_values) == 0 + memory_monitor.save_memory_logs(time_values, memory_values, Path(tmpdir)) + + +@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy") +def test_memory_values_isolated(): + baseline_memory = float(os.environ[BASELINE_MEMORY_VAR]) if BASELINE_MEMORY_VAR in os.environ else None + + memory_monitor = MemoryMonitor(memory_type=MemoryType.RSS, memory_unit=MemoryUnit.B).start() + bytes_to_allocate = 1 if baseline_memory is None else BYTES_TO_ALLOCATE_LARGE + allocate(bytes_to_allocate, sleep_before_deallocation=True, sleep_after_deallocation=True) + memory_monitor.stop() + _, memory_values = memory_monitor.get_data() + + if baseline_memory is None: + print("\nMax memory:", max(memory_values)) + else: + memory_values = list(map(lambda it: it - baseline_memory, memory_values)) + rel = 1e-2 + assert max(memory_values) == pytest.approx(BYTES_TO_ALLOCATE_LARGE, rel=rel) + assert abs(memory_values[0]) < BYTES_TO_ALLOCATE_LARGE * rel + assert abs(memory_values[-1]) < BYTES_TO_ALLOCATE_LARGE * rel + + +def test_memory_values(): + # The first run of the test collects the memory that is allocated by default + _, stdout, _ = run_pytest_case_function_in_separate_process(test_memory_values_isolated) + max_mem_line = next(filter(lambda s: "Max memory:" in s, stdout.split("\n"))) + baseline_memory = max_mem_line.split(" ")[-1] + + # The second run of the test checks that the added amount of memory is correctly represented by the memory monitor + os.environ[BASELINE_MEMORY_VAR] = baseline_memory + run_pytest_case_function_in_separate_process(test_memory_values_isolated) + del os.environ[BASELINE_MEMORY_VAR] + + +@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy") +def test_at_exit_isolated(): + memory_monitor = MemoryMonitor() + at_exit_fn = lambda: memory_monitor.save_memory_logs(*memory_monitor.get_data(), Path(os.environ[TEMP_DIR_VAR])) + memory_monitor.start(at_exit_fn=at_exit_fn) + allocate(BYTES_TO_ALLOCATE_SMALL) + + +def test_at_exit(tmpdir): + os.environ[TEMP_DIR_VAR] = str(tmpdir) + run_pytest_case_function_in_separate_process(test_at_exit_isolated) + del os.environ[TEMP_DIR_VAR] + + tmpdir = Path(tmpdir) + saved_files = tuple(tmpdir.glob("*")) + assert len(saved_files) == 2 + assert any(map(lambda fn: str(fn).endswith(".txt"), saved_files)) + assert any(map(lambda fn: str(fn).endswith(".png"), saved_files)) diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 00000000000..440d6f21ebd --- /dev/null +++ b/tools/README.md @@ -0,0 +1,110 @@ +# NNCF Tools + +## Memory Monitor + +Memory Monitor is a tool that can be used to measure python program RAM footprint in time. It supports multiple memory types: + +- `MemoryType.RSS`: Resident Set Size is the portion of memory occupied by a process that is held in RAM. + +- `MemoryType.SYSTEM`: This metric is defined as the difference between total system virtual memory and system available memory. Be aware, that this way it is affected by other processes that can change RAM availability. It is advised to call `get_data(memory_from_zero=True)` for this type of memory logging, if one is interested in memory footprint for a certain process. This subtracts the starting memory from all values. + +RSS and SYSTEM behave differently when mmap is used, e.g. during OV model loading. RSS will report data which was read with mmap enabled as allocated, however this is not necessarily the case. SYSTEM does not report memory loaded with mmap. So it can be used to analyze "pure" memory usage without contribution of mmap pages which most probably will actually be free, but are reported as allocated by RSS. + +It is advised to use `MemoryType.SYSTEM` when analyzing memory of python scripts involving OpenVINO model reading. Also, memory monitor itself allocates some memory itself, especially during figure saving. It is advised to use it for measuring large memory processes. + +### Example 1. Monitor for an executable + +The tool allows to monitor memory for some executable including other python scripts. For example: + +```shell +python memory_monitor.py --log-dir ./allocation_logs python allocate.py +``` + +```shell +python memory_monitor.py optimum-cli export openvino ... +``` + +### Example 2. As a python Module + +```python +import gc +import time +import numpy as np + +from functools import partial +from pathlib import Path +from tqdm import tqdm + +from memory_monitor import MemoryMonitor, MemoryType + +save_dir = Path("memory_logs") + + +def allocate_memory(): + a = [] + for _ in tqdm(range(10)): + a.append(np.random.random((1 << 25,))) + time.sleep(1) + return a + +# Define a helper logging function +def log(mm, fz): + mm.save_memory_logs( + *mm.get_data(memory_from_zero=fz), + save_dir=save_dir, + filename_suffix="_from-zero" if fz else "" + ) + +# Create three memory monitors with different memory types and logging parameters +memory_monitor_configurations = [ + (MemoryType.RSS, False), + (MemoryType.SYSTEM, False), + (MemoryType.SYSTEM, True) +] +for memory_type, mem_from_zero in memory_monitor_configurations: + memory_monitor = MemoryMonitor(memory_type=memory_type) + # Start logging and register a logging function that will save logs at exit + memory_monitor.start(at_exit_fn=partial(log, memory_monitor, mem_from_zero)) + +# Example logic allocating some memory +a = allocate_memory() +del a +gc.collect() +time.sleep(1) +``` + +### Example 3. Memory Monitor Context + +Alternatively, you may use `memory_monitor_context` that envelops logic for creating MemoryMonitors and saving logs. It can also return only the maximal memory value if needed. Memory data will be available at `context.memory_data`. + +```python +import gc +import time +import numpy as np + +from pathlib import Path +from tqdm import tqdm + +from memory_monitor import MemoryType, memory_monitor_context + +save_dir = Path("memory_logs") + + +def allocate_memory(): + a = [] + for _ in tqdm(range(10)): + a.append(np.random.random((1 << 25,))) + time.sleep(1) + return a + + with memory_monitor_context( + return_max_value=True, + save_dir="memory_logs", + ) as mmc: + a = allocate_memory() + del a + gc.collect() + time.sleep(1) + +max_memory_usage: float = mmc.memory_data[MemoryType.SYSTEM] +``` diff --git a/tools/memory_monitor.py b/tools/memory_monitor.py new file mode 100644 index 00000000000..ed703864cea --- /dev/null +++ b/tools/memory_monitor.py @@ -0,0 +1,361 @@ +# Copyright (c) 2024 Intel Corporation +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse +import atexit +import logging +import queue +import subprocess +import threading +import time +from enum import Enum +from functools import lru_cache +from functools import partial +from pathlib import Path +from typing import Callable, List, Optional, Tuple + +import matplotlib.pyplot as plt +import psutil + +logger = logging.getLogger("memory_monitor") + + +class MemoryType(Enum): + RSS = "rss" + SYSTEM = "system" + + +class MemoryUnit(Enum): + B = "B" # byte + KiB = "KiB" # Kibibyte + MiB = "MiB" # Mibibyte + GiB = "GiB" # Gibibyte + KB = "KB" # Kilobyte + MB = "MB" # Megabyte + GB = "GB" # Gigabyte + + +@lru_cache +def system_memory_warning(): + # Log once + logger.warning( + "Please note that MemoryType.SYSTEM in general is affected by other processes that change RAM availability." + ) + + +class MemoryMonitor: + def __init__( + self, + interval: Optional[float] = 0.1, + memory_type: Optional[MemoryType] = MemoryType.RSS, + memory_unit: Optional[MemoryUnit] = MemoryUnit.MiB, + include_child_processes: Optional[bool] = None, + ): + """ + Memory monitoring utility to measure python process memory footprint. After start() is called, it + creates a thread which runs in parallel and takes memory measurements every *interval* seconds using the + specified *memory_type* approach. When stop() is called, the memory measuring thread is stopped. The results + can be obtained by calling get_data(). Memory logs can be saved by calling save_memory_logs(). There are two + log files: one with data values in a .txt format and another one in a form of a 2D time-memory plot. + + Memory monitor itself allocates some memory itself, especially during figure saving. It is advised to use it + for measuring large memory processes. + + :param interval: How frequently to take memory measurements (in seconds). + :param memory_type: Type of memory to log. Accepts four possible values: + - MemoryType.RSS: Resident Set Size is the portion of memory occupied by a process that is held in RAM. + Values are obtained through psutil library. If some data is read using mmap, RSS will report this data + as allocated, however this is not necessarily the case. + - MemoryType.SYSTEM: This metric is defined as the difference between total system virtual memory + and system available memory. Be aware, that this way it is affected by other processes that can change + RAM availability. It is advised to call get_data(memory_from_zero=True) for this type of memory logging, + if one is interested in memory footprint for a certain process. This subtracts the starting memory from + all values. + + RSS and SYSTEM behave differently when mmap is used, e.g. during OV model loading. RSS will report data + which was read with mmap enabled as allocated, however this is not necessarily the case. SYSTEM does not + report memory loaded with mmap. So it can be used to analyze "pure" memory usage without contribution of + mmap pages which are actually free, but are reported as allocated by RSS. + :param memory_unit: Unit to report memory in. + :param include_child_processes: For MemoryType.RSS only: whether to include memory of child processes. If not + provided, child processes are counted. + """ + self.interval = interval + self.memory_type = memory_type + if memory_type == MemoryType.SYSTEM: + system_memory_warning() + elif memory_type == MemoryType.RSS: + if include_child_processes is None: + include_child_processes = True + else: + raise ValueError("Unknown memory type to log") + self.memory_unit = memory_unit + self.include_child_processes = include_child_processes + + self._monitoring_thread_should_stop = False + self._monitoring_in_progress = False + + self._memory_monitor_thread = None + self._memory_values_queue = None + self._stop_logging_atexit_fn = None + + def start(self, at_exit_fn: Optional[Callable] = None) -> "MemoryMonitor": + """ + Start memory monitoring. + + :param at_exit_fn: A callable to execute at program exit. Useful fot providing logs saving routine, e.g. + ``` + at_exit_fn = lambda: memory_monitor.save_memory_logs(*memory_monitor.get_data(), save_dir) + memory_monitor.start(at_exit_fn=at_exit_fn) + ``` + """ + if self._monitoring_in_progress: + raise Exception("Monitoring already in progress") + + self._memory_values_queue = queue.Queue() + self._monitoring_thread_should_stop = False + + self._memory_monitor_thread = threading.Thread(target=self._monitor_memory) + self._memory_monitor_thread.daemon = True + self._memory_monitor_thread.start() + if at_exit_fn: + self._stop_logging_atexit_fn = at_exit_fn + atexit.register(self._stop_logging_atexit_fn) + + self._monitoring_in_progress = True + + return self + + def stop(self): + """ + Stop memory monitoring. + """ + if not self._monitoring_in_progress: + return + self._monitoring_thread_should_stop = True + self._monitoring_in_progress = False + self._memory_monitor_thread.join() + if self._stop_logging_atexit_fn is not None: + atexit.unregister(self._stop_logging_atexit_fn) + self._stop_logging_atexit_fn = None + + def get_data(self, memory_from_zero: Optional[bool] = False) -> Tuple[List, List]: + """ + :param memory_from_zero: Whether to normalize memory measurements by subtracting the first value. This way + the measurements will start with 0. Hence, is not very reliable and may actually result in negative values. + :returns: A tuple of list where the first element corresponds to measurements timestamps and the second one -- + to memory values. + """ + memory_usage_data = list(self._memory_values_queue.queue) + if len(memory_usage_data) == 0: + return [], [] + time_values, memory_values = tuple(zip(*memory_usage_data)) + time_values = _subtract_first_element(list(time_values)) + if memory_from_zero: + memory_values = _subtract_first_element(list(memory_values)) + + # Convert to target memory unit + memory_values = list(map(partial(_cast_bytes_to, memory_unit=self.memory_unit), memory_values)) + + return time_values, memory_values + + def save_memory_logs( + self, + time_values: List[float], + memory_values: List[float], + save_dir: Path, + plot_title: Optional[str] = "", + filename_suffix: Optional[str] = "", + ): + """ + Save memory logs as a text file and a 2D plot. + + :param time_values: Timestamps of the memory measurements. + :param memory_values: Memory measurements. + :param save_dir: Directory to save logs into. + :param plot_title: A title for a plot. + :param filename_suffix: A string suffix to give to the saved files. + """ + if not save_dir.exists(): + save_dir.mkdir(parents=True) + + filename_label = f"{self.memory_type.value}_memory_usage{filename_suffix}" + # Save measurements to text file + log_filepath = save_dir / f"{filename_label}.txt" + with open(log_filepath, "w") as log_file: + if len(time_values) == 0: + log_file.write("No measurements recorded.\nPlease make sure logging duration or interval were enough.") + return + for timestamp, memory_usage in zip(time_values, memory_values): + log_file.write(f"{timestamp} {memory_usage:.3f}\n") + + log_file.writelines( + [ + f"Total time: {time_values[-1] - time_values[0]}\n", + f"Max memory: {max(memory_values):.3f} ({self.memory_unit.value})", + ] + ) + + # Save measurements plot + self.save_memory_plot(log_filepath, plot_title) + + def save_memory_plot(self, log_filepath: Path, plot_title: Optional[str] = "", filename_suffix: Optional[str] = ""): + """ + Parse pre-saved txt file logs and plot a new figure based on this data. May be useful for re-plotting with + different title. + + :param log_filepath: A path to a .txt log file. + :param plot_title: A title to give to a plot. + :param filename_suffix: A string suffix to give to the saved figure. + """ + with open(log_filepath, "r") as f: + lines = f.readlines() + time_values, memory_values = [], [] + for line in lines[:-2]: + time_value, memory_value = tuple(map(float, line.split(" "))) + time_values.append(time_value) + memory_values.append(memory_value) + + fig = plt.figure(figsize=(10, 6)) + plt.plot(time_values, memory_values) + plt.xlabel("Time (seconds)") + plt.ylabel(f"Memory Usage ({self.memory_type.value}, {self.memory_unit.value})") + plt.title(f"{plot_title} Max_{self.memory_type.value}: {max(memory_values):.2f} {self.memory_unit.value}") + plt.grid(True) + plt.tight_layout() + plt.savefig(str(log_filepath).replace(".txt", f"{filename_suffix}.png")) + plt.close(fig) + + def __enter__(self) -> "MemoryMonitor": + return self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + + def _monitor_memory(self): + while not self._monitoring_thread_should_stop: + _last_measurement_time = time.perf_counter() + if self.memory_type == MemoryType.RSS: + bytes_used = psutil.Process().memory_info().rss + if self.include_child_processes: + for child_process in psutil.Process().children(recursive=True): + bytes_used += psutil.Process(child_process.pid).memory_info().rss + elif self.memory_type == MemoryType.SYSTEM: + bytes_used = psutil.virtual_memory().total - psutil.virtual_memory().available + else: + raise Exception("Unknown memory type to log") + self._memory_values_queue.put((time.perf_counter(), bytes_used)) + time.sleep(max(0.0, self.interval - (time.perf_counter() - _last_measurement_time))) + + +class memory_monitor_context: + def __init__( + self, + interval: Optional[float] = 0.1, + memory_unit: Optional[MemoryUnit] = MemoryUnit.MiB, + return_max_value: Optional[bool] = True, + save_dir: Optional[Path] = None, + ): + """ + A memory monitor context manager which monitors both RSS and SYSTEM memory types. After, it stores the + result for the maximum memory recorded if `return_max_value=True or the whole time-memory sequences. Works + by subtracting the first memory measurement from all the other ones so that the resulting sequence starts + from 0. Hence, it can actually return negative memory values. + + After exiting, the result is stored at .memory_data field -- a dict with memory types (RSS or SYSTEM) + as keys. The values are either a single float number if return_max_value is provided, or a tuple with time + and memory value lists. + + Additionally, memory logs may be saved by providing save_dir argument. + + :param interval: Interval in seconds to take measurements. + :param memory_unit: Memory unit. + :param return_max_value: Whether to return max value for each memory type or full memory sequences. + :param save_dir: If provided, will save memory logs at this location. + """ + + self.memory_monitors = {} + for memory_type in [MemoryType.RSS, MemoryType.SYSTEM]: + self.memory_monitors[memory_type] = MemoryMonitor( + interval=interval, memory_type=memory_type, memory_unit=memory_unit + ) + self.return_max_value = return_max_value + self.save_dir = save_dir + + self.memory_data = {} + + def __enter__(self): + for mm in self.memory_monitors.values(): + mm.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + for mt, mm in self.memory_monitors.items(): + mm.stop() + for fz in [False, True]: + time_values, memory_values = mm.get_data(memory_from_zero=fz) + if fz: + self.memory_data[mt] = max(memory_values) if self.return_max_value else (time_values, memory_values) + + if self.save_dir: + mm.save_memory_logs( + time_values, + memory_values, + save_dir=self.save_dir, + filename_suffix="_from-zero" if fz else "", + ) + + +def _cast_bytes_to(bytes, memory_unit, round_to_int=False): + memory_unit_divisors = { + MemoryUnit.B: 1, + MemoryUnit.KiB: 2**10, + MemoryUnit.MiB: 2**20, + MemoryUnit.GiB: 2**30, + MemoryUnit.KB: 10**3, + MemoryUnit.MB: 10**6, + MemoryUnit.GB: 10**9, + } + result = bytes / memory_unit_divisors[memory_unit] + return int(result) if round_to_int else result + + +def _subtract_first_element(data): + for i in range(1, len(data)): + data[i] = data[i] - data[0] + data[0] = 0 + return data + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Memory Monitor Tool. Monitors memory for an executable and saves logs at specified location.", + epilog="Examples:\n" + " python memory_monitor.py --log-dir ./allocation_logs python allocate.py\n" + " python memory_monitor.py optimum-cli export openvino ...", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument( + "--log-dir", type=str, default="memory_logs", help="A directory to save logs at. './memory_logs' by default." + ) + parser.add_argument("executable", type=str, nargs="+", help="Target executable to monitor memory for.") + args = parser.parse_args() + + def log(mm, fz): + mm.save_memory_logs( + *mm.get_data(memory_from_zero=fz), save_dir=Path(args.log_dir), filename_suffix="_from-zero" if fz else "" + ) + + for memory_type, mem_from_zero in [(MemoryType.RSS, False), (MemoryType.SYSTEM, False), (MemoryType.SYSTEM, True)]: + memory_monitor = MemoryMonitor(memory_type=memory_type, include_child_processes=True) + memory_monitor.start(at_exit_fn=partial(log, memory_monitor, mem_from_zero)) + + with subprocess.Popen(" ".join(args.executable), shell=True) as p: + p.wait() diff --git a/tools/requirements.txt b/tools/requirements.txt index 8f57ab34ca9..77d22021464 100644 --- a/tools/requirements.txt +++ b/tools/requirements.txt @@ -1 +1,2 @@ mdutils +matplotlib<3.6