From 068594e7ed83902be5fa6afa563f47b57dd5e196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ricks?= Date: Fri, 22 Mar 2024 08:06:18 +0100 Subject: [PATCH] Add: Add system report, trash can and user setting requests Implement system report, trash can and user settings commands and requests. With this change all system commands for the new protocol implementation are implemented. --- gvm/protocols/gmp/_gmp224.py | 100 +++++++++++++++++- gvm/protocols/gmp/requests/__init__.py | 6 ++ gvm/protocols/gmp/requests/_system_reports.py | 66 ++++++++++++ gvm/protocols/gmp/requests/_trashcan.py | 37 +++++++ gvm/protocols/gmp/requests/_user_settings.py | 83 +++++++++++++++ .../gmp/requests/test_system_report.py | 59 +++++++++++ tests/protocols/gmp/requests/test_trashcan.py | 24 +++++ .../gmp/requests/test_user_settings.py | 74 +++++++++++++ 8 files changed, 447 insertions(+), 2 deletions(-) create mode 100644 gvm/protocols/gmp/requests/_system_reports.py create mode 100644 gvm/protocols/gmp/requests/_trashcan.py create mode 100644 gvm/protocols/gmp/requests/_user_settings.py create mode 100644 tests/protocols/gmp/requests/test_system_report.py create mode 100644 tests/protocols/gmp/requests/test_trashcan.py create mode 100644 tests/protocols/gmp/requests/test_user_settings.py diff --git a/gvm/protocols/gmp/_gmp224.py b/gvm/protocols/gmp/_gmp224.py index cd78edb8b..704cfda58 100644 --- a/gvm/protocols/gmp/_gmp224.py +++ b/gvm/protocols/gmp/_gmp224.py @@ -17,6 +17,9 @@ PortList, PortRangeType, SortOrder, + SystemReports, + TrashCan, + UserSettings, Version, ) @@ -280,7 +283,7 @@ def get_feeds(self) -> T: def get_feed(self, feed_type: Union[FeedType, str]) -> T: """Request a single feed - Arguments: + Args: feed_type: Type of single feed to get: NVT, CERT or SCAP """ return self._send_and_transform_command(Feed.get_feed(feed_type)) @@ -293,7 +296,7 @@ def help( ) -> T: """Get the help text - Arguments: + Args: help_format: Format of of the help: "html", "rnc", "text" or "xml brief: If True help is brief @@ -301,3 +304,96 @@ def help( return self._send_and_transform_command( Help.help(help_format=help_format, brief=brief) ) + + def system_reports( + self, + *, + name: Optional[str] = None, + duration: Optional[int] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + brief: Optional[bool] = None, + slave_id: Optional[str] = None, + ) -> T: + """Request a list of system reports + + Args: + name: A string describing the required system report + duration: The number of seconds into the past that the system report + should include + start_time: The start of the time interval the system report should + include in ISO time format + end_time: The end of the time interval the system report should + include in ISO time format + brief: Whether to include the actual system reports + slave_id: UUID of GMP scanner from which to get the system reports + """ + return self._send_and_transform_command( + SystemReports.get_system_reports( + name=name, + duration=duration, + start_time=start_time, + end_time=end_time, + brief=brief, + slave_id=slave_id, + ) + ) + + def empty_trash(self) -> T: + """Empty the trashcan + + Remove all entities from the trashcan. **Attention:** this command can + not be reverted + """ + return self._send_and_transform_command(TrashCan.empty_trashcan()) + + def restore_from_trash(self, entity_id: str) -> T: + """Restore an entity from the trashcan + + Args: + entity_id: ID of the entity to be restored from the trashcan + """ + return self._send_and_transform_command( + TrashCan.restore_from_trashcan(entity_id) + ) + + def get_user_settings(self, *, filter_string: Optional[str] = None) -> T: + """Request a list of user settings + + Args: + filter_string: Filter term to use for the query + """ + return self._send_and_transform_command( + UserSettings.get_user_settings(filter_string=filter_string) + ) + + def get_user_setting(self, setting_id: str) -> T: + """Request a single user setting + + Args: + setting_id: UUID of an existing setting + """ + return self._send_and_transform_command( + UserSettings.get_user_setting(setting_id) + ) + + def modify_user_setting( + self, + *, + setting_id: Optional[str] = None, + name: Optional[str] = None, + value: Optional[str] = None, + ) -> T: + """Modifies an existing user setting. + + Args: + setting_id: UUID of the setting to be changed. + name: The name of the setting. Either setting_id or name must be + passed. + value: The value of the setting. + """ + return self._send_and_transform_command( + UserSettings.modify_user_setting( + setting_id=setting_id, name=name, value=value + ) + ) diff --git a/gvm/protocols/gmp/requests/__init__.py b/gvm/protocols/gmp/requests/__init__.py index aa58407ce..fdadd0a73 100644 --- a/gvm/protocols/gmp/requests/__init__.py +++ b/gvm/protocols/gmp/requests/__init__.py @@ -9,6 +9,9 @@ from ._help import Help, HelpFormat from ._port_list import PortList, PortRangeType from ._resource_names import ResourceNames, ResourceType +from ._system_reports import SystemReports +from ._trashcan import TrashCan +from ._user_settings import UserSettings from ._version import Version __all__ = ( @@ -25,5 +28,8 @@ "ResourceNames", "ResourceType", "SortOrder", + "SystemReports", + "TrashCan", + "UserSettings", "Version", ) diff --git a/gvm/protocols/gmp/requests/_system_reports.py b/gvm/protocols/gmp/requests/_system_reports.py new file mode 100644 index 000000000..ca2cd96ad --- /dev/null +++ b/gvm/protocols/gmp/requests/_system_reports.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: 2018-2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from numbers import Integral +from typing import Optional + +from gvm.errors import InvalidArgument +from gvm.protocols.core import Request +from gvm.utils import to_bool +from gvm.xml import XmlCommand + + +class SystemReports: + @classmethod + def get_system_reports( + cls, + *, + name: Optional[str] = None, + duration: Optional[int] = None, + start_time: Optional[str] = None, + end_time: Optional[str] = None, + brief: Optional[bool] = None, + slave_id: Optional[str] = None, + ) -> Request: + """Request a list of system reports + + Arguments: + name: A string describing the required system report + duration: The number of seconds into the past that the system report + should include + start_time: The start of the time interval the system report should + include in ISO time format + end_time: The end of the time interval the system report should + include in ISO time format + brief: Whether to include the actual system reports + slave_id: UUID of GMP scanner from which to get the system reports + """ + cmd = XmlCommand("get_system_reports") + + if name: + cmd.set_attribute("name", name) + + if duration is not None: + if not isinstance(duration, Integral): + raise InvalidArgument( + "duration needs to be an integer number", + function=cls.get_system_reports.__name__, + ) + + cmd.set_attribute("duration", str(duration)) + + if start_time: + cmd.set_attribute("start_time", str(start_time)) + + if end_time: + cmd.set_attribute("end_time", str(end_time)) + + if brief is not None: + cmd.set_attribute("brief", to_bool(brief)) + + if slave_id: + cmd.set_attribute("slave_id", slave_id) + + return cmd diff --git a/gvm/protocols/gmp/requests/_trashcan.py b/gvm/protocols/gmp/requests/_trashcan.py new file mode 100644 index 000000000..b469f99be --- /dev/null +++ b/gvm/protocols/gmp/requests/_trashcan.py @@ -0,0 +1,37 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from gvm.errors import RequiredArgument +from gvm.protocols.core import Request +from gvm.xml import XmlCommand + + +class TrashCan: + @staticmethod + def empty_trashcan() -> Request: + """Empty the trashcan + + Remove all entities from the trashcan. **Attention:** this command can + not be reverted + """ + return XmlCommand("empty_trashcan") + + @classmethod + def restore_from_trashcan(cls, entity_id: str) -> Request: + """Restore an entity from the trashcan + + Args: + entity_id: ID of the entity to be restored from the trashcan + """ + + if not entity_id: + raise RequiredArgument( + function=cls.restore_from_trashcan.__name__, + argument="entity_id", + ) + + cmd = XmlCommand("restore") + cmd.set_attribute("id", entity_id) + + return cmd diff --git a/gvm/protocols/gmp/requests/_user_settings.py b/gvm/protocols/gmp/requests/_user_settings.py new file mode 100644 index 000000000..597e7fea5 --- /dev/null +++ b/gvm/protocols/gmp/requests/_user_settings.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: 2021-2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +from typing import Optional + +from gvm.errors import RequiredArgument +from gvm.protocols.core import Request +from gvm.utils import to_base64 +from gvm.xml import XmlCommand + + +class UserSettings: + @staticmethod + def get_user_settings(*, filter_string: Optional[str] = None) -> Request: + """Request a list of user settings + + Args: + filter_string: Filter term to use for the query + """ + cmd = XmlCommand("get_settings") + + if filter_string: + cmd.set_attribute("filter", filter_string) + + return cmd + + @classmethod + def get_user_setting(cls, setting_id: str) -> Request: + """Request a single user setting + + Args: + setting_id: UUID of an existing setting + """ + cmd = XmlCommand("get_settings") + + if not setting_id: + raise RequiredArgument( + function=cls.get_user_setting.__name__, argument="setting_id" + ) + + cmd.set_attribute("setting_id", setting_id) + return cmd + + @classmethod + def modify_user_setting( + cls, + *, + setting_id: Optional[str] = None, + name: Optional[str] = None, + value: Optional[str] = None, + ) -> Request: + """Modifies an existing user setting. + + Args: + setting_id: UUID of the setting to be changed. + name: The name of the setting. Either setting_id or name must be + passed. + value: The value of the setting. + """ + if not setting_id and not name: + raise RequiredArgument( + function=cls.modify_user_setting.__name__, + argument="setting_id or name argument", + ) + + if value is None: + raise RequiredArgument( + function=cls.modify_user_setting.__name__, + argument="value argument", + ) + + cmd = XmlCommand("modify_setting") + + if setting_id: + cmd.set_attribute("setting_id", setting_id) + else: + cmd.add_element("name", name) + + cmd.add_element("value", to_base64(value)) + + return cmd diff --git a/tests/protocols/gmp/requests/test_system_report.py b/tests/protocols/gmp/requests/test_system_report.py new file mode 100644 index 000000000..d0906b37f --- /dev/null +++ b/tests/protocols/gmp/requests/test_system_report.py @@ -0,0 +1,59 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.errors import InvalidArgument +from gvm.protocols.gmp.requests import SystemReports + + +class SystemReportsTestCase(unittest.TestCase): + def test_get_system_reports(self): + request = SystemReports.get_system_reports() + + self.assertEqual(bytes(request), b"") + + def test_system_reports_with_name(self): + request = SystemReports.get_system_reports(name="foo") + + self.assertEqual(bytes(request), b'') + + def test_system_reports_with_slave_id(self): + request = SystemReports.get_system_reports(slave_id="s1") + + self.assertEqual(bytes(request), b'') + + def test_system_reports_with_brief(self): + request = SystemReports.get_system_reports(brief=True) + + self.assertEqual(bytes(request), b'') + + request = SystemReports.get_system_reports(brief=False) + + self.assertEqual(bytes(request), b'') + + def test_system_reports_with_duration(self): + request = SystemReports.get_system_reports(duration=3600) + + self.assertEqual( + bytes(request), b'' + ) + + def test_system_reports_with_invalid_duration(self): + with self.assertRaises(InvalidArgument): + SystemReports.get_system_reports(duration="") + + def test_system_reports_with_start_time(self): + request = SystemReports.get_system_reports(start_time="01-01-2019") + + self.assertEqual( + bytes(request), b'' + ) + + def test_system_reports_with_end_time(self): + request = SystemReports.get_system_reports(end_time="01-01-2019") + + self.assertEqual( + bytes(request), b'' + ) diff --git a/tests/protocols/gmp/requests/test_trashcan.py b/tests/protocols/gmp/requests/test_trashcan.py new file mode 100644 index 000000000..ac1e3fbdf --- /dev/null +++ b/tests/protocols/gmp/requests/test_trashcan.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.errors import RequiredArgument +from gvm.protocols.gmp.requests import TrashCan + + +class TrashCanTestCase(unittest.TestCase): + def test_empty_trashcan(self): + request = TrashCan.empty_trashcan() + + self.assertEqual(bytes(request), b"") + + def test_restore_from_trashcan(self): + request = TrashCan.restore_from_trashcan("1") + + self.assertEqual(bytes(request), b'') + + def test_restore_from_trashcan_with_empty_id(self): + with self.assertRaises(RequiredArgument): + TrashCan.restore_from_trashcan("") diff --git a/tests/protocols/gmp/requests/test_user_settings.py b/tests/protocols/gmp/requests/test_user_settings.py new file mode 100644 index 000000000..849cfb4c9 --- /dev/null +++ b/tests/protocols/gmp/requests/test_user_settings.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import unittest + +from gvm.errors import RequiredArgument +from gvm.protocols.gmp.requests import UserSettings + + +class UserSettingsTestCase(unittest.TestCase): + def test_get_user_setting(self): + request = UserSettings.get_user_setting("id") + + self.assertEqual(bytes(request), b'') + + def test_get_user_setting_missing_setting_id(self): + with self.assertRaises(RequiredArgument): + UserSettings.get_user_setting(setting_id=None) + + with self.assertRaises(RequiredArgument): + UserSettings.get_user_setting("") + + def test_get_user_settings(self): + request = UserSettings.get_user_settings() + + self.assertEqual(bytes(request), b"") + + def test_get_user_settings_with_filter_string(self): + request = UserSettings.get_user_settings(filter_string="foo=bar") + + self.assertEqual(bytes(request), b'') + + def test_modify_user_setting(self): + request = UserSettings.modify_user_setting( + setting_id="id", value="value" + ) + + self.assertEqual( + bytes(request), + b'dmFsdWU=', + ) + + request = UserSettings.modify_user_setting(name="name", value="value") + + self.assertEqual( + bytes(request), + b"namedmFsdWU=", + ) + + request = UserSettings.modify_user_setting(name="name", value="") + + self.assertEqual( + bytes(request), + b"name", + ) + + def test_modify_user_setting_missing_setting_id(self): + with self.assertRaises(RequiredArgument): + UserSettings.modify_user_setting(setting_id=None, value="value") + + with self.assertRaises(RequiredArgument): + UserSettings.modify_user_setting(setting_id="", value="value") + + def test_modify_setting_missing_name(self): + with self.assertRaises(RequiredArgument): + UserSettings.modify_user_setting(name=None, value="value") + + with self.assertRaises(RequiredArgument): + UserSettings.modify_user_setting(name="", value="value") + + def test_modify_user_setting_missing_value(self): + with self.assertRaises(RequiredArgument): + UserSettings.modify_user_setting(setting_id="id", value=None)