diff --git a/labscript/labscript.py b/labscript/labscript.py index 1271d8b..6b07706 100644 --- a/labscript/labscript.py +++ b/labscript/labscript.py @@ -658,11 +658,22 @@ def __init__(self, name, parent_device, **kwargs): raise LabscriptError('Error instantiating device %s. The parent (%s) must be an instance of ClockLine.'%(name, parent_device_name)) Device.__init__(self, name, parent_device, 'internal', **kwargs) # This 'internal' should perhaps be more descriptive? + @property + def minimum_clock_high_time(self): + if getattr(self, "clock_limit", None) is None: + return 0 + + # Convert clock limit to minimum pulse period and then divide by 2 to + # get minimum half period. This is the fastest assuming the minimum high + # time corresponds to half the fastest clock pulse supported. + return 1/self.clock_limit/2 + class ClockLine(Device): description = 'Generic ClockLine' allowed_children = [IntermediateDevice] _clock_limit = None + _minimum_clock_high_time = 0 @set_passed_properties(property_names = {}) def __init__(self, name, pseudoclock, connection, ramping_allowed = True, **kwargs): @@ -675,6 +686,10 @@ def add_device(self, device): Device.add_device(self, device) if getattr(device, 'clock_limit', None) is not None and (self._clock_limit is None or device.clock_limit < self.clock_limit): self._clock_limit = device.clock_limit + if getattr(device, 'minimum_clock_high_time', None) is not None: + self._minimum_clock_high_time = max( + device.minimum_clock_high_time, self._minimum_clock_high_time + ) # define a property to make sure no children overwrite this value themselves # The calculation of maximum clock_limit should be done by the add_device method above @@ -689,6 +704,10 @@ def clock_limit(self): return self.parent_device.clock_limit return self._clock_limit + @property + def minimum_clock_high_time(self): + return self._minimum_clock_high_time + class Pseudoclock(Device): """Parent class of all pseudoclocks. @@ -757,113 +776,147 @@ def collect_change_times(self, all_outputs, outputs_by_clockline): change_times[clock_line].extend(output_change_times) all_change_times.extend(output_change_times) ramps_by_clockline[clock_line].extend(output.get_ramp_times()) - - # print 'initial_change_times for %s: %s'%(clock_line.name,change_times[clock_line]) - + # Change to a set and back to get rid of duplicates: if not all_change_times: all_change_times.append(0) all_change_times.append(self.parent_device.stop_time) - # include trigger times in change_times, so that pseudoclocks always have an instruction immediately following a wait: + # include trigger times in change_times, so that pseudoclocks always + # have an instruction immediately following a wait: all_change_times.extend(self.parent_device.trigger_times) - - #################################################################################################### - # Find out whether any other clockline has a change time during a ramp on another clockline. # - # If it does, we need to let the ramping clockline know it needs to break it's loop at that time # - #################################################################################################### + + ######################################################################## + # Find out whether any other clockline has a change time during a ramp # + # on another clockline. If it does, we need to let the ramping # + # clockline know it needs to break it's loop at that time # + ######################################################################## # convert all_change_times to a numpy array all_change_times_numpy = array(all_change_times) - + # quantise the all change times to the pseudoclock clock resolution - # all_change_times_numpy = (all_change_times_numpy/self.clock_resolution).round()*self.clock_resolution - all_change_times_numpy = self.quantise_to_pseudoclock(all_change_times_numpy) - + all_change_times_numpy = self.quantise_to_pseudoclock( + all_change_times_numpy + ) + # Loop through each clockline - # print ramps_by_clockline for clock_line, ramps in ramps_by_clockline.items(): # for each clockline, loop through the ramps on that clockline for ramp_start_time, ramp_end_time in ramps: - # for each ramp, check to see if there is a change time in all_change_times which intersects - # with the ramp. If there is, add a change time into this clockline at that point - indices = np.where((ramp_start_time < all_change_times_numpy) & (all_change_times_numpy < ramp_end_time)) + # for each ramp, check to see if there is a change time in + # all_change_times which intersects with the ramp. If there is, + # add a change time into this clockline at that point + indices = np.where( + (ramp_start_time < all_change_times_numpy) & + (all_change_times_numpy < ramp_end_time) + ) for idx in indices[0]: change_times[clock_line].append(all_change_times_numpy[idx]) - + # Get rid of duplicates: all_change_times = list(set(all_change_times_numpy)) all_change_times.sort() - + # Check that the pseudoclock can handle updates this fast for i, t in enumerate(all_change_times[:-1]): dt = all_change_times[i+1] - t if dt < 1.0/self.clock_limit: - raise LabscriptError('Commands have been issued to devices attached to %s at t= %s s and %s s. '%(self.name, str(t),str(all_change_times[i+1])) + - 'This Pseudoclock cannot support update delays shorter than %s sec.'%(str(1.0/self.clock_limit))) + raise LabscriptError( + "Commands have been issued to devices attached to " + f"{self.name} at t={t} and t={all_change_times[i+1]}. " + "This Pseudoclock cannot support update delays shorter " + f"than {1.0/self.clock_limit} seconds." + ) - #################################################################################################### - # For each clockline, make sure we have a change time for triggers, stop_time, t=0 and # - # check that no change tiems are too close together # - #################################################################################################### + ######################################################################## + # For each clockline, make sure we have a change time for triggers, # + # stop_time, t=0 and check that no change times are too close together # + ######################################################################## for clock_line, change_time_list in change_times.items(): - # include trigger times in change_times, so that pseudoclocks always have an instruction immediately following a wait: + # include trigger times in change_times, so that pseudoclocks always + # have an instruction immediately following a wait: change_time_list.extend(self.parent_device.trigger_times) - + # If the device has no children, we still need it to have a # single instruction. So we'll add 0 as a change time: if not change_time_list: change_time_list.append(0) - + # quantise the all change times to the pseudoclock clock resolution - # change_time_list = (array(change_time_list)/self.clock_resolution).round()*self.clock_resolution change_time_list = self.quantise_to_pseudoclock(change_time_list) - + # Get rid of duplicates if trigger times were already in the list: change_time_list = list(set(change_time_list)) change_time_list.sort() + # Also add the stop time as as change time. First check that it + # isn't too close to the time of the last instruction: + if not self.parent_device.stop_time in change_time_list: + dt = self.parent_device.stop_time - change_time_list[-1] + if abs(dt) < 1.0/clock_line.clock_limit: + raise LabscriptError( + "The stop time of the experiment is " + f"t={self.parent_device.stop_time}, but the last " + f"instruction for a device attached to {self.name} is " + f"at t={change_time_list[-1]}. One or more connected " + "devices cannot support update delays shorter than " + f"{1.0/clock_line.clock_limit} seconds. Please set the " + "stop_time a bit later." + ) + + change_time_list.append(self.parent_device.stop_time) + + # Sort change times so self.stop_time will be in the middle + # somewhere if it is prior to the last actual instruction. + # Whilst this means the user has set stop_time in error, not + # catching the error here allows it to be caught later by the + # specific device that has more instructions after + # self.stop_time. Thus we provide the user with sligtly more + # detailed error info. + change_time_list.sort() + # index to keep track of in all_change_times j = 0 # Check that no two instructions are too close together: for i, t in enumerate(change_time_list[:-1]): dt = change_time_list[i+1] - t if dt < 1.0/clock_line.clock_limit: - raise LabscriptError('Commands have been issued to devices attached to clockline %s at t= %s s and %s s. '%(clock_line.name, str(t),str(change_time_list[i+1])) + - 'One or more connected devices on ClockLine %s cannot support update delays shorter than %s sec.'%(clock_line.name, str(1.0/clock_line.clock_limit))) - + raise LabscriptError( + "Commands have been issued to devices attached to " + f"clockline {clock_line.name} at t={t} and " + f"t={change_time_list[i+1]}. One or more connected " + f"devices on ClockLine {clock_line.name} cannot " + "support update delays shorter than " + f"{1.0/clock_line.clock_limit} seconds" + ) + all_change_times_len = len(all_change_times) # increment j until we reach the current time while all_change_times[j] < t and j < all_change_times_len-1: j += 1 # j should now index all_change_times at "t" - # Check that the next all change_time is not too close (and thus would force this clock tick to be faster than the clock_limit) + # Check that the next all change_time is not too close (and thus + # would force this clock tick to be faster than the minimum + # clock high time) dt = all_change_times[j+1] - t - if dt < 1.0/clock_line.clock_limit: - raise LabscriptError('Commands have been issued to devices attached to %s at t= %s s and %s s. '%(self.name, str(t),str(all_change_times[j+1])) + - 'One or more connected devices on ClockLine %s cannot support update delays shorter than %s sec.'%(clock_line.name, str(1.0/clock_line.clock_limit))) - - # Also add the stop time as as change time. First check that it isn't too close to the time of the last instruction: - if not self.parent_device.stop_time in change_time_list: - dt = self.parent_device.stop_time - change_time_list[-1] - if abs(dt) < 1.0/clock_line.clock_limit: - raise LabscriptError('The stop time of the experiment is t= %s s, but the last instruction for a device attached to %s is at t= %s s. '%( str(self.stop_time), self.name, str(change_time_list[-1])) + - 'One or more connected devices cannot support update delays shorter than %s sec. Please set the stop_time a bit later.'%str(1.0/clock_line.clock_limit)) - - change_time_list.append(self.parent_device.stop_time) - - # Sort change times so self.stop_time will be in the middle - # somewhere if it is prior to the last actual instruction. Whilst - # this means the user has set stop_time in error, not catching - # the error here allows it to be caught later by the specific - # device that has more instructions after self.stop_time. Thus - # we provide the user with sligtly more detailed error info. - change_time_list.sort() - - # because we made the list into a set and back to a list, it is now a different object - # so modifying it won't update the list in the dictionary. - # So store the updated list in the dictionary + if dt < (2 * clock_line.minimum_clock_high_time): + raise LabscriptError( + "Commands have been issued to devices attached to " + f"{self.name} at t={t} and t={all_change_times[j+1]}. " + "One or more connected devices on ClockLine " + f"{clock_line.name} cannot support clock ticks with a " + "digital high time shorter than " + f"{clock_line.minimum_clock_high_time} which is more " + "than half the available time between the event at " + f"t={t} on ClockLine {clock_line.name} and the next " + "event on another ClockLine." + ) + + # because we made the list into a set and back to a list, it is now + # a different object so modifying it won't update the list in the + # dictionary. So store the updated list in the dictionary change_times[clock_line] = change_time_list return all_change_times, change_times - + def expand_change_times(self, all_change_times, change_times, outputs_by_clockline): """For each time interval delimited by change_times, constructs an array of times at which the clock for this device needs to @@ -2412,7 +2465,11 @@ def go_high(self): self.add_instruction(0,1) self._static_value = 1 else: - raise LabscriptError('%s %s has already been set to %s. It cannot also be set to %s.'%(self.description, self.name, self.instruction_to_string[self._static_value], self.instruction_to_string[value])) + raise LabscriptError( + f"{self.description} {self.name} has already been set to " + f"{self.instruction_to_string(self._static_value)}. It cannot " + "also be set to 1." + ) def go_low(self): """Command a static low output. @@ -2424,7 +2481,11 @@ def go_low(self): self.add_instruction(0,0) self._static_value = 0 else: - raise LabscriptError('%s %s has already been set to %s. It cannot also be set to %s.'%(self.description, self.name, self.instruction_to_string[self._static_value], self.instruction_to_string[value])) + raise LabscriptError( + f"{self.description} {self.name} has already been set to " + f"{self.instruction_to_string(self._static_value)}. It cannot " + "also be set to 0." + ) def get_change_times(self): """Enforces no change times.