diff --git a/config/muxcable.py b/config/muxcable.py index 518446435d..548b22f41d 100644 --- a/config/muxcable.py +++ b/config/muxcable.py @@ -631,3 +631,315 @@ def setswitchmode(state, port): if rc == False: click.echo("ERR: Unable to set switching mode one or more ports to {}".format(state)) sys.exit(CONFIG_FAIL) + +@muxcable.group(cls=clicommon.AbbreviationGroup) +def firmware(): + """Configure muxcable firmware command""" + pass + +# 'muxcable' command ("config muxcable mode active|auto") +@firmware.command() +@click.argument('fwfile', metavar='', required=True)) +@click.argument('port', metavar='', required=True, default=None) +def download(fwfile, port): + """Config muxcable firmware download""" + + port_table_keys = {} + y_cable_asic_table_keys = {} + per_npu_statedb = {} + + # Getting all front asic namespace and correspding config and state DB connector + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # replace these with correct macros + per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace) + per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB) + + + port_table_keys[asic_id] = per_npu_statedb[asic_id].keys( + per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*') + + if port is not None and port != "all": + + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + asic_index = None + if platform_sfputil is not None: + asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port) + if asic_index is None: + # TODO this import is only for unit test purposes, and should be removed once sonic_platform_base + # is fully mocked + import sonic_platform_base.sonic_sfp.sfputilhelper + asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port) + if asic_index is None: + click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port)) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + sys.exit(CONFIG_FAIL) + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + sys.exit(CONFIG_FAIL) + + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + if logical_key in y_cable_asic_table_keys: + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile) + if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS: + click.echo("firmware download successful {}".format(port)) + else: + click.echo("firmware download failure {}".format(port)) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) + sys.exit(CONFIG_FAIL) + + elif port == "all" and port is not None: + + port_status_dict = {} + rc = True + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + for key in port_table_keys[asic_id]: + port = key.split("|")[1] + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + rc = False + continue + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + rc = False + continue + physical_port = physical_port_list[0] + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile) + if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS: + click.echo("firmware download successful for {}".format(port)) + else: + click.echo("firmware download failure for {}".format(port)) + rc = False + + if rc: + sys.exit(CONFIG_SUCCESSFUL) + else: + sys.exit(CONFIG_FAIL) + +@firmware.command() +@click.argument('port', metavar='', required=True, default=None) +def activate(port): + """Config muxcable firmware activate""" + + port_table_keys = {} + y_cable_asic_table_keys = {} + per_npu_statedb = {} + + # Getting all front asic namespace and correspding config and state DB connector + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # replace these with correct macros + per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace) + per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB) + + + port_table_keys[asic_id] = per_npu_statedb[asic_id].keys( + per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*') + + if port is not None and port != "all": + + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + asic_index = None + if platform_sfputil is not None: + asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port) + if asic_index is None: + # TODO this import is only for unit test purposes, and should be removed once sonic_platform_base + # is fully mocked + import sonic_platform_base.sonic_sfp.sfputilhelper + asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port) + if asic_index is None: + click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port)) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + sys.exit(CONFIG_FAIL) + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + sys.exit(CONFIG_FAIL) + + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + if logical_key in y_cable_asic_table_keys: + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.activate_firmware(physical_port) + if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS: + click.echo("firmware activate successful {}".format(port)) + else: + click.echo("firmware activate failure {}".format(port)) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) + sys.exit(CONFIG_FAIL) + + elif port == "all" and port is not None: + + port_status_dict = {} + rc = True + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + for key in port_table_keys[asic_id]: + port = key.split("|")[1] + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + rc = False + continue + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + rc = False + continue + + physical_port = physical_port_list[0] + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.activate_firmware(physical_port) + if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS: + click.echo("firmware ACTIVATE successful for {}".format(port)) + else: + click.echo("firmware ACTIVATE failure for {}".format(port)) + rc = False + + if rc: + sys.exit(CONFIG_SUCCESSFUL) + else: + sys.exit(CONFIG_FAIL) + i +@firmware.command() +@click.argument('port', metavar='', required=True, default=None) +def rollback(port): + """Config muxcable firmware rollback""" + + port_table_keys = {} + y_cable_asic_table_keys = {} + per_npu_statedb = {} + + # Getting all front asic namespace and correspding config and state DB connector + + namespaces = multi_asic.get_front_end_namespaces() + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + # replace these with correct macros + per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace) + per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB) + + + port_table_keys[asic_id] = per_npu_statedb[asic_id].keys( + per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*') + + if port is not None and port != "all": + + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + asic_index = None + if platform_sfputil is not None: + asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port) + if asic_index is None: + # TODO this import is only for unit test purposes, and should be removed once sonic_platform_base + # is fully mocked + import sonic_platform_base.sonic_sfp.sfputilhelper + asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port) + if asic_index is None: + click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port)) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + sys.exit(CONFIG_FAIL) + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + sys.exit(CONFIG_FAIL) + + physical_port = physical_port_list[0] + if per_npu_statedb[asic_index] is not None: + y_cable_asic_table_keys = port_table_keys[asic_index] + logical_key = "MUX_CABLE_TABLE|{}".format(port) + if logical_key in y_cable_asic_table_keys: + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.rollback_firmware(physical_port) + if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS: + click.echo("firmware rollback successful {}".format(port)) + else: + click.echo("firmware rollback failure {}".format(port)) + + else: + click.echo("this is not a valid port present on mux_cable".format(port)) + sys.exit(CONFIG_FAIL) + else: + click.echo("there is not a valid asic table for this asic_index".format(asic_index)) + sys.exit(CONFIG_FAIL) + + elif port == "all" and port is not None: + + port_status_dict = {} + rc = True + for namespace in namespaces: + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + for key in port_table_keys[asic_id]: + port = key.split("|")[1] + if platform_sfputil is not None: + physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port) + + if not isinstance(physical_port_list, list): + click.echo(("ERR: Unable to locate physical port information for {}".format(port))) + rc = False + continue + + if len(physical_port_list) != 1: + click.echo("ERR: Found multiple physical ports ({}) associated with {}".format( + ", ".join(physical_port_list), port)) + rc = False + continue + + physical_port = physical_port_list[0] + import sonic_y_cable.y_cable + result = sonic_y_cable.y_cable.rollback_firmware(physical_port) + if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS: + click.echo("firmware ROLLBACK successful for {}".format(port)) + else: + click.echo("firmware ROLLBACK failure for {}".format(port)) + rc = False + + if rc: + sys.exit(CONFIG_SUCCESSFUL) + else: + sys.exit(CONFIG_FAIL)