Skip to content

Commit

Permalink
Fix BGP Allow List test case
Browse files Browse the repository at this point in the history
Signed-off-by: Abhishek Dosi <[email protected]>
  • Loading branch information
abdosi committed Feb 3, 2021
1 parent 5d9664c commit 5e82113
Showing 1 changed file with 36 additions and 97 deletions.
133 changes: 36 additions & 97 deletions tests/bgp/test_bgp_allow_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
logger = logging.getLogger(__name__)

CONSTANTS_FILE = '/etc/sonic/constants.yml'

ALLOW_LIST_PREFIX_JSON_FILE = '/tmp/allow_list.json'
EXABGP_BASE_PORT = 5000
EXABGP_BASE_PORT_V6 = 6000

Expand All @@ -38,23 +38,25 @@
}

TEST_COMMUNITY = '1010:1010'
DROP_COMMUNITY = '5060:12345'
DROP_COMMUNITY = ''
DEPLOYMENT_ID = '0'
DEFAULT_ACTION = ''

ALLOW_LIST = {
'BGP_ALLOWED_PREFIXES': {
'DEPLOYMENT_ID|{}|{}'.format(DEPLOYMENT_ID, TEST_COMMUNITY): {
'prefixes_v4': PREFIX_LISTS['ALLOWED_WITH_COMMUNITY'],
'prefixes_v6': PREFIX_LISTS['ALLOWED_WITH_COMMUNITY_V6']
'prefixes_v6': PREFIX_LISTS['ALLOWED_WITH_COMMUNITY_V6'],
'default_action':''
},
'DEPLOYMENT_ID|{}'.format(DEPLOYMENT_ID): {
'prefixes_v4': PREFIX_LISTS['ALLOWED'],
'prefixes_v6': PREFIX_LISTS['ALLOWED_V6']
'prefixes_v6': PREFIX_LISTS['ALLOWED_V6'],
'default_action':''
}
}
}


@pytest.fixture(scope='module')
def setup(tbinfo, nbrhosts, duthosts, rand_one_dut_hostname):
duthost = duthosts[rand_one_dut_hostname]
Expand All @@ -67,11 +69,19 @@ def setup(tbinfo, nbrhosts, duthosts, rand_one_dut_hostname):
pytest.skip('No file {} on DUT, BGP Allow List is not supported')

constants = yaml.safe_load(duthost.shell('cat {}'.format(CONSTANTS_FILE))['stdout'])

global DEFAULT_ACTION
try:
constants['constants']['bgp']['allow_list']['default_action']
DEFAULT_ACTION = constants['constants']['bgp']['allow_list']['default_action']
except KeyError as e:
pytest.skip('No BGP Allow List configuration in {}, BGP Allow List is not supported.'.format(CONSTANTS_FILE))

global DROP_COMMUNITY
try:
DROP_COMMUNITY = constants['constants']['bgp']['allow_list']['drop_community']
except KeyError as e:
pytest.skip('No BGP Allow List Drop Commnity define in {}, BGP Allow List is not supported.'.format(CONSTANTS_FILE))

setup_info = {}

tor_neighbors = natsorted([neighbor for neighbor in nbrhosts.keys() if neighbor.endswith('T0')])
Expand Down Expand Up @@ -110,23 +120,27 @@ def update_routes(action, ptfip, port, route):
assert r.status_code == 200


@pytest.fixture(scope='module', autouse=True)
def prepare_allow_list(duthosts, rand_one_dut_hostname):
duthost=duthosts[rand_one_dut_hostname]
duthost.copy(content=json.dumps(ALLOW_LIST, indent=2), dest='/tmp/allow_list.json')


@pytest.fixture
def load_remove_allow_list(duthosts, rand_one_dut_hostname):
def load_remove_allow_list(duthosts, rand_one_dut_hostname, request):
duthost = duthosts[rand_one_dut_hostname]
duthost.shell('sonic-cfggen -j /tmp/allow_list.json -w')

allowed_list_prefixes = ALLOW_LIST['BGP_ALLOWED_PREFIXES']

for k,v in allowed_list_prefixes.items():
v['default_action'] = request.param

duthost.copy(content=json.dumps(ALLOW_LIST, indent=3), dest=ALLOW_LIST_PREFIX_JSON_FILE)
duthost.shell('sonic-cfggen -j {} -w'.format(ALLOW_LIST_PREFIX_JSON_FILE))
time.sleep(3)

yield
yield request.param

allow_list_keys = duthost.shell('redis-cli --raw -n 4 keys "BGP_ALLOWED_PREFIXES*"')['stdout_lines']
for key in allow_list_keys:
duthost.shell('redis-cli -n 4 del "{}"'.format(key))
duthost.shell('rm -rf {}'.format(ALLOW_LIST_PREFIX_JSON_FILE))


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -186,23 +200,8 @@ def prepare_eos_routes(setup, ptfhost, build_routes, nbrhosts, tbinfo):
nbrhosts[tor1]['host'].eos_config(lines=no_cmds, parents='router bgp {}'.format(tor1_asn))


class BGPAllowListBase(object):

def bgp_started(self, duthost, tbinfo):
try:
bgp_summary = json.loads(duthost.shell('vtysh -c "show bgp summary json"')['stdout'])
peer_num = len(tbinfo['topo']['properties']['configuration'].keys())
if bgp_summary['ipv4Unicast']['failedPeers'] == 0 and \
bgp_summary['ipv6Unicast']['failedPeers'] == 0 and \
bgp_summary['ipv4Unicast']['totalPeers'] == peer_num and \
bgp_summary['ipv6Unicast']['totalPeers'] == peer_num:
return True
else:
return False
except Exception as e:
logger.info('Unable to get bgp status')
return False

class TestBGPAllowListBase(object):

def check_routes_on_tor1(self, setup, nbrhosts):
tor1 = setup['tor1']
for prefixes in PREFIX_LISTS.values():
Expand Down Expand Up @@ -357,77 +356,17 @@ def check_other_neigh(nbrhosts, permit, node=None, results=None):
results = parallel_run(check_other_neigh, (nbrhosts, permit), {}, other_neighbors, timeout=180)
self.check_results(results)


class TestBGPAllowListPermit(BGPAllowListBase):

@pytest.fixture(scope='class', autouse=True)
def default_action_permit(self, duthosts, rand_one_dut_hostname, tbinfo):
duthost = duthosts[rand_one_dut_hostname]
constants = yaml.safe_load(duthost.shell('cat {}'.format(CONSTANTS_FILE))['stdout'])
default_action = constants['constants']['bgp']['allow_list']['default_action']

if default_action != 'permit':
logger.info('Set constants.bgp.allow_list.default_action to permit')
constants['constants']['bgp']['allow_list']['default_action'] = 'permit'
duthost.copy(content=yaml.safe_dump(constants), dest=CONSTANTS_FILE)

logger.info('Restart BGP for default_action permit to take effect')
duthost.shell('systemctl reset-failed bgp') # Workaround for issue 'Start request repeated too quickly'
duthost.shell('systemctl restart bgp')
pytest_assert(wait_until(300, 30, self.bgp_started, duthost, tbinfo), 'Service bgp not started')

return

def test_empty_allow_list_permit(self, duthosts, rand_one_dut_hostname, setup, nbrhosts):
duthost = duthosts[rand_one_dut_hostname]
self.check_routes_on_tor1(setup, nbrhosts)
self.check_routes_on_dut(duthost)
self.check_routes_on_neighbors_empty_allow_list(nbrhosts, setup, permit=True)

def test_allow_list_permit(self, duthosts, rand_one_dut_hostname, setup, nbrhosts, load_remove_allow_list):
duthost = duthosts[rand_one_dut_hostname]
self.check_routes_on_tor1(setup, nbrhosts)
self.check_routes_on_dut(duthost)
self.check_routes_on_neighbors(nbrhosts, setup, permit=True)


class TestBGPAllowListDeny(BGPAllowListBase):

@pytest.fixture(scope='class', autouse=True)
def default_action_deny(self, duthosts, rand_one_dut_hostname, tbinfo):
duthost=duthosts[rand_one_dut_hostname]
constants = yaml.safe_load(duthost.shell('cat {}'.format(CONSTANTS_FILE))['stdout'])
default_action = constants['constants']['bgp']['allow_list']['default_action']

if default_action != 'deny':
logger.info('Set constants.bgp.allow_list.default_action to deny')
constants['constants']['bgp']['allow_list']['default_action'] = 'deny'
duthost.copy(content=yaml.safe_dump(constants), dest=CONSTANTS_FILE)

logger.info('Restart BGP for default_action deny to take effect')
duthost.shell('systemctl reset-failed bgp') # Workaround for issue 'Start request repeated too quickly'
duthost.shell('systemctl restart bgp')
pytest_assert(wait_until(300, 30, self.bgp_started, duthost, tbinfo), 'Service bgp not started')

yield

logger.info('Set constants.bgp.allow_list.default_action to permit')
constants['constants']['bgp']['allow_list']['default_action'] = 'permit'
duthost.copy(content=yaml.safe_dump(constants), dest=CONSTANTS_FILE)

logger.info('Restart BGP for default_action deny to take effect')
duthost.shell('systemctl reset-failed bgp') # Workaround for issue 'Start request repeated too quickly'
duthost.shell('systemctl restart bgp')
wait_until(300, 20, self.bgp_started, duthost, tbinfo)

def test_empty_allow_list_deny(self, duthosts, rand_one_dut_hostname, setup, nbrhosts):
def test_default_allow_list(self, duthosts, rand_one_dut_hostname, setup, nbrhosts):
permit = True if DEFAULT_ACTION == "permit" else False
duthost = duthosts[rand_one_dut_hostname]
self.check_routes_on_tor1(setup, nbrhosts)
self.check_routes_on_dut(duthost)
self.check_routes_on_neighbors_empty_allow_list(nbrhosts, setup, permit=False)
self.check_routes_on_neighbors_empty_allow_list(nbrhosts, setup, permit)

def test_allow_list_deny(self, duthosts, rand_one_dut_hostname, setup, nbrhosts, load_remove_allow_list):
@pytest.mark.parametrize('load_remove_allow_list', ["deny", "permit"], indirect=['load_remove_allow_list'])
def test_allow_list(self, duthosts, rand_one_dut_hostname, setup, nbrhosts, load_remove_allow_list):
permit = True if load_remove_allow_list == "permit" else False
duthost = duthosts[rand_one_dut_hostname]
self.check_routes_on_tor1(setup, nbrhosts)
self.check_routes_on_dut(duthost)
self.check_routes_on_neighbors(nbrhosts, setup, permit=False)
self.check_routes_on_neighbors(nbrhosts, setup, permit)

0 comments on commit 5e82113

Please sign in to comment.