Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add threading to qcodes Loops #763

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qcodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


from qcodes.station import Station
from qcodes.loops import Loop, active_loop, active_data_set
from qcodes.loops import Loop, active_loop, active_data_set, stop
from qcodes.measure import Measure
from qcodes.actions import Task, Wait, BreakIf
haswebsockets = True
Expand Down
3 changes: 2 additions & 1 deletion qcodes/config/qcodesrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"db_location": "~/experiments.db",
"db_debug": false,
"loglevel": "WARNING",
"file_loglevel": "INFO"
"file_loglevel": "INFO",
"loop_thread": false
},
"logger":{
"console_level": "WARNING",
Expand Down
5 changes: 5 additions & 0 deletions qcodes/config/qcodesrc_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
"type": "string",
"description": "location of the database",
"default": "./experiments.db"
},
"loop_thread": {
"type": "boolean",
"description": "Start QCoDeS loops in separate thread",
"default": false
}
},
"required":["db_location"]
Expand Down
64 changes: 61 additions & 3 deletions qcodes/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@
import logging
import time
import numpy as np
import threading
from functools import partial

from qcodes import config
from qcodes.station import Station
from qcodes.data.data_set import new_data
from qcodes.data.data_array import DataArray
from qcodes.utils.helpers import wait_secs, full_class, tprint
from qcodes.utils.threading import KillableThread
from qcodes.utils.metadata import Metadatable

from .actions import (_actions_snapshot, Task, Wait, _Measure, _Nest,
Expand Down Expand Up @@ -351,6 +355,8 @@ class ActiveLoop(Metadatable):
# Currently active loop, is set when calling loop.run(set_active=True)
# is reset to None when active measurement is finished
active_loop = None
# Flag to stop loop when running
_is_stopped = False

def __init__(self, sweep_values, delay, *actions, then_actions=(),
station=None, progress_interval=None, bg_task=None,
Expand Down Expand Up @@ -606,6 +612,12 @@ def _default_setpoints(self, shape):

return sp

def _raise_if_stopped(self, reset=True):
if self._is_stopped:
if reset:
ActiveLoop._is_stopped = False
raise _QcodesBreak

def set_common_attrs(self, data_set, use_threads):
"""
set a couple of common attributes that the main and nested loops
Expand Down Expand Up @@ -674,12 +686,15 @@ def run_temp(self, **kwargs):
"""
return self.run(quiet=True, location=False, **kwargs)

def run(self, use_threads=False, quiet=False, station=None,
def run(self, thread=None, use_threads=False, quiet=False, station=None,
progress_interval=False, set_active=True, *args, **kwargs):
"""
Execute this loop.

Args:
thread: Start QCoDeS in separate thread. If not specified,
will check qcodes.config.core.loop_thread, which is set to False
by default.
use_threads: (default False): whenever there are multiple `get` calls
back-to-back, execute them in separate threads so they run in
parallel (as long as they don't block each other)
Expand Down Expand Up @@ -716,8 +731,40 @@ def run(self, use_threads=False, quiet=False, station=None,
if progress_interval is not False:
self.progress_interval = progress_interval

if self.data_set is not None:
# Remove name from kwargs since a dataset is already created
kwargs.pop('name', None)
data_set = self.get_data_set(*args, **kwargs)

if thread is None:
thread = config.core.get('loop_thread', False)

if thread:
if any(t.name == 'qcodes_loop' for t in threading.enumerate()):
raise RuntimeError('QCoDeS loop already running. Exiting')

def attach_stop_bg(loop, reset=True):
new_loop = loop.with_bg_task(partial(self._raise_if_stopped,
reset=reset))
for action in loop:
if isinstance(action, ActiveLoop):
attach_stop_bg(action, reset=False)
return new_loop

loop = attach_stop_bg(self)
t = KillableThread(target=loop.run, name='qcodes_loop',
args=args,
kwargs={'thread': False,
'use_threads': use_threads,
'name': None,
'quiet': quiet,
'station': station,
'progress_interval': progress_interval,
'set_active': set_active,
**kwargs})
t.start()
return data_set

self.set_common_attrs(data_set=data_set, use_threads=use_threads)

station = station or self.station or Station.default
Expand All @@ -744,6 +791,7 @@ def run(self, use_threads=False, quiet=False, station=None,
self._run_wrapper()
ds = self.data_set
finally:
ActiveLoop._is_stopped = False
if not quiet:
print(repr(self.data_set))
print(datetime.now().strftime('Finished at %Y-%m-%d %H:%M:%S'))
Expand Down Expand Up @@ -901,6 +949,9 @@ def _run_loop(self, first_delay=0, action_indices=(),
if t - last_task >= self.bg_min_delay:
try:
self.bg_task()
except _QcodesBreak:
log.error('QCodes break raise, stopping')
break
except Exception:
if self.last_task_failed:
self.bg_task = None
Expand All @@ -911,8 +962,11 @@ def _run_loop(self, first_delay=0, action_indices=(),

# run the background task one last time to catch the last setpoint(s)
if self.bg_task is not None:
log.debug('Running the background task one last time.')
self.bg_task()
try:
log.debug('Running the background task one last time.')
self.bg_task()
except _QcodesBreak:
pass

# the loop is finished - run the .then actions
#log.debug('Finishing loop, running the .then actions...')
Expand All @@ -930,3 +984,7 @@ def _wait(self, delay):
finish_clock = time.perf_counter() + delay
t = wait_secs(finish_clock)
time.sleep(t)


def stop():
ActiveLoop._is_stopped = True
92 changes: 92 additions & 0 deletions qcodes/utils/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
# That way the things we call need not be rewritten explicitly async.

import threading
import ctypes
import time
from collections import Iterable
import logging


logger = logging.getLogger(__name__)


class RespondingThread(threading.Thread):
Expand Down Expand Up @@ -72,3 +79,88 @@ def thread_map(callables, args=None, kwargs=None):
t.start()

return [t.output() for t in threads]


class PeriodicThread(threading.Thread):
"""
Creates a thread that periodically calls functions at specified interval.
The thread can be started, paused, and stopped using its methods.

Args:
callables (list[callable]): list of callable functions.
args/kwargs cannot be passed
interval (float): interval between successive calls (in seconds)
name (str): thread name, used to distinguish it from other threads
max_threads(int): maximum number of threads with same name before
emitting a warning
auto_start (bool): If True, start periodic calling of functions
after waiting for interval.
"""
def __init__(self, callables, interval, name=None, max_threads=None,
auto_start=True):
super().__init__(name=name)
self._is_paused = False

if not isinstance(callables, Iterable):
callables = [callables]
self.callables = callables

self.interval = interval
if max_threads is not None:
active_threads = sum(thread.getName()==name
for thread in threading.enumerate())
if active_threads > max_threads:
logger.warning('Found {} active periodic threads'.format(
active_threads))

if auto_start:
time.sleep(interval)
self.start()

def run(self):
while not self._is_stopped:
if not self._is_paused:
for callable in self.callables:
callable()
time.sleep(self.interval)
else:
logger.warning('Periodic thread stopped')

def pause(self):
self._is_paused = True

def unpause(self):
self._is_paused = False

def halt(self):
self._is_stopped = True


def _async_raise(tid, excobj):
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")


class KillableThread(threading.Thread):
"""
A thread that can be forcibly terminated via `KillableThread.terminate()`.
Is potentially unsafe and should only be used as a last resort.
A preferrable stopping method would be by raising a stop flag in the code.
"""
def raise_exc(self, excobj):
assert self.isAlive(), "thread must be started"
for tid, tobj in threading._active.items():
if tobj is self:
_async_raise(tid, excobj)
return

def terminate(self):
# must raise the SystemExit type, instead of a SystemExit() instance
# due to a bug in PyThreadState_SetAsyncExc
self.raise_exc(SystemExit)