diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 8d3012f2a..d37b68e35 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -40,6 +40,95 @@ global_media_settings = media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS'].pop('1-32') media_settings_with_comma_dict['GLOBAL_MEDIA_SETTINGS']['1-5,6,7-20,21-32'] = global_media_settings +class TestXcvrdThreadException(object): + + @patch('xcvrd.xcvrd.platform_chassis', MagicMock()) + def test_CmisManagerTask_task_run_with_exception(self): + port_mapping = PortMapping() + stop_event = threading.Event() + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + cmis_manager.wait_for_port_config_done = MagicMock(side_effect = NotImplementedError) + exception_received = None + trace = None + try: + cmis_manager.start() + cmis_manager.join() + except Exception as e1: + exception_received = e1 + trace = traceback.format_exc() + + assert not cmis_manager.is_alive() + assert(type(exception_received) == NotImplementedError) + assert("NotImplementedError" in str(trace) and "effect" in str(trace)) + assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) + assert("wait_for_port_config_done" in str(trace)) + + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) + def test_DomInfoUpdateTask_task_run_with_exception(self): + port_mapping = PortMapping() + stop_event = threading.Event() + dom_info_update = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + exception_received = None + trace = None + try: + dom_info_update.start() + dom_info_update.join() + except Exception as e1: + exception_received = e1 + trace = traceback.format_exc() + + assert not dom_info_update.is_alive() + assert(type(exception_received) == NotImplementedError) + assert("NotImplementedError" in str(trace) and "effect" in str(trace)) + assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) + assert("subscribe_port_config_change" in str(trace)) + + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(side_effect = NotImplementedError)) + def test_SfpStateUpdateTask_task_run_with_exception(self): + port_mapping = PortMapping() + retry_eeprom_set = set() + stop_event = threading.Event() + sfp_error_event = threading.Event() + sfp_state_update = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) + exception_received = None + trace = None + try: + sfp_state_update.start() + sfp_state_update.join() + except Exception as e1: + exception_received = e1 + trace = traceback.format_exc() + + assert not sfp_state_update.is_alive() + assert(type(exception_received) == NotImplementedError) + assert("NotImplementedError" in str(trace) and "effect" in str(trace)) + assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) + assert("subscribe_port_config_change" in str(trace)) + + @patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False)) + @patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False)) + @patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False)) + @patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect = NotImplementedError)) + @patch('xcvrd.xcvrd.CmisManagerTask.start', MagicMock()) + @patch('xcvrd.xcvrd.DomInfoUpdateTask.start', MagicMock()) + @patch('xcvrd.xcvrd.SfpStateUpdateTask.start', MagicMock()) + @patch('xcvrd.xcvrd.DaemonXcvrd.deinit', MagicMock()) + @patch('os.kill') + @patch('xcvrd.xcvrd.DaemonXcvrd.init') + @patch('xcvrd.xcvrd.DomInfoUpdateTask.join') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.join') + def test_DaemonXcvrd_run_with_exception(self, mock_task_join1, mock_task_join2, mock_init, mock_os_kill): + mock_init.return_value = (PortMapping(), set()) + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + xcvrd.stop_event.wait = MagicMock() + xcvrd.run() + + assert len(xcvrd.threads) == 3 + assert mock_init.call_count == 1 + assert mock_task_join1.call_count == 1 + assert mock_task_join2.call_count == 1 + assert mock_os_kill.call_count == 1 + class TestXcvrdScript(object): @patch('xcvrd.xcvrd._wrapper_get_sfp_type') @@ -482,10 +571,10 @@ def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table @patch('xcvrd.xcvrd.DaemonXcvrd.init') @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') - @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run') - @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run') - @patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop') - @patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop') + @patch('xcvrd.xcvrd.DomInfoUpdateTask.start') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.start') + @patch('xcvrd.xcvrd.DomInfoUpdateTask.join') + @patch('xcvrd.xcvrd.SfpStateUpdateTask.join') def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init): mock_init.return_value = (PortMapping(), set()) xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) @@ -502,7 +591,8 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_handle_port_change_event(self): port_mapping = PortMapping() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) + stop_event = threading.Event() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) assert not task.isPortConfigDone port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) @@ -529,7 +619,8 @@ def test_CmisManagerTask_handle_port_change_event(self): @patch('xcvrd.xcvrd.XcvrTableHelper') def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): port_mapping = PortMapping() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) + stop_event = threading.Event() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) cfg_port_tbl = MagicMock() cfg_port_tbl.get = MagicMock(return_value=(True, (('laser_freq', 193100),))) mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) @@ -539,7 +630,8 @@ def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): @patch('xcvrd.xcvrd.XcvrTableHelper') def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper): port_mapping = PortMapping() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) + stop_event = threading.Event() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) cfg_port_tbl = MagicMock() cfg_port_tbl.get = MagicMock(return_value=(True, (('tx_power', -10),))) mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) @@ -555,11 +647,12 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object]) port_mapping = PortMapping() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) - task.wait_for_port_config_done = MagicMock() - task.task_run() - task.task_stop() - assert task.task_process is None + stop_event = threading.Event() + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + cmis_manager.wait_for_port_config_done = MagicMock() + cmis_manager.start() + cmis_manager.join() + assert not cmis_manager.is_alive() @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) @@ -658,7 +751,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) port_mapping = PortMapping() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) + stop_event = threading.Event() + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) task.on_port_update_event(port_change_event) @@ -709,7 +803,8 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): @patch('xcvrd.xcvrd.delete_port_from_status_table_hw') def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw): port_mapping = PortMapping() - task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping) + stop_event = threading.Event() + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) task.on_port_config_change(port_change_event) @@ -731,10 +826,11 @@ def test_DomInfoUpdateTask_handle_port_change_event(self, mock_del_status_tbl_hw @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) def test_DomInfoUpdateTask_task_run_stop(self): port_mapping = PortMapping() - task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping) - task.task_run() - task.task_stop() - assert not task.task_thread.is_alive() + stop_event = threading.Event() + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task.start() + task.join() + assert not task.is_alive() @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status') @@ -755,7 +851,8 @@ def test_DomInfoUpdateTask_task_worker(self, mock_post_pm_info, mock_update_stat mock_sub_table.return_value = mock_selectable port_mapping = PortMapping() - task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping) + stop_event = threading.Event() + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, stop_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) mock_detect_error.return_value = True @@ -786,10 +883,11 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw mock_table_helper.get_int_tbl = MagicMock(return_value=mock_table) mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table) mock_table_helper.get_dom_threshold_tbl = MagicMock(return_value=mock_table) - stopping_event = multiprocessing.Event() + stop_event = threading.Event() + sfp_error_event = threading.Event() port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl @@ -822,15 +920,18 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_update_status_hw assert not task.port_mapping.logical_to_asic assert mock_update_status_hw.call_count == 1 + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) def test_SfpStateUpdateTask_task_run_stop(self): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) - sfp_error_event = multiprocessing.Event() - task.task_run(sfp_error_event) - assert wait_until(5, 1, task.task_process.is_alive) - task.task_stop() - assert wait_until(5, 1, lambda: task.task_process.is_alive() is False) + stop_event = threading.Event() + sfp_error_event = threading.Event() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) + task.start() + assert wait_until(5, 1, task.is_alive) + task.raise_exception() + task.join() + assert wait_until(5, 1, lambda: task.is_alive() is False) @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') @@ -841,7 +942,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + stop_event = threading.Event() + sfp_error_event = threading.Event() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table) task.xcvr_table_helper.get_dom_tbl = MagicMock(return_value=mock_table) @@ -873,7 +976,9 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_update_status_hw, mo def test_SfpStateUpdateTask_mapping_event_from_change_event(self): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + stop_event = threading.Event() + sfp_error_event = threading.Event() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) port_dict = {} assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL @@ -910,10 +1015,10 @@ def test_SfpStateUpdateTask_task_worker(self, mock_post_pm_info, mock_del_status mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + stop_event = threading.Event() + sfp_error_event = threading.Event() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) - stop_event = multiprocessing.Event() - sfp_error_event = multiprocessing.Event() mock_change_event.return_value = (True, {0: 0}, {}) mock_mapping_event.return_value = SYSTEM_NOT_READY @@ -1036,7 +1141,9 @@ class MockTable: port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + stop_event = threading.Event() + sfp_error_event = threading.Event() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set, stop_event, sfp_error_event) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 0ab68d4bc..ed2f42105 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -9,7 +9,6 @@ import ast import copy import json - import multiprocessing import os import signal import sys @@ -19,6 +18,8 @@ import subprocess import argparse import re + import traceback + import ctypes from sonic_py_common import daemon_base, device_info, logger from sonic_py_common import multi_asic @@ -938,7 +939,7 @@ def is_fast_reboot_enabled(): # Thread wrapper class for CMIS transceiver management -class CmisManagerTask: +class CmisManagerTask(threading.Thread): CMIS_MAX_RETRIES = 3 CMIS_DEF_EXPIRED = 60 # seconds, default expiration time @@ -956,9 +957,12 @@ class CmisManagerTask: CMIS_STATE_REMOVED = 'REMOVED' CMIS_STATE_FAILED = 'FAILED' - def __init__(self, namespaces, port_mapping, skip_cmis_mgr=False): - self.task_stopping_event = multiprocessing.Event() - self.task_process = None + def __init__(self, namespaces, port_mapping, main_thread_stop_event, skip_cmis_mgr=False): + threading.Thread.__init__(self) + self.name = "CmisManagerTask" + self.exc = None + self.task_stopping_event = threading.Event() + self.main_thread_stop_event = main_thread_stop_event self.port_dict = {} self.port_mapping = copy.deepcopy(port_mapping) self.xcvr_table_helper = XcvrTableHelper(namespaces) @@ -1605,7 +1609,7 @@ def task_worker(self): self.log_notice("Stopped") - def task_run(self): + def run(self): if platform_chassis is None: self.log_notice("Platform chassis is not available, stopping...") return @@ -1614,23 +1618,35 @@ def task_run(self): self.log_notice("Skipping CMIS Task Manager") return - self.task_process = multiprocessing.Process(target=self.task_worker) - if self.task_process is not None: - self.task_process.start() - - def task_stop(self): + try: + self.task_worker() + except Exception as e: + helper_logger.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + helper_logger.log_error(tb_line_split) + self.exc = e + self.main_thread_stop_event.set() + + def join(self): self.task_stopping_event.set() - if self.task_process is not None: - self.task_process.join() - self.task_process = None + if not self.skip_cmis_mgr: + threading.Thread.join(self) + if self.exc: + raise self.exc # Thread wrapper class to update dom info periodically -class DomInfoUpdateTask(object): - def __init__(self, namespaces, port_mapping): - self.task_thread = None +class DomInfoUpdateTask(threading.Thread): + def __init__(self, namespaces, port_mapping, main_thread_stop_event): + threading.Thread.__init__(self) + self.name = "DomInfoUpdateTask" + self.exc = None self.task_stopping_event = threading.Event() + self.main_thread_stop_event = main_thread_stop_event self.port_mapping = copy.deepcopy(port_mapping) self.namespaces = namespaces @@ -1673,16 +1689,26 @@ def task_worker(self): helper_logger.log_info("Stop DOM monitoring loop") - def task_run(self): + def run(self): if self.task_stopping_event.is_set(): return - - self.task_thread = threading.Thread(target=self.task_worker) - self.task_thread.start() - - def task_stop(self): + try: + self.task_worker() + except Exception as e: + helper_logger.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + helper_logger.log_error(tb_line_split) + self.exc = e + self.main_thread_stop_event.set() + + def join(self): self.task_stopping_event.set() - self.task_thread.join() + threading.Thread.join(self) + if self.exc: + raise self.exc def on_port_config_change(self, port_change_event): if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: @@ -1709,14 +1735,18 @@ def on_remove_logical_port(self, port_change_event): self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) -# Process wrapper class to update sfp state info periodically +# Thread wrapper class to update sfp state info periodically -class SfpStateUpdateTask(object): +class SfpStateUpdateTask(threading.Thread): RETRY_EEPROM_READING_INTERVAL = 60 - def __init__(self, namespaces, port_mapping, retry_eeprom_set): - self.task_process = None - self.task_stopping_event = multiprocessing.Event() + def __init__(self, namespaces, port_mapping, retry_eeprom_set, main_thread_stop_event, sfp_error_event): + threading.Thread.__init__(self) + self.name = "SfpStateUpdateTask" + self.exc = None + self.task_stopping_event = threading.Event() + self.main_thread_stop_event = main_thread_stop_event + self.sfp_error_event = sfp_error_event self.port_mapping = copy.deepcopy(port_mapping) # A set to hold those logical port name who fail to read EEPROM self.retry_eeprom_set = retry_eeprom_set @@ -2011,18 +2041,37 @@ def task_worker(self, stopping_event, sfp_error_event): helper_logger.log_info("Stop SFP monitoring loop") - def task_run(self, sfp_error_event): + def run(self): + self.thread_id = threading.current_thread().ident if self.task_stopping_event.is_set(): return - - - self.task_process = multiprocessing.Process(target=self.task_worker, args=( - self.task_stopping_event, sfp_error_event)) - self.task_process.start() - - def task_stop(self): + try: + self.task_worker(self.task_stopping_event, self.sfp_error_event) + except Exception as e: + helper_logger.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().getName(), repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + helper_logger.log_error(tb_line_split) + self.exc = e + self.main_thread_stop_event.set() + + # SfpStateUpdateTask thread has a call to an API which could potentially sleep in the order of seconds and hence, + # could block the xcvrd daemon graceful shutdown process for a prolonged time. Raising an exception will allow us to + # interrupt the SfpStateUpdateTask thread while sleeping and will allow graceful shutdown of the thread + def raise_exception(self): + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(self.thread_id), + ctypes.py_object(SystemExit)) + if res > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(self.thread_id), 0) + helper_logger.log_error('Exception raise failure for SfpStateUpdateTask') + + def join(self): self.task_stopping_event.set() - os.kill(self.task_process.pid, signal.SIGKILL) + threading.Thread.join(self) + if self.exc: + raise self.exc def on_port_config_change(self , port_change_event): if port_change_event.event_type == port_mapping.PortChangeEvent.PORT_REMOVE: @@ -2040,7 +2089,7 @@ def on_remove_logical_port(self, port_change_event): """ # To avoid race condition, remove the entry TRANSCEIVER_DOM_INFO, TRANSCEIVER_STATUS_INFO and TRANSCEIVER_INFO table. # The operation to remove entry from TRANSCEIVER_DOM_INFO is duplicate with DomInfoUpdateTask.on_remove_logical_port, - # but it is necessary because TRANSCEIVER_DOM_INFO is also updated in this sub process when a new SFP is inserted. + # but it is necessary because TRANSCEIVER_DOM_INFO is also updated in this thread when a new SFP is inserted. del_port_sfp_dom_info_from_db(port_change_event.port_name, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id), @@ -2200,9 +2249,10 @@ class DaemonXcvrd(daemon_base.DaemonBase): def __init__(self, log_identifier, skip_cmis_mgr=False): super(DaemonXcvrd, self).__init__(log_identifier) self.stop_event = threading.Event() - self.sfp_error_event = multiprocessing.Event() + self.sfp_error_event = threading.Event() self.skip_cmis_mgr = skip_cmis_mgr self.namespaces = [''] + self.threads = [] # Signal handler def signal_handler(self, sig, frame): @@ -2352,36 +2402,57 @@ def run(self): port_mapping_data, retry_eeprom_set = self.init() # Start the CMIS manager - cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.skip_cmis_mgr) - cmis_manager.task_run() + cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.skip_cmis_mgr) + if not self.skip_cmis_mgr: + cmis_manager.start() + self.threads.append(cmis_manager) # Start the dom sensor info update thread - dom_info_update = DomInfoUpdateTask(self.namespaces, port_mapping_data) - dom_info_update.task_run() + dom_info_update = DomInfoUpdateTask(self.namespaces, port_mapping_data, self.stop_event) + dom_info_update.start() + self.threads.append(dom_info_update) - # Start the sfp state info update process - sfp_state_update = SfpStateUpdateTask(self.namespaces, port_mapping_data, retry_eeprom_set) - sfp_state_update.task_run(self.sfp_error_event) - - # Start the Y-cable state info update process if Y cable presence established + # Start the sfp state info update thread + sfp_state_update = SfpStateUpdateTask(self.namespaces, port_mapping_data, retry_eeprom_set, self.stop_event, self.sfp_error_event) + sfp_state_update.start() + self.threads.append(sfp_state_update) # Start main loop - self.log_info("Start daemon main loop") + self.log_notice("Start daemon main loop with thread count {}".format(len(self.threads))) + for thread in self.threads: + self.log_notice("Started thread {}".format(thread.getName())) self.stop_event.wait() self.log_info("Stop daemon main loop") + generate_sigkill = False + # check all threads are alive + for thread in self.threads: + if thread.is_alive() is False: + try: + thread.join() + except Exception as e: + self.log_error("Xcvrd: exception found at child thread {} due to {}".format(thread.getName(), repr(e))) + generate_sigkill = True + + if generate_sigkill is True: + self.log_error("Exiting main loop as child thread raised exception!") + os.kill(os.getpid(), signal.SIGKILL) + # Stop the CMIS manager if cmis_manager is not None: - cmis_manager.task_stop() + if cmis_manager.is_alive(): + cmis_manager.join() # Stop the dom sensor info update thread - dom_info_update.task_stop() - - # Stop the sfp state info update process - sfp_state_update.task_stop() + if dom_info_update.is_alive(): + dom_info_update.join() + # Stop the sfp state info update thread + if sfp_state_update.is_alive(): + sfp_state_update.raise_exception() + sfp_state_update.join() # Start daemon deinitialization sequence self.deinit()