diff --git a/agents/lakeshore372/LS372_agent.py b/agents/lakeshore372/LS372_agent.py index b92ed4123..399a2db87 100644 --- a/agents/lakeshore372/LS372_agent.py +++ b/agents/lakeshore372/LS372_agent.py @@ -280,122 +280,126 @@ def acq(self, session, params=None): f"currently held by {self._lock.job}.") continue - if self.fake_data: - data = { - 'timestamp': time.time(), - 'block_name': 'fake-data', - 'data': {} - } - for therm in self.thermometers: - reading = np.random.normal(self.res, 20) - data['data'][therm] = reading - time.sleep(.1) - - else: - active_channel = self.module.get_active_channel() - - # The 372 reports the last updated measurement repeatedly - # during the "pause change time", this results in several - # stale datapoints being recorded. To get around this we - # query the pause time and skip data collection during it - # if the channel has changed (as it would if autoscan is - # enabled.) - if previous_channel != active_channel: - if previous_channel is not None: - pause_time = active_channel.get_pause() - self.log.debug("Pause time for {c}: {p}", - c=active_channel.channel_num, - p=pause_time) - - dwell_time = active_channel.get_dwell() - self.log.debug("User set dwell_time_delay: {p}", - p=self.dwell_time_delay) - - # Check user set dwell time isn't too long - if self.dwell_time_delay > dwell_time: - self.log.warn("WARNING: User set dwell_time_delay of " - + "{delay} s is larger than channel " - + "dwell time of {chan_time} s. If " - + "you are autoscanning this will " - + "cause no data to be collected. " - + "Reducing dwell time delay to {s} s.", - delay=self.dwell_time_delay, - chan_time=dwell_time, - s=dwell_time - 1) - total_time = pause_time + dwell_time - 1 - else: - total_time = pause_time + self.dwell_time_delay - - for i in range(total_time): - self.log.debug("Sleeping for {t} more seconds...", - t=total_time - i) - time.sleep(1) - - # Track the last channel we measured - previous_channel = self.module.get_active_channel() - - current_time = time.time() - data = { - 'timestamp': current_time, - 'block_name': active_channel.name, - 'data': {} - } - - # Collect both temperature and resistance values from each Channel - channel_str = active_channel.name.replace(' ', '_') - temp_reading = self.module.get_temp(unit='kelvin', - chan=active_channel.channel_num) - res_reading = self.module.get_temp(unit='ohms', - chan=active_channel.channel_num) - - # For data feed - data['data'][channel_str + '_T'] = temp_reading - data['data'][channel_str + '_R'] = res_reading - session.app.publish_to_feed('temperatures', data) - self.log.debug("{data}", data=session.data) - - # For session.data - field_dict = {channel_str: {"T": temp_reading, - "R": res_reading, - "timestamp": current_time}} - session.data['fields'].update(field_dict) - - # Also queries control channel if enabled - if self.control_chan_enabled: - temp = self.module.get_temp(unit='kelvin', chan=0) - res = self.module.get_temp(unit='ohms', chan=0) - cur_time = time.time() + try: + if self.fake_data: data = { 'timestamp': time.time(), - 'block_name': 'control_chan', - 'data': { - 'control_T': temp, - 'control_R': res - } + 'block_name': 'fake-data', + 'data': {} + } + for therm in self.thermometers: + reading = np.random.normal(self.res, 20) + data['data'][therm] = reading + time.sleep(.1) + + else: + active_channel = self.module.get_active_channel() + + # The 372 reports the last updated measurement repeatedly + # during the "pause change time", this results in several + # stale datapoints being recorded. To get around this we + # query the pause time and skip data collection during it + # if the channel has changed (as it would if autoscan is + # enabled.) + if previous_channel != active_channel: + if previous_channel is not None: + pause_time = active_channel.get_pause() + self.log.debug("Pause time for {c}: {p}", + c=active_channel.channel_num, + p=pause_time) + + dwell_time = active_channel.get_dwell() + self.log.debug("User set dwell_time_delay: {p}", + p=self.dwell_time_delay) + + # Check user set dwell time isn't too long + if self.dwell_time_delay > dwell_time: + self.log.warn("WARNING: User set dwell_time_delay of " + + "{delay} s is larger than channel " + + "dwell time of {chan_time} s. If " + + "you are autoscanning this will " + + "cause no data to be collected. " + + "Reducing dwell time delay to {s} s.", + delay=self.dwell_time_delay, + chan_time=dwell_time, + s=dwell_time - 1) + total_time = pause_time + dwell_time - 1 + else: + total_time = pause_time + self.dwell_time_delay + + for i in range(total_time): + self.log.debug("Sleeping for {t} more seconds...", + t=total_time - i) + time.sleep(1) + + # Track the last channel we measured + previous_channel = self.module.get_active_channel() + + current_time = time.time() + data = { + 'timestamp': current_time, + 'block_name': active_channel.name, + 'data': {} } + + # Collect both temperature and resistance values from each Channel + channel_str = active_channel.name.replace(' ', '_') + temp_reading = self.module.get_temp(unit='kelvin', + chan=active_channel.channel_num) + res_reading = self.module.get_temp(unit='ohms', + chan=active_channel.channel_num) + + # For data feed + data['data'][channel_str + '_T'] = temp_reading + data['data'][channel_str + '_R'] = res_reading session.app.publish_to_feed('temperatures', data) self.log.debug("{data}", data=session.data) - # Updates session data w/ control field - session.data['fields'].update({ - 'control': { - 'T': temp, 'R': res, 'timestamp': cur_time + + # For session.data + field_dict = {channel_str: {"T": temp_reading, + "R": res_reading, + "timestamp": current_time}} + session.data['fields'].update(field_dict) + + # Also queries control channel if enabled + if self.control_chan_enabled: + temp = self.module.get_temp(unit='kelvin', chan=0) + res = self.module.get_temp(unit='ohms', chan=0) + cur_time = time.time() + data = { + 'timestamp': time.time(), + 'block_name': 'control_chan', + 'data': { + 'control_T': temp, + 'control_R': res + } } - }) - - if params.get("sample_heater", False): - # Sample Heater - heater = self.module.sample_heater - hout = heater.get_sample_heater_output() - - current_time = time.time() - htr_data = { - 'timestamp': current_time, - 'block_name': "heaters", - 'data': {} - } - htr_data['data']['sample_heater_output'] = hout + session.app.publish_to_feed('temperatures', data) + self.log.debug("{data}", data=session.data) + # Updates session data w/ control field + session.data['fields'].update({ + 'control': { + 'T': temp, 'R': res, 'timestamp': cur_time + } + }) + + if params.get("sample_heater", False): + # Sample Heater + heater = self.module.sample_heater + hout = heater.get_sample_heater_output() + + current_time = time.time() + htr_data = { + 'timestamp': current_time, + 'block_name': "heaters", + 'data': {} + } + htr_data['data']['sample_heater_output'] = hout - session.app.publish_to_feed('temperatures', htr_data) + session.app.publish_to_feed('temperatures', htr_data) + except ConnectionResetError as e: + print(f"Skipping acq iteration due to: {e}") + continue if params['run_once']: break diff --git a/agents/scpi_psu/scpi_psu_agent.py b/agents/scpi_psu/scpi_psu_agent.py index f7bc74400..07a602658 100644 --- a/agents/scpi_psu/scpi_psu_agent.py +++ b/agents/scpi_psu/scpi_psu_agent.py @@ -79,9 +79,15 @@ def monitor_output(self, session, params=None): 'data': {} } - for chan in channels: - data['data']["Voltage_{}".format(chan)] = self.psu.get_volt(chan) - data['data']["Current_{}".format(chan)] = self.psu.get_curr(chan) + try: + for chan in channels: + data['data']["Voltage_{}".format(chan)] = self.psu.get_volt(chan) + data['data']["Current_{}".format(chan)] = self.psu.get_curr(chan) + except ConnectionResetError as e: + print( + f"Failed to get volt/curr in monitor_output, skipping this iteration: {e}" + ) + continue # self.log.info(str(data)) # print(data) diff --git a/socs/Lakeshore/Lakeshore372.py b/socs/Lakeshore/Lakeshore372.py index 9b353a6c1..c62575203 100644 --- a/socs/Lakeshore/Lakeshore372.py +++ b/socs/Lakeshore/Lakeshore372.py @@ -2,6 +2,8 @@ import sys import socket +import select +import time import numpy as np # Lookup keys for command parameters. @@ -215,6 +217,8 @@ def __init__(self, ip, timeout=10, num_channels=16): self.com = _establish_socket_connection(ip, timeout) self.num_channels = num_channels + self.reset = lambda: self.__init__(ip, timeout, num_channels) + self.id = self.get_id() self.autoscan = self.get_autoscan() # Enable all channels @@ -234,6 +238,55 @@ def __init__(self, ip, timeout=10, num_channels=16): self.sample_heater = Heater(self, 0) self.still_heater = Heater(self, 2) + def connection_check(self, op): + assert op in ['read', 'write'], "'op' must be 'read' or 'write'" + select_lists = ([self.com, ], [], []) if op == 'read' else ([], [self.com, ], []) + try: + ready_to_read, ready_to_write, in_error = \ + select.select(*select_lists, 5) + except select.error as e: + # self.com.shutdown(2) + # self.com.close() + # print("Lakeshore372 connection error") + # self.disconnect_handler() + # self.connection_check(op) # need to test on real hardware + # return + raise Exception("Triggered select.error block unexpectedly") from e + if op == 'read' and not ready_to_read: + self.disconnect_handler("No sockets ready for reading") + elif op == 'write' and not ready_to_write: + self.disconnect_handler("No sockets ready for writing") + + def disconnect_handler(self, reset_reason): + max_attempts = 500 + for i in range(max_attempts): + try: + self.reset() + break + except socket.error as e: + print(f"Reconnect attempt #{i} failed with: {e}") + if i == max_attempts - 1: + assert False, "Could not reconnect" + time.sleep(1) + print(f"Successfully reconnected on attempt #{i}") + raise ConnectionResetError(reset_reason) # should be caught by agent + + def write(self, message): + self.connection_check('write') + msg_str = f'{message}\r\n'.encode() + try: + self.com.send(msg_str) + except socket.error as e: + self.disconnect_handler(f"Socket write failed (disconnect?): {e}") + + def read(self): + self.connection_check('read') + data = self.com.recv(4096) + if not data: + self.disconnect_handler("Received no data from socket recv") + resp = str(data, 'utf-8').strip() + return resp + def msg(self, message): """Send message to the Lakeshore 372 over ethernet. @@ -252,14 +305,12 @@ def msg(self, message): Response string from the Lakeshore, if any. Else, an empty string. """ - msg_str = f'{message}\r\n'.encode() - if '?' in message: - self.com.send(msg_str) + self.write(message) # Try once, if we timeout, try again. Usually gets around single event glitches. for attempt in range(2): try: - resp = str(self.com.recv(4096), 'utf-8').strip() + resp = self.read() break except socket.timeout: print("Warning: Caught timeout waiting for response to '%s', trying again " @@ -267,8 +318,10 @@ def msg(self, message): if attempt == 1: raise RuntimeError('Query response to Lakeshore timed out after two ' 'attempts. Check connection.') + except ConnectionResetError: + raise else: - self.com.send(msg_str) + self.write(message) resp = '' return resp diff --git a/socs/agent/prologix_interface.py b/socs/agent/prologix_interface.py index 68072512c..f9f60d835 100644 --- a/socs/agent/prologix_interface.py +++ b/socs/agent/prologix_interface.py @@ -1,5 +1,6 @@ import time import socket +import select class PrologixInterface: @@ -21,13 +22,55 @@ def configure(self): self.write('++auto 1') self.write('++addr ' + str(self.gpibAddr)) + def connection_check(self, op): + assert op in ['read', 'write'], "'op' must be 'read' or 'write'" + select_lists = ([self.sock, ], [], []) if op == 'read' else ([], [self.sock, ], []) + try: + ready_to_read, ready_to_write, in_error = \ + select.select(*select_lists, 5) + except select.error as e: + # self.sock.shutdown(2) + # self.sock.close() + # print("Lakeshore372 connection error") + # self.disconnect_handler() + # self.connection_check(op) # need to test on real hardware + # return + raise Exception("Triggered select.error block unexpectedly") from e + if op == 'read' and not ready_to_read: + self.disconnect_handler("No sockets ready for reading") + elif op == 'write' and not ready_to_write: + self.disconnect_handler("No sockets ready for writing") + def write(self, msg): + self.connection_check('write') message = msg + '\n' - self.sock.sendall(message.encode()) + try: + self.sock.sendall(message.encode()) + except socket.error as e: + self.disconnect_handler(f"Socket write failed (disconnect?): {e}") time.sleep(0.1) # Don't send messages too quickly def read(self): - return self.sock.recv(128).decode().strip() + self.connection_check('read') + data = self.sock.recv(128) + if not data: + self.disconnect_handler("Received no data from socket (disconnect?)") + return data.decode().strip() + + def disconnect_handler(self, reset_reason): + max_attempts = 500 + for i in range(max_attempts): + try: + self.conn_socket() + self.configure() + break + except socket.error as e: + print(f"Reconnect attempt #{i} failed with: {e}") + if i == max_attempts - 1: + assert False, "Could not reconnect" + time.sleep(1) + print(f"Successfully reconnected on attempt #{i}") + raise ConnectionResetError(reset_reason) # should be caught by agent def version(self): self.write('++ver') diff --git a/socs/testing/device_emulator.py b/socs/testing/device_emulator.py index dbd6be86e..811dea0b9 100644 --- a/socs/testing/device_emulator.py +++ b/socs/testing/device_emulator.py @@ -192,6 +192,13 @@ def _read_serial(self): def __del__(self): self.shutdown() + def disconnect_reconnect(self, timeout, port): + print(f"<<< Disconnecting tcp relay for {timeout} seconds >>>") + self.shutdown() + time.sleep(timeout) + print(f"<<< Re-creating tcp relay on port {port} >>>") + self.create_tcp_relay(port) + def shutdown(self): """Shutdown communication on the configured relay. This will stop any attempt to read communication on the relay, as well as shutdown the relay diff --git a/tests/integration/test_ls372_agent_integration.py b/tests/integration/test_ls372_agent_integration.py index 4996634aa..1c0c95e10 100644 --- a/tests/integration/test_ls372_agent_integration.py +++ b/tests/integration/test_ls372_agent_integration.py @@ -1,5 +1,6 @@ import os import pytest +import time import ocs from ocs.base import OpCode @@ -17,7 +18,8 @@ run_agent = create_agent_runner_fixture( '../agents/lakeshore372/LS372_agent.py', - 'ls372') + 'ls372', + args=["--log-dir", "./logs/"]) client = create_client_fixture('LSASIM') wait_for_crossbar = create_crossbar_fixture() @@ -224,3 +226,21 @@ def test_ls372_get_still_output(wait_for_crossbar, emulator, run_agent, client): resp = client.get_still_output() assert resp.status == ocs.OK assert resp.session['op_code'] == OpCode.SUCCEEDED.value + + +@pytest.mark.integtest +def test_ls372_disconnect_reconnect(wait_for_crossbar, emulator, run_agent, client): + client.init_lakeshore() + resp = client.acq.start(sample_heater=False, run_once=False) + assert resp.status == ocs.OK + assert resp.session['op_code'] == OpCode.STARTING.value + + time.sleep(5) + emulator.disconnect_reconnect(timeout=3, port=7777) + time.sleep(5) + + client.acq.stop() + time.sleep(1) + resp = client.acq.status() + assert resp.status == ocs.OK + assert resp.session['op_code'] == OpCode.SUCCEEDED.value diff --git a/tests/integration/test_scpi_psu_agent_integration.py b/tests/integration/test_scpi_psu_agent_integration.py index 41dd39e88..edb6a8891 100644 --- a/tests/integration/test_scpi_psu_agent_integration.py +++ b/tests/integration/test_scpi_psu_agent_integration.py @@ -1,3 +1,4 @@ +import time import pytest import ocs @@ -71,6 +72,7 @@ def test_scpi_psu_set_voltage(wait_for_crossbar, gpib_emu, run_agent, client): @pytest.mark.integtest def test_scpi_psu_monitor_output(wait_for_crossbar, gpib_emu, run_agent, client): responses = { + "*idn?": "Keithley instruments, 2230G-30-1, 9203269, 1.16-1.04", "MEAS:VOLT? CH1": "3.14", "MEAS:CURR? CH1": "6.28", "MEAS:VOLT? CH2": "2.72", @@ -84,3 +86,29 @@ def test_scpi_psu_monitor_output(wait_for_crossbar, gpib_emu, run_agent, client) resp = client.monitor_output.start(test_mode=True, wait=0) resp = client.monitor_output.wait() check_resp_success(resp) + + +@pytest.mark.integtest +def test_scpi_psu_monitor_output_disconnect( + wait_for_crossbar, gpib_emu, run_agent, client +): + responses = { + "*idn?": "Keithley instruments, 2230G-30-1, 9203269, 1.16-1.04", + "MEAS:VOLT? CH1": "3.14", + "MEAS:CURR? CH1": "6.28", + "MEAS:VOLT? CH2": "2.72", + "MEAS:CURR? CH2": "5.44", + "MEAS:VOLT? CH3": "1.23", + "MEAS:CURR? CH3": "2.46", + } + gpib_emu.define_responses(responses) + + client.init() + resp = client.monitor_output.start(test_mode=False, wait=0) + time.sleep(5) + gpib_emu.disconnect_reconnect(timeout=3, port=1234) + time.sleep(5) + resp = client.monitor_output.stop() + time.sleep(1) + resp = client.monitor_output.status() + check_resp_success(resp)