diff --git a/tests/bgp/test_bgp_allow_list.py b/tests/bgp/test_bgp_allow_list.py index 1dcb7c7afa..e40addbb8f 100644 --- a/tests/bgp/test_bgp_allow_list.py +++ b/tests/bgp/test_bgp_allow_list.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) CONSTANTS_FILE = '/etc/sonic/constants.yml' - +ALLOW_LIST_PREFIX_JSON_FILE = '/tmp/allow_list.json' EXABGP_BASE_PORT = 5000 EXABGP_BASE_PORT_V6 = 6000 @@ -38,23 +38,25 @@ } TEST_COMMUNITY = '1010:1010' -DROP_COMMUNITY = '5060:12345' +DROP_COMMUNITY = '' DEPLOYMENT_ID = '0' +DEFAULT_ACTION = '' ALLOW_LIST = { 'BGP_ALLOWED_PREFIXES': { 'DEPLOYMENT_ID|{}|{}'.format(DEPLOYMENT_ID, TEST_COMMUNITY): { 'prefixes_v4': PREFIX_LISTS['ALLOWED_WITH_COMMUNITY'], - 'prefixes_v6': PREFIX_LISTS['ALLOWED_WITH_COMMUNITY_V6'] + 'prefixes_v6': PREFIX_LISTS['ALLOWED_WITH_COMMUNITY_V6'], + 'default_action':'' }, 'DEPLOYMENT_ID|{}'.format(DEPLOYMENT_ID): { 'prefixes_v4': PREFIX_LISTS['ALLOWED'], - 'prefixes_v6': PREFIX_LISTS['ALLOWED_V6'] + 'prefixes_v6': PREFIX_LISTS['ALLOWED_V6'], + 'default_action':'' } } } - @pytest.fixture(scope='module') def setup(tbinfo, nbrhosts, duthosts, rand_one_dut_hostname): duthost = duthosts[rand_one_dut_hostname] @@ -67,11 +69,19 @@ def setup(tbinfo, nbrhosts, duthosts, rand_one_dut_hostname): pytest.skip('No file {} on DUT, BGP Allow List is not supported') constants = yaml.safe_load(duthost.shell('cat {}'.format(CONSTANTS_FILE))['stdout']) + + global DEFAULT_ACTION try: - constants['constants']['bgp']['allow_list']['default_action'] + DEFAULT_ACTION = constants['constants']['bgp']['allow_list']['default_action'] except KeyError as e: pytest.skip('No BGP Allow List configuration in {}, BGP Allow List is not supported.'.format(CONSTANTS_FILE)) + global DROP_COMMUNITY + try: + DROP_COMMUNITY = constants['constants']['bgp']['allow_list']['drop_community'] + except KeyError as e: + pytest.skip('No BGP Allow List Drop Commnity define in {}, BGP Allow List is not supported.'.format(CONSTANTS_FILE)) + setup_info = {} tor_neighbors = natsorted([neighbor for neighbor in nbrhosts.keys() if neighbor.endswith('T0')]) @@ -110,23 +120,27 @@ def update_routes(action, ptfip, port, route): assert r.status_code == 200 -@pytest.fixture(scope='module', autouse=True) -def prepare_allow_list(duthosts, rand_one_dut_hostname): - duthost=duthosts[rand_one_dut_hostname] - duthost.copy(content=json.dumps(ALLOW_LIST, indent=2), dest='/tmp/allow_list.json') @pytest.fixture -def load_remove_allow_list(duthosts, rand_one_dut_hostname): +def load_remove_allow_list(duthosts, rand_one_dut_hostname, request): duthost = duthosts[rand_one_dut_hostname] - duthost.shell('sonic-cfggen -j /tmp/allow_list.json -w') + + allowed_list_prefixes = ALLOW_LIST['BGP_ALLOWED_PREFIXES'] + + for k,v in allowed_list_prefixes.items(): + v['default_action'] = request.param + + duthost.copy(content=json.dumps(ALLOW_LIST, indent=3), dest=ALLOW_LIST_PREFIX_JSON_FILE) + duthost.shell('sonic-cfggen -j {} -w'.format(ALLOW_LIST_PREFIX_JSON_FILE)) time.sleep(3) - yield + yield request.param allow_list_keys = duthost.shell('redis-cli --raw -n 4 keys "BGP_ALLOWED_PREFIXES*"')['stdout_lines'] for key in allow_list_keys: duthost.shell('redis-cli -n 4 del "{}"'.format(key)) + duthost.shell('rm -rf {}'.format(ALLOW_LIST_PREFIX_JSON_FILE)) @pytest.fixture(scope='module') @@ -186,23 +200,8 @@ def prepare_eos_routes(setup, ptfhost, build_routes, nbrhosts, tbinfo): nbrhosts[tor1]['host'].eos_config(lines=no_cmds, parents='router bgp {}'.format(tor1_asn)) -class BGPAllowListBase(object): - - def bgp_started(self, duthost, tbinfo): - try: - bgp_summary = json.loads(duthost.shell('vtysh -c "show bgp summary json"')['stdout']) - peer_num = len(tbinfo['topo']['properties']['configuration'].keys()) - if bgp_summary['ipv4Unicast']['failedPeers'] == 0 and \ - bgp_summary['ipv6Unicast']['failedPeers'] == 0 and \ - bgp_summary['ipv4Unicast']['totalPeers'] == peer_num and \ - bgp_summary['ipv6Unicast']['totalPeers'] == peer_num: - return True - else: - return False - except Exception as e: - logger.info('Unable to get bgp status') - return False - +class TestBGPAllowListBase(object): + def check_routes_on_tor1(self, setup, nbrhosts): tor1 = setup['tor1'] for prefixes in PREFIX_LISTS.values(): @@ -357,77 +356,17 @@ def check_other_neigh(nbrhosts, permit, node=None, results=None): results = parallel_run(check_other_neigh, (nbrhosts, permit), {}, other_neighbors, timeout=180) self.check_results(results) - -class TestBGPAllowListPermit(BGPAllowListBase): - - @pytest.fixture(scope='class', autouse=True) - def default_action_permit(self, duthosts, rand_one_dut_hostname, tbinfo): - duthost = duthosts[rand_one_dut_hostname] - constants = yaml.safe_load(duthost.shell('cat {}'.format(CONSTANTS_FILE))['stdout']) - default_action = constants['constants']['bgp']['allow_list']['default_action'] - - if default_action != 'permit': - logger.info('Set constants.bgp.allow_list.default_action to permit') - constants['constants']['bgp']['allow_list']['default_action'] = 'permit' - duthost.copy(content=yaml.safe_dump(constants), dest=CONSTANTS_FILE) - - logger.info('Restart BGP for default_action permit to take effect') - duthost.shell('systemctl reset-failed bgp') # Workaround for issue 'Start request repeated too quickly' - duthost.shell('systemctl restart bgp') - pytest_assert(wait_until(300, 30, self.bgp_started, duthost, tbinfo), 'Service bgp not started') - - return - - def test_empty_allow_list_permit(self, duthosts, rand_one_dut_hostname, setup, nbrhosts): - duthost = duthosts[rand_one_dut_hostname] - self.check_routes_on_tor1(setup, nbrhosts) - self.check_routes_on_dut(duthost) - self.check_routes_on_neighbors_empty_allow_list(nbrhosts, setup, permit=True) - - def test_allow_list_permit(self, duthosts, rand_one_dut_hostname, setup, nbrhosts, load_remove_allow_list): - duthost = duthosts[rand_one_dut_hostname] - self.check_routes_on_tor1(setup, nbrhosts) - self.check_routes_on_dut(duthost) - self.check_routes_on_neighbors(nbrhosts, setup, permit=True) - - -class TestBGPAllowListDeny(BGPAllowListBase): - - @pytest.fixture(scope='class', autouse=True) - def default_action_deny(self, duthosts, rand_one_dut_hostname, tbinfo): - duthost=duthosts[rand_one_dut_hostname] - constants = yaml.safe_load(duthost.shell('cat {}'.format(CONSTANTS_FILE))['stdout']) - default_action = constants['constants']['bgp']['allow_list']['default_action'] - - if default_action != 'deny': - logger.info('Set constants.bgp.allow_list.default_action to deny') - constants['constants']['bgp']['allow_list']['default_action'] = 'deny' - duthost.copy(content=yaml.safe_dump(constants), dest=CONSTANTS_FILE) - - logger.info('Restart BGP for default_action deny to take effect') - duthost.shell('systemctl reset-failed bgp') # Workaround for issue 'Start request repeated too quickly' - duthost.shell('systemctl restart bgp') - pytest_assert(wait_until(300, 30, self.bgp_started, duthost, tbinfo), 'Service bgp not started') - - yield - - logger.info('Set constants.bgp.allow_list.default_action to permit') - constants['constants']['bgp']['allow_list']['default_action'] = 'permit' - duthost.copy(content=yaml.safe_dump(constants), dest=CONSTANTS_FILE) - - logger.info('Restart BGP for default_action deny to take effect') - duthost.shell('systemctl reset-failed bgp') # Workaround for issue 'Start request repeated too quickly' - duthost.shell('systemctl restart bgp') - wait_until(300, 20, self.bgp_started, duthost, tbinfo) - - def test_empty_allow_list_deny(self, duthosts, rand_one_dut_hostname, setup, nbrhosts): + def test_default_allow_list(self, duthosts, rand_one_dut_hostname, setup, nbrhosts): + permit = True if DEFAULT_ACTION == "permit" else False duthost = duthosts[rand_one_dut_hostname] self.check_routes_on_tor1(setup, nbrhosts) self.check_routes_on_dut(duthost) - self.check_routes_on_neighbors_empty_allow_list(nbrhosts, setup, permit=False) + self.check_routes_on_neighbors_empty_allow_list(nbrhosts, setup, permit) - def test_allow_list_deny(self, duthosts, rand_one_dut_hostname, setup, nbrhosts, load_remove_allow_list): + @pytest.mark.parametrize('load_remove_allow_list', ["deny", "permit"], indirect=['load_remove_allow_list']) + def test_allow_list(self, duthosts, rand_one_dut_hostname, setup, nbrhosts, load_remove_allow_list): + permit = True if load_remove_allow_list == "permit" else False duthost = duthosts[rand_one_dut_hostname] self.check_routes_on_tor1(setup, nbrhosts) self.check_routes_on_dut(duthost) - self.check_routes_on_neighbors(nbrhosts, setup, permit=False) + self.check_routes_on_neighbors(nbrhosts, setup, permit)