From bd23642d7e98b599804c6549ce1cd4befeec8d29 Mon Sep 17 00:00:00 2001 From: nulinspiratie Date: Tue, 26 Sep 2017 18:09:13 +1000 Subject: [PATCH 1/5] feat: add threading to qcodes Loops --- qcodes/__init__.py | 2 +- qcodes/config/qcodesrc.json | 3 +- qcodes/config/qcodesrc_schema.json | 5 ++ qcodes/loops.py | 62 +++++++++++++++++++++++- qcodes/utils/threading.py | 76 ++++++++++++++++++++++++++++++ 5 files changed, 144 insertions(+), 4 deletions(-) diff --git a/qcodes/__init__.py b/qcodes/__init__.py index 542d2c10359..60ed7525ea5 100644 --- a/qcodes/__init__.py +++ b/qcodes/__init__.py @@ -29,7 +29,7 @@ from qcodes.station import Station -from qcodes.loops import Loop, active_loop, active_data_set +from qcodes.loops import Loop, active_loop, active_data_set, stop from qcodes.measure import Measure from qcodes.actions import Task, Wait, BreakIf diff --git a/qcodes/config/qcodesrc.json b/qcodes/config/qcodesrc.json index f0c6d56624b..8a6891cb822 100644 --- a/qcodes/config/qcodesrc.json +++ b/qcodes/config/qcodesrc.json @@ -1,7 +1,8 @@ { "core":{ "loglevel": "DEBUG", - "default_fmt": "data/{date}/#{counter}_{name}_{time}" + "default_fmt": "data/{date}/#{counter}_{name}_{time}", + "loop_thread": false }, "gui" :{ "notebook": true, diff --git a/qcodes/config/qcodesrc_schema.json b/qcodes/config/qcodesrc_schema.json index 9abce34b8ad..f29b894934a 100644 --- a/qcodes/config/qcodesrc_schema.json +++ b/qcodes/config/qcodesrc_schema.json @@ -23,6 +23,11 @@ "INFO", "DEBUG" ] + }, + "loop_thread" :{ + "type" : "boolean", + "description" : "Start QCoDeS loops in separate thread", + "default" : false } }, "required":["loglevel" ] diff --git a/qcodes/loops.py b/qcodes/loops.py index 5a898e0ceec..6924feec5f5 100644 --- a/qcodes/loops.py +++ b/qcodes/loops.py @@ -50,11 +50,15 @@ import logging import time import numpy as np +import threading +from functools import partial +from qcodes import config from qcodes.station import Station from qcodes.data.data_set import new_data from qcodes.data.data_array import DataArray from qcodes.utils.helpers import wait_secs, full_class, tprint +from qcodes.utils.threading import KillableThread from qcodes.utils.metadata import Metadatable from .actions import (_actions_snapshot, Task, Wait, _Measure, _Nest, @@ -344,6 +348,8 @@ class ActiveLoop(Metadatable): # Currently active loop, is set when calling loop.run(set_active=True) # is reset to None when active measurement is finished active_loop = None + # Flag to stop loop when running + _is_stopped = False def __init__(self, sweep_values, delay, *actions, then_actions=(), station=None, progress_interval=None, bg_task=None, @@ -597,6 +603,12 @@ def _default_setpoints(self, shape): return sp + def _raise_if_stopped(self, reset=True): + if self._is_stopped: + if reset: + ActiveLoop._is_stopped = False + raise _QcodesBreak + def set_common_attrs(self, data_set, use_threads): """ set a couple of common attributes that the main and nested loops @@ -665,12 +677,15 @@ def run_temp(self, **kwargs): """ return self.run(quiet=True, location=False, **kwargs) - def run(self, use_threads=False, quiet=False, station=None, + def run(self, thread=None, use_threads=False, quiet=False, station=None, progress_interval=False, set_active=True, *args, **kwargs): """ Execute this loop. Args: + thread: Start QCoDeS in separate thread. If not specified, + will check qcodes.config.core.loop_thread, which is set to False + by default. use_threads: (default False): whenever there are multiple `get` calls back-to-back, execute them in separate threads so they run in parallel (as long as they don't block each other) @@ -707,8 +722,40 @@ def run(self, use_threads=False, quiet=False, station=None, if progress_interval is not False: self.progress_interval = progress_interval + if self.data_set is not None: + # Remove name from kwargs since a dataset is already created + kwargs.pop('name', None) data_set = self.get_data_set(*args, **kwargs) + if thread is None: + thread = config.core.get('loop_thread', False) + + if thread: + if any(t.name == 'qcodes_loop' for t in threading.enumerate()): + raise RuntimeError('QCoDeS loop already running. Exiting') + + def attach_stop_bg(loop, reset=True): + new_loop = loop.with_bg_task(partial(self._raise_if_stopped, + reset=reset)) + for action in loop: + if isinstance(action, ActiveLoop): + attach_stop_bg(action, reset=False) + return new_loop + + loop = attach_stop_bg(self) + t = KillableThread(target=loop.run, name='qcodes_loop', + args=args, + kwargs={'thread': False, + 'use_threads': use_threads, + 'name': None, + 'quiet': quiet, + 'station': station, + 'progress_interval': progress_interval, + 'set_active': set_active, + **kwargs}) + t.start() + return data_set + self.set_common_attrs(data_set=data_set, use_threads=use_threads) station = station or self.station or Station.default @@ -735,6 +782,7 @@ def run(self, use_threads=False, quiet=False, station=None, self._run_wrapper() ds = self.data_set finally: + ActiveLoop._is_stopped = False if not quiet: print(repr(self.data_set)) print(datetime.now().strftime('Finished at %Y-%m-%d %H:%M:%S')) @@ -879,6 +927,9 @@ def _run_loop(self, first_delay=0, action_indices=(), if t - last_task >= self.bg_min_delay: try: self.bg_task() + except _QcodesBreak: + log.error('QCodes break raise, stopping') + break except Exception: if self.last_task_failed: self.bg_task = None @@ -889,7 +940,10 @@ def _run_loop(self, first_delay=0, action_indices=(), # run the background task one last time to catch the last setpoint(s) if self.bg_task is not None: - self.bg_task() + try: + self.bg_task() + except _QcodesBreak: + pass # the loop is finished - run the .then actions for f in self._compile_actions(self.then_actions, ()): @@ -904,3 +958,7 @@ def _wait(self, delay): finish_clock = time.perf_counter() + delay t = wait_secs(finish_clock) time.sleep(t) + + +def stop(): + ActiveLoop._is_stopped = True \ No newline at end of file diff --git a/qcodes/utils/threading.py b/qcodes/utils/threading.py index 4b181c386ca..198f7ad5dc3 100644 --- a/qcodes/utils/threading.py +++ b/qcodes/utils/threading.py @@ -4,6 +4,13 @@ # That way the things we call need not be rewritten explicitly async. import threading +import ctypes +import time +from collections import Iterable +import logging + + +logger = logging.getLogger(__name__) class RespondingThread(threading.Thread): @@ -72,3 +79,72 @@ def thread_map(callables, args=None, kwargs=None): t.start() return [t.output() for t in threads] + + +class UpdaterThread(threading.Thread): + def __init__(self, callables, interval, name=None, max_threads=None, + auto_start=True): + super().__init__(name=name) + self._is_paused = False + + if not isinstance(callables, Iterable): + callables = [callables] + self.callables = callables + + self.interval = interval + if max_threads is not None: + active_threads = sum(thread.getName()==name + for thread in threading.enumerate()) + if active_threads > max_threads: + logger.warning(f'Found {active_threads} active updater threads') + + if auto_start: + time.sleep(interval) + self.start() + + def run(self): + while not self._is_stopped: + if not self._is_paused: + for callable in self.callables: + callable() + time.sleep(self.interval) + else: + logger.warning('Updater thread stopped') + + def pause(self): + self._is_paused = True + + def unpause(self): + self._is_paused = False + + def halt(self): + self._is_stopped = True + + +def _async_raise(tid, excobj): + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj)) + if res == 0: + raise ValueError("nonexistent thread id") + elif res > 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) + raise SystemError("PyThreadState_SetAsyncExc failed") + + +class KillableThread(threading.Thread): + def raise_exc(self, excobj): + assert self.isAlive(), "thread must be started" + for tid, tobj in threading._active.items(): + if tobj is self: + _async_raise(tid, excobj) + return + + # the thread was alive when we entered the loop, but was not found + # in the dict, hence it must have been already terminated. should we raise + # an exception here? silently ignore? + + def terminate(self): + # must raise the SystemExit type, instead of a SystemExit() instance + # due to a bug in PyThreadState_SetAsyncExc + self.raise_exc(SystemExit) From 8b9477d6f4c581e671d17f1f4555444575284d9f Mon Sep 17 00:00:00 2001 From: nulinspiratie Date: Tue, 26 Sep 2017 18:17:17 +1000 Subject: [PATCH 2/5] doc: remove unnecessary comment --- qcodes/utils/threading.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/qcodes/utils/threading.py b/qcodes/utils/threading.py index 198f7ad5dc3..8f412375336 100644 --- a/qcodes/utils/threading.py +++ b/qcodes/utils/threading.py @@ -140,10 +140,6 @@ def raise_exc(self, excobj): _async_raise(tid, excobj) return - # the thread was alive when we entered the loop, but was not found - # in the dict, hence it must have been already terminated. should we raise - # an exception here? silently ignore? - def terminate(self): # must raise the SystemExit type, instead of a SystemExit() instance # due to a bug in PyThreadState_SetAsyncExc From 42034875ebdd57abf7b503214eab7c6464abecbc Mon Sep 17 00:00:00 2001 From: nulinspiratie Date: Wed, 25 Oct 2017 20:33:23 +1100 Subject: [PATCH 3/5] doc: add docstring for KilllableThread and for UpdaterThread --- qcodes/utils/threading.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/qcodes/utils/threading.py b/qcodes/utils/threading.py index 8f412375336..e6ff65ea095 100644 --- a/qcodes/utils/threading.py +++ b/qcodes/utils/threading.py @@ -82,6 +82,20 @@ def thread_map(callables, args=None, kwargs=None): class UpdaterThread(threading.Thread): + """ + Creates a thread that periodically calls functions at specified interval. + The thread can be started, paused, and stopped using its methods. + + Args: + callables (list[callable]): list of callable functions. + args/kwargs cannot be passed + interval (float): interval between successive calls (in seconds) + name (str): thread name, used to distinguish it from other threads + max_threads(int): maximum number of threads with same name before + emitting a warning + auto_start (bool): If True, start periodic calling of functions + after waiting for interval. + """ def __init__(self, callables, interval, name=None, max_threads=None, auto_start=True): super().__init__(name=name) @@ -133,6 +147,11 @@ def _async_raise(tid, excobj): class KillableThread(threading.Thread): + """ + A thread that can be forcibly terminated via `KillableThread.terminate()`. + Is potentially unsafe and should only be used as a last resort. + A preferrable stopping method would be by raising a stop flag in the code. + """ def raise_exc(self, excobj): assert self.isAlive(), "thread must be started" for tid, tobj in threading._active.items(): From 13d72b38610e90f0f884eca7033ff503ddb3a90d Mon Sep 17 00:00:00 2001 From: nulinspiratie Date: Wed, 25 Oct 2017 20:35:49 +1100 Subject: [PATCH 4/5] fix: python 3.5 compatible --- qcodes/utils/threading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qcodes/utils/threading.py b/qcodes/utils/threading.py index e6ff65ea095..68581798ea0 100644 --- a/qcodes/utils/threading.py +++ b/qcodes/utils/threading.py @@ -110,7 +110,7 @@ def __init__(self, callables, interval, name=None, max_threads=None, active_threads = sum(thread.getName()==name for thread in threading.enumerate()) if active_threads > max_threads: - logger.warning(f'Found {active_threads} active updater threads') + logger.warning('Found {} active updater threads'.format(active_threads)) if auto_start: time.sleep(interval) From 5c1778718751053c821840b061d6911521b6724b Mon Sep 17 00:00:00 2001 From: nulinspiratie Date: Wed, 25 Oct 2017 20:38:44 +1100 Subject: [PATCH 5/5] refactor: change UpdaterThread to PeriodicThread --- qcodes/utils/threading.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/qcodes/utils/threading.py b/qcodes/utils/threading.py index 68581798ea0..019cda48fca 100644 --- a/qcodes/utils/threading.py +++ b/qcodes/utils/threading.py @@ -81,7 +81,7 @@ def thread_map(callables, args=None, kwargs=None): return [t.output() for t in threads] -class UpdaterThread(threading.Thread): +class PeriodicThread(threading.Thread): """ Creates a thread that periodically calls functions at specified interval. The thread can be started, paused, and stopped using its methods. @@ -110,7 +110,8 @@ def __init__(self, callables, interval, name=None, max_threads=None, active_threads = sum(thread.getName()==name for thread in threading.enumerate()) if active_threads > max_threads: - logger.warning('Found {} active updater threads'.format(active_threads)) + logger.warning('Found {} active periodic threads'.format( + active_threads)) if auto_start: time.sleep(interval) @@ -123,7 +124,7 @@ def run(self): callable() time.sleep(self.interval) else: - logger.warning('Updater thread stopped') + logger.warning('Periodic thread stopped') def pause(self): self._is_paused = True