Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[show][config] cli support for firmware upgrade on Y-Cable #1528

Merged
merged 13 commits into from
Apr 5, 2021
239 changes: 239 additions & 0 deletions config/muxcable.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,242 @@ def setswitchmode(state, port):
if rc == False:
click.echo("ERR: Unable to set switching mode one or more ports to {}".format(state))
sys.exit(CONFIG_FAIL)


def get_per_npu_statedb(per_npu_statedb, port_table_keys):

# Getting all front asic namespace and correspding config and state DB connector

namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
# replace these with correct macros
per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)

port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')


def get_physical_port_list(port, physical_port_list):

if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

return asic_index


def perform_download_firmware(physical_port, fwfile):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile)
if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS:
click.echo("firmware download successful {}".format(port))
return True
else:
click.echo("firmware download failure {}".format(port))
return False


def perform_activate_firmware(physical_port):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.activate_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS:
click.echo("firmware activate successful for {}".format(port))
return True
else:
click.echo("firmware activate failure for {}".format(port))
return False


def perform_rollback_firmware(physical_port):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.rollback_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS:
click.echo("firmware rollback successful {}".format(port))
return True
else:
click.echo("firmware rollback failure {}".format(port))
return False


@muxcable.group(cls=clicommon.AbbreviationGroup)
def firmware():
"""Configure muxcable firmware command"""
pass


@firmware.command()
@click.argument('fwfile', metavar='<firmware_file>', required=True)
@click.argument('port', metavar='<port_name>', required=True, default=None)
def download(fwfile, port):
"""Config muxcable firmware download"""

per_npu_statedb = {}
y_cable_asic_table_keys = {}
port_table_keys = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

physical_port_list = []
get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
perform_download_firmware(physical_port, fwfile)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
sys.exit(CONFIG_FAIL)

elif port == "all" and port is not None:

rc = True
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]

physical_port_list = []
get_physical_port_list(port, physical_port_list)

physical_port = physical_port_list[0]

status = perform_download_firmware(physical_port, fwfile)

if status is not True:
rc = False

if rc:
sys.exit(CONFIG_SUCCESSFUL)
else:
sys.exit(CONFIG_FAIL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rc seems redundant as a bool. Suggest setting rc = CONFIG_SUCCESSFUL on line 750, rc = CONFIG_FAIL on line 764, and here simply call sys.exit(rc).

Suggest making this change throughout the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed



@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def activate(port):
"""Config muxcable firmware activate"""

per_npu_statedb = {}
y_cable_asic_table_keys = {}
port_table_keys = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

physical_port_list = []
get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
perform_activate_firmware(physical_port)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
sys.exit(CONFIG_FAIL)

elif port == "all" and port is not None:

rc = True
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]

physical_port_list = []

get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
status = perform_activate_firmware(physical_port)

if status is not True:
rc = False

if rc:
sys.exit(CONFIG_SUCCESSFUL)
else:
sys.exit(CONFIG_FAIL)


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def rollback(port):
"""Config muxcable firmware rollback"""

port_table_keys = {}
y_cable_asic_table_keys = {}
per_npu_statedb = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

physical_port_list = []
get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
perform_rollback_firmware(physical_port)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))
sys.exit(CONFIG_FAIL)

elif port == "all" and port is not None:

rc = True
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]

physical_port_list = []
get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
status = perform_rollback_firmware(physical_port)

if status is not True:
rc = False

if rc:
sys.exit(CONFIG_SUCCESSFUL)
else:
sys.exit(CONFIG_FAIL)
99 changes: 99 additions & 0 deletions show/muxcable.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,3 +820,102 @@ def switchmode(port):
click.echo(tabulate(body, headers=headers))
if rc == False:
sys.exit(EXIT_FAIL)


def get_firmware_dict(physical_port, target, side, mux_info_dict):

import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.get_firmware_version(physical_port, target)

if result is not None and isinstance(result, dict):
mux_info_dict[("build_slot1_{}".format(side))] = result.get("build_slot1", None)
mux_info_dict[("version_slot1_{}".format(side))] = result.get("version_slot1", None)
mux_info_dict[("build_slot2_{}".format(side))] = result.get("build_slot2", None)
mux_info_dict[("version_slot2_{}".format(side))] = result.get("version_slot2", None)
mux_info_dict[("run_slot1_{}".format(side))] = result.get("run_slot1", None)
mux_info_dict[("run_slot2_{}".format(side))] = result.get("run_slot2", None)
mux_info_dict[("commit_slot1_{}".format(side))] = result.get("commit_slot1", None)
mux_info_dict[("commit_slot2_{}".format(side))] = result.get("commit_slot2", None)
mux_info_dict[("empty_slot1_{}".format(side))] = result.get("empty_slot1", None)
mux_info_dict[("empty_slot2_{}".format(side))] = result.get("empty_slot2", None)
else:
mux_info_dict[("build_slot1_{}".format(side))] = "N/A"
mux_info_dict[("version_slot1_{}".format(side))] = "N/A"
mux_info_dict[("build_slot2_{}".format(side))] = "N/A"
mux_info_dict[("version_slot2_{}".format(side))] = "N/A"
mux_info_dict[("run_slot1_{}".format(side))] = "N/A"
mux_info_dict[("run_slot2_{}".format(side))] = "N/A"
mux_info_dict[("commit_slot1_{}".format(side))] = "N/A"
mux_info_dict[("commit_slot2_{}".format(side))] = "N/A"
mux_info_dict[("empty_slot1_{}".format(side))] = "N/A"
mux_info_dict[("empty_slot2_{}".format(side))] = "N/A"


@muxcable.group(cls=clicommon.AbbreviationGroup)
def firmware():
"""Show muxcable firmware command"""
pass


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def version(port):
"""Show muxcable firmware version"""

port_table_keys = {}
y_cable_asic_table_keys = {}
per_npu_statedb = {}

# Getting all front asic namespace and correspding config and state DB connector

namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
# replace these with correct macros
per_npu_statedb[asic_id] = swsscommon.SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)

port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')

if port is not None and port != "all":

if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

mux_info_dict = {}
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
get_firmware_dict(physical_port, 0, "nic", mux_info_dict)
get_firmware_dict(physical_port, 1, "tor1", mux_info_dict)
get_firmware_dict(physical_port, 2, "tor2", mux_info_dict)
click.echo("{}".format(json.dumps(mux_info_dict, indent=4)))

else:
click.echo("this is not a valid port present on mux_cable".format(port))
sys.exit(CONFIG_FAIL)
else:
click.echo("there is not a valid asic table for this asic_index".format(asic_index))