Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading/writing module EEPROM data by page and offset #3008

Merged
merged 19 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions doc/Command-Reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@
* [MACsec config command](#macsec-config-command)
* [MACsec show command](#macsec-show-command)
* [MACsec clear command](#macsec-clear-command)
* [SFP Utilities Commands](#sfp-utilities-commands)
* [SFP Utilities read command](#sfp-utilities-read-command)
* [SFP Utilities write command](#sfp-utilities-write-command)
* [Static DNS Commands](#static-dns-commands)
* [Static DNS config command](#static-dns-config-command)
* [Static DNS show command](#static-dns-show-command)
Expand Down Expand Up @@ -12748,6 +12751,61 @@ Clear MACsec counters which is to reset all MACsec counters to ZERO.

Go Back To [Beginning of the document](#) or [Beginning of this section](#macsec-commands)

# SFP Utilities Commands

This sub-section explains the list of commands available for SFP utilities feature.

# SFP Utilities read command

- Read SFP EEPROM data

```
admin@sonic:~$ sfputil read-eeprom --help
Usage: sfputil read-eeprom [OPTIONS] <port_name> <page> <offset> <size>
Read SFP EEPROM data
Options:
--no-format Display non formatted data.
--wire-addr Wire address of sff8472.
--help Show this
```

```
admin@sonic:~$ sfputil read-eeprom Ethernet0 0 100 2
00000064 4a 44 |..|

admin@sonic:~$ sfputil read-eeprom Ethernet0 0 0 32
00000000 11 08 06 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|

admin@sonic:~$ sfputil read-eeprom Ethernet0 0 100 2 --no-format
4a44
```

# SFP Utilities write command

- Write SFP EEPROM data

```
admin@sonic:~$ sfputil write-eeprom --help
Usage: sfputil write-eeprom [OPTIONS] <port> <page> <offset> <data>

Write SFP EEPROM data

Options:
--wire-addr Wire address of sff8472.
--verify Verify the data by reading back.
--help Show this message and exit.
```

```
admin@sonic:~$ sfputil write-eeprom Ethernet0 0 100 4a44

admin@sonic:~$ sfputil write-eeprom Etherent0 0 100 4a44 --verify
Error: Write data failed! Write: 4a44, read: 0000.
Junchao-Mellanox marked this conversation as resolved.
Show resolved Hide resolved
```

Go Back To [Beginning of the document](#) or [Beginning of this section](#sfp-utilities-commands)

# Static DNS Commands

This sub-section explains the list of the configuration options available for static DNS feature.
Expand Down
129 changes: 109 additions & 20 deletions sfputil/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,27 +799,34 @@ def convert_byte_to_valid_ascii_char(byte):
else:
return chr(byte)

def hexdump(indent, data, mem_address):
ascii_string = ''
result = ''
for byte in data:
ascii_string = ascii_string + convert_byte_to_valid_ascii_char(byte)
byte_string = "{:02x}".format(byte)
if mem_address % 16 == 0:
mem_address_string = "{:08x}".format(mem_address)
result += '\n{}{} '.format(indent, mem_address_string)
result += '{} '.format(byte_string)
elif mem_address % 16 == 15:
result += '{} '.format(byte_string)
result += '|{}|'.format(ascii_string)
ascii_string = ""
elif mem_address % 16 == 8:
result += ' {} '.format(byte_string)
def hexdump(indent, data, mem_address, start_newline=True):
size = len(data)
offset = 0
lines = [''] if start_newline else []
while size > 0:
offset_str = "{}{:08x}".format(indent, mem_address)
if size >= 16:
first_half = ' '.join("{:02x}".format(x) for x in data[offset:offset + 8])
second_half = ' '.join("{:02x}".format(x) for x in data[offset + 8:offset + 16])
ascii_str = ''.join(convert_byte_to_valid_ascii_char(x) for x in data[offset:offset + 16])
lines.append(f'{offset_str} {first_half} {second_half} |{ascii_str}|')
elif size > 8:
first_half = ' '.join("{:02x}".format(x) for x in data[offset:offset + 8])
second_half = ' '.join("{:02x}".format(x) for x in data[offset + 8:offset + size])
padding = ' ' * (16 - size)
ascii_str = ''.join(convert_byte_to_valid_ascii_char(x) for x in data[offset:offset + size])
lines.append(f'{offset_str} {first_half} {second_half}{padding} |{ascii_str}|')
break
else:
result += '{} '.format(byte_string)
mem_address += 1

return result
hex_part = ' '.join("{:02x}".format(x) for x in data[offset:offset + size])
padding = ' ' * (16 - size)
ascii_str = ''.join(convert_byte_to_valid_ascii_char(x) for x in data[offset:offset + size])
lines.append(f'{offset_str} {hex_part} {padding} |{ascii_str}|')
break
size -= 16
offset += 16
mem_address += 16
return '\n'.join(lines)

# 'presence' subcommand
@show.command()
Expand Down Expand Up @@ -1514,6 +1521,88 @@ def unlock(port_name, password):
else:
click.echo("CDB: Host password NOT accepted! status = {}".format(status))


# 'read-eeprom' subcommand
@cli.command()
@click.argument('port_name', metavar='<port_name>', required=True)
@click.argument('page', metavar='<page>', type=click.INT, required=True)
@click.argument('offset', metavar='<offset>', type=click.INT, required=True)
@click.argument('size', metavar='<size>', type=click.INT, required=True)
@click.option('--no-format', is_flag=True, help="Display non formatted data")
@click.option('--wire-addr', help="Wire address of sff8472")
def read_eeprom(port_name, page, offset, size, no_format, wire_addr):
"""Read SFP EEPROM data"""
Junchao-Mellanox marked this conversation as resolved.
Show resolved Hide resolved
if is_port_type_rj45(port_name):
click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name))
sys.exit(EXIT_FAIL)

physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)
if not sfp.get_presence():
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

try:
data = sfp.read_eeprom_by_page(page, offset, size, wire_addr)
if data is None:
click.echo("Error: Failed to read EEPROM!")
sys.exit(ERROR_NOT_IMPLEMENTED)
if no_format:
click.echo(''.join('{:02x}'.format(x) for x in data))
else:
click.echo(hexdump('', data, offset))
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)
except ValueError as e:
click.echo("Error: {}".format(e))
sys.exit(EXIT_FAIL)


# 'write-eeprom' subcommand
@cli.command()
@click.argument('port_name', metavar='<port_name>', required=True)
@click.argument('page', metavar='<page>', type=click.INT, required=True)
@click.argument('offset', metavar='<offset>', type=click.INT, required=True)
@click.argument('data', metavar='<data>', required=True)
@click.option('--wire-addr', help="Wire address of sff8472")
@click.option('--verify', is_flag=True, help="Verify the data by reading back")
Junchao-Mellanox marked this conversation as resolved.
Show resolved Hide resolved
def write_eeprom(port_name, page, offset, data, wire_addr, verify):
"""Write SFP EEPROM data"""
if is_port_type_rj45(port_name):
click.echo("This functionality is not applicable for RJ45 port {}.".format(port_name))
sys.exit(EXIT_FAIL)

physical_port = logical_port_to_physical_port_index(port_name)
sfp = platform_chassis.get_sfp(physical_port)
if not sfp.get_presence():
click.echo("{}: SFP EEPROM not detected\n".format(port_name))
sys.exit(EXIT_FAIL)

try:
bytes = bytearray.fromhex(data)
except ValueError:
click.echo("Error: Data must be a hex string of even length!")
sys.exit(EXIT_FAIL)

try:
success = sfp.write_eeprom_by_page(page, offset, bytes, wire_addr)
if not success:
click.echo("Error: Failed to write EEPROM!")
sys.exit(ERROR_NOT_IMPLEMENTED)
if verify:
read_data = sfp.read_eeprom_by_page(page, offset, len(bytes), wire_addr)
if read_data != bytes:
click.echo(f"Error: Write data failed! Write: {''.join('{:02x}'.format(x) for x in bytes)}, read: {''.join('{:02x}'.format(x) for x in read_data)}")
sys.exit(EXIT_FAIL)
except NotImplementedError:
click.echo("This functionality is currently not implemented for this platform")
sys.exit(ERROR_NOT_IMPLEMENTED)
except ValueError as e:
click.echo("Error: {}".format(e))
sys.exit(EXIT_FAIL)


# 'version' subcommand
@cli.command()
def version():
Expand Down
102 changes: 102 additions & 0 deletions tests/sfputil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,105 @@ def test_update_firmware_info_to_state_db(self, mock_chassis):
mock_sfp.get_transceiver_info_firmware_versions.return_value = ['a.b.c', 'd.e.f']

sfputil.update_firmware_info_to_state_db("Ethernet0")

@patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=False))
@patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1))
@patch('sfputil.main.platform_chassis')
def test_read_eeprom(self, mock_chassis):
mock_sfp = MagicMock()
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)

mock_sfp.get_presence = MagicMock(return_value=False)
runner = CliRunner()
result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '0', '1'])
assert result.exit_code == EXIT_FAIL

mock_sfp.get_presence.return_value = True
mock_sfp.read_eeprom_by_page = MagicMock(return_value=None)
result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '0', '1'])
assert result.exit_code == ERROR_NOT_IMPLEMENTED

mock_sfp.read_eeprom_by_page.return_value = bytearray([0x00, 0x01])
result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '0', '2', '--no-format'])
assert result.exit_code == 0
assert result.output == '0001\n'

result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '5', '2'])
assert result.exit_code == 0
expected_output = """
00000005 00 01 |..|
"""
print(result.output)
assert result.output == expected_output

mock_sfp.read_eeprom_by_page.side_effect = NotImplementedError
result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '5', '2'])
assert result.exit_code == ERROR_NOT_IMPLEMENTED

mock_sfp.read_eeprom_by_page.side_effect = ValueError
result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '5', '2'])
assert result.exit_code == EXIT_FAIL

@patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=False))
@patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1))
@patch('sfputil.main.platform_chassis')
def test_write_eeprom(self, mock_chassis):
mock_sfp = MagicMock()
mock_chassis.get_sfp = MagicMock(return_value=mock_sfp)

mock_sfp.get_presence = MagicMock(return_value=False)
runner = CliRunner()
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '01'])
assert result.exit_code == EXIT_FAIL

# invalid hex string, hex string must have even length
mock_sfp.get_presence.return_value = True
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '1'])
assert result.exit_code == EXIT_FAIL

# invalid hex string
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '+0'])
assert result.exit_code == EXIT_FAIL

# write failed
mock_sfp.write_eeprom_by_page = MagicMock(return_value=False)
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '10'])
print(result.output)
assert result.exit_code == ERROR_NOT_IMPLEMENTED

# write success
mock_sfp.write_eeprom_by_page.return_value = True
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '10'])
assert result.exit_code == 0

# write verify success
mock_sfp.read_eeprom_by_page = MagicMock(return_value=bytearray([16]))
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '10', '--verify'])
assert result.exit_code == 0

# write verify failed
mock_sfp.read_eeprom_by_page = MagicMock(return_value=bytearray([10]))
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '11', '--verify'])
assert result.exit_code != 0

# Not implemented
mock_sfp.write_eeprom_by_page.side_effect = NotImplementedError
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '10'])
assert result.exit_code == ERROR_NOT_IMPLEMENTED

# Value error
mock_sfp.write_eeprom_by_page.side_effect = ValueError
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '10'])
assert result.exit_code == EXIT_FAIL

@patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True))
def test_read_eeprom_rj45(self):
runner = CliRunner()
result = runner.invoke(sfputil.cli.commands['read-eeprom'], ["Ethernet0", '0', '0', '1'])
assert result.exit_code == EXIT_FAIL

@patch('sfputil.main.is_port_type_rj45', MagicMock(return_value=True))
def test_write_eeprom_rj45(self):
runner = CliRunner()
result = runner.invoke(sfputil.cli.commands['write-eeprom'], ["Ethernet0", '0', '0', '00'])
assert result.exit_code == EXIT_FAIL
Loading