Skip to content

Commit

Permalink
Implement UUID support in qrexec
Browse files Browse the repository at this point in the history
This allows using UUIDs in qrexec policy, using the syntax uuid:VM_UUID.
This works anywhere a VM name is expected.  Since ':' is not allowed in
VM names, there is no ambiguity.
  • Loading branch information
DemiMarie committed Feb 9, 2024
1 parent c918563 commit f82cc26
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 27 deletions.
56 changes: 38 additions & 18 deletions qrexec/policy/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import itertools
import logging
import pathlib
import re
import string

from typing import (
Expand Down Expand Up @@ -235,11 +236,11 @@ def __new__(cls, token: str, *, filepath: Optional[pathlib.Path]=None,
orig_token = token

# first, adjust some aliases
if token == "dom0":
if token in ("dom0", "uuid:00000000-0000-0000-0000-000000000000"):
# TODO: log a warning in Qubes 4.1
token = "@adminvm"

# if user specified just qube name, use it directly
# if user specified just qube name or UUID, use it directly
if not (token.startswith("@") or token == "*"):
return super().__new__(cls, token)

Expand Down Expand Up @@ -300,13 +301,28 @@ def __init__(self, token: str, *, filepath: Optional[pathlib.Path]=None,
# This replaces is_match() and is_match_single().
def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional["VMToken"]=None
) -> bool:
"""Check if this token matches opposite token"""
# pylint: disable=unused-argument
# pylint: disable=unused-argument,too-many-return-statements
if self == "@adminvm":
return other == "@adminvm"
info = system_info["domains"]
if self.startswith("uuid:"):
if other.startswith("uuid:"):
return self == other
try:
return self[5:] == info[str(other)]["uuid"]
except KeyError:
return False
if other.startswith("uuid:"):
try:
return other[5:] == info[str(self)]["uuid"]
except KeyError:
return False
return self == other

def is_special_value(self) -> bool:
Expand Down Expand Up @@ -339,9 +355,12 @@ def expand(self, *, system_info: FullSystemInfo) -> Iterable[VMToken]:
This is used as part of :py:meth:`Policy.collect_targets_for_ask()`.
"""
if self in system_info["domains"]:
yield IntendedTarget(self)

info = system_info["domains"]
if self in info:
if self.startswith("uuid:"):
yield IntendedTarget(type(self)(info[self]))
else:
yield IntendedTarget(self)

class Target(_BaseTarget):
# pylint: disable=missing-docstring
Expand All @@ -362,10 +381,12 @@ def __new__(
return super().__new__(cls, value, filepath=filepath, lineno=lineno) # type: ignore


_uuid_regex = re.compile(r"\A[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}\Z")

# this method (with overloads in subclasses) was verify_target_value
class IntendedTarget(VMToken):
# pylint: disable=missing-docstring
def verify(self, *, system_info: FullSystemInfo) -> VMToken:
def verify(self, *, system_info: FullSystemInfo) -> Optional[VMToken]:
"""Check if given value names valid target
This function check if given value is not only syntactically correct,
Expand Down Expand Up @@ -410,7 +431,7 @@ class WildcardVM(Source, Target):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
Expand Down Expand Up @@ -443,7 +464,7 @@ class AnyVM(Source, Target):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
Expand Down Expand Up @@ -476,7 +497,7 @@ class TypeVM(Source, Target):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
Expand All @@ -499,7 +520,7 @@ class TagVM(Source, Target):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
Expand All @@ -522,7 +543,7 @@ class DispVM(Target, Redirect, IntendedTarget):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
Expand Down Expand Up @@ -556,7 +577,7 @@ class DispVMTemplate(Source, Target, Redirect, IntendedTarget):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
Expand Down Expand Up @@ -590,14 +611,13 @@ class DispVMTag(Source, Target):

def match(
self,
other: Optional[str],
other: str,
*,
system_info: FullSystemInfo,
source: Optional[VMToken]=None
) -> bool:
if isinstance(other, DispVM):
assert source is not None
other = other.get_dispvm_template(source, system_info=system_info)
if isinstance(other, DispVM) and source is not None:
return self == other.get_dispvm_template(source, system_info=system_info)

if not isinstance(other, DispVMTemplate):
# 1) original other may have been neither @dispvm:<name> nor @dispvm
Expand Down
45 changes: 37 additions & 8 deletions qrexec/tests/policy_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,62 +38,71 @@
"default_dispvm": "default-dvm",
"template_for_dispvms": False,
"power_state": "Running",
"uuid": "00000000-0000-0000-0000-000000000000",
},
"test-vm1": {
"tags": ["tag1", "tag2"],
"type": "AppVM",
"default_dispvm": "default-dvm",
"template_for_dispvms": False,
"power_state": "Running",
"uuid": "c9024a97-9b15-46cc-8341-38d75d5d421b",
},
"test-vm2": {
"tags": ["tag2"],
"type": "AppVM",
"default_dispvm": "default-dvm",
"template_for_dispvms": False,
"power_state": "Running",
"uuid": "b3eb69d0-f9d9-4c3c-ad5c-454500303ea4",
},
"test-vm3": {
"tags": ["tag3"],
"type": "AppVM",
"default_dispvm": "default-dvm",
"template_for_dispvms": True,
"power_state": "Halted",
"uuid": "fa6d56e8-a89d-4106-aa62-22e172a43c8b",
},
"default-dvm": {
"tags": [],
"type": "AppVM",
"default_dispvm": "default-dvm",
"template_for_dispvms": True,
"power_state": "Halted",
"uuid": "f3e538bd-4427-4697-bed7-45ef3270df21",
},
"test-invalid-dvm": {
"tags": ["tag1", "tag2"],
"type": "AppVM",
"default_dispvm": "test-vm1",
"template_for_dispvms": False,
"power_state": "Halted",
"uuid": "c4fa3586-a6b6-4dc4-bdda-c9e7375a12b5",
},
"test-no-dvm": {
"tags": ["tag1", "tag2"],
"type": "AppVM",
"default_dispvm": None,
"template_for_dispvms": False,
"power_state": "Halted",
"uuid": "53a450b9-a454-4416-8adb-46812257ad29",
},
"test-template": {
"tags": ["tag1", "tag2"],
"type": "TemplateVM",
"default_dispvm": "default-dvm",
"template_for_dispvms": False,
"power_state": "Halted",
"uuid": "a9fe2b04-9fd5-4e95-be20-162433d64de0",
},
"test-standalone": {
"tags": ["tag1", "tag2"],
"type": "StandaloneVM",
"default_dispvm": "default-dvm",
"template_for_dispvms": False,
"power_state": "Halted",
"uuid": "6d7a02b5-532b-467f-b9fb-6596bae03c33",
},
},
}
Expand All @@ -110,6 +119,11 @@ async def __call__(self, *args, **kwargs):


class TC_00_VMToken(unittest.TestCase):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
for i, j in SYSTEM_INFO["domains"].items():
j["name"] = i

def test_010_Source(self):
# with self.assertRaises(exc.PolicySyntaxError):
# parser.Source(None)
Expand All @@ -120,6 +134,11 @@ def test_010_Source(self):
parser.Source("*")
with self.assertRaises(exc.PolicySyntaxError):
parser.Source("@default")
parser.Source("uuid:d8a249f1-b02b-4944-a9e5-437def2fbe2c")
with self.assertRaises(exc.PolicySyntaxError):
parser.Source("@uuid:")
with self.assertRaises(exc.PolicySyntaxError):
parser.Source("@uuid:d8a249f1-b02b-4944-a9e5-437def2fbe2c")
parser.Source("@type:AppVM")
parser.Source("@tag:tag1")
with self.assertRaises(exc.PolicySyntaxError):
Expand Down Expand Up @@ -150,6 +169,7 @@ def test_020_Target(self):
parser.Target("@dispvm")
parser.Target("@dispvm:default-dvm")
parser.Target("@dispvm:@tag:tag3")
parser.Target("uuid:d8a249f1-b02b-4944-a9e5-437def2fbe2c")

with self.assertRaises(exc.PolicySyntaxError):
parser.Target("@invalid")
Expand All @@ -163,19 +183,22 @@ def test_020_Target(self):
parser.Target("@type:")

def test_021_Target_expand(self):
self.assertCountEqual(
parser.Target("test-vm1").expand(system_info=SYSTEM_INFO),
self.assertEqual(
list(parser.Target("test-vm1").expand(system_info=SYSTEM_INFO)),
["test-vm1"],
)
self.assertCountEqual(
parser.Target("@adminvm").expand(system_info=SYSTEM_INFO),
self.assertEqual(
list(parser.Target("@adminvm").expand(system_info=SYSTEM_INFO)),
["@adminvm"],
)
self.assertCountEqual(
parser.Target("dom0").expand(system_info=SYSTEM_INFO), ["@adminvm"]
self.assertEqual(
list(parser.Target("dom0").expand(system_info=SYSTEM_INFO)), ["@adminvm"]
)
self.assertCountEqual(
parser.Target("@anyvm").expand(system_info=SYSTEM_INFO),
self.assertEqual(
list(parser.Target("uuid:00000000-0000-0000-0000-000000000000").expand(system_info=SYSTEM_INFO)), ["@adminvm"]
)
self.assertEqual(
list(parser.Target("@anyvm").expand(system_info=SYSTEM_INFO)),
[
"test-vm1",
"test-vm2",
Expand Down Expand Up @@ -286,6 +309,7 @@ def test_030_Redirect(self):
parser.Redirect("test-vm1")
parser.Redirect("@adminvm")
parser.Redirect("dom0")
parser.Redirect("uuid:00000000-0000-0000-0000-000000000000")
with self.assertRaises(exc.PolicySyntaxError):
parser.Redirect("@anyvm")
with self.assertRaises(exc.PolicySyntaxError):
Expand Down Expand Up @@ -313,6 +337,7 @@ def test_030_Redirect(self):
parser.Redirect("@type:")

def test_040_IntendedTarget(self):
parser.IntendedTarget("uuid:00000000-0000-0000-0000-000000000000")
parser.IntendedTarget("test-vm1")
parser.IntendedTarget("@adminvm")
parser.IntendedTarget("dom0")
Expand Down Expand Up @@ -344,6 +369,10 @@ def test_040_IntendedTarget(self):
def test_100_match_single(self):
# pytest: disable=no-self-use
cases = [
("uuid:00000000-0000-0000-0000-000000000000", "@adminvm", True),
("uuid:00000000-0000-0000-0000-000000000000", "dom0", True),
("uuid:00000000-0000-0000-0000-000000000000", "@dispvm:default-dvm", False),
("uuid:00000000-0000-0000-0000-000000000000", "test-vm1", False),
("@anyvm", "test-vm1", True),
("@anyvm", "@default", True),
("@default", "@default", True),
Expand Down
11 changes: 10 additions & 1 deletion qrexec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class SystemInfoEntry(TypedDict):
power_state: str
icon: str
guivm: Optional[str]
uuid: Optional[str]

SystemInfo: 'TypeAlias' = Dict[str, SystemInfoEntry]

Expand All @@ -136,7 +137,15 @@ def get_system_info() -> FullSystemInfo:
"""

system_info = qubesd_call("dom0", "internal.GetSystemInfo")
return cast(SystemInfo, json.loads(system_info.decode("utf-8")))
system_info_decoded = cast(FullSystemInfo, json.loads(system_info.decode("utf-8")))
inner = system_info_decoded["domains"]
for i, j in list(inner.items()):
j["name"] = i
try:
inner["uuid:" + j["uuid"]] = j
except KeyError:
pass
return system_info_decoded


def prepare_subprocess_kwds(input: object) -> Dict[str, object]:
Expand Down

0 comments on commit f82cc26

Please sign in to comment.