Skip to content

Commit

Permalink
Changed API for memory logger. Added tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nikita-savelyevv committed Jul 16, 2024
1 parent 76af9ff commit 494a589
Show file tree
Hide file tree
Showing 3 changed files with 483 additions and 228 deletions.
169 changes: 169 additions & 0 deletions tests/tools/test_memory_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright (c) 2024 Intel Corporation
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import time
import gc
from pathlib import Path

import numpy as np
import pytest
import os

from tools.memory_monitor import MemoryMonitor, MemoryType, MemoryUnit, monitor_memory_for_callable

from tests.shared.isolation_runner import run_pytest_case_function_in_separate_process
from tests.shared.isolation_runner import ISOLATION_RUN_ENV_VAR

BYTES_TO_ALLOCATE_SMALL = 2 ** 20 # 1 MiB
BYTES_TO_ALLOCATE_LARGE = 100 * 2 ** 20 # 100 MiB
PREALLOCATE_DURATION = 0.5
ALLOCATE_DURATION = 0.5
DEALLOCATE_DURATION = 0.5
BASELINE_MEMORY_VAR = "TEST_BASELINE_MEMORY"
TEMP_DIR_VAR = "TEST_TEMP_DIR"


def is_windows() -> bool:
return "win32" in sys.platform


def allocate(n_bytes, sleep_before_deallocation=False, sleep_after_deallocation=False):
if sleep_before_deallocation:
time.sleep(PREALLOCATE_DURATION)
data = np.ones((n_bytes,), dtype=np.uint8)
time.sleep(ALLOCATE_DURATION)
del data
gc.collect()
if sleep_after_deallocation:
time.sleep(DEALLOCATE_DURATION)


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
def test_memory_monitor_api(tmpdir):
tmpdir = Path(tmpdir)

memory_monitor = MemoryMonitor().start()
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_monitor.stop()
time_values, memory_values = memory_monitor.get_data()

filename_suffix1 = "_test1"
memory_monitor.save_memory_logs(time_values, memory_values, tmpdir, plot_title="Test",
filename_suffix=filename_suffix1)
saved_files = tuple(tmpdir.glob("*"))
assert len(saved_files) == 2
assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.txt"), saved_files))
assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix1}.png"), saved_files))

filename_suffix2 = "_test2"
txt_filepath = next(filter(lambda fn: str(fn).endswith(".txt"), saved_files))
memory_monitor.save_memory_plot(txt_filepath, plot_title="Test re-plot", filename_suffix=filename_suffix2)
saved_files = list(tmpdir.glob("*"))
assert len(saved_files) == 3
assert any(map(lambda fn: str(fn).endswith(f"{filename_suffix2}.png"), saved_files))


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
@pytest.mark.parametrize("memory_type", MemoryType.__members__.values())
def test_memory_type(memory_type):
memory_monitor = MemoryMonitor(memory_type=memory_type).start()
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_monitor.stop()
memory_monitor.get_data()


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
@pytest.mark.parametrize("memory_unit", MemoryUnit.__members__.values())
def test_memory_unit(memory_unit):
memory_monitor = MemoryMonitor(memory_unit=memory_unit).start()
allocate(BYTES_TO_ALLOCATE_SMALL)
memory_monitor.stop()
memory_monitor.get_data()


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
@pytest.mark.parametrize("interval", (5e-2, 1e-1))
def test_interval(interval):
memory_monitor = MemoryMonitor(interval=interval).start()
allocate(BYTES_TO_ALLOCATE_SMALL, sleep_before_deallocation=True, sleep_after_deallocation=True)
memory_monitor.stop()
time_values, memory_values = memory_monitor.get_data()
assert len(time_values) == len(memory_values)
assert len(time_values) == pytest.approx((PREALLOCATE_DURATION + ALLOCATE_DURATION + DEALLOCATE_DURATION) / interval, rel=1e-1)


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
@pytest.mark.parametrize("max_value", (True, False))
def test_monitor_memory_for_callable(max_value):
allocate_fn = lambda: allocate(BYTES_TO_ALLOCATE_SMALL)
fn_res, memory_data = monitor_memory_for_callable(allocate_fn, max_value=max_value)
assert fn_res is None

if max_value:
assert isinstance(memory_data, float)
else:
time_values, memory_values = memory_data
assert len(time_values) == len(memory_values)


@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy")
def test_memory_values_isolated():
baseline_memory = float(os.environ[BASELINE_MEMORY_VAR]) if BASELINE_MEMORY_VAR in os.environ else None

memory_monitor = MemoryMonitor(memory_type=MemoryType.RSS, memory_unit=MemoryUnit.B).start()
bytes_to_allocate = 1 if baseline_memory is None else BYTES_TO_ALLOCATE_LARGE
allocate(bytes_to_allocate, sleep_before_deallocation=True, sleep_after_deallocation=True)
memory_monitor.stop()
time_values, memory_values = memory_monitor.get_data()

if baseline_memory is None:
print("\nMax memory:", max(memory_values))
else:
memory_values = list(map(lambda it: it - baseline_memory, memory_values))
rel = 1e-2
assert max(memory_values) == pytest.approx(BYTES_TO_ALLOCATE_LARGE, rel=rel)
assert abs(memory_values[0]) < BYTES_TO_ALLOCATE_LARGE * rel
assert abs(memory_values[-1]) < BYTES_TO_ALLOCATE_LARGE * rel


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
def test_memory_values():
# The first run of the test collects the memory that is allocated by default
ret_code, stdout, stderr = run_pytest_case_function_in_separate_process(test_memory_values_isolated)
max_mem_line = next(filter(lambda s: "Max memory:" in s, stdout.split('\n')))
baseline_memory = max_mem_line.split(' ')[-1]

# The second run of the test checks that the added amount of memory is correctly represented by the memory monitor
os.environ[BASELINE_MEMORY_VAR] = baseline_memory
run_pytest_case_function_in_separate_process(test_memory_values_isolated)
del os.environ[BASELINE_MEMORY_VAR]


@pytest.mark.skipif(ISOLATION_RUN_ENV_VAR not in os.environ, reason="Should be run via isolation proxy")
def test_at_exit_isolated():
memory_monitor = MemoryMonitor()
at_exit_fn = lambda: memory_monitor.save_memory_logs(*memory_monitor.get_data(), Path(os.environ[TEMP_DIR_VAR]))
memory_monitor.start(at_exit_fn=at_exit_fn)
allocate(BYTES_TO_ALLOCATE_SMALL)


@pytest.mark.skipif(is_windows(), reason="Check on linux only")
def test_at_exit(tmpdir):
os.environ[TEMP_DIR_VAR] = str(tmpdir)
run_pytest_case_function_in_separate_process(test_at_exit_isolated)
del os.environ[TEMP_DIR_VAR]

tmpdir = Path(tmpdir)
saved_files = tuple(tmpdir.glob("*"))
assert len(saved_files) == 2
assert any(map(lambda fn: str(fn).endswith(f".txt"), saved_files))
assert any(map(lambda fn: str(fn).endswith(f".png"), saved_files))
228 changes: 0 additions & 228 deletions tools/memory_logger.py

This file was deleted.

Loading

0 comments on commit 494a589

Please sign in to comment.