From dd2978d66c52ba8e80639b6d9a3749d84a8bdf68 Mon Sep 17 00:00:00 2001 From: Russell Anderson <5637107+rpanderson@users.noreply.github.com> Date: Fri, 12 Jun 2020 18:45:31 +1000 Subject: [PATCH] Initialise with AnalogOut and DigitalOuts + add runviewer_parser --- labscript_devices/DummyIntermediateDevice.py | 209 +++++++++++++++++-- 1 file changed, 189 insertions(+), 20 deletions(-) diff --git a/labscript_devices/DummyIntermediateDevice.py b/labscript_devices/DummyIntermediateDevice.py index 768fa015..800d6e6a 100644 --- a/labscript_devices/DummyIntermediateDevice.py +++ b/labscript_devices/DummyIntermediateDevice.py @@ -15,27 +15,67 @@ # This file represents a dummy labscript device for purposes of testing BLACS # and labscript. The device is a Intermediate Device, and can be attached to # a pseudoclock in labscript in order to test the pseudoclock behaviour -# without needing a real Intermediate Device. -# +# without needing a real Intermediate Device. +# # You can attach an arbitrary number of outputs to this device, however we -# currently only support outputs of type AnalogOut and DigitalOut. I would be +# currently only support outputs of type AnalogOut and DigitalOut. It would be # easy to extend this is anyone needed further functionality. -from labscript_devices import labscript_device, BLACS_tab, BLACS_worker -from labscript import IntermediateDevice, DigitalOut, AnalogOut, config +from labscript_devices import ( + BLACS_tab, + runviewer_parser, +) +from labscript import ( + IntermediateDevice, + DigitalOut, + AnalogOut, + config, + set_passed_properties, +) +from labscript_devices.NI_DAQmx.utils import split_conn_AO, split_conn_DO import numpy as np +import labscript_utils.h5_lock # noqa: F401 +import h5py + +from blacs.device_base_class import DeviceTab +from blacs.tab_base_classes import Worker + class DummyIntermediateDevice(IntermediateDevice): description = 'Dummy IntermediateDevice' - clock_limit = 1e6 - # If this is updated, then you need to update generate_code to support whatever types you add + # If this is updated, then you need to update generate_code to support whatever + # types you add allowed_children = [DigitalOut, AnalogOut] - def __init__(self, name, parent_device, BLACS_connection='dummy_connection', **kwargs): - self.BLACS_connection = BLACS_connection + @set_passed_properties( + property_names={ + "connection_table_properties": [ + "AO_range", + "num_AO", + "ports", + "clock_limit", + ], + "device_properties": [], + } + ) + def __init__( + self, + name, + parent_device=None, + AO_range=[-10.0, 10.0], + num_AO=4, + ports={'port0': {'num_lines': 32, 'supports_buffered': True}}, + clock_limit=1e6, + **kwargs, + ): + self.AO_range = AO_range + self.num_AO = num_AO + self.ports = ports if ports is not None else {} + self.clock_limit = clock_limit + self.BLACS_connection = 'dummy_connection' IntermediateDevice.__init__(self, name, parent_device, **kwargs) def generate_code(self, hdf5_file): @@ -54,43 +94,172 @@ def generate_code(self, hdf5_file): device_dtype = np.int8 elif isinstance(device, AnalogOut): device_dtype = np.float64 - dtypes.append((device.name, device_dtype)) + dtypes.append((device.connection, device_dtype)) # create dataset out_table = np.zeros(len(times), dtype=dtypes) for device in self.child_devices: - out_table[device.name][:] = device.raw_output + out_table[device.connection][:] = device.raw_output group.create_dataset('OUTPUTS', compression=config.compression, data=out_table) -from blacs.device_base_class import DeviceTab -from blacs.tab_base_classes import Worker - @BLACS_tab class DummyIntermediateDeviceTab(DeviceTab): def initialise_GUI(self): - self.create_worker("main_worker",DummyIntermediateDeviceWorker,{}) + # Get capabilities from connection table properties: + connection_table = self.settings['connection_table'] + properties = connection_table.find_by_name(self.device_name).properties + + num_AO = properties['num_AO'] + # num_DO = properties['num_DO'] + ports = properties['ports'] + + AO_base_units = 'V' + if num_AO > 0: + AO_base_min, AO_base_max = properties['AO_range'] + else: + AO_base_min, AO_base_max = None, None + AO_base_step = 0.1 + AO_base_decimals = 3 + + # Create output objects: + AO_prop = {} + for i in range(num_AO): + AO_prop['ao%d' % i] = { + 'base_unit': AO_base_units, + 'min': AO_base_min, + 'max': AO_base_max, + 'step': AO_base_step, + 'decimals': AO_base_decimals, + } + + DO_proplist = [] + DO_hardware_names = [] + for port_num in range(len(ports)): + port_str = 'port%d' % port_num + port_props = {} + for line in range(ports[port_str]['num_lines']): + hardware_name = 'port%d/line%d' % (port_num, line) + port_props[hardware_name] = {} + DO_hardware_names.append(hardware_name) + DO_proplist.append((port_str, port_props)) + + # Create the output objects + self.create_analog_outputs(AO_prop) + + # Create widgets for outputs defined so far (i.e. analog outputs only) + _, AO_widgets, _ = self.auto_create_widgets() + + # now create the digital output objects one port at a time + for _, DO_prop in DO_proplist: + self.create_digital_outputs(DO_prop) + + # Manually create the digital output widgets so they are grouped separately + DO_widgets_by_port = {} + for port_str, DO_prop in DO_proplist: + DO_widgets_by_port[port_str] = self.create_digital_widgets(DO_prop) + + # Auto place the widgets in the UI, specifying sort keys for ordering them: + widget_list = [("Analog outputs", AO_widgets, split_conn_AO)] + for port_num in range(len(ports)): + port_str = 'port%d' % port_num + DO_widgets = DO_widgets_by_port[port_str] + name = "Digital outputs: %s" % port_str + if ports[port_str]['supports_buffered']: + name += ' (buffered)' + else: + name += ' (static)' + widget_list.append((name, DO_widgets, split_conn_DO)) + self.auto_place_widgets(*widget_list) + + # Create and set the primary worker + self.create_worker( + "main_worker", + DummyIntermediateDeviceWorker, + { + 'Vmin': AO_base_min, + 'Vmax': AO_base_max, + 'num_AO': num_AO, + 'ports': ports, + 'DO_hardware_names': DO_hardware_names, + }, + ) self.primary_worker = "main_worker" + # Set the capabilities of this device + self.supports_remote_value_check(False) + self.supports_smart_programming(False) + + class DummyIntermediateDeviceWorker(Worker): def init(self): pass + def get_output_table(self, h5file, device_name): + """Return the OUTPUT table from the file, or None if it does not exist.""" + with h5py.File(h5file, 'r') as hdf5_file: + group = hdf5_file['devices'][device_name] + try: + return group['OUTPUTS'][:] + except KeyError: + return None + def program_manual(self, front_panel_values): - return front_panel_values + return front_panel_values def transition_to_buffered(self, device_name, h5file, initial_values, fresh): - return initial_values + # Get the data to be programmed into the output tasks: + outputs = self.get_output_table(h5file, device_name) + + # Collect the final values of the outputs + final_values = dict(zip(outputs.dtype.names, outputs[-1])) - def transition_to_manual(self,abort = False): + return final_values + + def transition_to_manual(self, abort=False): return True def abort_transition_to_buffered(self): return self.transition_to_manual(True) - + def abort_buffered(self): return self.transition_to_manual(True) def shutdown(self): - pass \ No newline at end of file + pass + + +@runviewer_parser +class DummyIntermediateDeviceParser(object): + def __init__(self, path, device): + self.path = path + self.name = device.name + self.device = device + + def get_traces(self, add_trace, clock=None): + times, clock_value = clock[0], clock[1] + + clock_indices = np.where((clock_value[1:] - clock_value[:-1]) == 1)[0] + 1 + # If initial clock value is 1, then this counts as a rising edge (clock should + # be 0 before experiment) but this is not picked up by the above code. So we + # insert it! + if clock_value[0] == 1: + clock_indices = np.insert(clock_indices, 0, 0) + clock_ticks = times[clock_indices] + + # Get the output table from the experiment shot file + with h5py.File(self.path, 'r') as hdf5_file: + outputs = hdf5_file[f"devices/{self.name}/OUTPUTS"][:] + + traces = {} + + for channel in outputs.dtype.names: + traces[channel] = (clock_ticks, outputs[channel]) + + for channel_name, channel in self.device.child_list.items(): + if channel.parent_port in traces: + trace = traces[channel.parent_port] + add_trace(channel_name, trace, self.name, channel.parent_port) + + return {}