Skip to content

Commit

Permalink
tests: use socat instead of nc
Browse files Browse the repository at this point in the history
socat have only one variant, so one command line syntax to handle. It's
also installed by default in Qubes VMs.
  • Loading branch information
marmarek committed Oct 27, 2018
1 parent 08ddeee commit a972c61
Showing 1 changed file with 39 additions and 86 deletions.
125 changes: 39 additions & 86 deletions qubes/tests/integ/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
import qubes.vm.qubesvm
import qubes.vm.appvm

class NcVersion:
Trad = 1
Nmap = 2


# noinspection PyAttributeOutsideInit,PyPep8Naming
class VmNetworkingMixin(object):
Expand Down Expand Up @@ -63,18 +59,6 @@ def run_cmd(self, vm, cmd, user="root"):
return e.returncode
return 0

def check_nc_version(self, vm):
'''
:type self: qubes.tests.SystemTestCase | VMNetworkingMixin
:param vm: VM where check ncat version in
'''
if self.run_cmd(vm, 'nc -h >/dev/null 2>&1') != 0:
self.skipTest('nc not installed')
if self.run_cmd(vm, 'nc -h 2>&1|grep -q nmap.org') == 0:
return NcVersion.Nmap
else:
return NcVersion.Trad

def setUp(self):
'''
:type self: qubes.tests.SystemTestCase | VMNetworkingMixin
Expand Down Expand Up @@ -228,19 +212,15 @@ def test_030_firewallvm_firewall(self):
self.testvm1.netvm = self.proxy
self.app.save()

nc_version = self.check_nc_version(self.testnetvm)

# block all for first

self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
self.testvm1.firewall.save()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
server = self.loop.run_until_complete(self.testnetvm.run(
'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
Expand All @@ -250,11 +230,8 @@ def test_030_firewallvm_firewall(self):
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
"Ping by IP should be blocked")

if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked")

# block all except ICMP
Expand Down Expand Up @@ -283,7 +260,7 @@ def test_030_firewallvm_firewall(self):
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked")

# block all except target
Expand All @@ -297,7 +274,7 @@ def test_030_firewallvm_firewall(self):
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection failed (should be allowed now)")

# allow all except target
Expand All @@ -312,11 +289,11 @@ def test_030_firewallvm_firewall(self):
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
server.terminate()
self.loop.run_until_complete(server.wait())


def test_040_inter_vm(self):
Expand Down Expand Up @@ -479,8 +456,6 @@ def test_202_fake_ip_firewall(self):
self.testvm1.netvm = self.proxy
self.app.save()

nc_version = self.check_nc_version(self.testnetvm)

# block all but ICMP and DNS

self.testvm1.firewall.rules = [
Expand All @@ -491,10 +466,8 @@ def test_202_fake_ip_firewall(self):
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
server = self.loop.run_until_complete(self.testnetvm.run(
'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
Expand All @@ -505,15 +478,12 @@ def test_202_fake_ip_firewall(self):
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
server.terminate()
self.loop.run_until_complete(server.wait())

def test_203_fake_ip_inter_vm_allow(self):
'''Access VM with "fake IP" from other VM (when firewall allows)
Expand Down Expand Up @@ -682,8 +652,6 @@ def test_212_custom_ip_firewall(self):
self.testvm1.netvm = self.proxy
self.app.save()

nc_version = self.check_nc_version(self.testnetvm)

# block all but ICMP and DNS

self.testvm1.firewall.rules = [
Expand All @@ -694,10 +662,8 @@ def test_212_custom_ip_firewall(self):
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
server = self.loop.run_until_complete(self.testnetvm.run(
'socat TCP-LISTEN:1234,fork EXEC:/bin/hostname'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
Expand All @@ -708,15 +674,12 @@ def test_212_custom_ip_firewall(self):
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
server.terminate()
self.loop.run_until_complete(server.wait())

# noinspection PyAttributeOutsideInit,PyPep8Naming
class VmIPv6NetworkingMixin(VmNetworkingMixin):
Expand Down Expand Up @@ -852,18 +815,15 @@ def test_530_ipv6_firewallvm_firewall(self):
self.testvm1.netvm = self.proxy
self.app.save()

if self.run_cmd(self.testnetvm, 'ncat -h') != 0:
self.skipTest('nmap ncat not installed')

# block all for first

self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
self.testvm1.firewall.save()
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'ncat -l --send-only -e /bin/hostname -k 1234'))
server = self.loop.run_until_complete(self.testnetvm.run(
'socat TCP6-LISTEN:1234,fork EXEC:/bin/hostname'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
Expand All @@ -873,8 +833,9 @@ def test_530_ipv6_firewallvm_firewall(self):
self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
"Ping by IP should be blocked")

nc_cmd = "ncat -w 1 --recv-only {} 1234".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection should be blocked")

# block all except ICMP
Expand Down Expand Up @@ -904,7 +865,7 @@ def test_530_ipv6_firewallvm_firewall(self):
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name failed (should be allowed now)")
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection should be blocked")

# block all except target
Expand All @@ -919,7 +880,7 @@ def test_530_ipv6_firewallvm_firewall(self):
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection failed (should be allowed now)")

# block all except target - by name
Expand All @@ -934,10 +895,9 @@ def test_530_ipv6_firewallvm_firewall(self):
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP (IPv6) connection failed (should be allowed now)")
self.assertEqual(self.run_cmd(self.testvm1,
nc_cmd.replace(self.test_ip6, self.test_ip)),
self.assertEqual(self.run_cmd(self.testvm1, client4_cmd),
0,
"TCP (IPv4) connection failed (should be allowed now)")

Expand All @@ -953,11 +913,11 @@ def test_530_ipv6_firewallvm_firewall(self):
# Ugly hack b/c there is no feedback when the rules are actually
# applied
time.sleep(3)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
server.terminate()
self.loop.run_until_complete(server.wait())


def test_540_ipv6_inter_vm(self):
Expand Down Expand Up @@ -1081,8 +1041,6 @@ def test_712_ipv6_custom_ip_firewall(self):
self.testvm1.netvm = self.proxy
self.app.save()

nc_version = self.check_nc_version(self.testnetvm)

# block all but ICMP and DNS

self.testvm1.firewall.rules = [
Expand All @@ -1093,10 +1051,8 @@ def test_712_ipv6_custom_ip_firewall(self):
self.loop.run_until_complete(self.testvm1.start())
self.assertTrue(self.proxy.is_running())

nc = self.loop.run_until_complete(self.testnetvm.run(
'nc -l --send-only -e /bin/hostname -k 1234'
if nc_version == NcVersion.Nmap
else 'while nc -l -e /bin/hostname -p 1234; do true; done'))
server = self.loop.run_until_complete(self.testnetvm.run(
'socat TCP6-LISTEN:1234,fork EXEC:/bin/hostname'))

try:
self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
Expand All @@ -1107,15 +1063,12 @@ def test_712_ipv6_custom_ip_firewall(self):
"Ping by IP should be allowed")
self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
"Ping by name should be allowed")
if nc_version == NcVersion.Nmap:
nc_cmd = "nc -w 1 --recv-only {} 1234".format(self.test_ip6)
else:
nc_cmd = "nc -w 1 {} 1234".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, nc_cmd), 0,
client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
"TCP connection should be blocked")
finally:
nc.terminate()
self.loop.run_until_complete(nc.wait())
server.terminate()
self.loop.run_until_complete(server.wait())

# noinspection PyAttributeOutsideInit,PyPep8Naming
class VmUpdatesMixin(object):
Expand Down

0 comments on commit a972c61

Please sign in to comment.