diff --git a/qcodes/__init__.py b/qcodes/__init__.py index c67ad2ef3cd..6f8e7f097e9 100644 --- a/qcodes/__init__.py +++ b/qcodes/__init__.py @@ -34,7 +34,7 @@ from qcodes.station import Station -from qcodes.loops import Loop, active_loop, active_data_set +from qcodes.loops import Loop, active_loop, active_data_set, stop from qcodes.measure import Measure from qcodes.actions import Task, Wait, BreakIf haswebsockets = True diff --git a/qcodes/config/qcodesrc.json b/qcodes/config/qcodesrc.json index 1777b28386c..5857dc87b91 100644 --- a/qcodes/config/qcodesrc.json +++ b/qcodes/config/qcodesrc.json @@ -5,7 +5,8 @@ "db_location": "~/experiments.db", "db_debug": false, "loglevel": "WARNING", - "file_loglevel": "INFO" + "file_loglevel": "INFO", + "loop_thread": false }, "logger":{ "console_level": "WARNING", diff --git a/qcodes/config/qcodesrc_schema.json b/qcodes/config/qcodesrc_schema.json index 8cd141a0ace..8eab9822909 100644 --- a/qcodes/config/qcodesrc_schema.json +++ b/qcodes/config/qcodesrc_schema.json @@ -53,6 +53,11 @@ "type": "string", "description": "location of the database", "default": "./experiments.db" + }, + "loop_thread": { + "type": "boolean", + "description": "Start QCoDeS loops in separate thread", + "default": false } }, "required":["db_location"] diff --git a/qcodes/loops.py b/qcodes/loops.py index f866329773c..a10b7373a4d 100644 --- a/qcodes/loops.py +++ b/qcodes/loops.py @@ -50,11 +50,15 @@ import logging import time import numpy as np +import threading +from functools import partial +from qcodes import config from qcodes.station import Station from qcodes.data.data_set import new_data from qcodes.data.data_array import DataArray from qcodes.utils.helpers import wait_secs, full_class, tprint +from qcodes.utils.threading import KillableThread from qcodes.utils.metadata import Metadatable from .actions import (_actions_snapshot, Task, Wait, _Measure, _Nest, @@ -351,6 +355,8 @@ class ActiveLoop(Metadatable): # Currently active loop, is set when calling loop.run(set_active=True) # is reset to None when active measurement is finished active_loop = None + # Flag to stop loop when running + _is_stopped = False def __init__(self, sweep_values, delay, *actions, then_actions=(), station=None, progress_interval=None, bg_task=None, @@ -606,6 +612,12 @@ def _default_setpoints(self, shape): return sp + def _raise_if_stopped(self, reset=True): + if self._is_stopped: + if reset: + ActiveLoop._is_stopped = False + raise _QcodesBreak + def set_common_attrs(self, data_set, use_threads): """ set a couple of common attributes that the main and nested loops @@ -674,12 +686,15 @@ def run_temp(self, **kwargs): """ return self.run(quiet=True, location=False, **kwargs) - def run(self, use_threads=False, quiet=False, station=None, + def run(self, thread=None, use_threads=False, quiet=False, station=None, progress_interval=False, set_active=True, *args, **kwargs): """ Execute this loop. Args: + thread: Start QCoDeS in separate thread. If not specified, + will check qcodes.config.core.loop_thread, which is set to False + by default. use_threads: (default False): whenever there are multiple `get` calls back-to-back, execute them in separate threads so they run in parallel (as long as they don't block each other) @@ -716,8 +731,40 @@ def run(self, use_threads=False, quiet=False, station=None, if progress_interval is not False: self.progress_interval = progress_interval + if self.data_set is not None: + # Remove name from kwargs since a dataset is already created + kwargs.pop('name', None) data_set = self.get_data_set(*args, **kwargs) + if thread is None: + thread = config.core.get('loop_thread', False) + + if thread: + if any(t.name == 'qcodes_loop' for t in threading.enumerate()): + raise RuntimeError('QCoDeS loop already running. Exiting') + + def attach_stop_bg(loop, reset=True): + new_loop = loop.with_bg_task(partial(self._raise_if_stopped, + reset=reset)) + for action in loop: + if isinstance(action, ActiveLoop): + attach_stop_bg(action, reset=False) + return new_loop + + loop = attach_stop_bg(self) + t = KillableThread(target=loop.run, name='qcodes_loop', + args=args, + kwargs={'thread': False, + 'use_threads': use_threads, + 'name': None, + 'quiet': quiet, + 'station': station, + 'progress_interval': progress_interval, + 'set_active': set_active, + **kwargs}) + t.start() + return data_set + self.set_common_attrs(data_set=data_set, use_threads=use_threads) station = station or self.station or Station.default @@ -744,6 +791,7 @@ def run(self, use_threads=False, quiet=False, station=None, self._run_wrapper() ds = self.data_set finally: + ActiveLoop._is_stopped = False if not quiet: print(repr(self.data_set)) print(datetime.now().strftime('Finished at %Y-%m-%d %H:%M:%S')) @@ -901,6 +949,9 @@ def _run_loop(self, first_delay=0, action_indices=(), if t - last_task >= self.bg_min_delay: try: self.bg_task() + except _QcodesBreak: + log.error('QCodes break raise, stopping') + break except Exception: if self.last_task_failed: self.bg_task = None @@ -911,8 +962,11 @@ def _run_loop(self, first_delay=0, action_indices=(), # run the background task one last time to catch the last setpoint(s) if self.bg_task is not None: - log.debug('Running the background task one last time.') - self.bg_task() + try: + log.debug('Running the background task one last time.') + self.bg_task() + except _QcodesBreak: + pass # the loop is finished - run the .then actions #log.debug('Finishing loop, running the .then actions...') @@ -930,3 +984,7 @@ def _wait(self, delay): finish_clock = time.perf_counter() + delay t = wait_secs(finish_clock) time.sleep(t) + + +def stop(): + ActiveLoop._is_stopped = True \ No newline at end of file diff --git a/qcodes/utils/threading.py b/qcodes/utils/threading.py index 98a80cb7470..3333800b43b 100644 --- a/qcodes/utils/threading.py +++ b/qcodes/utils/threading.py @@ -4,6 +4,13 @@ # That way the things we call need not be rewritten explicitly async. import threading +import ctypes +import time +from collections import Iterable +import logging + + +logger = logging.getLogger(__name__) class RespondingThread(threading.Thread): @@ -72,3 +79,88 @@ def thread_map(callables, args=None, kwargs=None): t.start() return [t.output() for t in threads] + + +class PeriodicThread(threading.Thread): + """ + Creates a thread that periodically calls functions at specified interval. + The thread can be started, paused, and stopped using its methods. + + Args: + callables (list[callable]): list of callable functions. + args/kwargs cannot be passed + interval (float): interval between successive calls (in seconds) + name (str): thread name, used to distinguish it from other threads + max_threads(int): maximum number of threads with same name before + emitting a warning + auto_start (bool): If True, start periodic calling of functions + after waiting for interval. + """ + def __init__(self, callables, interval, name=None, max_threads=None, + auto_start=True): + super().__init__(name=name) + self._is_paused = False + + if not isinstance(callables, Iterable): + callables = [callables] + self.callables = callables + + self.interval = interval + if max_threads is not None: + active_threads = sum(thread.getName()==name + for thread in threading.enumerate()) + if active_threads > max_threads: + logger.warning('Found {} active periodic threads'.format( + active_threads)) + + if auto_start: + time.sleep(interval) + self.start() + + def run(self): + while not self._is_stopped: + if not self._is_paused: + for callable in self.callables: + callable() + time.sleep(self.interval) + else: + logger.warning('Periodic thread stopped') + + def pause(self): + self._is_paused = True + + def unpause(self): + self._is_paused = False + + def halt(self): + self._is_stopped = True + + +def _async_raise(tid, excobj): + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj)) + if res == 0: + raise ValueError("nonexistent thread id") + elif res > 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) + raise SystemError("PyThreadState_SetAsyncExc failed") + + +class KillableThread(threading.Thread): + """ + A thread that can be forcibly terminated via `KillableThread.terminate()`. + Is potentially unsafe and should only be used as a last resort. + A preferrable stopping method would be by raising a stop flag in the code. + """ + def raise_exc(self, excobj): + assert self.isAlive(), "thread must be started" + for tid, tobj in threading._active.items(): + if tobj is self: + _async_raise(tid, excobj) + return + + def terminate(self): + # must raise the SystemExit type, instead of a SystemExit() instance + # due to a bug in PyThreadState_SetAsyncExc + self.raise_exc(SystemExit)