Skip to content

Commit

Permalink
Add test for invalid service name for old protocol version
Browse files Browse the repository at this point in the history
This would have previously been allowed.  The test currently fails.

Also add various tests for the modern protocol version, which all pass,
and a test for QSB-089.
  • Loading branch information
DemiMarie committed Apr 9, 2024
1 parent 2a302d8 commit a8052a3
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
89 changes: 89 additions & 0 deletions qrexec/tests/socket/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,95 @@ def test_bad_request_id_bad_byte_after_nul_terminator(self):
self.assertEqual(data[:4], b"a\0b\0")
self.assertEqual(data[4:], b"\0" * 28)

@unittest.expectedFailure
def test_bad_old_style_request(self):
agent = self.start_daemon_with_agent()
agent.send_message(qrexec.MSG_HELLO, struct.pack("<L", 2))
message_type, data = agent.recv_message()
self.assertEqual(message_type, qrexec.MSG_HELLO)
(ver,) = struct.unpack("<L", data)
self.assertEqual(ver, 3)

target_domain_name = "target_domain"
ident = b"ab"

self.set_policy_params(1, 0)

for service_name in (b"\0", b"", b"\0a", b"\0+a", b"+", b"+a"):
agent.send_message(
qrexec.MSG_TRIGGER_SERVICE,
struct.pack("<64s32s32s", service_name,
target_domain_name.encode(), ident)
)
message_type, data = agent.recv_message()
self.assertEqual(message_type, qrexec.MSG_SERVICE_REFUSED)
self.assertEqual(len(data), 32)
self.assertEqual(data[:len(ident)], ident)
self.assertEqual(data[len(ident):], b"\0" * (32 - len(ident)))
self.assertFalse(os.path.exists(
os.path.join(self.tempdir, "qrexec-policy-params")
)),

def test_bad_new_style_request(self):
"""
Test that qrexec-daemon rejects various invalid requests.
"""
agent = self.start_daemon_with_agent()
agent.handshake()

target_domain_name = "target_domain"
ident = "ab"

self.set_policy_params(1, 0)

def recv_refused(agent):
message_type, data = agent.recv_message()
self.assertEqual(message_type, qrexec.MSG_SERVICE_REFUSED)
self.assertEqual(len(data), 32)
self.assertEqual(data[:2], b"ab")
self.assertEqual(data[2:], b"\0" * 30)
self.assertFalse(os.path.exists(
os.path.join(self.tempdir, "qrexec-policy-params")
)),

# missing NUL terminator
agent.send_message(
qrexec.MSG_TRIGGER_SERVICE3,
struct.pack("<64s32s", target_domain_name.encode(), ident.encode())
+ b"a",
)
recv_refused(agent)

# leading, interior or trailing NUL before NUL terminator,
# empty service name with argument, or empty service name with no
# argument
for service_name in ("\0", "", "a\0", "\0a", "\0+a", "+", "+a", "a\0a"):
self.send_trigger_service(
agent, target_domain_name, service_name, ident
)
recv_refused(agent)

def test_qsb_089(self):
"""
Test that qrexec-daemon doesn't corrupt memory on empty request
"""
agent = self.start_daemon_with_agent()
agent.handshake()

target_domain_name = "target_domain"
ident = "ab"

self.set_policy_params(1, 0)

agent.send_message(
qrexec.MSG_TRIGGER_SERVICE3,
struct.pack("<64s32s", target_domain_name.encode(), ident.encode()))
# qrexec-daemon will terminate
self.assertEqual(agent.recvall(1), b"")
self.assertFalse(os.path.exists(
os.path.join(self.tempdir, "qrexec-policy-params")
))

def send_trigger_service(
self, agent, target_domain_name: str, service_name: str, ident: str
):
Expand Down
1 change: 1 addition & 0 deletions qrexec/tests/socket/qrexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
MSG_JUST_EXEC = 0x201
MSG_SERVICE_CONNECT = 0x202
MSG_SERVICE_REFUSED = 0x203
MSG_TRIGGER_SERVICE = 0x210
MSG_CONNECTION_TERMINATED = 0x211
MSG_TRIGGER_SERVICE3 = 0x212
MSG_HELLO = 0x300
Expand Down

0 comments on commit a8052a3

Please sign in to comment.