Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: vaibhav-dahiya <[email protected]>
  • Loading branch information
vdahiya12 committed Mar 29, 2021
1 parent 0b3bdba commit 63c6977
Showing 1 changed file with 101 additions and 174 deletions.
275 changes: 101 additions & 174 deletions config/muxcable.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,20 +632,10 @@ def setswitchmode(state, port):
click.echo("ERR: Unable to set switching mode one or more ports to {}".format(state))
sys.exit(CONFIG_FAIL)

@muxcable.group(cls=clicommon.AbbreviationGroup)
def firmware():
"""Configure muxcable firmware command"""
pass

@firmware.command()
@click.argument('fwfile', metavar='<operation_status>', required=True)
@click.argument('port', metavar='<port_name>', required=True, default=None)
def download(fwfile, port):
"""Config muxcable firmware download"""
def get_per_npu_statedb(per_npu_statedb, port_table_keys):

port_table_keys = {}
y_cable_asic_table_keys = {}
per_npu_statedb = {}

# Getting all front asic namespace and correspding config and state DB connector

Expand All @@ -656,46 +646,98 @@ def download(fwfile, port):
per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)


port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')

if port is not None and port != "all":

if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)
def get_physical_port_list(port, physical_port_list)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)
if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

return asic_index


def perform_download_firmware(physical_port, fwfile):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile)
if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS:
click.echo("firmware download successful {}".format(port))
return True
else:
click.echo("firmware download failure {}".format(port))
return False


def perform_activate_firmware(physical_port, fwfile):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.activate_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS:
click.echo("firmware ACTIVATE successful for {}".format(port))
return True
else:
click.echo("firmware ACTIVATE failure for {}".format(port))
return False


def perform_rollback_firmware(physical_port, fwfile):
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.rollback_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS:
click.echo("firmware rollback successful {}".format(port))
return True
else:
click.echo("firmware rollback failure {}".format(port))
return False


@muxcable.group(cls=clicommon.AbbreviationGroup)
def firmware():
"""Configure muxcable firmware command"""
pass


@firmware.command()
@click.argument('fwfile', metavar='<operation_status>', required=True)
@click.argument('port', metavar='<port_name>', required=True, default=None)
def download(fwfile, port):
"""Config muxcable firmware download"""

per_npu_statedb = {}
physical_port_list = []
port_table_keys = {}

get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile)
if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS:
click.echo("firmware download successful {}".format(port))
else:
click.echo("firmware download failure {}".format(port))
perform_download_firmware(physical_port, fwfile)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
Expand All @@ -711,33 +753,23 @@ def download(fwfile, port):
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]
if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
rc = False
continue
physical_port_list = []
get_physical_port_list(port, physical_port_list)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
rc = False
continue
physical_port = physical_port_list[0]
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.download_firmware(physical_port, fwfile)
if result == sonic_y_cable.y_cable.FIRMWARE_DOWNLOAD_SUCCESS:
click.echo("firmware download successful for {}".format(port))
else:
click.echo("firmware download failure for {}".format(port))

status = perform_download_firmware(physical_port, fwfile)

if !status:
rc = False

if rc:
sys.exit(CONFIG_SUCCESSFUL)
else:
sys.exit(CONFIG_FAIL)


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def activate(port):
Expand All @@ -747,55 +779,17 @@ def activate(port):
y_cable_asic_table_keys = {}
per_npu_statedb = {}

# Getting all front asic namespace and correspding config and state DB connector

namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
# replace these with correct macros
per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)


port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')
get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.activate_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS:
click.echo("firmware activate successful {}".format(port))
else:
click.echo("firmware activate failure {}".format(port))
perform_activate_firmware(physical_port, fwfile)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
Expand All @@ -811,34 +805,20 @@ def activate(port):
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]
if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
rc = False
continue

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
rc = False
continue

get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.activate_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ACTIVATE_SUCCESS:
click.echo("firmware ACTIVATE successful for {}".format(port))
else:
click.echo("firmware ACTIVATE failure for {}".format(port))
status = perform_activate_firmware(physical_port, fwfile)

if !status:
rc = False

if rc:
sys.exit(CONFIG_SUCCESSFUL)
else:
sys.exit(CONFIG_FAIL)
i


@firmware.command()
@click.argument('port', metavar='<port_name>', required=True, default=None)
def rollback(port):
Expand All @@ -848,55 +828,17 @@ def rollback(port):
y_cable_asic_table_keys = {}
per_npu_statedb = {}

# Getting all front asic namespace and correspding config and state DB connector

namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
# replace these with correct macros
per_npu_statedb[asic_id] = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
per_npu_statedb[asic_id].connect(per_npu_statedb[asic_id].STATE_DB)


port_table_keys[asic_id] = per_npu_statedb[asic_id].keys(
per_npu_statedb[asic_id].STATE_DB, 'MUX_CABLE_TABLE|*')
get_per_npu_statedb(per_npu_statedb, port_table_keys)

if port is not None and port != "all":

if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

asic_index = None
if platform_sfputil is not None:
asic_index = platform_sfputil_helper.get_asic_id_for_logical_port(port)
if asic_index is None:
# TODO this import is only for unit test purposes, and should be removed once sonic_platform_base
# is fully mocked
import sonic_platform_base.sonic_sfp.sfputilhelper
asic_index = sonic_platform_base.sonic_sfp.sfputilhelper.SfpUtilHelper().get_asic_id_for_logical_port(port)
if asic_index is None:
click.echo("Got invalid asic index for port {}, cant retreive mux status".format(port))

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
sys.exit(CONFIG_FAIL)

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
sys.exit(CONFIG_FAIL)

get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
if per_npu_statedb[asic_index] is not None:
y_cable_asic_table_keys = port_table_keys[asic_index]
logical_key = "MUX_CABLE_TABLE|{}".format(port)
if logical_key in y_cable_asic_table_keys:
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.rollback_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS:
click.echo("firmware rollback successful {}".format(port))
else:
click.echo("firmware rollback failure {}".format(port))
perform_rollback_firmware(physical_port, fwfile)

else:
click.echo("this is not a valid port present on mux_cable".format(port))
Expand All @@ -912,27 +854,12 @@ def rollback(port):
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
for key in port_table_keys[asic_id]:
port = key.split("|")[1]
if platform_sfputil is not None:
physical_port_list = platform_sfputil_helper.logical_port_name_to_physical_port_list(port)

if not isinstance(physical_port_list, list):
click.echo(("ERR: Unable to locate physical port information for {}".format(port)))
rc = False
continue

if len(physical_port_list) != 1:
click.echo("ERR: Found multiple physical ports ({}) associated with {}".format(
", ".join(physical_port_list), port))
rc = False
continue

get_physical_port_list(port, physical_port_list)
physical_port = physical_port_list[0]
import sonic_y_cable.y_cable
result = sonic_y_cable.y_cable.rollback_firmware(physical_port)
if result == sonic_y_cable.y_cable.FIRMWARE_ROLLBACK_SUCCESS:
click.echo("firmware ROLLBACK successful for {}".format(port))
else:
click.echo("firmware ROLLBACK failure for {}".format(port))
status = perform_rollback_firmware(physical_port, fwfile)

if !status:
rc = False

if rc:
Expand Down

0 comments on commit 63c6977

Please sign in to comment.