Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement automatic socket reconnection for LS372 and SCPI agents #325

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 113 additions & 109 deletions agents/lakeshore372/LS372_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,122 +280,126 @@ def acq(self, session, params=None):
f"currently held by {self._lock.job}.")
continue

if self.fake_data:
data = {
'timestamp': time.time(),
'block_name': 'fake-data',
'data': {}
}
for therm in self.thermometers:
reading = np.random.normal(self.res, 20)
data['data'][therm] = reading
time.sleep(.1)

else:
active_channel = self.module.get_active_channel()

# The 372 reports the last updated measurement repeatedly
# during the "pause change time", this results in several
# stale datapoints being recorded. To get around this we
# query the pause time and skip data collection during it
# if the channel has changed (as it would if autoscan is
# enabled.)
if previous_channel != active_channel:
if previous_channel is not None:
pause_time = active_channel.get_pause()
self.log.debug("Pause time for {c}: {p}",
c=active_channel.channel_num,
p=pause_time)

dwell_time = active_channel.get_dwell()
self.log.debug("User set dwell_time_delay: {p}",
p=self.dwell_time_delay)

# Check user set dwell time isn't too long
if self.dwell_time_delay > dwell_time:
self.log.warn("WARNING: User set dwell_time_delay of "
+ "{delay} s is larger than channel "
+ "dwell time of {chan_time} s. If "
+ "you are autoscanning this will "
+ "cause no data to be collected. "
+ "Reducing dwell time delay to {s} s.",
delay=self.dwell_time_delay,
chan_time=dwell_time,
s=dwell_time - 1)
total_time = pause_time + dwell_time - 1
else:
total_time = pause_time + self.dwell_time_delay

for i in range(total_time):
self.log.debug("Sleeping for {t} more seconds...",
t=total_time - i)
time.sleep(1)

# Track the last channel we measured
previous_channel = self.module.get_active_channel()

current_time = time.time()
data = {
'timestamp': current_time,
'block_name': active_channel.name,
'data': {}
}

# Collect both temperature and resistance values from each Channel
channel_str = active_channel.name.replace(' ', '_')
temp_reading = self.module.get_temp(unit='kelvin',
chan=active_channel.channel_num)
res_reading = self.module.get_temp(unit='ohms',
chan=active_channel.channel_num)

# For data feed
data['data'][channel_str + '_T'] = temp_reading
data['data'][channel_str + '_R'] = res_reading
session.app.publish_to_feed('temperatures', data)
self.log.debug("{data}", data=session.data)

# For session.data
field_dict = {channel_str: {"T": temp_reading,
"R": res_reading,
"timestamp": current_time}}
session.data['fields'].update(field_dict)

# Also queries control channel if enabled
if self.control_chan_enabled:
temp = self.module.get_temp(unit='kelvin', chan=0)
res = self.module.get_temp(unit='ohms', chan=0)
cur_time = time.time()
try:
if self.fake_data:
data = {
'timestamp': time.time(),
'block_name': 'control_chan',
'data': {
'control_T': temp,
'control_R': res
}
'block_name': 'fake-data',
'data': {}
}
for therm in self.thermometers:
reading = np.random.normal(self.res, 20)
data['data'][therm] = reading
time.sleep(.1)

else:
active_channel = self.module.get_active_channel()

# The 372 reports the last updated measurement repeatedly
# during the "pause change time", this results in several
# stale datapoints being recorded. To get around this we
# query the pause time and skip data collection during it
# if the channel has changed (as it would if autoscan is
# enabled.)
if previous_channel != active_channel:
if previous_channel is not None:
pause_time = active_channel.get_pause()
self.log.debug("Pause time for {c}: {p}",
c=active_channel.channel_num,
p=pause_time)

dwell_time = active_channel.get_dwell()
self.log.debug("User set dwell_time_delay: {p}",
p=self.dwell_time_delay)

# Check user set dwell time isn't too long
if self.dwell_time_delay > dwell_time:
self.log.warn("WARNING: User set dwell_time_delay of "
+ "{delay} s is larger than channel "
+ "dwell time of {chan_time} s. If "
+ "you are autoscanning this will "
+ "cause no data to be collected. "
+ "Reducing dwell time delay to {s} s.",
delay=self.dwell_time_delay,
chan_time=dwell_time,
s=dwell_time - 1)
total_time = pause_time + dwell_time - 1
else:
total_time = pause_time + self.dwell_time_delay

for i in range(total_time):
self.log.debug("Sleeping for {t} more seconds...",
t=total_time - i)
time.sleep(1)

# Track the last channel we measured
previous_channel = self.module.get_active_channel()

current_time = time.time()
data = {
'timestamp': current_time,
'block_name': active_channel.name,
'data': {}
}

# Collect both temperature and resistance values from each Channel
channel_str = active_channel.name.replace(' ', '_')
temp_reading = self.module.get_temp(unit='kelvin',
chan=active_channel.channel_num)
res_reading = self.module.get_temp(unit='ohms',
chan=active_channel.channel_num)

# For data feed
data['data'][channel_str + '_T'] = temp_reading
data['data'][channel_str + '_R'] = res_reading
session.app.publish_to_feed('temperatures', data)
self.log.debug("{data}", data=session.data)
# Updates session data w/ control field
session.data['fields'].update({
'control': {
'T': temp, 'R': res, 'timestamp': cur_time

# For session.data
field_dict = {channel_str: {"T": temp_reading,
"R": res_reading,
"timestamp": current_time}}
session.data['fields'].update(field_dict)

# Also queries control channel if enabled
if self.control_chan_enabled:
temp = self.module.get_temp(unit='kelvin', chan=0)
res = self.module.get_temp(unit='ohms', chan=0)
cur_time = time.time()
data = {
'timestamp': time.time(),
'block_name': 'control_chan',
'data': {
'control_T': temp,
'control_R': res
}
}
})

if params.get("sample_heater", False):
# Sample Heater
heater = self.module.sample_heater
hout = heater.get_sample_heater_output()

current_time = time.time()
htr_data = {
'timestamp': current_time,
'block_name': "heaters",
'data': {}
}
htr_data['data']['sample_heater_output'] = hout
session.app.publish_to_feed('temperatures', data)
self.log.debug("{data}", data=session.data)
# Updates session data w/ control field
session.data['fields'].update({
'control': {
'T': temp, 'R': res, 'timestamp': cur_time
}
})

if params.get("sample_heater", False):
# Sample Heater
heater = self.module.sample_heater
hout = heater.get_sample_heater_output()

current_time = time.time()
htr_data = {
'timestamp': current_time,
'block_name': "heaters",
'data': {}
}
htr_data['data']['sample_heater_output'] = hout

session.app.publish_to_feed('temperatures', htr_data)
session.app.publish_to_feed('temperatures', htr_data)
except ConnectionResetError as e:
print(f"Skipping acq iteration due to: {e}")
continue

if params['run_once']:
break
Expand Down
12 changes: 9 additions & 3 deletions agents/scpi_psu/scpi_psu_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ def monitor_output(self, session, params=None):
'data': {}
}

for chan in channels:
data['data']["Voltage_{}".format(chan)] = self.psu.get_volt(chan)
data['data']["Current_{}".format(chan)] = self.psu.get_curr(chan)
try:
for chan in channels:
data['data']["Voltage_{}".format(chan)] = self.psu.get_volt(chan)
data['data']["Current_{}".format(chan)] = self.psu.get_curr(chan)
except ConnectionResetError as e:
print(
f"Failed to get volt/curr in monitor_output, skipping this iteration: {e}"
)
continue

# self.log.info(str(data))
# print(data)
Expand Down
63 changes: 58 additions & 5 deletions socs/Lakeshore/Lakeshore372.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import sys
import socket
import select
import time
import numpy as np

# Lookup keys for command parameters.
Expand Down Expand Up @@ -215,6 +217,8 @@ def __init__(self, ip, timeout=10, num_channels=16):
self.com = _establish_socket_connection(ip, timeout)
self.num_channels = num_channels

self.reset = lambda: self.__init__(ip, timeout, num_channels)

self.id = self.get_id()
self.autoscan = self.get_autoscan()
# Enable all channels
Expand All @@ -234,6 +238,55 @@ def __init__(self, ip, timeout=10, num_channels=16):
self.sample_heater = Heater(self, 0)
self.still_heater = Heater(self, 2)

def connection_check(self, op):
assert op in ['read', 'write'], "'op' must be 'read' or 'write'"
select_lists = ([self.com, ], [], []) if op == 'read' else ([], [self.com, ], [])
try:
ready_to_read, ready_to_write, in_error = \
select.select(*select_lists, 5)
except select.error as e:
# self.com.shutdown(2)
# self.com.close()
# print("Lakeshore372 connection error")
# self.disconnect_handler()
# self.connection_check(op) # need to test on real hardware
# return
raise Exception("Triggered select.error block unexpectedly") from e
if op == 'read' and not ready_to_read:
self.disconnect_handler("No sockets ready for reading")
elif op == 'write' and not ready_to_write:
self.disconnect_handler("No sockets ready for writing")

def disconnect_handler(self, reset_reason):
max_attempts = 500
for i in range(max_attempts):
try:
self.reset()
break
except socket.error as e:
print(f"Reconnect attempt #{i} failed with: {e}")
if i == max_attempts - 1:
assert False, "Could not reconnect"
time.sleep(1)
print(f"Successfully reconnected on attempt #{i}")
raise ConnectionResetError(reset_reason) # should be caught by agent

def write(self, message):
self.connection_check('write')
msg_str = f'{message}\r\n'.encode()
try:
self.com.send(msg_str)
except socket.error as e:
self.disconnect_handler(f"Socket write failed (disconnect?): {e}")

def read(self):
self.connection_check('read')
data = self.com.recv(4096)
if not data:
self.disconnect_handler("Received no data from socket recv")
resp = str(data, 'utf-8').strip()
return resp

def msg(self, message):
"""Send message to the Lakeshore 372 over ethernet.

Expand All @@ -252,23 +305,23 @@ def msg(self, message):
Response string from the Lakeshore, if any. Else, an empty string.

"""
msg_str = f'{message}\r\n'.encode()

if '?' in message:
self.com.send(msg_str)
self.write(message)
# Try once, if we timeout, try again. Usually gets around single event glitches.
for attempt in range(2):
try:
resp = str(self.com.recv(4096), 'utf-8').strip()
resp = self.read()
break
except socket.timeout:
print("Warning: Caught timeout waiting for response to '%s', trying again "
"before giving up" % message)
if attempt == 1:
raise RuntimeError('Query response to Lakeshore timed out after two '
'attempts. Check connection.')
except ConnectionResetError:
raise
else:
self.com.send(msg_str)
self.write(message)
resp = ''

return resp
Expand Down
Loading