Skip to content

Commit

Permalink
Merge pull request #15 from mantidproject/memory_monitoring
Browse files Browse the repository at this point in the history
Add memory monitoring to livereduce
  • Loading branch information
rosswhitfield authored Nov 26, 2024
2 parents 344666e + 518e7c3 commit e846a70
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 4 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ dependencies:
- pyinotify
- pre-commit
- python-build
- psutil
35 changes: 31 additions & 4 deletions scripts/livereduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import os
import signal
import sys
import threading
import time
from hashlib import md5

import mantid # for clearer error message
import psutil
import pyinotify
from mantid.simpleapi import StartLiveData
from mantid.simpleapi import StartLiveData, mtd
from mantid.utils.logging import log_to_python as mtd_log_to_python
from packaging.version import parse as parse_version

CONVERSION_FACTOR_BYTES_TO_MB = 1.0 / (1024 * 1024)

# ##################
# configure logging
# ##################
Expand Down Expand Up @@ -79,6 +83,13 @@ def stop(cls):
else:
cls.logger.info("mantid not initialized - nothing to cleanup")

def restart_and_clear(self):
self.logger.info("Restarting Live Data and clearing workspaces")
self.stop()
time.sleep(1.0)
mtd.clear()
self.start()


# ##################
# register a signal handler so we can exit gracefully if someone kills us
Expand Down Expand Up @@ -157,6 +168,10 @@ def __init__(self, filename):
self.accumMethod = str(json_doc.get("accum_method", "Add"))
self.periods = json_doc.get("periods", None)
self.spectra = json_doc.get("spectra", None)
self.system_mem_limit_perc = json_doc.get("system_mem_limit_perc", 25) # set to 0 to disable
self.mem_check_interval_sec = json_doc.get("mem_check_interval_sec", 1)
self.mem_limit = psutil.virtual_memory().total * self.system_mem_limit_perc / 100
self.proc_pid = psutil.Process(os.getpid())

# location of the scripts
self.script_dir = json_doc.get("script_dir")
Expand Down Expand Up @@ -321,9 +336,16 @@ def process_default(self, event):
self.scriptfiles[event.pathname] = newmd5
# restart the service
self.logger.info(f'Processing script "{event.pathname}" changed - restarting ' '"StartLiveData"')
self.livemanager.stop()
time.sleep(1.0) # seconds
self.livemanager.start()
self.livemanager.restart_and_clear()


def memory_checker(config, livemanager):
while True:
mem_used = config.proc_pid.memory_info().rss
if mem_used > config.mem_limit:
logger.error(f"Memory usage {mem_used * CONVERSION_FACTOR_BYTES_TO_MB:.2f} MB exceeds limit")
livemanager.restart_and_clear()
time.sleep(config.mem_check_interval_sec)


# determine the configuration file
Expand Down Expand Up @@ -355,6 +377,11 @@ def process_default(self, event):
# start up the live data
liveDataMgr.start()

# start the memory checker
if config.system_mem_limit_perc > 0:
memory_thread = threading.Thread(target=memory_checker, args=(config, liveDataMgr), daemon=True)
memory_thread.start()

# inotify will keep the program running
notifier.loop()

Expand Down
8 changes: 8 additions & 0 deletions test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@ Once the first chunk of live data is processed, `ctrl-C` will
interrupt the process and it will close cleanly.

In testing mode, the logging will go to `${PWD}/livereduce.log` and can be watched with `tail -F livereduce.log`


Example using event data, to test memory monitoring
----------------------------------------------------

This test case will continuously accumulate events until it fails.

Start the server using `test/fake_event_server.py` and use the configuration `test/fake_event.conf`.
9 changes: 9 additions & 0 deletions test/fake_event.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"instrument": "ISIS_Event",
"script_dir": "test",
"update_every": 3,
"CONDA_ENV": "livereduce",
"accum_method":"Add",
"system_mem_limit_perc": 25,
"mem_check_interval_sec": 1
}
24 changes: 24 additions & 0 deletions test/fake_event_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from threading import Thread

from mantid import AlgorithmManager, ConfigService
from mantid.simpleapi import FakeISISEventDAE

facility = ConfigService.getFacility().name()
ConfigService.setFacility("TEST_LIVE")


def startServer():
FakeISISEventDAE(NEvents=1000000)


try:
thread = Thread(target=startServer)
thread.start()
thread.join()
except Exception as e: # noqa: BLE001
print(e)
alg = AlgorithmManager.newestInstanceOf("FakeISISEventDAE")
if alg.isRunning():
alg.cancel()
finally:
ConfigService.setFacility(facility)
1 change: 1 addition & 0 deletions test/reduce_ISIS_Event_live_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("Running proc script")

0 comments on commit e846a70

Please sign in to comment.