Skip to content

Commit

Permalink
Memory Monitor tool (#2801)
Browse files Browse the repository at this point in the history
- Added a tool for memory monitoring. It is helpful for measuring memory
footprint of python scripts. For example, during NNCF weights
compression. There are also different memory measuring mechanisms to
properly track effect of mmap, e.g. during OV model loading.
- Added tests for the tool.
- Added an option to run PTQ and WC conformance tests with using the
introduced tool.
  • Loading branch information
nikita-savelyevv authored Aug 13, 2024
1 parent d5b9a0b commit 9a0b5d2
Show file tree
Hide file tree
Showing 10 changed files with 738 additions and 5 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/precommit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,18 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
name: coverage_tensorflow
flags: TENSORFLOW
tools:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29 # v4.1.6
with:
lfs: true
- uses: actions/setup-python@0a5c61591373683505ea898e09a3ea4f39ef2b9c # v5.0.0
with:
python-version: 3.8.18
cache: pip
- name: Install NNCF and test requirements
run: |
pip install -r tests/tools/requirements.txt
- name: Run tools precommit test scope
run: PYTHONPATH=./ pytest -ra tests/tools
6 changes: 6 additions & 0 deletions tests/post_training/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,9 @@ def pytest_addoption(parser):
action="store_true",
help="Add additional columns to reports.csv",
)
parser.addoption(
"--memory-monitor",
action="store_true",
help="Report memory using MemoryMonitor from tools/memory_monitor.py. "
"Warning: currently, reported memory values are not always reproducible.",
)
34 changes: 31 additions & 3 deletions tests/post_training/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime as dt
import gc
import os
import re
import time
Expand All @@ -30,6 +31,9 @@
import nncf
from nncf import TargetDevice
from tests.shared.command import Command
from tools.memory_monitor import MemoryType
from tools.memory_monitor import MemoryUnit
from tools.memory_monitor import memory_monitor_context

DEFAULT_VAL_THREADS = 4

Expand Down Expand Up @@ -139,6 +143,8 @@ class RunInfo:
metric_value: Optional[float] = None
metric_diff: Optional[float] = None
compression_memory_usage: Optional[int] = None
compression_memory_usage_rss: Optional[int] = None
compression_memory_usage_system: Optional[int] = None
status: Optional[str] = None
fps: Optional[float] = None
time_total: Optional[float] = None
Expand All @@ -159,7 +165,7 @@ def format_memory_usage(memory):
return int(memory)

def get_result_dict(self):
return {
result = {
"Model": self.model,
"Backend": self.backend.value if self.backend else None,
"Metric name": self.metric_name,
Expand All @@ -168,14 +174,22 @@ def get_result_dict(self):
"Num FQ": self.num_compress_nodes.num_fq_nodes,
"Num int4": self.num_compress_nodes.num_int4,
"Num int8": self.num_compress_nodes.num_int8,
"RAM MiB": self.format_memory_usage(self.compression_memory_usage),
"Compr. time": self.format_time(self.time_compression),
**self.stats_from_output.get_stats(),
"Total time": self.format_time(self.time_total),
"FPS": self.fps,
"Status": self.status[:LIMIT_LENGTH_OF_STATUS] if self.status is not None else None,
}

if self.compression_memory_usage_rss is None and self.compression_memory_usage_system is None:
result["RAM MiB"] = self.format_memory_usage(self.compression_memory_usage)
if self.compression_memory_usage_rss is not None:
result["RAM MiB RSS"] = self.format_memory_usage(self.compression_memory_usage_rss)
if self.compression_memory_usage_system is not None:
result["RAM MiB System"] = self.format_memory_usage(self.compression_memory_usage_system)

return result


class BaseTestPipeline(ABC):
"""
Expand All @@ -195,6 +209,7 @@ def __init__(
run_benchmark_app: bool,
params: dict = None,
batch_size: int = 1,
memory_monitor: bool = False,
) -> None:
self.reported_name = reported_name
self.model_id = model_id
Expand All @@ -205,6 +220,7 @@ def __init__(
self.reference_data = reference_data
self.params = params or {}
self.batch_size = batch_size
self.memory_monitor = memory_monitor
self.no_eval = no_eval
self.run_benchmark_app = run_benchmark_app
self.output_model_dir: Path = self.output_dir / self.reported_name / self.backend.value
Expand Down Expand Up @@ -352,7 +368,19 @@ def compress(self) -> None:
torch.set_num_threads(int(inference_num_threads))

start_time = time.perf_counter()
self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True)
if self.memory_monitor:
gc.collect()
with memory_monitor_context(
interval=0.1,
memory_unit=MemoryUnit.MiB,
return_max_value=True,
save_dir=self.output_model_dir / "ptq_memory_logs",
) as mmc:
self._compress()
self.run_info.compression_memory_usage_rss = mmc.memory_data[MemoryType.RSS]
self.run_info.compression_memory_usage_system = mmc.memory_data[MemoryType.SYSTEM]
else:
self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True)
self.run_info.time_compression = time.perf_counter() - start_time

def save_compressed_model(self) -> None:
Expand Down
19 changes: 17 additions & 2 deletions tests/post_training/pipelines/lm_weight_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import os
import re
import shutil
Expand All @@ -32,6 +32,9 @@
from tests.post_training.pipelines.base import BaseTestPipeline
from tests.post_training.pipelines.base import StatsFromOutput
from tests.shared.paths import TEST_ROOT
from tools.memory_monitor import MemoryType
from tools.memory_monitor import MemoryUnit
from tools.memory_monitor import memory_monitor_context


@dataclass
Expand Down Expand Up @@ -178,7 +181,19 @@ def compress(self) -> None:

print("Weight compression...")
start_time = time.perf_counter()
self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True)
if self.memory_monitor:
gc.collect()
with memory_monitor_context(
interval=0.1,
memory_unit=MemoryUnit.MiB,
return_max_value=True,
save_dir=self.output_model_dir / "wc_memory_logs",
) as mmc:
self._compress()
self.run_info.compression_memory_usage_rss = mmc.memory_data[MemoryType.RSS]
self.run_info.compression_memory_usage_system = mmc.memory_data[MemoryType.SYSTEM]
else:
self.run_info.compression_memory_usage = memory_usage(self._compress, max_usage=True)
self.run_info.time_compression = time.perf_counter() - start_time

def collect_data_from_stdout(self, stdout: str):
Expand Down
9 changes: 9 additions & 0 deletions tests/post_training/test_quantize_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ def fixture_extra_columns(pytestconfig):
return pytestconfig.getoption("extra_columns")


@pytest.fixture(scope="session", name="memory_monitor")
def fixture_memory_monitor(pytestconfig):
return pytestconfig.getoption("memory_monitor")


def _parse_version(s: Path):
version_str = re.search(r".*_(\d+\.\d+).(?:yaml|yml)", s.name).group(1)
return version.parse(version_str)
Expand Down Expand Up @@ -242,6 +247,7 @@ def test_ptq_quantization(
run_benchmark_app: bool,
capsys: pytest.CaptureFixture,
extra_columns: bool,
memory_monitor: bool,
):
pipeline = None
err_msg = None
Expand All @@ -267,6 +273,7 @@ def test_ptq_quantization(
"no_eval": no_eval,
"run_benchmark_app": run_benchmark_app,
"batch_size": batch_size,
"memory_monitor": memory_monitor,
}
)
pipeline: BaseTestPipeline = pipeline_cls(**pipeline_kwargs)
Expand Down Expand Up @@ -311,6 +318,7 @@ def test_weight_compression(
run_benchmark_app: bool,
capsys: pytest.CaptureFixture,
extra_columns: bool,
memory_monitor: bool,
):
pipeline = None
err_msg = None
Expand All @@ -330,6 +338,7 @@ def test_weight_compression(
"no_eval": no_eval,
"run_benchmark_app": run_benchmark_app,
"batch_size": batch_size,
"memory_monitor": memory_monitor,
}
)
pipeline: BaseTestPipeline = pipeline_cls(**pipeline_kwargs)
Expand Down
3 changes: 3 additions & 0 deletions tests/tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
matplotlib
psutil
pytest
185 changes: 185 additions & 0 deletions tests/tools/test_memory_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright (c) 2024 Intel Corporation
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import os
import queue
import sys
import time
from pathlib import Path

import numpy as np
import pytest

from tests.shared.isolation_runner import ISOLATION_RUN_ENV_VAR
from tests.shared.isolation_runner import run_pytest_case_function_in_separate_process
from tools.memory_monitor import MemoryMonitor
from tools.memory_monitor import MemoryType
from tools.memory_monitor import MemoryUnit
from tools.memory_monitor import memory_monitor_context

BYTES_TO_ALLOCATE_SMALL = 2**20 # 1 MiB
BYTES_TO_ALLOCATE_LARGE = 100 * 2**20 # 100 MiB
PREALLOCATE_DURATION = 0.5
ALLOCATE_DURATION = 0.5
DEALLOCATE_DURATION = 0.5
BASELINE_MEMORY_VAR = "TEST_BASELINE_MEMORY"
TEMP_DIR_VAR = "TEST_TEMP_DIR"


if "win32" in sys.platform:
pytest.skip("Windows is not supported", allow_module_level=True)


def allocate(n_bytes, sleep_before_deallocation=False, sleep_after_deallocation=False):
if sleep_before_deallocation:
time.sleep(PREALLOCATE_DURATION)
data = np.ones((n_bytes,), dtype=np.uint8)
time.sleep(ALLOCATE_DURATION)
del data
gc.collect()
if sleep_after_deallocation:
time.sleep(DEALLOCATE_DURATION)


def test_memory_monitor_api(tmpdir):
tmpdir = Path(tmpdir)

memory_monitor = MemoryMonitor().start()
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_monitor.stop()
time_values, memory_values = memory_monitor.get_data()

filename_suffix1 = "_test1"
memory_monitor.save_memory_logs(
time_values, memory_values, tmpdir, plot_title="Test", filename_suffix=filename_suffix1
)
saved_files = tuple(tmpdir.glob("*"))
assert len(saved_files) == 2
assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.txt"), saved_files))
assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.png"), saved_files))

filename_suffix2 = "_test2"
txt_filepath = next(filter(lambda fn: str(fn).endswith(".txt"), saved_files))
memory_monitor.save_memory_plot(txt_filepath, plot_title="Test re-plot", filename_suffix=filename_suffix2)
saved_files = list(tmpdir.glob("*"))
assert len(saved_files) == 3
assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix2}.png"), saved_files))


@pytest.mark.parametrize("memory_type", (MemoryType.RSS, MemoryType.SYSTEM))
def test_memory_type(memory_type):
memory_monitor = MemoryMonitor(memory_type=memory_type).start()
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_monitor.stop()
memory_monitor.get_data()


@pytest.mark.parametrize("memory_unit", MemoryUnit.__members__.values())
def test_memory_unit(memory_unit):
memory_monitor = MemoryMonitor(memory_unit=memory_unit).start()
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_monitor.stop()
memory_monitor.get_data()


@pytest.mark.parametrize("interval", (5e-2, 1e-1))
def test_interval(interval):
memory_monitor = MemoryMonitor(interval=interval).start()
allocate(BYTES_TO_ALLOCATE_SMALL, sleep_before_deallocation=True, sleep_after_deallocation=True)
memory_monitor.stop()
time_values, memory_values = memory_monitor.get_data()
assert len(time_values) == len(memory_values)
assert len(time_values) == pytest.approx(
(PREALLOCATE_DURATION + ALLOCATE_DURATION + DEALLOCATE_DURATION) / interval, rel=1e-1
)


@pytest.mark.parametrize("return_max_value", (True, False))
def test_memory_monitor_context(tmpdir, return_max_value):
tmpdir = Path(tmpdir)
with memory_monitor_context(return_max_value=return_max_value, save_dir=tmpdir) as mmc:
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_data = mmc.memory_data

assert isinstance(memory_data, dict)
assert MemoryType.RSS in memory_data
assert MemoryType.SYSTEM in memory_data
if return_max_value:
assert all(map(lambda v: isinstance(v, float), memory_data.values()))
else:
assert all(map(lambda v: isinstance(v, tuple) and len(v) == 2 and len(v[0]) == len(v[1]), memory_data.values()))

saved_files = tuple(tmpdir.glob("*"))
assert len(saved_files) == 8
assert sum(map(lambda fn: int(str(fn).endswith(".txt")), saved_files)) == 4
assert sum(map(lambda fn: int(str(fn).endswith(".png")), saved_files)) == 4


def test_empty_logs(tmpdir):
memory_monitor = MemoryMonitor().start()
memory_monitor.stop()
memory_monitor._memory_values_queue = queue.Queue() # make sure no logs are recorded
time_values, memory_values = memory_monitor.get_data()
assert len(time_values) == len(memory_values) == 0
memory_monitor.save_memory_logs(time_values, memory_values, Path(tmpdir))


@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy")
def test_memory_values_isolated():
baseline_memory = float(os.environ[BASELINE_MEMORY_VAR]) if BASELINE_MEMORY_VAR in os.environ else None

memory_monitor = MemoryMonitor(memory_type=MemoryType.RSS, memory_unit=MemoryUnit.B).start()
bytes_to_allocate = 1 if baseline_memory is None else BYTES_TO_ALLOCATE_LARGE
allocate(bytes_to_allocate, sleep_before_deallocation=True, sleep_after_deallocation=True)
memory_monitor.stop()
_, memory_values = memory_monitor.get_data()

if baseline_memory is None:
print("\nMax memory:", max(memory_values))
else:
memory_values = list(map(lambda it: it - baseline_memory, memory_values))
rel = 1e-2
assert max(memory_values) == pytest.approx(BYTES_TO_ALLOCATE_LARGE, rel=rel)
assert abs(memory_values[0]) < BYTES_TO_ALLOCATE_LARGE * rel
assert abs(memory_values[-1]) < BYTES_TO_ALLOCATE_LARGE * rel


def test_memory_values():
# The first run of the test collects the memory that is allocated by default
_, stdout, _ = run_pytest_case_function_in_separate_process(test_memory_values_isolated)
max_mem_line = next(filter(lambda s: "Max memory:" in s, stdout.split("\n")))
baseline_memory = max_mem_line.split(" ")[-1]

# The second run of the test checks that the added amount of memory is correctly represented by the memory monitor
os.environ[BASELINE_MEMORY_VAR] = baseline_memory
run_pytest_case_function_in_separate_process(test_memory_values_isolated)
del os.environ[BASELINE_MEMORY_VAR]


@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy")
def test_at_exit_isolated():
memory_monitor = MemoryMonitor()
at_exit_fn = lambda: memory_monitor.save_memory_logs(*memory_monitor.get_data(), Path(os.environ[TEMP_DIR_VAR]))
memory_monitor.start(at_exit_fn=at_exit_fn)
allocate(BYTES_TO_ALLOCATE_SMALL)


def test_at_exit(tmpdir):
os.environ[TEMP_DIR_VAR] = str(tmpdir)
run_pytest_case_function_in_separate_process(test_at_exit_isolated)
del os.environ[TEMP_DIR_VAR]

tmpdir = Path(tmpdir)
saved_files = tuple(tmpdir.glob("*"))
assert len(saved_files) == 2
assert any(map(lambda fn: str(fn).endswith(".txt"), saved_files))
assert any(map(lambda fn: str(fn).endswith(".png"), saved_files))
Loading

0 comments on commit 9a0b5d2

Please sign in to comment.