diff --git a/sonic-pcied/pytest.ini b/sonic-pcied/pytest.ini new file mode 100644 index 000000000000..d90ee9ed9e12 --- /dev/null +++ b/sonic-pcied/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = --cov=scripts --cov-report html --cov-report term --cov-report xml --junitxml=test-results.xml -vv diff --git a/sonic-pcied/scripts/pcied b/sonic-pcied/scripts/pcied index 0f2a5208c68b..7265063ff894 100644 --- a/sonic-pcied/scripts/pcied +++ b/sonic-pcied/scripts/pcied @@ -5,30 +5,62 @@ PCIe device monitoring daemon for SONiC """ -try: - import os - import signal - import sys - import threading +import os +import signal +import sys +import threading - from sonic_py_common import daemon_base, device_info - from swsscommon import swsscommon -except ImportError as e: - raise ImportError(str(e) + " - required module not found") +from sonic_py_common import daemon_base, device_info +from swsscommon import swsscommon # # Constants ==================================================================== # + +# TODO: Once we no longer support Python 2, we can eliminate this and get the +# name using the 'name' field (e.g., `signal.SIGINT.name`) starting with Python 3.5 +SIGNALS_TO_NAMES_DICT = dict((getattr(signal, n), n) + for n in dir(signal) if n.startswith('SIG') and '_' not in n) + SYSLOG_IDENTIFIER = "pcied" PCIE_RESULT_REGEX = "PCIe Device Checking All Test" -PCIE_TABLE_NAME = "PCIE_STATUS" PCIE_DEVICE_TABLE_NAME = "PCIE_DEVICE" - -PCIE_CONF_FILE = 'pcie.yaml' +PCIE_STATUS_TABLE_NAME = "PCIE_DEVICES" PCIED_MAIN_THREAD_SLEEP_SECS = 60 -REDIS_HOSTIP = "127.0.0.1" + +PCIEUTIL_CONF_FILE_ERROR = 1 +PCIEUTIL_LOAD_ERROR = 2 + +platform_pcieutil = None + +exit_code = 0 + +# wrapper functions to call the platform api +def load_platform_pcieutil(): + _platform_pcieutil = None + (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() + try: + from sonic_platform.pcie import Pcie + _platform_pcieutil = Pcie(platform_path) + except ImportError as e: + self.log_error("Failed to load platform Pcie module. Error : {}".format(str(e)), True) + try: + from sonic_platform_base.sonic_pcie.pcie_common import PcieUtil + _platform_pcieutil = PcieUtil(platform_path) + except ImportError as e: + self.log_error("Failed to load default PcieUtil module. Error : {}".format(str(e)), True) + return _platform_pcieutil + +def read_id_file(device_name): + id = None + dev_id_path = '/sys/bus/pci/devices/0000:%s/device' % device_name + + if os.path.exists(dev_id_path): + with open(dev_id_path, 'r') as fd: + id = fd.read().strip() + return id # # Daemon ======================================================================= @@ -39,134 +71,129 @@ class DaemonPcied(daemon_base.DaemonBase): def __init__(self, log_identifier): super(DaemonPcied, self).__init__(log_identifier) - (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() - pciefilePath = os.path.join(platform_path, PCIE_CONF_FILE) - if not os.path.exists(pciefilePath): - self.log_error("Platform pcie configuration file doesn't exist! Exiting ...") - sys.exit("Platform PCIe Configuration file doesn't exist!") - self.timeout = PCIED_MAIN_THREAD_SLEEP_SECS self.stop_event = threading.Event() - - self.state_db = swsscommon.SonicV2Connector(host=REDIS_HOSTIP) - self.state_db.connect("STATE_DB") - state_db = daemon_base.db_connect("STATE_DB") - self.device_table = swsscommon.Table(state_db, PCIE_DEVICE_TABLE_NAME) - - # Load AER-fields into STATEDB - def update_aer_to_statedb(self, device_name, aer_stats): + self.state_db = None + self.device_table = None + self.table = None + self.resultInfo = [] + self.device_name = None + self.aer_stats = {} + + global platform_pcieutil + + platform_pcieutil = load_platform_pcieutil() + if platform_pcieutil is None: + sys.exit(PCIEUTIL_LOAD_ERROR) + + # Connect to STATE_DB and create pcie device table + self.state_db = daemon_base.db_connect("STATE_DB") + self.device_table = swsscommon.Table(self.state_db, PCIE_DEVICE_TABLE_NAME) + self.status_table = swsscommon.Table(self.state_db, PCIE_STATUS_TABLE_NAME) + + def __del__(self): + if self.device_table: + table_keys = self.device_table.getKeys() + for tk in table_keys: + self.device_table._del(tk) + if self.status_table: + stable_keys = self.status_table.getKeys() + for stk in stable_keys: + self.status_table._del(stk) + + # load aer-fields into statedb + def update_aer_to_statedb(self): + if self.aer_stats is None: + self.log_debug("PCIe device {} has no AER Stats".format(device_name)) + return aer_fields = {} - for field, value in aer_stats['correctable'].items(): - correctable_field = "correctable|" + field - aer_fields[correctable_field] = value - - for field, value in aer_stats['fatal'].items(): - fatal_field = "fatal|" + field - aer_fields[fatal_field] = value - - for field, value in aer_stats['non_fatal'].items(): - non_fatal_field = "non_fatal|" + field - aer_fields[non_fatal_field] = value + for key, fv in self.aer_stats.items(): + for field, value in fv.items(): + key_field = "{}|{}".format(key,field) + aer_fields[key_field] = value if aer_fields: formatted_fields = swsscommon.FieldValuePairs(list(aer_fields.items())) - self.device_table.set(device_name, formatted_fields) + self.device_table.set(self.device_name, formatted_fields) else: - self.log_debug("PCIe device {} has no AER attriutes".format(device_name)) + self.log_debug("PCIe device {} has no AER attriutes".format(self.device_name)) - # Check the PCIe devices - def check_pcie_devices(self): - try: - platform_path, _ = device_info.get_paths_to_platform_and_hwsku_dirs() - from sonic_platform_base.sonic_pcie.pcie_common import PcieUtil - platform_pcieutil = PcieUtil(platform_path) - except ImportError as e: - self.log_error("Failed to load default PcieUtil module. Error : {}".format(str(e)), True) - raise e - resultInfo = platform_pcieutil.get_pcie_check() - err = 0 + # Check the PCIe AER Stats + def check_n_update_pcie_aer_stats(self, Bus, Dev, Fn): + self.device_name = "%02x:%02x.%d" % (Bus, Dev, Fn) - for item in resultInfo: - if item["result"] == "Failed": - self.log_warning("PCIe Device: " + item["name"] + " Not Found") - err += 1 + Id = read_id_file(self.device_name) + self.aer_stats = {} + if Id is not None: + self.device_table.set(self.device_name, [('id', Id)]) + self.aer_stats = platform_pcieutil.get_pcie_aer_stats(bus=Bus, dev=Dev, func=Fn) + self.update_aer_to_statedb() + + + # Update the PCIe devices status to DB + def update_pcie_devices_status_db(self, err): if err: - self.update_state_db("PCIE_DEVICES", "status", "FAILED") - self.log_error("PCIe device status check : FAILED") + pcie_status = "FAILED" + self.log_error("PCIe device status check : {}".format(pcie_status)) else: - self.update_state_db("PCIE_DEVICES", "status", "PASSED") - self.log_info("PCIe device status check : PASSED") + pcie_status = "PASSED" + self.log_info("PCIe device status check : {}".format(pcie_status)) + fvs = swsscommon.FieldValuePairs([ + ('status', pcie_status) + ]) - # update AER-attributes to DB - for item in resultInfo: - if item["result"] == "Failed": - continue + self.status_table.set("status", fvs) - Bus = int(item["bus"], 16) - Dev = int(item["dev"], 16) - Fn = int(item["fn"], 16) + # Check the PCIe devices + def check_pcie_devices(self): + self.resultInfo = platform_pcieutil.get_pcie_check() + err = 0 + if self.resultInfo is None: + return - device_name = "%02x:%02x.%d" % (Bus, Dev, Fn) - dev_id_path = '/sys/bus/pci/devices/0000:%s/device' % device_name - with open(dev_id_path, 'r') as fd: - Id = fd.read().strip() + for result in self.resultInfo: + if result["result"] == "Failed": + self.log_warning("PCIe Device: " + result["name"] + " Not Found") + err += 1 + else: + Bus = int(result["bus"], 16) + Dev = int(result["dev"], 16) + Fn = int(result["fn"], 16) + # update AER-attributes to DB + self.check_n_update_pcie_aer_stats(Bus, Dev, Fn) - self.device_table.set(device_name, [('id', Id)]) - aer_stats = platform_pcieutil.get_pcie_aer_stats(bus=Bus, device=Dev, func=Fn) - self.update_aer_to_statedb(device_name, aer_stats) + # update PCIe Device Status to DB + self.update_pcie_devices_status_db(err) - def read_state_db(self, key1, key2): - return self.state_db.get('STATE_DB', key1, key2) + # Override signal handler from DaemonBase + def signal_handler(self, sig, frame): + FATAL_SIGNALS = [signal.SIGINT, signal.SIGTERM] + NONFATAL_SIGNALS = [signal.SIGHUP] - def update_state_db(self, key1, key2, value): - self.state_db.set('STATE_DB', key1, key2, value) + global exit_code - # Signal handler - def signal_handler(self, sig, frame): - if sig == signal.SIGHUP: - self.log_info("Caught SIGHUP - ignoring...") - elif sig == signal.SIGINT: - self.log_info("Caught SIGINT - exiting...") - self.stop_event.set() - elif sig == signal.SIGTERM: - self.log_info("Caught SIGTERM - exiting...") + if sig in FATAL_SIGNALS: + self.log_info("Caught signal '{}' - exiting...".format(SIGNALS_TO_NAMES_DICT[sig])) + exit_code = 128 + sig # Make sure we exit with a non-zero code so that supervisor will try to restart us self.stop_event.set() + elif sig in NONFATAL_SIGNALS: + self.log_info("Caught signal '{}' - ignoring...".format(SIGNALS_TO_NAMES_DICT[sig])) else: - self.log_warning("Caught unhandled signal '" + sig + "'") + self.log_warning("Caught unhandled signal '{}' - ignoring...".format(SIGNALS_TO_NAMES_DICT[sig])) - # Initialize daemon - def init(self): - self.log_info("Start daemon init...") - - # Deinitialize daemon - def deinit(self): - self.log_info("Start daemon deinit...") - - # Run daemon + # Main daemon logic def run(self): - self.log_info("Starting up...") - - # Start daemon initialization sequence - self.init() - - # Start main loop - self.log_info("Start daemon main loop") - - while not self.stop_event.wait(self.timeout): - # Check the Pcie device status - self.check_pcie_devices() - - self.log_info("Stop daemon main loop") + if self.stop_event.wait(self.timeout): + # We received a fatal signal + return False - # Start daemon deinitialization sequence - self.deinit() - - self.log_info("Shutting down...") + self.check_pcie_devices() + return True # # Main ========================================================================= # @@ -174,7 +201,15 @@ class DaemonPcied(daemon_base.DaemonBase): def main(): pcied = DaemonPcied(SYSLOG_IDENTIFIER) - pcied.run() + + pcied.log_info("Starting up...") + + while pcied.run(): + pass + + pcied.log_info("Shutting down...") + + return exit_code if __name__ == '__main__': - main() + sys.exit(main()) diff --git a/sonic-pcied/setup.cfg b/sonic-pcied/setup.cfg new file mode 100644 index 000000000000..b7e478982ccf --- /dev/null +++ b/sonic-pcied/setup.cfg @@ -0,0 +1,2 @@ +[aliases] +test=pytest diff --git a/sonic-pcied/setup.py b/sonic-pcied/setup.py index 0d7d32118422..7c7cbc1464b1 100644 --- a/sonic-pcied/setup.py +++ b/sonic-pcied/setup.py @@ -14,8 +14,19 @@ 'scripts/pcied', ], setup_requires=[ + 'pytest-runner', 'wheel' ], + install_requires=[ + 'enum34; python_version < "3.4"', + 'sonic-py-common', + ], + tests_requires=[ + 'mock>=2.0.0; python_version < "3.3"', + 'pytest', + 'pytest-cov', + 'sonic-platform-common' + ], classifiers=[ 'Development Status :: 4 - Beta', 'Environment :: No Input/Output (Daemon)', @@ -29,4 +40,5 @@ 'Topic :: System :: Hardware', ], keywords='sonic SONiC PCIe pcie PCIED pcied', + test_suite='setup.get_test_suite' ) diff --git a/sonic-pcied/tests/__init__.py b/sonic-pcied/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sonic-pcied/tests/mock_platform.py b/sonic-pcied/tests/mock_platform.py new file mode 100644 index 000000000000..7d8a32a90852 --- /dev/null +++ b/sonic-pcied/tests/mock_platform.py @@ -0,0 +1,48 @@ + +""" + Mock implementation of sonic_platform package for unit testing +""" + +# TODO: Clean this up once we no longer need to support Python 2 +import sys +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + + + +pcie_device_list = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A'}] +""" + +pcie_check_result = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A', 'result': 'Passed'}] +""" + +pcie_aer_stats = \ +""" +{'correctable': {}, 'fatal': {}, 'non_fatal': {}} +""" + +#class MockPcieUtil(PcieUtil): +class MockPcieUtil(): + def __init__(self, + pciList=pcie_device_list, + result=pcie_check_result, + aer_stats=pcie_aer_stats): + super(MockPcieUtil, self).__init__() + self._pciList = pciList + self._result = result + self._aer_stats = aer_stats + + def get_pcie_device(self): + return self._pciList + + def get_pcie_check(self): + return self._result + + def get_pcie_aer_stats(self, domain, bus, dev, fn): + return self._aer_stats diff --git a/sonic-pcied/tests/mocked_libs/swsscommon/__init__.py b/sonic-pcied/tests/mocked_libs/swsscommon/__init__.py new file mode 100644 index 000000000000..012af621e5f0 --- /dev/null +++ b/sonic-pcied/tests/mocked_libs/swsscommon/__init__.py @@ -0,0 +1,5 @@ +''' + Mock implementation of swsscommon package for unit testing +''' + +from . import swsscommon diff --git a/sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py b/sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py new file mode 100644 index 000000000000..6947a8601819 --- /dev/null +++ b/sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py @@ -0,0 +1,54 @@ +''' + Mock implementation of swsscommon package for unit testing +''' + +STATE_DB = '' + + +class Table: + def __init__(self, db, table_name): + self.table_name = table_name + self.mock_dict = {} + + def _del(self, key): + del self.mock_dict[key] + pass + + def set(self, key, fvs): + self.mock_dict[key] = fvs.fv_dict + pass + + def get(self, key): + if key in self.mock_dict: + return self.mock_dict[key] + return None + + def get_size(self): + return (len(self.mock_dict)) + + +class FieldValuePairs: + fv_dict = {} + + def __init__(self, tuple_list): + if isinstance(tuple_list, list) and isinstance(tuple_list[0], tuple): + self.fv_dict = dict(tuple_list) + + def __setitem__(self, key, kv_tuple): + self.fv_dict[kv_tuple[0]] = kv_tuple[1] + + def __getitem__(self, key): + return self.fv_dict[key] + + def __eq__(self, other): + if not isinstance(other, FieldValuePairs): + # don't attempt to compare against unrelated types + return NotImplemented + + return self.fv_dict == other.fv_dict + + def __repr__(self): + return repr(self.fv_dict) + + def __str__(self): + return repr(self.fv_dict) diff --git a/sonic-pcied/tests/test_DaemonPcied.py b/sonic-pcied/tests/test_DaemonPcied.py new file mode 100644 index 000000000000..24044767e888 --- /dev/null +++ b/sonic-pcied/tests/test_DaemonPcied.py @@ -0,0 +1,226 @@ +import datetime +import os +import sys +from imp import load_source # Replace with importlib once we no longer need to support Python 2 + +import pytest + +# TODO: Clean this up once we no longer need to support Python 2 +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + +from sonic_py_common import daemon_base + +from .mock_platform import MockPcieUtil + +SYSLOG_IDENTIFIER = 'pcied_test' +NOT_AVAILABLE = 'N/A' + +daemon_base.db_connect = mock.MagicMock() + +tests_path = os.path.dirname(os.path.abspath(__file__)) + +# Add mocked_libs path so that the file under test can load mocked modules from there +mocked_libs_path = os.path.join(tests_path, "mocked_libs") +sys.path.insert(0, mocked_libs_path) + +# Add path to the file under test so that we can load it +modules_path = os.path.dirname(tests_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) +load_source('pcied', os.path.join(scripts_path, 'pcied')) +import pcied + +pcie_no_aer_stats = \ +""" +{'correctable': {}, 'fatal': {}, 'non_fatal': {}} +""" + +pcie_aer_stats_no_err = \ +""" +{'correctable': {'field1': '0', 'field2': '0'}, + 'fatal': {'field3': '0', 'field4': '0'}, + 'non_fatal': {'field5': '0', 'field6': '0'}} +""" + +pcie_aer_stats_err = \ +""" +{'correctable': {'field1': '1', 'field2': '0'}, + 'fatal': {'field3': '0', 'field4': '1'}, + 'non_fatal': {'field5': '0', 'field6': '1'}} +""" + +pcie_device_list = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A'}, + {'bus': '00', 'dev': '02', 'fn': '0', 'id': '1f11', 'name': 'PCI B'}, + {'bus': '00', 'dev': '03', 'fn': '0', 'id': '1f13', 'name': 'PCI C'}] +""" + +pcie_check_result_no = [] + +pcie_check_result_pass = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A', 'result': 'Passed'}, + {'bus': '00', 'dev': '02', 'fn': '0', 'id': '1f11', 'name': 'PCI B', 'result': 'Passed'}, + {'bus': '00', 'dev': '03', 'fn': '0', 'id': '1f12', 'name': 'PCI C', 'result': 'Passed'}] +""" + +pcie_check_result_fail = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A', 'result': 'Passed'}, + {'bus': '00', 'dev': '02', 'fn': '0', 'id': '1f11', 'name': 'PCI B', 'result': 'Passed'}, + {'bus': '00', 'dev': '03', 'fn': '0', 'id': '1f12', 'name': 'PCI C', 'result': 'Failed'}] +""" + +class TestDaemonPcied(object): + """ + Test cases to cover functionality in DaemonPcied class + """ + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_signal_handler(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.stop_event.set = mock.MagicMock() + daemon_pcied.log_info = mock.MagicMock() + daemon_pcied.log_warning = mock.MagicMock() + + # Test SIGHUP + daemon_pcied.signal_handler(pcied.signal.SIGHUP, None) + assert daemon_pcied.log_info.call_count == 1 + daemon_pcied.log_info.assert_called_with("Caught signal 'SIGHUP' - ignoring...") + assert daemon_pcied.log_warning.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 0 + assert pcied.exit_code == 0 + + # Reset + daemon_pcied.log_info.reset_mock() + daemon_pcied.log_warning.reset_mock() + daemon_pcied.stop_event.set.reset_mock() + + # Test SIGINT + test_signal = pcied.signal.SIGINT + daemon_pcied.signal_handler(test_signal, None) + assert daemon_pcied.log_info.call_count == 1 + daemon_pcied.log_info.assert_called_with("Caught signal 'SIGINT' - exiting...") + assert daemon_pcied.log_warning.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 1 + assert pcied.exit_code == (128 + test_signal) + + # Reset + daemon_pcied.log_info.reset_mock() + daemon_pcied.log_warning.reset_mock() + daemon_pcied.stop_event.set.reset_mock() + + # Test SIGTERM + test_signal = pcied.signal.SIGTERM + daemon_pcied.signal_handler(test_signal, None) + assert daemon_pcied.log_info.call_count == 1 + daemon_pcied.log_info.assert_called_with("Caught signal 'SIGTERM' - exiting...") + assert daemon_pcied.log_warning.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 1 + assert pcied.exit_code == (128 + test_signal) + + # Reset + daemon_pcied.log_info.reset_mock() + daemon_pcied.log_warning.reset_mock() + daemon_pcied.stop_event.set.reset_mock() + pcied.exit_code = 0 + + # Test an unhandled signal + daemon_pcied.signal_handler(pcied.signal.SIGUSR1, None) + assert daemon_pcied.log_warning.call_count == 1 + daemon_pcied.log_warning.assert_called_with("Caught unhandled signal 'SIGUSR1' - ignoring...") + assert daemon_pcied.log_info.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 0 + assert pcied.exit_code == 0 + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_run(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.check_pcie_devices = mock.MagicMock() + + daemon_pcied.run() + assert daemon_pcied.check_pcie_devices.call_count == 1 + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_check_pcie_devices(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.update_pcie_devices_status_db = mock.MagicMock() + daemon_pcied.check_n_update_pcie_aer_stats = mock.MagicMock() + pcied.platform_pcieutil.get_pcie_check = mock.MagicMock() + + daemon_pcied.check_pcie_devices() + assert daemon_pcied.update_pcie_devices_status_db.call_count == 1 + assert daemon_pcied.check_n_update_pcie_aer_stats.call_count == 0 + + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_update_pcie_devices_status_db(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.status_table = mock.MagicMock() + daemon_pcied.log_info = mock.MagicMock() + daemon_pcied.log_error = mock.MagicMock() + + # test for pass resultInfo + daemon_pcied.update_pcie_devices_status_db(0) + assert daemon_pcied.status_table.set.call_count == 1 + assert daemon_pcied.log_info.call_count == 1 + assert daemon_pcied.log_error.call_count == 0 + + daemon_pcied.status_table.set.reset_mock() + daemon_pcied.log_info.reset_mock() + + # test for resultInfo with 1 device failed to detect + daemon_pcied.update_pcie_devices_status_db(1) + assert daemon_pcied.status_table.set.call_count == 1 + assert daemon_pcied.log_info.call_count == 0 + assert daemon_pcied.log_error.call_count == 1 + + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + @mock.patch('pcied.read_id_file') + def test_check_n_update_pcie_aer_stats(self, mock_read): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.device_table = mock.MagicMock() + daemon_pcied.update_aer_to_statedb = mock.MagicMock() + pcied.platform_pcieutil.get_pcie_aer_stats = mock.MagicMock() + + mock_read.return_value = None + daemon_pcied.check_n_update_pcie_aer_stats(0,1,0) + assert daemon_pcied.update_aer_to_statedb.call_count == 0 + assert daemon_pcied.device_table.set.call_count == 0 + assert pcied.platform_pcieutil.get_pcie_aer_stats.call_count == 0 + + mock_read.return_value = '1714' + daemon_pcied.check_n_update_pcie_aer_stats(0,1,0) + assert daemon_pcied.update_aer_to_statedb.call_count == 1 + assert daemon_pcied.device_table.set.call_count == 1 + assert pcied.platform_pcieutil.get_pcie_aer_stats.call_count == 1 + + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_update_aer_to_statedb(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.log_debug = mock.MagicMock() + daemon_pcied.device_table = mock.MagicMock() + daemon_pcied.device_name = mock.MagicMock() + daemon_pcied.aer_stats = mock.MagicMock() + + + mocked_expected_fvp = pcied.swsscommon.FieldValuePairs( + [("correctable|field1", '0'), + ("correctable|field2", '0'), + ("fatal|field3", '0'), + ("fatal|field4", '0'), + ("non_fatal|field5", '0'), + ("non_fatal|field6", '0'), + ]) + + daemon_pcied.update_aer_to_statedb() + assert daemon_pcied.log_debug.call_count == 1 + assert daemon_pcied.device_table.set.call_count == 0 + + daemon_pcied.device_table.set.reset_mock() diff --git a/sonic-pcied/tests/test_pcied.py b/sonic-pcied/tests/test_pcied.py new file mode 100644 index 000000000000..f3b3e78e9471 --- /dev/null +++ b/sonic-pcied/tests/test_pcied.py @@ -0,0 +1,43 @@ +import os +import sys +from imp import load_source # Replace with importlib once we no longer need to support Python 2 + +import pytest + +# TODO: Clean this up once we no longer need to support Python 2 +if sys.version_info.major == 3: + from unittest import mock +else: + import mock +from sonic_py_common import daemon_base, device_info + +from .mock_platform import MockPcieUtil + +tests_path = os.path.dirname(os.path.abspath(__file__)) + +# Add mocked_libs path so that the file under test can load mocked modules from there +mocked_libs_path = os.path.join(tests_path, "mocked_libs") +sys.path.insert(0, mocked_libs_path) + +# Add path to the file under test so that we can load it +modules_path = os.path.dirname(tests_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) +load_source('pcied', os.path.join(scripts_path, 'pcied')) +import pcied + + +daemon_base.db_connect = mock.MagicMock() + + +SYSLOG_IDENTIFIER = 'pcied_test' +NOT_AVAILABLE = 'N/A' + + +@mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) +@mock.patch('pcied.DaemonPcied.run') +def test_main(mock_run): + mock_run.return_value = False + + pcied.main() + assert mock_run.call_count == 1