diff --git a/config/muxcable.py b/config/muxcable.py index e7b682f2f7..506e01339e 100644 --- a/config/muxcable.py +++ b/config/muxcable.py @@ -632,3 +632,234 @@ def setswitchmode(state, port): if rc == False: click.echo("ERR: Unable to set switching mode one or more ports to {}".format(state)) sys.exit(CONFIG_FAIL) + + +def get_per_npu_statedb(per_npu_statedb, port_table_keys): + + # Getting all front asic namespace and correspding config and state DB connector + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # replace these with correct macros + per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace) + per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB) + + port_table_keys[asic_id] = per_npu_statedb[asic_id].keys( + per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*') + + +def get_physical_port_list(port): + + physical_port_list = [] + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + asic_index = None + if platform_sfputil is not None: + asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port) + if asic_index is None: + # TODO this import is only for unit test purposes, and should be removed once sonic_platform_base + # is fully mocked + import sonic_platform_base.sonic_sfp.sfputilhelper + asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port) + if asic_index is None: + click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port)) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + sys.exit(CONFIG_FAIL) + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + sys.exit(CONFIG_FAIL) + + return (physical_port_list, asic_index) + + +def perform_download_firmware(physical_port, fwfile, port): + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile) + if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS: + click.echo("firmware download successful {}".format(port)) + return True + else: + click.echo("firmware download failure {}".format(port)) + return False + + +def perform_activate_firmware(physical_port, port): + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.activate_firmware(physical_port) + if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS: + click.echo("firmware activate successful for {}".format(port)) + return True + else: + click.echo("firmware activate failure for {}".format(port)) + return False + + +def perform_rollback_firmware(physical_port, port): + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.rollback_firmware(physical_port) + if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS: + click.echo("firmware rollback successful {}".format(port)) + return True + else: + click.echo("firmware rollback failure {}".format(port)) + return False + + +@muxcable.group(cls=clicommon.AbbreviationGroup) +def firmware(): + """Configure muxcable firmware command""" + pass + + +@firmware.command() +@click.argument('fwfile', metavar='', required=True) +@click.argument('port', metavar='', required=True, default=None) +def download(fwfile, port): + """Config muxcable firmware download""" + + per_npu_statedb = {} + y_cable_asic_table_keys = {} + port_table_keys = {} + + get_per_npu_statedb(per_npu_statedb, port_table_keys) + + if port is not None and port != "all": + + physical_port_list = [] + physical_port_list, asic_index = get_physical_port_list(port) + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + if logical_key in y_cable_asic_table_keys: + perform_download_firmware(physical_port, fwfile, port) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) + sys.exit(CONFIG_FAIL) + + elif port == "all" and port is not None: + + rc = CONFIG_SUCCESSFUL + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + for key in port_table_keys[asic_id]: + port = key.split("|")[1] + + physical_port_list = [] + (physical_port_list, asic_index) = get_physical_port_list(port) + + physical_port = physical_port_list[0] + + status = perform_download_firmware(physical_port, fwfile, port) + + if status is not True: + rc = CONFIG_FAIL + + sys.exit(rc) + + +@firmware.command() +@click.argument('port', metavar='', required=True, default=None) +def activate(port): + """Config muxcable firmware activate""" + + per_npu_statedb = {} + y_cable_asic_table_keys = {} + port_table_keys = {} + + get_per_npu_statedb(per_npu_statedb, port_table_keys) + + if port is not None and port != "all": + + physical_port_list = [] + (physical_port_list, asic_index) = get_physical_port_list(port) + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + if logical_key in y_cable_asic_table_keys: + perform_activate_firmware(physical_port, port) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) + sys.exit(CONFIG_FAIL) + + elif port == "all" and port is not None: + + rc = CONFIG_SUCCESSFUL + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + for key in port_table_keys[asic_id]: + port = key.split("|")[1] + + physical_port_list = [] + + (physical_port_list, asic_index) = get_physical_port_list(port) + physical_port = physical_port_list[0] + status = perform_activate_firmware(physical_port, port) + + if status is not True: + rc = CONFIG_FAIL + + sys.exit(rc) + + +@firmware.command() +@click.argument('port', metavar='', required=True, default=None) +def rollback(port): + """Config muxcable firmware rollback""" + + port_table_keys = {} + y_cable_asic_table_keys = {} + per_npu_statedb = {} + + get_per_npu_statedb(per_npu_statedb, port_table_keys) + + if port is not None and port != "all": + + physical_port_list = [] + (physical_port_list, asic_index) = get_physical_port_list(port) + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + if logical_key in y_cable_asic_table_keys: + perform_rollback_firmware(physical_port, port) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) + sys.exit(CONFIG_FAIL) + + elif port == "all" and port is not None: + + rc = CONFIG_SUCCESSFUL + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + for key in port_table_keys[asic_id]: + port = key.split("|")[1] + + physical_port_list = [] + (physical_port_list, asic_index) = get_physical_port_list(port) + physical_port = physical_port_list[0] + status = perform_rollback_firmware(physical_port, port) + + if status is not True: + rc = CONFIG_FAIL + + sys.exit(rc) diff --git a/show/muxcable.py b/show/muxcable.py index 9b2b60a671..5739183aa2 100644 --- a/show/muxcable.py +++ b/show/muxcable.py @@ -8,7 +8,7 @@ from natsort import natsorted from sonic_py_common import multi_asic from swsscommon import swsscommon -from swsssdk import ConfigDBConnector +from swsscommon.swsscommon import SonicV2Connector, ConfigDBConnector from tabulate import tabulate from utilities_common import platform_sfputil_helper @@ -660,7 +660,7 @@ def switchmode(port): namespaces = multi_asic.get_front_end_namespaces() for namespace in namespaces: asic_id = multi_asic.get_asic_index_from_namespace(namespace) - per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=False, namespace=namespace) + per_npu_statedb[asic_id] = swsscommon.SonicV2Connector(use_unix_socket_path=False, namespace=namespace) per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB) if port is not None: @@ -821,3 +821,107 @@ def switchmode(port): click.echo(tabulate(body, headers=headers)) if rc == False: sys.exit(EXIT_FAIL) + + +def get_firmware_dict(physical_port, target, side, mux_info_dict): + + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.get_firmware_version(physical_port, target) + + if result is not None and isinstance(result, dict): + mux_info_dict[("version_{}_active".format(side))] = result.get("version_active", None) + mux_info_dict[("version_{}_inactive".format(side))] = result.get("version_inactive", None) + mux_info_dict[("version_{}_next".format(side))] = result.get("version_next", None) + + else: + mux_info_dict[("version_{}_active".format(side))] = "N/A" + mux_info_dict[("version_{}_inactive".format(side))] = "N/A" + mux_info_dict[("version_{}_next".format(side))] = "N/A" + + +@muxcable.group(cls=clicommon.AbbreviationGroup) +def firmware(): + """Show muxcable firmware command""" + pass + + +@firmware.command() +@click.argument('port', metavar='', required=True, default=None) +def version(port): + """Show muxcable firmware version""" + + port_table_keys = {} + y_cable_asic_table_keys = {} + per_npu_statedb = {} + physical_port_list = [] + + # Getting all front asic namespace and correspding config and state DB connector + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # replace these with correct macros + per_npu_statedb[asic_id] = swsscommon.SonicV2Connector(use_unix_socket_path=True, namespace=namespace) + per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB) + + port_table_keys[asic_id] = per_npu_statedb[asic_id].keys( + per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*') + + if port is not None: + + logical_port_list = platform_sfputil_helper.get_logical_list() + + if port not in logical_port_list: + click.echo(("ERR: Not a valid logical port for muxcable firmware {}".format(port))) + sys.exit(CONFIG_FAIL) + + asic_index = None + if platform_sfputil is not None: + asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port) + if asic_index is None: + # TODO this import is only for unit test purposes, and should be removed once sonic_platform_base + # is fully mocked + import sonic_platform_base.sonic_sfp.sfputilhelper + asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port) + if asic_index is None: + click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port)) + + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + sys.exit(CONFIG_FAIL) + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + sys.exit(CONFIG_FAIL) + + mux_info_dict = {} + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + import sonic_y_cable.y_cable + read_side = sonic_y_cable.y_cable.check_read_side(physical_port) + if logical_key in y_cable_asic_table_keys: + if read_side == 1: + get_firmware_dict(physical_port, 1, "self", mux_info_dict) + get_firmware_dict(physical_port, 2, "peer", mux_info_dict) + get_firmware_dict(physical_port, 0, "nic", mux_info_dict) + click.echo("{}".format(json.dumps(mux_info_dict, indent=4))) + elif read_side == 2: + get_firmware_dict(physical_port, 2, "self", mux_info_dict) + get_firmware_dict(physical_port, 1, "peer", mux_info_dict) + get_firmware_dict(physical_port, 0, "nic", mux_info_dict) + click.echo("{}".format(json.dumps(mux_info_dict, indent=4))) + else: + click.echo("Did not get a valid read_side for muxcable".format(port)) + sys.exit(CONFIG_FAIL) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) diff --git a/tests/muxcable_test.py b/tests/muxcable_test.py index 9f91a0336b..a28662e21a 100644 --- a/tests/muxcable_test.py +++ b/tests/muxcable_test.py @@ -175,6 +175,20 @@ Ethernet12 standby """ +show_muxcable_firmware_version_expected_output = """\ +{ + "version_self_active": "0.6MS", + "version_self_inactive": "0.6MS", + "version_self_next": "0.6MS", + "version_peer_active": "0.6MS", + "version_peer_inactive": "0.6MS", + "version_peer_next": "0.6MS", + "version_nic_active": "0.6MS", + "version_nic_inactive": "0.6MS", + "version_nic_next": "0.6MS" +} +""" + class TestMuxcable(object): @classmethod @@ -696,6 +710,76 @@ def test_config_muxcable_hwmode_state_standby(self): ["standby", "all"], obj=db) assert result.exit_code == 0 + @mock.patch('utilities_common.platform_sfputil_helper.get_logical_list', mock.MagicMock(return_value=["Ethernet0", "Ethernet12"])) + @mock.patch('utilities_common.platform_sfputil_helper.get_asic_id_for_logical_port', mock.MagicMock(return_value=0)) + @mock.patch('show.muxcable.platform_sfputil', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.get_physical_to_logical', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.logical_port_name_to_physical_port_list', mock.MagicMock(return_value=[0])) + @mock.patch('sonic_y_cable.y_cable.check_read_side', mock.MagicMock(return_value=(1))) + @mock.patch('click.confirm', mock.MagicMock(return_value=("y"))) + @mock.patch('sonic_y_cable.y_cable.get_firmware_version', mock.MagicMock(return_value={"version_active": "0.6MS", + "version_inactive": "0.6MS", + "version_next": "0.6MS"})) + def test_show_muxcable_firmware_version(self): + runner = CliRunner() + db = Db() + + result = runner.invoke(show.cli.commands["muxcable"].commands["firmware"].commands["version"], [ + "Ethernet0"], obj=db) + assert result.exit_code == 0 + assert result.output == show_muxcable_firmware_version_expected_output + + @mock.patch('utilities_common.platform_sfputil_helper.get_logical_list', mock.MagicMock(return_value=["Ethernet0", "Ethernet12"])) + @mock.patch('utilities_common.platform_sfputil_helper.get_asic_id_for_logical_port', mock.MagicMock(return_value=0)) + @mock.patch('config.muxcable.platform_sfputil', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.get_physical_to_logical', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.logical_port_name_to_physical_port_list', mock.MagicMock(return_value=[0])) + @mock.patch('sonic_y_cable.y_cable.check_read_side', mock.MagicMock(return_value=(1))) + @mock.patch('click.confirm', mock.MagicMock(return_value=("y"))) + @mock.patch('sonic_y_cable.y_cable.download_fimware', mock.MagicMock(return_value=(1))) + @mock.patch('sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS', mock.MagicMock(return_value=(1))) + def test_config_muxcable_download_firmware(self): + runner = CliRunner() + db = Db() + + result = runner.invoke(config.config.commands["muxcable"].commands["firmware"].commands["download"], [ + "fwfile", "Ethernet0"], obj=db) + assert result.exit_code == 0 + + @mock.patch('utilities_common.platform_sfputil_helper.get_logical_list', mock.MagicMock(return_value=["Ethernet0", "Ethernet12"])) + @mock.patch('utilities_common.platform_sfputil_helper.get_asic_id_for_logical_port', mock.MagicMock(return_value=0)) + @mock.patch('config.muxcable.platform_sfputil', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.get_physical_to_logical', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.logical_port_name_to_physical_port_list', mock.MagicMock(return_value=[0])) + @mock.patch('sonic_y_cable.y_cable.check_read_side', mock.MagicMock(return_value=(1))) + @mock.patch('click.confirm', mock.MagicMock(return_value=("y"))) + @mock.patch('sonic_y_cable.y_cable.activate_firmware', mock.MagicMock(return_value=(1))) + @mock.patch('sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS', mock.MagicMock(return_value=(1))) + def test_config_muxcable_activate_firmware(self): + runner = CliRunner() + db = Db() + + result = runner.invoke(config.config.commands["muxcable"].commands["firmware"].commands["activate"], [ + "Ethernet0"], obj=db) + assert result.exit_code == 0 + + @mock.patch('utilities_common.platform_sfputil_helper.get_logical_list', mock.MagicMock(return_value=["Ethernet0", "Ethernet12"])) + @mock.patch('utilities_common.platform_sfputil_helper.get_asic_id_for_logical_port', mock.MagicMock(return_value=0)) + @mock.patch('config.muxcable.platform_sfputil', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.get_physical_to_logical', mock.MagicMock(return_value={0: ["Ethernet12", "Ethernet0"]})) + @mock.patch('utilities_common.platform_sfputil_helper.logical_port_name_to_physical_port_list', mock.MagicMock(return_value=[0])) + @mock.patch('sonic_y_cable.y_cable.check_read_side', mock.MagicMock(return_value=(1))) + @mock.patch('click.confirm', mock.MagicMock(return_value=("y"))) + @mock.patch('sonic_y_cable.y_cable.rollback_firmware', mock.MagicMock(return_value=(1))) + @mock.patch('sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS', mock.MagicMock(return_value=(1))) + def test_config_muxcable_rollback_firmware(self): + runner = CliRunner() + db = Db() + + result = runner.invoke(config.config.commands["muxcable"].commands["firmware"].commands["rollback"], [ + "Ethernet0"], obj=db) + assert result.exit_code == 0 + @classmethod def teardown_class(cls): os.environ['UTILITIES_UNIT_TESTING'] = "0"