Skip to content

Commit

Permalink
tests: add integration tests for IPv6
Browse files Browse the repository at this point in the history
Run also all IPv4 tests with IPv6 enabled to check for regressions
(broken IPv4 because of enabled IPv6).

QubesOS/qubes-issues#718
  • Loading branch information
marmarek committed Dec 5, 2017
1 parent d2e01c5 commit 7e143da
Showing 1 changed file with 374 additions and 0 deletions.
374 changes: 374 additions & 0 deletions qubes/tests/integ/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,375 @@ def test_212_custom_ip_firewall(self):
nc.terminate()
self.loop.run_until_complete(nc.wait())

class VmIPv6NetworkingMixin(VmNetworkingMixin):
test_ip6 = '2000:abcd::1'

ping6_cmd = 'ping6 -W 1 -n -c 1 {target}'

def setUp(self):
super(VmIPv6NetworkingMixin, self).setUp()
self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6)
self.ping6_name = self.ping6_cmd.format(target=self.test_name)

def configure_netvm(self):
self.testnetvm.features['ipv6'] = True
super(VmIPv6NetworkingMixin, self).configure_netvm()

def run_netvm_cmd(cmd):
if self.run_cmd(self.testnetvm, cmd) != 0:
self.fail("Command '%s' failed" % cmd)

run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
run_netvm_cmd(
"ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6))
# ignore failure
self.run_cmd(self.testnetvm, "killall --wait dnsmasq")
run_netvm_cmd(
"dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".
format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name))

def test_500_ipv6_simple_networking(self):
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)


def test_510_ipv6_simple_proxyvm(self):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.loop.run_until_complete(self.proxy.create_on_disk())
self.testvm1.netvm = self.proxy
self.app.save()

self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP from AppVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by IP from AppVM failed")


@qubes.tests.expectedFailureIfTemplate('debian-7')
@unittest.skipUnless(spawn.find_executable('xdotool'),
"xdotool not installed")
def test_520_ipv6_simple_proxyvm_nm(self):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.proxy.features['service.network-manager'] = True
self.testvm1.netvm = self.proxy
self.app.save()

self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name failed")

# reconnect to make sure that device was configured by NM
self.assertEqual(
self.run_cmd(self.proxy, "nmcli device disconnect eth0",
user="user"),
0, "Failed to disconnect eth0 using nmcli")

self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Network should be disabled, but apparently it isn't")
self.assertEqual(
self.run_cmd(self.proxy,
'nmcli connection up "VM uplink eth0" ifname eth0',
user="user"),
0, "Failed to connect eth0 using nmcli")
self.assertEqual(self.run_cmd(self.proxy, "nm-online",
user="user"), 0,
"Failed to wait for NM connection")

# wait for duplicate-address-detection to complete - by default it has
# 1s timeout
time.sleep(2)

# check for nm-applet presence
self.assertEqual(subprocess.call([
'xdotool', 'search', '--class', '{}:nm-applet'.format(
self.proxy.name)],
stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP failed (after NM reconnection")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name failed (after NM reconnection)")


def test_530_ipv6_firewallvm_firewall(self):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()

if self.run_cmd(self.testnetvm, 'ncat -h') != 0:
self.skipTest('nmap ncat not installer')

# block all for first

self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
self.testvm1.firewall.save()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'ncat -l --send-only -e /bin/hostname -k 1234'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
"Ping by name from ProxyVM failed")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP should be blocked")

nc_cmd = "ncat -w 1 --recv-only {} 1234".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")

# block all except ICMP

self.testvm1.firewall.rules = [(
qubes.firewall.Rule(None, action='accept', proto='icmp')
)]
self.testvm1.firewall.save()
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name should be blocked")

# all TCP still blocked

self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()

# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")

# block all except target

self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip6,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()

# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection failed (should be allowed now)")

# block all except target - by name

self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept',
dsthost=self.test_name,
proto='tcp', dstports=1234),
]
self.testvm1.firewall.save()

# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP (IPv6) connection failed (should be allowed now)")
self.assertEqual(self.run_cmd(self.testvm1,
nc_cmd.replace(self.test_ip6, self.test_ip)),
0,
"TCP (IPv4) connection failed (should be allowed now)")

# allow all except target

self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6,
proto='tcp', dstports=1234),
qubes.firewall.Rule(action='accept'),
]
self.testvm1.firewall.save()

# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())


def test_540_ipv6_inter_vm(self):
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy

self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('vm2'),
label='red')
self.loop.run_until_complete(self.testvm2.create_on_disk())
self.testvm2.netvm = self.proxy
self.app.save()

self.loop.run_until_complete(asyncio.wait([
self.testvm1.start(),
self.testvm2.start()]))

self.assertNotEqual(self.run_cmd(self.testvm1,
self.ping_cmd.format(target=self.testvm2.ip6)), 0)

self.testvm2.netvm = self.testnetvm

self.assertNotEqual(self.run_cmd(self.testvm1,
self.ping_cmd.format(target=self.testvm2.ip6)), 0)
self.assertNotEqual(self.run_cmd(self.testvm2,
self.ping_cmd.format(target=self.testvm1.ip6)), 0)

self.testvm1.netvm = self.testnetvm

self.assertNotEqual(self.run_cmd(self.testvm1,
self.ping_cmd.format(target=self.testvm2.ip6)), 0)
self.assertNotEqual(self.run_cmd(self.testvm2,
self.ping_cmd.format(target=self.testvm1.ip6)), 0)



def test_550_ipv6_spoof_ip(self):
"""Test if VM IP spoofing is blocked"""
self.loop.run_until_complete(self.testvm1.start())

self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
# add a simple rule counting packets
self.assertEqual(self.run_cmd(self.testnetvm,
'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format(
self.testvm1.ip6)), 0)
self.loop.run_until_complete(self.testvm1.run_for_stdio(
'ip -6 addr flush dev eth0 && '
'ip -6 addr add {}/128 dev eth0 && '
'ip -6 route add default via {} dev eth0'.format(
self.testvm1.visible_ip6 + '1',
self.testvm1.visible_gateway6),
user='root'))
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Spoofed ping should be blocked")
try:
(output, _) = self.loop.run_until_complete(
self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT',
user='root'))
except subprocess.CalledProcessError:
self.fail('ip6tables -nxvL INPUT failed')

output = output.decode().splitlines()
packets = output[2].lstrip().split()[0]
self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')

def test_710_ipv6_custom_ip_simple(self):
'''Custom AppVM IP'''
self.testvm1.ip6 = '2000:aaaa:bbbb::1'
self.app.save()
self.loop.run_until_complete(self.testvm1.start())
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)

def test_711_ipv6_custom_ip_proxy(self):
'''Custom ProxyVM IP'''
self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.provides_network = True
self.proxy.netvm = self.testnetvm
self.testvm1.ip6 = '2000:aaaa:bbbb::1'
self.testvm1.netvm = self.proxy
self.app.save()

self.loop.run_until_complete(self.testvm1.start())

self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)

def test_712_ipv6_custom_ip_firewall(self):
'''Custom VM IP and firewall'''
self.testvm1.ip6 = '2000:aaaa:bbbb::1'

self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
name=self.make_vm_name('proxy'),
label='red')
self.proxy.provides_network = True
self.loop.run_until_complete(self.proxy.create_on_disk())
self.proxy.netvm = self.testnetvm
self.testvm1.netvm = self.proxy
self.app.save()

if self.run_cmd(self.testnetvm, 'nc -h 2>&1|grep -q nmap.org') == 0:
nc_version = NcVersion.Nmap
else:
nc_version = NcVersion.Trad

# block all but ICMP and DNS

self.testvm1.firewall.rules = [
qubes.firewall.Rule(None, action='accept', proto='icmp'),
qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
]
self.testvm1.firewall.save()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
"Ping by IP from ProxyVM failed")
self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
"Ping by name from ProxyVM failed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip6)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())

# noinspection PyAttributeOutsideInit
class VmUpdatesMixin(object):
Expand Down Expand Up @@ -960,6 +1329,11 @@ def load_tests(loader, tests, pattern):
'VmNetworking_' + template,
(VmNetworkingMixin, qubes.tests.SystemTestCase),
{'template': template})))
tests.addTests(loader.loadTestsFromTestCase(
type(
'VmIPv6Networking_' + template,
(VmIPv6NetworkingMixin, qubes.tests.SystemTestCase),
{'template': template})))
tests.addTests(loader.loadTestsFromTestCase(
type(
'VmUpdates_' + template,
Expand Down

0 comments on commit 7e143da

Please sign in to comment.