From 193a00d3275a1ec7eecec86a98fcbee8a6ddb241 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Mon, 14 Feb 2022 22:19:26 +0000 Subject: [PATCH] [test_mux]: Formatting fixes Signed-off-by: Lawrence Lee --- tests/mux_neigh_miss_tests.py | 36 ++++--- tests/test_mux.py | 174 ++++++++++++++++++---------------- 2 files changed, 111 insertions(+), 99 deletions(-) diff --git a/tests/mux_neigh_miss_tests.py b/tests/mux_neigh_miss_tests.py index 0dfa2f3c57..d8c32a29e0 100644 --- a/tests/mux_neigh_miss_tests.py +++ b/tests/mux_neigh_miss_tests.py @@ -7,7 +7,7 @@ The expected result itself is another dictionary, containing the following attributes: - (bool) EXPECT_ROUTE: if we expect a route entry in ASIC_DB - - (bool) EXPECT_NEIGH: if we expect a neighbor entry in ASIC_DB + - (bool) EXPECT_NEIGH: if we expect a neighbor entry in ASIC_DB - (bool) REAL_MAC: If a real MAC address is expected in the APPL_DB neighbor table entry, as opposed to a zero/empty MAC @@ -26,25 +26,31 @@ """ -TEST_ACTION = 'action' -EXPECTED_RESULT = 'result' +__all__ = [ + 'TEST_ACTION', 'EXPECTED_RESULT', 'ACTIVE', 'STANDBY', 'PING_SERV', 'PING_NEIGH', + 'RESOLVE_ENTRY', 'DELETE_ENTRY', 'EXPECT_ROUTE', 'EXPECT_NEIGH', 'REAL_MAC', + 'INTF', 'IP', 'MAC', 'NEIGH_MISS_TESTS' +] + +TEST_ACTION = 'action' +EXPECTED_RESULT = 'result' # Possible test actions -ACTIVE = 'active' # Switch the test interface to active -STANDBY = 'standby' # Switch the test interface to standby -PING_SERV = 'ping_serv' # Ping the server mux cable IP -PING_NEIGH = 'ping_neigh' # Ping the neighbor IP (not configured on a specific mux cable port) -RESOLVE_ENTRY = 'resolve_entry' # Resolve the test IP neighbor entry in the kernel -DELETE_ENTRY = 'delete_entry' # Delete the test IP neighbor entry from the kernel +ACTIVE = 'active' # Switch the test interface to active +STANDBY = 'standby' # Switch the test interface to standby +PING_SERV = 'ping_serv' # Ping the server mux cable IP, used to trigger a netlink fail message +PING_NEIGH = 'ping_neigh' # Ping the neighbor IP (not configured on a specific mux cable port) +RESOLVE_ENTRY = 'resolve_entry' # Resolve the test IP neighbor entry in the kernel +DELETE_ENTRY = 'delete_entry' # Delete the test IP neighbor entry from the kernel # Test expectations -EXPECT_ROUTE = 'expect_route' -EXPECT_NEIGH = 'expect_neigh' -REAL_MAC = 'real_mac' +EXPECT_ROUTE = 'expect_route' +EXPECT_NEIGH = 'expect_neigh' +REAL_MAC = 'real_mac' -INTF = 'intf' -IP = 'ip' -MAC = 'mac' +INTF = 'intf' +IP = 'ip' +MAC = 'mac' # Note: For most test cases below, after the neighbor entry is deleted, we must # still set `REAL_MAC` to `True` in the expected result since a prior step in the diff --git a/tests/test_mux.py b/tests/test_mux.py index 95bd4c075e..13edb14349 100644 --- a/tests/test_mux.py +++ b/tests/test_mux.py @@ -13,7 +13,7 @@ def create_fvs(**kwargs): tunnel_nh_id = 0 -class TestMuxTunnelBase(object): +class TestMuxTunnelBase(): APP_MUX_CABLE = "MUX_CABLE_TABLE" APP_NEIGH_TABLE = "NEIGH_TABLE" APP_TUNNEL_DECAP_TABLE_NAME = "TUNNEL_DECAP_TABLE" @@ -56,8 +56,8 @@ class TestMuxTunnelBase(object): SAI_ROUTER_INTERFACE_ATTR_TYPE = "SAI_ROUTER_INTERFACE_ATTR_TYPE" SAI_ROUTER_INTERFACE_TYPE_VLAN = "SAI_ROUTER_INTERFACE_TYPE_VLAN" - - DEFAULT_TUNNEL_PARAMS = { + + DEFAULT_TUNNEL_PARAMS = { "tunnel_type": "IPINIP", "dst_ip": SELF_IPV4, "dscp_mode": "uniform", @@ -65,10 +65,10 @@ class TestMuxTunnelBase(object): "ttl_mode": "pipe" } - DEFAULT_PEER_SWITCH_PARAMs = { + DEFAULT_PEER_SWITCH_PARAMS = { "address_ipv4": PEER_IPV4 } - + ecn_modes_map = { "standard" : "SAI_TUNNEL_DECAP_ECN_MODE_STANDARD", "copy_from_outer": "SAI_TUNNEL_DECAP_ECN_MODE_COPY_FROM_OUTER" @@ -84,7 +84,6 @@ class TestMuxTunnelBase(object): "uniform" : "SAI_TUNNEL_TTL_MODE_UNIFORM_MODEL" } - def create_vlan_interface(self, dvs): confdb = dvs.get_config_db() @@ -105,19 +104,19 @@ def create_vlan_interface(self, dvs): dvs.runcmd("config interface startup Ethernet4") dvs.runcmd("config interface startup Ethernet8") - def create_mux_cable(self, confdb): - - fvs = { "server_ipv4":self.SERV1_IPV4+self.IPV4_MASK, "server_ipv6":self.SERV1_IPV6+self.IPV6_MASK } + fvs = {"server_ipv4": self.SERV1_IPV4+self.IPV4_MASK, + "server_ipv6": self.SERV1_IPV6+self.IPV6_MASK} confdb.create_entry(self.CONFIG_MUX_CABLE, "Ethernet0", fvs) - fvs = { "server_ipv4":self.SERV2_IPV4+self.IPV4_MASK, "server_ipv6":self.SERV2_IPV6+self.IPV6_MASK } + fvs = {"server_ipv4": self.SERV2_IPV4+self.IPV4_MASK, + "server_ipv6": self.SERV2_IPV6+self.IPV6_MASK} confdb.create_entry(self.CONFIG_MUX_CABLE, "Ethernet4", fvs) - fvs = { "server_ipv4":self.SERV3_IPV4+self.IPV4_MASK, "server_ipv6":self.SERV3_IPV6+self.IPV6_MASK } + fvs = {"server_ipv4": self.SERV3_IPV4+self.IPV4_MASK, + "server_ipv6": self.SERV3_IPV6+self.IPV6_MASK} confdb.create_entry(self.CONFIG_MUX_CABLE, "Ethernet8", fvs) - def set_mux_state(self, appdb, ifname, state_change): ps = swsscommon.ProducerStateTable(appdb, self.APP_MUX_CABLE) @@ -128,13 +127,11 @@ def set_mux_state(self, appdb, ifname, state_change): time.sleep(1) - def get_switch_oid(self, asicdb): # Assumes only one switch is ever present keys = asicdb.wait_for_n_keys(self.ASIC_SWITCH_TABLE, 1) return keys[0] - def get_vlan_rif_oid(self, asicdb): # create_vlan_interface should be called before this method # Assumes only one VLAN RIF is present @@ -149,7 +146,6 @@ def get_vlan_rif_oid(self, asicdb): return vlan_oid - def check_neigh_in_asic_db(self, asicdb, ip, expected=True): rif_oid = self.get_vlan_rif_oid(asicdb) switch_oid = self.get_switch_oid(asicdb) @@ -158,7 +154,7 @@ def check_neigh_in_asic_db(self, asicdb, ip, expected=True): "rif": rif_oid, "switch_id": switch_oid } - expected_key = json.dumps(neigh_key_map, sort_keys=True, separators=(',',':')) + expected_key = json.dumps(neigh_key_map, sort_keys=True, separators=(',', ':')) if expected: nbr_keys = asicdb.wait_for_matching_keys(self.ASIC_NEIGH_TABLE, [expected_key]) @@ -172,12 +168,11 @@ def check_neigh_in_asic_db(self, asicdb, ip, expected=True): return '' - def check_tnl_nexthop_in_asic_db(self, asicdb, expected=1): global tunnel_nh_id - nh = asicdb.wait_for_n_keys(self.ASIC_NEXTHOP_TABLE, expected, polling_config=PollingConfig(strict=False)) + nh = asicdb.wait_for_n_keys(self.ASIC_NEXTHOP_TABLE, expected) for key in nh: fvs = asicdb.get_entry(self.ASIC_NEXTHOP_TABLE, key) @@ -186,7 +181,6 @@ def check_tnl_nexthop_in_asic_db(self, asicdb, expected=1): assert tunnel_nh_id - def check_nexthop_in_asic_db(self, asicdb, key, standby=False): fvs = asicdb.get_entry(self.ASIC_ROUTE_TABLE, key) @@ -199,7 +193,6 @@ def check_nexthop_in_asic_db(self, asicdb, key, standby=False): else: assert (nhid != tunnel_nh_id) - def check_nexthop_group_in_asic_db(self, asicdb, key, num_tnl_nh=0): fvs = asicdb.get_entry("ASIC_STATE:SAI_OBJECT_TYPE_ROUTE_ENTRY", key) @@ -216,14 +209,13 @@ def check_nexthop_group_in_asic_db(self, asicdb, key, num_tnl_nh=0): for k in keys: fvs = asicdb.get_entry("ASIC_STATE:SAI_OBJECT_TYPE_NEXT_HOP_GROUP_MEMBER", k) assert fvs["SAI_NEXT_HOP_GROUP_MEMBER_ATTR_NEXT_HOP_GROUP_ID"] == nhg_id - + # Count the number of Nexthop member pointing to tunnel if fvs["SAI_NEXT_HOP_GROUP_MEMBER_ATTR_NEXT_HOP_ID"] == tunnel_nh_id: - count += 1 + count += 1 assert num_tnl_nh == count - def add_neighbor(self, dvs, ip, mac): if ip_address(ip).version == 6: dvs.runcmd("ip -6 neigh replace " + ip + " lladdr " + mac + " dev Vlan1000") @@ -275,7 +267,9 @@ def create_and_test_neighbor(self, confdb, appdb, asicdb, dvs, dvs_route): # In standby mode, the entry must not be added to Neigh table but Route asicdb.wait_for_matching_keys(self.ASIC_NEIGH_TABLE, existing_keys) - dvs_route.check_asicdb_route_entries([self.SERV2_IPV4+self.IPV4_MASK, self.SERV2_IPV6+self.IPV6_MASK]) + dvs_route.check_asicdb_route_entries( + [self.SERV2_IPV4+self.IPV4_MASK, self.SERV2_IPV6+self.IPV6_MASK] + ) # The first standby route also creates as tunnel Nexthop self.check_tnl_nexthop_in_asic_db(asicdb, 3) @@ -285,16 +279,19 @@ def create_and_test_neighbor(self, confdb, appdb, asicdb, dvs, dvs_route): asicdb.wait_for_deleted_entry(self.ASIC_NEIGH_TABLE, srv1_v4) asicdb.wait_for_deleted_entry(self.ASIC_NEIGH_TABLE, srv1_v6) - dvs_route.check_asicdb_route_entries([self.SERV1_IPV4+self.IPV4_MASK, self.SERV1_IPV6+self.IPV6_MASK]) + dvs_route.check_asicdb_route_entries( + [self.SERV1_IPV4+self.IPV4_MASK, self.SERV1_IPV6+self.IPV6_MASK] + ) # Change state to Active. This will add Neigh and delete Route self.set_mux_state(appdb, "Ethernet4", "active") - dvs_route.check_asicdb_deleted_route_entries([self.SERV2_IPV4+self.IPV4_MASK, self.SERV2_IPV6+self.IPV6_MASK]) + dvs_route.check_asicdb_deleted_route_entries( + [self.SERV2_IPV4+self.IPV4_MASK, self.SERV2_IPV6+self.IPV6_MASK] + ) self.check_neigh_in_asic_db(asicdb, self.SERV2_IPV4) self.check_neigh_in_asic_db(asicdb, self.SERV2_IPV6) - def create_and_test_fdb(self, appdb, asicdb, dvs, dvs_route): self.set_mux_state(appdb, "Ethernet0", "active") @@ -354,7 +351,10 @@ def create_and_test_route(self, appdb, asicdb, dvs, dvs_route): rtprefix = "2.3.4.0/24" - dvs.runcmd("vtysh -c \"configure terminal\" -c \"ip route " + rtprefix + " " + self.SERV1_IPV4 + "\"") + dvs.runcmd( + "vtysh -c \"configure terminal\" -c \"ip route " + rtprefix + + " " + self.SERV1_IPV4 + "\"" + ) pdb = dvs.get_app_db() pdb.wait_for_entry("ROUTE_TABLE", rtprefix) @@ -406,9 +406,14 @@ def create_and_test_route(self, appdb, asicdb, dvs, dvs_route): dvs_route.check_asicdb_deleted_route_entries([rtprefix]) ps = swsscommon.ProducerStateTable(pdb.db_connection, "ROUTE_TABLE") - - fvs = swsscommon.FieldValuePairs([("nexthop", self.SERV1_IPV4 + "," + self.SERV2_IPV4), ("ifname", "Vlan1000,Vlan1000")]) - + + fvs = swsscommon.FieldValuePairs( + [ + ("nexthop", self.SERV1_IPV4 + "," + self.SERV2_IPV4), + ("ifname", "Vlan1000,Vlan1000") + ] + ) + ps.set(rtprefix, fvs) # Check if route was propagated to ASIC DB @@ -445,7 +450,12 @@ def create_and_test_route(self, appdb, asicdb, dvs, dvs_route): ps = swsscommon.ProducerStateTable(pdb.db_connection, "ROUTE_TABLE") - fvs = swsscommon.FieldValuePairs([("nexthop", self.SERV1_IPV6 + "," + self.SERV2_IPV6), ("ifname", "tun0,tun0")]) + fvs = swsscommon.FieldValuePairs( + [ + ("nexthop", self.SERV1_IPV6 + "," + self.SERV2_IPV6), + ("ifname", "tun0,tun0") + ] + ) ps.set(rtprefix, fvs) @@ -473,7 +483,6 @@ def create_and_test_route(self, appdb, asicdb, dvs, dvs_route): ps._del(rtprefix) - def get_expected_sai_qualifiers(self, portlist, dvs_acl): expected_sai_qualifiers = { "SAI_ACL_ENTRY_ATTR_PRIORITY": self.ACL_PRIORITY, @@ -482,8 +491,7 @@ def get_expected_sai_qualifiers(self, portlist, dvs_acl): return expected_sai_qualifiers - - def create_and_test_acl(self, appdb, asicdb, dvs, dvs_acl): + def create_and_test_acl(self, appdb, dvs_acl): # Start with active, verify NO ACL rules exists self.set_mux_state(appdb, "Ethernet0", "active") @@ -499,7 +507,7 @@ def create_and_test_acl(self, appdb, asicdb, dvs, dvs_acl): # Set two mux ports to standby, verify ACL rule with inport bitmap (2 ports) self.set_mux_state(appdb, "Ethernet4", "standby") - sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0","Ethernet4"], dvs_acl) + sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0", "Ethernet4"], dvs_acl) dvs_acl.verify_acl_rule(sai_qualifier, action="DROP", priority=self.ACL_PRIORITY) # Set one mux port to active, verify ACL rule with inport bitmap (1 port) @@ -518,7 +526,7 @@ def create_and_test_acl(self, appdb, asicdb, dvs, dvs_acl): # Verify change while setting unknown from active self.set_mux_state(appdb, "Ethernet4", "unknown") - sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0","Ethernet4"], dvs_acl) + sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0", "Ethernet4"], dvs_acl) dvs_acl.verify_acl_rule(sai_qualifier, action="DROP", priority=self.ACL_PRIORITY) self.set_mux_state(appdb, "Ethernet0", "active") @@ -526,16 +534,15 @@ def create_and_test_acl(self, appdb, asicdb, dvs, dvs_acl): dvs_acl.verify_acl_rule(sai_qualifier, action="DROP", priority=self.ACL_PRIORITY) self.set_mux_state(appdb, "Ethernet0", "standby") - sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0","Ethernet4"], dvs_acl) + sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0", "Ethernet4"], dvs_acl) dvs_acl.verify_acl_rule(sai_qualifier, action="DROP", priority=self.ACL_PRIORITY) # Verify no change while setting unknown from standby self.set_mux_state(appdb, "Ethernet0", "unknown") - sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0","Ethernet4"], dvs_acl) + sai_qualifier = self.get_expected_sai_qualifiers(["Ethernet0", "Ethernet4"], dvs_acl) dvs_acl.verify_acl_rule(sai_qualifier, action="DROP", priority=self.ACL_PRIORITY) - - def create_and_test_metrics(self, appdb, statedb, dvs): + def create_and_test_metrics(self, appdb, statedb): # Set to active and test attributes for start and end time self.set_mux_state(appdb, "Ethernet0", "active") @@ -549,7 +556,7 @@ def create_and_test_metrics(self, appdb, statedb, dvs): assert fvs != {} start = end = False - for f,v in fvs.items(): + for f, _ in fvs.items(): if f == "orch_switch_active_start": start = True elif f == "orch_switch_active_end": @@ -571,7 +578,7 @@ def create_and_test_metrics(self, appdb, statedb, dvs): assert fvs != {} start = end = False - for f,v in fvs.items(): + for f, v in fvs.items(): if f == "orch_switch_standby_start": start = True elif f == "orch_switch_standby_end": @@ -580,17 +587,14 @@ def create_and_test_metrics(self, appdb, statedb, dvs): assert start assert end - def check_interface_exists_in_asicdb(self, asicdb, sai_oid): asicdb.wait_for_entry(self.ASIC_RIF_TABLE, sai_oid) return True - def check_vr_exists_in_asicdb(self, asicdb, sai_oid): asicdb.wait_for_entry(self.ASIC_VRF_TABLE, sai_oid) return True - def create_and_test_peer(self, asicdb): """ Create PEER entry verify all needed enties in ASIC DB exists """ @@ -634,7 +638,6 @@ def create_and_test_peer(self, asicdb): else: assert False, "Field %s is not tested" % field - def check_tunnel_termination_entry_exists_in_asicdb(self, asicdb, tunnel_sai_oid, dst_ips): tunnel_term_entries = asicdb.wait_for_n_keys(self.ASIC_TUNNEL_TERM_ENTRIES, len(dst_ips)) @@ -657,7 +660,6 @@ def check_tunnel_termination_entry_exists_in_asicdb(self, asicdb, tunnel_sai_oid else: assert False, "Field %s is not tested" % field - def create_and_test_tunnel(self, db, asicdb, tunnel_name, tunnel_params): """ Create tunnel and verify all needed enties in ASIC DB exists """ @@ -696,8 +698,9 @@ def create_and_test_tunnel(self, db, asicdb, tunnel_name, tunnel_params): else: assert False, "Field %s is not tested" % field - self.check_tunnel_termination_entry_exists_in_asicdb(asicdb, tunnel_sai_obj, tunnel_params["dst_ip"].split(",")) - + self.check_tunnel_termination_entry_exists_in_asicdb( + asicdb, tunnel_sai_obj, tunnel_params["dst_ip"].split(",") + ) def remove_and_test_tunnel(self, db, asicdb, tunnel_name): """ Removes tunnel and checks that ASIC db is clear""" @@ -712,7 +715,7 @@ def remove_and_test_tunnel(self, db, asicdb, tunnel_name): status, fvs = tunnel_table.get(tunnel_sai_obj) # get overlay loopback interface oid to check if it is deleted with the tunnel - overlay_infs_id = {f:v for f,v in fvs}["SAI_TUNNEL_ATTR_OVERLAY_INTERFACE"] + overlay_infs_id = {f:v for f, v in fvs}["SAI_TUNNEL_ATTR_OVERLAY_INTERFACE"] ps = swsscommon.ProducerStateTable(db, self.APP_TUNNEL_DECAP_TABLE_NAME) ps.set(tunnel_name, create_fvs(), 'DEL') @@ -725,8 +728,10 @@ def remove_and_test_tunnel(self, db, asicdb, tunnel_name): assert len(tunnel_app_table.getKeys()) == 0 assert not self.check_interface_exists_in_asicdb(asicdb, overlay_infs_id) - - def check_app_db_neigh_table(self, appdb, intf, neigh_ip, mac="00:00:00:00:00:00", expect_entry=True): + def check_app_db_neigh_table( + self, appdb, intf, neigh_ip, + mac="00:00:00:00:00:00", expect_entry=True + ): key = "{}:{}".format(intf, neigh_ip) if isinstance(ip_address(neigh_ip), IPv4Address): family = 'IPv4' @@ -740,7 +745,6 @@ def check_app_db_neigh_table(self, appdb, intf, neigh_ip, mac="00:00:00:00:00:00 else: appdb.wait_for_deleted_keys(self.APP_NEIGH_TABLE, key) - def cleanup_left_over(self, db, asicdb): """ Cleanup APP and ASIC tables """ @@ -756,17 +760,13 @@ def cleanup_left_over(self, db, asicdb): for key in tunnel_app_table.getKeys(): tunnel_table._del(key) - - def ping_ips(self, dvs, ip_list): - for ip in ip_list: - dvs.runcmd(self.PING_CMD.format(ip=ip)) - - def ping_ip(self, dvs, ip): - dvs.runcmd(self.PING_CMD.format(ip=ip)) + dvs.runcmd(self.PING_CMD.format(ip=ip)) - - def check_neighbor_state(self, dvs, dvs_route, neigh_ip, expect_route=True, expect_neigh=False, expected_mac='00:00:00:00:00:00'): + def check_neighbor_state( + self, dvs, dvs_route, neigh_ip, expect_route=True, + expect_neigh=False, expected_mac='00:00:00:00:00:00' + ): """ Checks the status of neighbor entries in APPL and ASIC DB """ @@ -775,7 +775,10 @@ def check_neighbor_state(self, dvs, dvs_route, neigh_ip, expect_route=True, expe app_db = dvs.get_app_db() asic_db = dvs.get_asic_db() prefix = str(ip_network(neigh_ip)) - self.check_app_db_neigh_table(app_db, self.VLAN_1000, neigh_ip, mac=expected_mac, expect_entry=expect_route) + self.check_app_db_neigh_table( + app_db, self.VLAN_1000, neigh_ip, + mac=expected_mac, expect_entry=expect_route + ) if expect_route: self.check_tnl_nexthop_in_asic_db(asic_db) routes = dvs_route.check_asicdb_route_entries([prefix]) @@ -802,7 +805,6 @@ def execute_action(self, action, dvs, test_info): def setup_vlan(self, dvs): self.create_vlan_interface(dvs) - @pytest.fixture(scope='module') def setup_mux_cable(self, dvs): config_db = dvs.get_config_db() @@ -818,7 +820,11 @@ def setup_tunnel(self, dvs): @pytest.fixture def setup_peer_switch(self, dvs): config_db = dvs.get_config_db() - config_db.create_entry(self.CONFIG_PEER_SWITCH, self.PEER_SWITCH_HOST, self.DEFAULT_PEER_SWITCH_PARAMs) + config_db.create_entry( + self.CONFIG_PEER_SWITCH, + self.PEER_SWITCH_HOST, + self.DEFAULT_PEER_SWITCH_PARAMS + ) @pytest.fixture def remove_peer_switch(self, dvs): @@ -868,7 +874,7 @@ def ips_for_test(self, server_test_ips, neigh_test_ips, neigh_miss_test_sequence return server_test_ips if step[TEST_ACTION] == PING_NEIGH: return neigh_test_ips - + # If we got here, the test sequence did not contain a ping command pytest.fail('No ping command found in test sequence {}'.format(neigh_miss_test_sequence)) @@ -884,7 +890,11 @@ def ip_to_intf_map(self, server_test_ips, neigh_test_ips): } return map - @pytest.fixture(params=NEIGH_MISS_TESTS, ids=['->'.join([step[TEST_ACTION] for step in scenario]) for scenario in NEIGH_MISS_TESTS]) + @pytest.fixture( + params=NEIGH_MISS_TESTS, + ids=['->'.join([step[TEST_ACTION] for step in scenario]) + for scenario in NEIGH_MISS_TESTS] + ) def neigh_miss_test_sequence(self, request): return request.param @@ -921,7 +931,6 @@ def test_Tunnel(self, dvs, setup_tunnel, testlog): # create tunnel IPv4 tunnel self.create_and_test_tunnel(db, asicdb, self.MUX_TUNNEL_0, self.DEFAULT_TUNNEL_PARAMS) - def test_Peer(self, dvs, setup_peer_switch, testlog): """ test IPv4 Mux tunnel creation """ @@ -929,56 +938,52 @@ def test_Peer(self, dvs, setup_peer_switch, testlog): self.create_and_test_peer(asicdb) - def test_Neighbor(self, dvs, dvs_route, testlog): """ test Neighbor entries and mux state change """ confdb = dvs.get_config_db() - appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) asicdb = dvs.get_asic_db() self.create_and_test_neighbor(confdb, appdb, asicdb, dvs, dvs_route) - def test_Fdb(self, dvs, dvs_route, testlog): """ test Fdb entries and mux state change """ - appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) asicdb = dvs.get_asic_db() self.create_and_test_fdb(appdb, asicdb, dvs, dvs_route) - def test_Route(self, dvs, dvs_route, testlog): """ test Route entries and mux state change """ - appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) asicdb = dvs.get_asic_db() self.create_and_test_route(appdb, asicdb, dvs, dvs_route) - def test_acl(self, dvs, dvs_acl, testlog): """ test acl and mux state change """ - appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) asicdb = dvs.get_asic_db() - self.create_and_test_acl(appdb, asicdb, dvs, dvs_acl) + self.create_and_test_acl(appdb, dvs_acl) def test_mux_metrics(self, dvs, testlog): """ test metrics for mux state change """ - appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) + appdb = swsscommon.DBConnector(swsscommon.APPL_DB, dvs.redis_sock, 0) statedb = dvs.get_state_db() - self.create_and_test_metrics(appdb, statedb, dvs) + self.create_and_test_metrics(appdb, statedb) def test_neighbor_miss( self, dvs, dvs_route, ips_for_test, neigh_miss_test_sequence, ip_to_intf_map, intf_fdb_map, neighbor_cleanup, setup_vlan, setup_mux_cable, setup_tunnel, setup_peer_switch, testlog - ): + ): ip = ips_for_test[0] intf = ip_to_intf_map[ip] mac = intf_fdb_map[intf] @@ -1001,14 +1006,15 @@ def test_neighbor_miss( def test_neighbor_miss_no_peer( self, dvs, dvs_route, setup_vlan, setup_mux_cable, setup_tunnel, remove_peer_switch, neighbor_cleanup, testlog - ): + ): """ test neighbor miss with no peer switch configured No new entries are expected in APPL_DB or ASIC_DB """ - test_ips = [self.NEIGH3_IPV4, self.SERV3_IPV4] + test_ips = [self.NEIGH3_IPV4, self.SERV3_IPV4, self.NEIGH1_IPV6, self.SERV1_IPV6] - self.ping_ips(dvs, test_ips) + for ip in test_ips: + self.ping_ip(dvs, ip) for ip in test_ips: self.check_neighbor_state(dvs, dvs_route, ip, expect_route=False)