Skip to content

Commit

Permalink
[config] fix interface IPv6 address removal. (#1819)
Browse files Browse the repository at this point in the history
Current code takes an input from the user, converts the IPv6 string to lower letters and removes lower case IPv6 string from CONFIG DB. This is a bug since, according to the schema CONFIG DB is case insensitive for IPv6 address.

- What I did
Fixed CLI for removing IPv6 address.
Issue is that below command does not work if IP address is written in upper case in CONFIG DB, like this: FC00::1/64.
'config interface ip remove Ethernet0 FC00::1/64'

- How I did it
Make it case insensitive
Relaxed the validation of IP address, a built-in validation from ipaddress package in python is used.
Refactored interface_ipaddr_dependent_on_interface -> get_interface_ipaddresses
Separated some functions (has_static_routes_attached, flush_ip_neigh_in_kernel, can_remove_router_interface, remove_router_interface_ip_address, remove_router_interface, is_management_interface)

- How to verify it
Run UT. Try to reproduce the scenario described above.

Signed-off-by: Stepan Blyschak <[email protected]>
  • Loading branch information
stepanblyschak authored Nov 1, 2021
1 parent 0665d6f commit ca728b8
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 136 deletions.
232 changes: 109 additions & 123 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,18 +343,22 @@ def interface_name_to_alias(config_db, interface_name):

return None

def interface_ipaddr_dependent_on_interface(config_db, interface_name):
"""Get table keys including ipaddress
def get_interface_ipaddresses(config_db, interface_name):
"""Get IP addresses attached to interface
"""
data = []
ipaddresses = set()
table_name = get_interface_table_name(interface_name)
if table_name == "":
return data
if not table_name:
return ipaddresses

keys = config_db.get_keys(table_name)
for key in keys:
if interface_name in key and len(key) == 2:
data.append(key)
return data
if isinstance(key, tuple) and len(key) == 2:
iface, interface_ip = key
if iface == interface_name:
ipaddresses.add(ipaddress.ip_interface(interface_ip))

return ipaddresses

def is_interface_bind_to_vrf(config_db, interface_name):
"""Get interface if bind to vrf or not
Expand Down Expand Up @@ -448,9 +452,9 @@ def del_interface_bind_to_vrf(config_db, vrf_name):
if interface_dict:
for interface_name in interface_dict:
if 'vrf_name' in interface_dict[interface_name] and vrf_name == interface_dict[interface_name]['vrf_name']:
interface_dependent = interface_ipaddr_dependent_on_interface(config_db, interface_name)
for interface_del in interface_dependent:
config_db.set_entry(table_name, interface_del, None)
interface_ipaddresses = get_interface_ipaddresses(config_db, interface_name)
for ipaddress in interface_ipaddresses:
remove_router_interface_ip_address(config_db, interface_name, ipaddress)
config_db.set_entry(table_name, interface_name, None)

def set_interface_naming_mode(mode):
Expand Down Expand Up @@ -854,27 +858,6 @@ def validate_mirror_session_config(config_db, session_name, dst_port, src_port,

return True

def is_valid_ip_interface(ctx, ip_addr):
split_ip_mask = ip_addr.split("/")
if len(split_ip_mask) < 2:
return False

# Check if the IP address is correct or if there are leading zeros.
ip_obj = ipaddress.ip_address(split_ip_mask[0])

if isinstance(ip_obj, ipaddress.IPv4Address):
# Since the IP address is used as a part of a key in Redis DB,
# do not tolerate extra zeros in IPv4.
if str(ip_obj) != split_ip_mask[0]:
return False

# Check if the mask is correct
net = ipaddress.ip_network(ip_addr, strict=False)
if str(net.prefixlen) != split_ip_mask[1] or net.prefixlen == 0:
return False

return True

def cli_sroute_to_config(ctx, command_str, strict_nh = True):
if len(command_str) < 2 or len(command_str) > 9:
ctx.fail("argument is not in pattern prefix [vrf <vrf_name>] <A.B.C.D/M> nexthop <[vrf <vrf_name>] <A.B.C.D>>|<dev <dev_name>>!")
Expand Down Expand Up @@ -985,6 +968,20 @@ def cache_arp_entries():
open(restore_flag_file, 'w').close()
return success

def remove_router_interface_ip_address(config_db, interface_name, ipaddress_to_remove):
table_name = get_interface_table_name(interface_name)
keys = config_db.get_keys(table_name)

for key in keys:
if not isinstance(key, tuple) or len(key) != 2:
continue

iface, ipaddress_string = key
if iface != interface_name:
continue

if ipaddress.ip_interface(ipaddress_string) == ipaddress_to_remove:
config_db.set_entry(table_name, (interface_name, ipaddress_string), None)

def validate_ipv4_address(ctx, param, ip_addr):
"""Helper function to validate ipv4 address
Expand Down Expand Up @@ -3749,50 +3746,44 @@ def add(ctx, interface_name, ip_addr, gw):
return

try:
net = ipaddress.ip_network(ip_addr, strict=False)
if '/' not in ip_addr:
ip_addr += '/' + str(net.prefixlen)

if not is_valid_ip_interface(ctx, ip_addr):
raise ValueError('')

if interface_name == 'eth0':

# Configuring more than 1 IPv4 or more than 1 IPv6 address fails.
# Allow only one IPv4 and only one IPv6 address to be configured for IPv6.
# If a row already exist, overwrite it (by doing delete and add).
mgmtintf_key_list = _get_all_mgmtinterface_keys()

for key in mgmtintf_key_list:
# For loop runs for max 2 rows, once for IPv4 and once for IPv6.
# No need to capture the exception since the ip_addr is already validated earlier
ip_input = ipaddress.ip_interface(ip_addr)
current_ip = ipaddress.ip_interface(key[1])
if (ip_input.version == current_ip.version):
# If user has configured IPv4/v6 address and the already available row is also IPv4/v6, delete it here.
config_db.set_entry("MGMT_INTERFACE", ("eth0", key[1]), None)

# Set the new row with new value
if not gw:
config_db.set_entry("MGMT_INTERFACE", (interface_name, ip_addr), {"NULL": "NULL"})
else:
config_db.set_entry("MGMT_INTERFACE", (interface_name, ip_addr), {"gwaddr": gw})
mgmt_ip_restart_services()
ip_address = ipaddress.ip_interface(ip_addr)
except ValueError as err:
ctx.fail("IP address is not valid: {}".format(err))

if interface_name == 'eth0':

# Configuring more than 1 IPv4 or more than 1 IPv6 address fails.
# Allow only one IPv4 and only one IPv6 address to be configured for IPv6.
# If a row already exist, overwrite it (by doing delete and add).
mgmtintf_key_list = _get_all_mgmtinterface_keys()

for key in mgmtintf_key_list:
# For loop runs for max 2 rows, once for IPv4 and once for IPv6.
# No need to capture the exception since the ip_addr is already validated earlier
current_ip = ipaddress.ip_interface(key[1])
if (ip_address.version == current_ip.version):
# If user has configured IPv4/v6 address and the already available row is also IPv4/v6, delete it here.
config_db.set_entry("MGMT_INTERFACE", ("eth0", key[1]), None)

# Set the new row with new value
if not gw:
config_db.set_entry("MGMT_INTERFACE", (interface_name, str(ip_address)), {"NULL": "NULL"})
else:
config_db.set_entry("MGMT_INTERFACE", (interface_name, str(ip_address)), {"gwaddr": gw})
mgmt_ip_restart_services()

return
return

table_name = get_interface_table_name(interface_name)
if table_name == "":
ctx.fail("'interface_name' is not valid. Valid names [Ethernet/PortChannel/Vlan/Loopback]")
interface_entry = config_db.get_entry(table_name, interface_name)
if len(interface_entry) == 0:
if table_name == "VLAN_SUB_INTERFACE":
config_db.set_entry(table_name, interface_name, {"admin_status": "up"})
else:
config_db.set_entry(table_name, interface_name, {"NULL": "NULL"})
config_db.set_entry(table_name, (interface_name, ip_addr), {"NULL": "NULL"})
except ValueError:
ctx.fail("ip address or mask is not valid.")
table_name = get_interface_table_name(interface_name)
if table_name == "":
ctx.fail("'interface_name' is not valid. Valid names [Ethernet/PortChannel/Vlan/Loopback]")
interface_entry = config_db.get_entry(table_name, interface_name)
if len(interface_entry) == 0:
if table_name == "VLAN_SUB_INTERFACE":
config_db.set_entry(table_name, interface_name, {"admin_status": "up"})
else:
config_db.set_entry(table_name, interface_name, {"NULL": "NULL"})
config_db.set_entry(table_name, (interface_name, str(ip_address)), {"NULL": "NULL"})

#
# 'del' subcommand
Expand All @@ -3813,52 +3804,47 @@ def remove(ctx, interface_name, ip_addr):
ctx.fail("'interface_name' is None!")

try:
net = ipaddress.ip_network(ip_addr, strict=False)
if '/' not in ip_addr:
ip_addr += '/' + str(net.prefixlen)
ip_address = ipaddress.ip_interface(ip_addr)
except ValueError as err:
ctx.fail("IP address is not valid: {}".format(err))

if not is_valid_ip_interface(ctx, ip_addr):
raise ValueError('')
if interface_name == 'eth0':
config_db.set_entry("MGMT_INTERFACE", (interface_name, str(ip_address)), None)
mgmt_ip_restart_services()
return

if interface_name == 'eth0':
config_db.set_entry("MGMT_INTERFACE", (interface_name, ip_addr), None)
mgmt_ip_restart_services()
return
table_name = get_interface_table_name(interface_name)
if table_name == "":
ctx.fail("'interface_name' is not valid. Valid names [Ethernet/PortChannel/Vlan/Loopback]")
interface_addresses = get_interface_ipaddresses(config_db, interface_name)
# If we deleting the last IP entry of the interface, check whether a static route present for the RIF
# before deleting the entry and also the RIF.
if interface_addresses == {ip_address}:
# Check both IPv4 and IPv6 routes.
ip_versions = [ "ip", "ipv6"]
for ip_ver in ip_versions:
# Compete the command and ask Zebra to return the routes.
# Scopes of all VRFs will be checked.
cmd = "show {} route vrf all static".format(ip_ver)
if multi_asic.is_multi_asic():
output = bgp_util.run_bgp_command(cmd, ctx.obj['namespace'])
else:
output = bgp_util.run_bgp_command(cmd)
# If there is output data, check is there a static route,
# bound to the interface.
if output != "":
if any(interface_name in output_line for output_line in output.splitlines()):
ctx.fail("Cannot remove the last IP entry of interface {}. A static {} route is still bound to the RIF.".format(interface_name, ip_ver))
remove_router_interface_ip_address(config_db, interface_name, ip_address)
interface_addresses = get_interface_ipaddresses(config_db, interface_name)
if len(interface_addresses) == 0 and is_interface_bind_to_vrf(config_db, interface_name) is False and get_intf_ipv6_link_local_mode(ctx, interface_name, table_name) != "enable":
config_db.set_entry(table_name, interface_name, None)

table_name = get_interface_table_name(interface_name)
if table_name == "":
ctx.fail("'interface_name' is not valid. Valid names [Ethernet/PortChannel/Vlan/Loopback]")
interface_dependent = interface_ipaddr_dependent_on_interface(config_db, interface_name)
# If we deleting the last IP entry of the interface, check whether a static route present for the RIF
# before deleting the entry and also the RIF.
if len(interface_dependent) == 1 and interface_dependent[0][1] == ip_addr:
# Check both IPv4 and IPv6 routes.
ip_versions = [ "ip", "ipv6"]
for ip_ver in ip_versions:
# Compete the command and ask Zebra to return the routes.
# Scopes of all VRFs will be checked.
cmd = "show {} route vrf all static".format(ip_ver)
if multi_asic.is_multi_asic():
output = bgp_util.run_bgp_command(cmd, ctx.obj['namespace'])
else:
output = bgp_util.run_bgp_command(cmd)
# If there is output data, check is there a static route,
# bound to the interface.
if output != "":
if any(interface_name in output_line for output_line in output.splitlines()):
ctx.fail("Cannot remove the last IP entry of interface {}. A static {} route is still bound to the RIF.".format(interface_name, ip_ver))
config_db.set_entry(table_name, (interface_name, ip_addr), None)
interface_dependent = interface_ipaddr_dependent_on_interface(config_db, interface_name)
if len(interface_dependent) == 0 and is_interface_bind_to_vrf(config_db, interface_name) is False and get_intf_ipv6_link_local_mode(ctx, interface_name, table_name) != "enable":
config_db.set_entry(table_name, interface_name, None)

if multi_asic.is_multi_asic():
command = "sudo ip netns exec {} ip neigh flush dev {} {}".format(ctx.obj['namespace'], interface_name, ip_addr)
else:
command = "ip neigh flush dev {} {}".format(interface_name, ip_addr)
clicommon.run_command(command)
except ValueError:
ctx.fail("ip address or mask is not valid.")
if multi_asic.is_multi_asic():
command = "sudo ip netns exec {} ip neigh flush dev {} {}".format(ctx.obj['namespace'], interface_name, str(ip_address))
else:
command = "ip neigh flush dev {} {}".format(interface_name, str(ip_address))
clicommon.run_command(command)


#
Expand Down Expand Up @@ -4243,9 +4229,9 @@ def bind(ctx, interface_name, vrf_name):
config_db.get_entry(table_name, interface_name).get('vrf_name') == vrf_name:
return
# Clean ip addresses if interface configured
interface_dependent = interface_ipaddr_dependent_on_interface(config_db, interface_name)
for interface_del in interface_dependent:
config_db.set_entry(table_name, interface_del, None)
interface_addresses = get_interface_ipaddresses(config_db, interface_name)
for ipaddress in interface_addresses:
remove_router_interface_ip_address(config_db, interface_name, ipaddress)
config_db.set_entry(table_name, interface_name, None)
# When config_db del entry and then add entry with same key, the DEL will lost.
if ctx.obj['namespace'] is DEFAULT_NAMESPACE:
Expand Down Expand Up @@ -4281,9 +4267,9 @@ def unbind(ctx, interface_name):
ctx.fail("'interface_name' is not valid. Valid names [Ethernet/PortChannel/Vlan/Loopback]")
if is_interface_bind_to_vrf(config_db, interface_name) is False:
return
interface_dependent = interface_ipaddr_dependent_on_interface(config_db, interface_name)
for interface_del in interface_dependent:
config_db.set_entry(table_name, interface_del, None)
interface_ipaddresses = get_interface_ipaddresses(config_db, interface_name)
for ipaddress in interface_ipaddresses:
remove_router_interface_ip_address(config_db, interface_name, ipaddress)
config_db.set_entry(table_name, interface_name, None)


Expand Down
38 changes: 25 additions & 13 deletions tests/ip_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import show.main as show
from utilities_common.db import Db

ERROR_MSG = '''
Error: ip address or mask is not valid.
'''
ERROR_MSG = "Error: IP address is not valid"

class TestConfigIP(object):
@classmethod
Expand Down Expand Up @@ -65,7 +63,7 @@ def test_add_interface_ipv4_with_leading_zeros(self):
obj = {'config_db':db.cfgdb}

# config int ip add Ethernet68 10.10.10.002/24
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "10.10.10.002/24"], obj=obj)
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "10.10.10.0002/24"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ERROR_MSG in result.output
Expand All @@ -88,7 +86,21 @@ def test_add_del_interface_valid_ipv6(self):
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet72', '2001:1db8:11a3:19d7:1f34:8a2e:17a0:765d/34') not in db.cfgdb.get_table('INTERFACE')


def test_del_interface_case_sensitive_ipv6(self):
db = Db()
runner = CliRunner()
obj = {'config_db':db.cfgdb}

obj['config_db'].set_entry('INTERFACE', ('Ethernet72', 'FC00::1/126'), {})
assert ('Ethernet72', 'FC00::1/126') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet72 FC00::1/126
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet72", "FC00::1/126"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet72', 'FC00::1/126') not in db.cfgdb.get_table('INTERFACE')

def test_add_interface_invalid_ipv6(self):
db = Db()
runner = CliRunner()
Expand Down Expand Up @@ -120,14 +132,14 @@ def test_add_del_interface_ipv6_with_leading_zeros(self):
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet68', '2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34') in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '2001:db8:11a3:9d7:1f34:8a2e:7a0:765d/34') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet68 2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet68', '2001:0db8:11a3:09d7:1f34:8a2e:07a0:765d/34') not in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '2001:db8:11a3:9d7:1f34:8a2e:7a0:765d/34') not in db.cfgdb.get_table('INTERFACE')

def test_add_del_interface_shortened_ipv6_with_leading_zeros(self):
db = Db()
runner = CliRunner()
Expand All @@ -137,14 +149,14 @@ def test_add_del_interface_shortened_ipv6_with_leading_zeros(self):
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["add"], ["Ethernet68", "3000::001/64"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code == 0
assert ('Ethernet68', '3000::001/64') in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '3000::1/64') in db.cfgdb.get_table('INTERFACE')

# config int ip remove Ethernet68 3000::001/64
result = runner.invoke(config.config.commands["interface"].commands["ip"].commands["remove"], ["Ethernet68", "3000::001/64"], obj=obj)
print(result.exit_code, result.output)
assert result.exit_code != 0
assert ('Ethernet68', '3000::001/64') not in db.cfgdb.get_table('INTERFACE')
assert ('Ethernet68', '3000::1/64') not in db.cfgdb.get_table('INTERFACE')

@classmethod
def teardown_class(cls):
os.environ['UTILITIES_UNIT_TESTING'] = "0"
Expand Down

0 comments on commit ca728b8

Please sign in to comment.