diff --git a/environment.yml b/environment.yml index 079cf66..18d78b9 100644 --- a/environment.yml +++ b/environment.yml @@ -9,3 +9,4 @@ dependencies: - pyinotify - pre-commit - python-build + - psutil diff --git a/scripts/livereduce.py b/scripts/livereduce.py index 8eb90f2..64b28fa 100644 --- a/scripts/livereduce.py +++ b/scripts/livereduce.py @@ -3,15 +3,19 @@ import os import signal import sys +import threading import time from hashlib import md5 import mantid # for clearer error message +import psutil import pyinotify -from mantid.simpleapi import StartLiveData +from mantid.simpleapi import StartLiveData, mtd from mantid.utils.logging import log_to_python as mtd_log_to_python from packaging.version import parse as parse_version +CONVERSION_FACTOR_BYTES_TO_MB = 1.0 / (1024 * 1024) + # ################## # configure logging # ################## @@ -79,6 +83,13 @@ def stop(cls): else: cls.logger.info("mantid not initialized - nothing to cleanup") + def restart_and_clear(self): + self.logger.info("Restarting Live Data and clearing workspaces") + self.stop() + time.sleep(1.0) + mtd.clear() + self.start() + # ################## # register a signal handler so we can exit gracefully if someone kills us @@ -157,6 +168,10 @@ def __init__(self, filename): self.accumMethod = str(json_doc.get("accum_method", "Add")) self.periods = json_doc.get("periods", None) self.spectra = json_doc.get("spectra", None) + self.system_mem_limit_perc = json_doc.get("system_mem_limit_perc", 25) # set to 0 to disable + self.mem_check_interval_sec = json_doc.get("mem_check_interval_sec", 1) + self.mem_limit = psutil.virtual_memory().total * self.system_mem_limit_perc / 100 + self.proc_pid = psutil.Process(os.getpid()) # location of the scripts self.script_dir = json_doc.get("script_dir") @@ -321,9 +336,16 @@ def process_default(self, event): self.scriptfiles[event.pathname] = newmd5 # restart the service self.logger.info(f'Processing script "{event.pathname}" changed - restarting ' '"StartLiveData"') - self.livemanager.stop() - time.sleep(1.0) # seconds - self.livemanager.start() + self.livemanager.restart_and_clear() + + +def memory_checker(config, livemanager): + while True: + mem_used = config.proc_pid.memory_info().rss + if mem_used > config.mem_limit: + logger.error(f"Memory usage {mem_used * CONVERSION_FACTOR_BYTES_TO_MB:.2f} MB exceeds limit") + livemanager.restart_and_clear() + time.sleep(config.mem_check_interval_sec) # determine the configuration file @@ -355,6 +377,11 @@ def process_default(self, event): # start up the live data liveDataMgr.start() +# start the memory checker +if config.system_mem_limit_perc > 0: + memory_thread = threading.Thread(target=memory_checker, args=(config, liveDataMgr), daemon=True) + memory_thread.start() + # inotify will keep the program running notifier.loop() diff --git a/test/README.md b/test/README.md index a48ad51..f8a43a3 100644 --- a/test/README.md +++ b/test/README.md @@ -24,3 +24,11 @@ Once the first chunk of live data is processed, `ctrl-C` will interrupt the process and it will close cleanly. In testing mode, the logging will go to `${PWD}/livereduce.log` and can be watched with `tail -F livereduce.log` + + +Example using event data, to test memory monitoring +---------------------------------------------------- + +This test case will continuously accumulate events until it fails. + +Start the server using `test/fake_event_server.py` and use the configuration `test/fake_event.conf`. diff --git a/test/fake_event.conf b/test/fake_event.conf new file mode 100644 index 0000000..aea13b7 --- /dev/null +++ b/test/fake_event.conf @@ -0,0 +1,9 @@ +{ + "instrument": "ISIS_Event", + "script_dir": "test", + "update_every": 3, + "CONDA_ENV": "livereduce", + "accum_method":"Add", + "system_mem_limit_perc": 25, + "mem_check_interval_sec": 1 +} diff --git a/test/fake_event_server.py b/test/fake_event_server.py new file mode 100644 index 0000000..1fe58b7 --- /dev/null +++ b/test/fake_event_server.py @@ -0,0 +1,24 @@ +from threading import Thread + +from mantid import AlgorithmManager, ConfigService +from mantid.simpleapi import FakeISISEventDAE + +facility = ConfigService.getFacility().name() +ConfigService.setFacility("TEST_LIVE") + + +def startServer(): + FakeISISEventDAE(NEvents=1000000) + + +try: + thread = Thread(target=startServer) + thread.start() + thread.join() +except Exception as e: # noqa: BLE001 + print(e) + alg = AlgorithmManager.newestInstanceOf("FakeISISEventDAE") + if alg.isRunning(): + alg.cancel() +finally: + ConfigService.setFacility(facility) diff --git a/test/reduce_ISIS_Event_live_proc.py b/test/reduce_ISIS_Event_live_proc.py new file mode 100644 index 0000000..6ad56a1 --- /dev/null +++ b/test/reduce_ISIS_Event_live_proc.py @@ -0,0 +1 @@ +print("Running proc script")