From d9f79bdc2623939aa8f49f1fd408d4aa6b39cf9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ricks?= Date: Wed, 6 Mar 2024 15:51:00 +0100 Subject: [PATCH] Add: Add first draft of a core GMP protocol implementation Implement the GMP as an IO independent software stack. With this core protocol it will be possible to implement all kind of stacks for GMP easily. --- gvm/protocols/gmp/core/__init__.py | 14 + gvm/protocols/gmp/core/_connection.py | 170 ++++++++++++ gvm/protocols/gmp/core/_request.py | 11 + gvm/protocols/gmp/core/_response.py | 70 +++++ gvm/protocols/gmp/core/requests/__init__.py | 17 ++ gvm/protocols/gmp/core/requests/_auth.py | 78 ++++++ gvm/protocols/gmp/core/requests/_port_list.py | 247 ++++++++++++++++++ .../gmp/core/requests/_resource_names.py | 107 ++++++++ gvm/protocols/gmp/core/requests/_version.py | 16 ++ 9 files changed, 730 insertions(+) create mode 100644 gvm/protocols/gmp/core/__init__.py create mode 100644 gvm/protocols/gmp/core/_connection.py create mode 100644 gvm/protocols/gmp/core/_request.py create mode 100644 gvm/protocols/gmp/core/_response.py create mode 100644 gvm/protocols/gmp/core/requests/__init__.py create mode 100644 gvm/protocols/gmp/core/requests/_auth.py create mode 100644 gvm/protocols/gmp/core/requests/_port_list.py create mode 100644 gvm/protocols/gmp/core/requests/_resource_names.py create mode 100644 gvm/protocols/gmp/core/requests/_version.py diff --git a/gvm/protocols/gmp/core/__init__.py b/gvm/protocols/gmp/core/__init__.py new file mode 100644 index 000000000..7dea50c82 --- /dev/null +++ b/gvm/protocols/gmp/core/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from ._connection import Connection +from ._request import Request +from ._response import Response, StatusError + +__all__ = ( + "Connection", + "Request", + "Response", + "StatusError", +) diff --git a/gvm/protocols/gmp/core/_connection.py b/gvm/protocols/gmp/core/_connection.py new file mode 100644 index 000000000..8da05b81c --- /dev/null +++ b/gvm/protocols/gmp/core/_connection.py @@ -0,0 +1,170 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import AnyStr, Optional, Protocol + +from lxml import etree + +from gvm.errors import GvmError + +from ._request import Request +from ._response import Response + + +class XmlReader: + """ + Read a XML command until its closing element + """ + + def start_xml(self) -> None: + self._first_element: Optional[etree._Element] = None + # act on start and end element events and + # allow huge text data (for report content) + self._parser = etree.XMLPullParser( + events=("start", "end"), huge_tree=True + ) + + def is_end_xml(self) -> bool: + for action, obj in self._parser.read_events(): + if not self._first_element and action in "start": + self._first_element = obj.tag # type: ignore + + if ( + self._first_element + and action in "end" + and str(self._first_element) == str(obj.tag) # type: ignore + ): + return True + return False + + def feed_xml(self, data: AnyStr) -> None: + try: + self._parser.feed(data) + except etree.ParseError as e: + raise GvmError( + f"Cannot parse XML response. Response data read {data!r}", + e, + ) from None + + +class InvalidStateError(GvmError): + def __init__(self, message: str = "Invalid State", *args): + super().__init__(message, *args) + + +class State(Protocol): + def __set_context__(self, context: "Context") -> None: ... + def send(self, request: Request) -> bytes: ... + def receive_data(self, data: bytes) -> Optional[Response]: ... + def close(self) -> None: ... + + +class Context(Protocol): + def __set_state__(self, state: State) -> None: ... + + +class AbstractState: + _context: Context + + def __set_context__(self, context: Context) -> None: + self._context = context + + def set_next_state(self, next_state: State) -> None: + self._context.__set_state__(next_state) + + +class InitialState(AbstractState): + def send(self, request: Request) -> bytes: + self.set_next_state(AwaitingResponseState(request)) + return bytes(request) + + def receive_data(self, data: bytes) -> Optional[Response]: + raise InvalidStateError() + + def close(self) -> None: + # nothing to do + return + + +class AwaitingResponseState(AbstractState): + def __init__(self, request: Request) -> None: + self._request = request + + def send(self, request: Request) -> bytes: + raise InvalidStateError() + + def close(self) -> None: + self.set_next_state(InitialState()) + + def receive_data(self, data: bytes) -> Optional[Response]: + next_state = ReceivingDataState(self._request) + self.set_next_state(next_state) + return next_state.receive_data(data) + + +class ErrorState(AbstractState): + message = ( + "The connection is in an error state. Please close the connection." + ) + + def send(self, request: Request) -> bytes: + raise InvalidStateError(self.message) + + def close(self) -> None: + self.set_next_state(InitialState()) + + def receive_data(self, data: bytes) -> Optional[Response]: + raise InvalidStateError(self.message) + + +class ReceivingDataState(AbstractState): + def __init__(self, request: Request) -> None: + self._request = request + self._data = bytearray() + self._reader = XmlReader() + self._reader.start_xml() + + def send(self, request: Request) -> bytes: + raise InvalidStateError() + + def close(self) -> None: + self.set_next_state(InitialState()) + + def receive_data(self, data: bytes) -> Optional[Response]: + self._data += data + try: + self._reader.feed_xml(data) + except GvmError as e: + self.set_next_state(ErrorState()) + raise e + + if not self._reader.is_end_xml(): + return None + + self.set_next_state(InitialState()) + return Response(data=bytes(self._data), request=self._request) + + +class Connection: + """ + This is a [SansIO]() connection and not a socket connection + + It is responsible for + """ + + def __init__(self) -> None: + self.__set_state__(InitialState()) + + def send(self, request: Request) -> bytes: + return self._state.send(request) + + def receive_data(self, data: bytes) -> Optional[Response]: + return self._state.receive_data(data) + + def close(self) -> None: + return self._state.close() + + def __set_state__(self, state: State) -> None: + self._state = state + self._state.__set_context__(self) diff --git a/gvm/protocols/gmp/core/_request.py b/gvm/protocols/gmp/core/_request.py new file mode 100644 index 000000000..d8acfcab2 --- /dev/null +++ b/gvm/protocols/gmp/core/_request.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class Request(Protocol): + def __bytes__(self) -> bytes: ... + def __str__(self) -> str: ... diff --git a/gvm/protocols/gmp/core/_response.py b/gvm/protocols/gmp/core/_response.py new file mode 100644 index 000000000..9d226c38a --- /dev/null +++ b/gvm/protocols/gmp/core/_response.py @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from functools import cached_property +from typing import Optional + +from typing_extensions import Self + +from gvm.errors import GvmError +from gvm.xml import Element, parse_xml + +from ._request import Request + + +class StatusError(GvmError): + def __init__(self, message: str | None, *args, response: "Response"): + super().__init__(message, *args) + self.response = response + self.request = response.request + + +class Response: + def __init__(self, *, request: Request, data: bytes) -> None: + self._request = request + self._data = data + self.__xml: Optional[Element] = None + + def __root_element(self) -> Element: + if self.__xml is None: + self.__xml = self.xml() + return self.__xml + + def xml(self) -> Element: + return parse_xml(self.data) + + @property + def data(self) -> bytes: + return self._data + + @property + def request(self) -> Request: + return self._request + + @cached_property + def status_code(self) -> Optional[int]: + root = self.__root_element() + try: + status = root.attrib["status"] + return int(status) + except (KeyError, ValueError): + return None + + @property + def is_success(self) -> bool: + status = self.status_code + return status is not None and 200 <= status <= 299 + + def raise_for_status(self) -> Self: + if self.is_success: + return self + raise StatusError( + f"Invalid status code {self.status_code}", response=self + ) + + def __bytes__(self) -> bytes: + return self._data + + def __str__(self) -> str: + return self._data.decode() diff --git a/gvm/protocols/gmp/core/requests/__init__.py b/gvm/protocols/gmp/core/requests/__init__.py new file mode 100644 index 000000000..31bb21261 --- /dev/null +++ b/gvm/protocols/gmp/core/requests/__init__.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from ._auth import Authentication +from ._port_list import PortList, PortRangeType +from ._resource_names import ResourceNames, ResourceType +from ._version import Version + +__all__ = ( + "Authentication", + "PortList", + "PortRangeType", + "Version", + "ResourceNames", + "ResourceType", +) diff --git a/gvm/protocols/gmp/core/requests/_auth.py b/gvm/protocols/gmp/core/requests/_auth.py new file mode 100644 index 000000000..d79a4745b --- /dev/null +++ b/gvm/protocols/gmp/core/requests/_auth.py @@ -0,0 +1,78 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from gvm.errors import RequiredArgument +from gvm.xml import XmlCommand + +from .._request import Request + + +class Authentication: + + @classmethod + def authenticate(cls, username: str, password: str) -> Request: + """Authenticate to gvmd. + + The generated authenticate command will be send to server. + Afterwards the response is read, transformed and returned. + + Args: + username: Username + password: Password + """ + cmd = XmlCommand("authenticate") + + if not username: + raise RequiredArgument( + function=cls.authenticate.__name__, argument="username" + ) + + if not password: + raise RequiredArgument( + function=cls.authenticate.__name__, argument="password" + ) + + credentials = cmd.add_element("credentials") + credentials.add_element("username", username) + credentials.add_element("password", password) + return cmd + + @staticmethod + def describe_auth() -> Request: + """Describe authentication methods + + Returns a list of all used authentication methods if such a list is + available. + """ + return XmlCommand("describe_auth") + + @classmethod + def modify_auth( + cls, group_name: str, auth_conf_settings: dict[str, str] + ) -> Request: + """Modifies an existing authentication. + + Arguments: + group_name: Name of the group to be modified. + auth_conf_settings: The new auth config. + """ + if not group_name: + raise RequiredArgument( + function=cls.modify_auth.__name__, argument="group_name" + ) + if not auth_conf_settings: + raise RequiredArgument( + function=cls.modify_auth.__name__, + argument="auth_conf_settings", + ) + + cmd = XmlCommand("modify_auth") + group = cmd.add_element("group", attrs={"name": str(group_name)}) + + for key, value in auth_conf_settings.items(): + auth_conf = group.add_element("auth_conf_setting") + auth_conf.add_element("key", key) + auth_conf.add_element("value", value) + + return cmd diff --git a/gvm/protocols/gmp/core/requests/_port_list.py b/gvm/protocols/gmp/core/requests/_port_list.py new file mode 100644 index 000000000..bfb2cb9b4 --- /dev/null +++ b/gvm/protocols/gmp/core/requests/_port_list.py @@ -0,0 +1,247 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Optional, Union + +from gvm._enum import Enum +from gvm.errors import RequiredArgument +from gvm.utils import to_bool +from gvm.xml import XmlCommand + +from .._request import Request + + +class PortRangeType(Enum): + """Enum for port range type""" + + TCP = "TCP" + UDP = "UDP" + + +class PortList: + @classmethod + def clone_port_list(cls, port_list_id: str) -> Request: + """Clone an existing port list + + Arguments: + port_list_id: UUID of an existing port list to clone from + """ + if not port_list_id: + raise RequiredArgument( + function=cls.clone_port_list.__name__, argument="port_list_id" + ) + + cmd = XmlCommand("create_port_list") + cmd.add_element("copy", port_list_id) + return cmd + + @classmethod + def create_port_list( + cls, name: str, port_range: str, *, comment: Optional[str] = None + ) -> Request: + """Create a new port list + + Arguments: + name: Name of the new port list + port_range: Port list ranges e.g. `"T: 1-1234"` for tcp port + 1 - 1234 + comment: Comment for the port list + """ + if not name: + raise RequiredArgument( + function=cls.create_port_list.__name__, argument="name" + ) + + if not port_range: + raise RequiredArgument( + function=cls.create_port_list.__name__, argument="port_range" + ) + + cmd = XmlCommand("create_port_list") + cmd.add_element("name", name) + cmd.add_element("port_range", port_range) + + if comment: + cmd.add_element("comment", comment) + + return cmd + + @classmethod + def create_port_range( + cls, + port_list_id: str, + start: int, + end: int, + port_range_type: Union[str, PortRangeType], + *, + comment: Optional[str] = None, + ) -> Request: + """Create new port range + + Arguments: + port_list_id: UUID of the port list to which to add the range + start: The first port in the range + end: The last port in the range + port_range_type: The type of the ports: TCP, UDP, ... + comment: Comment for the port range + """ + if not port_list_id: + raise RequiredArgument( + function=cls.create_port_range.__name__, + argument="port_list_id", + ) + + if not port_range_type: + raise RequiredArgument( + function=cls.create_port_range.__name__, + argument="port_range_type", + ) + + if not start: + raise RequiredArgument( + function=cls.create_port_range.__name__, argument="start" + ) + + if not end: + raise RequiredArgument( + function=cls.create_port_range.__name__, argument="end" + ) + + if not isinstance(port_range_type, PortRangeType): + port_range_type = PortRangeType(port_range_type) + + cmd = XmlCommand("create_port_range") + cmd.add_element("port_list", attrs={"id": port_list_id}) + cmd.add_element("start", str(start)) + cmd.add_element("end", str(end)) + cmd.add_element("type", port_range_type.value) + + if comment: + cmd.add_element("comment", comment) + + return cmd + + @classmethod + def delete_port_list( + cls, port_list_id: str, *, ultimate: bool = False + ) -> Request: + """Deletes an existing port list + + Arguments: + port_list_id: UUID of the port list to be deleted. + ultimate: Whether to remove entirely, or to the trashcan. + """ + if not port_list_id: + raise RequiredArgument( + function=cls.delete_port_list.__name__, argument="port_list_id" + ) + + cmd = XmlCommand("delete_port_list") + cmd.set_attribute("port_list_id", port_list_id) + cmd.set_attribute("ultimate", to_bool(ultimate)) + + return cmd + + @classmethod + def delete_port_range(cls, port_range_id: str) -> Request: + """Deletes an existing port range + + Arguments: + port_range_id: UUID of the port range to be deleted. + """ + if not port_range_id: + raise RequiredArgument( + function=cls.delete_port_range.__name__, + argument="port_range_id", + ) + + cmd = XmlCommand("delete_port_range") + cmd.set_attribute("port_range_id", port_range_id) + + return cmd + + @classmethod + def get_port_lists( + cls, + *, + filter_string: Optional[str] = None, + filter_id: Optional[str] = None, + details: Optional[bool] = None, + targets: Optional[bool] = None, + trash: Optional[bool] = None, + ) -> Request: + """Request a list of port lists + + Arguments: + filter_string: Filter term to use for the query + filter_id: UUID of an existing filter to use for the query + details: Whether to include full port list details + targets: Whether to include targets using this port list + trash: Whether to get port lists in the trashcan instead + """ + cmd = XmlCommand("get_port_lists") + + cmd.add_filter(filter_string, filter_id) + + if details is not None: + cmd.set_attribute("details", to_bool(details)) + + if targets is not None: + cmd.set_attribute("targets", to_bool(targets)) + + if trash is not None: + cmd.set_attribute("trash", to_bool(trash)) + + return cmd + + @classmethod + def get_port_list(cls, port_list_id: str) -> Request: + """Request a single port list + + Arguments: + port_list_id: UUID of an existing port list + """ + cmd = XmlCommand("get_port_lists") + + if not port_list_id: + raise RequiredArgument( + function=cls.get_port_list.__name__, argument="port_list_id" + ) + + cmd.set_attribute("port_list_id", port_list_id) + + # for single entity always request all details + + cmd.set_attribute("details", "1") + return cmd + + @classmethod + def modify_port_list( + cls, + port_list_id: str, + *, + comment: Optional[str] = None, + name: Optional[str] = None, + ) -> Request: + """Modifies an existing port list. + + Arguments: + port_list_id: UUID of port list to modify. + name: Name of port list. + comment: Comment on port list. + """ + if not port_list_id: + raise RequiredArgument( + function=cls.modify_port_list.__name__, argument="port_list_id" + ) + cmd = XmlCommand("modify_port_list") + cmd.set_attribute("port_list_id", port_list_id) + + if comment: + cmd.add_element("comment", comment) + + if name: + cmd.add_element("name", name) + + return cmd diff --git a/gvm/protocols/gmp/core/requests/_resource_names.py b/gvm/protocols/gmp/core/requests/_resource_names.py new file mode 100644 index 000000000..8c7e9b884 --- /dev/null +++ b/gvm/protocols/gmp/core/requests/_resource_names.py @@ -0,0 +1,107 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from typing import Optional, Union + +from gvm._enum import Enum +from gvm.errors import RequiredArgument +from gvm.xml import XmlCommand + +from .._request import Request + + +class ResourceType(Enum): + """Enum for resource types""" + + ALERT = "ALERT" + CERT_BUND_ADV = "CERT_BUND_ADV" + CONFIG = "CONFIG" + CPE = "CPE" + CREDENTIAL = "CREDENTIAL" + CVE = "CVE" + DFN_CERT_ADV = "DFN_CERT_ADV" + FILTER = "FILTER" + GROUP = "GROUP" + HOST = "HOST" + NOTE = "NOTE" + NVT = "NVT" + OS = "OS" + OVERRIDE = "OVERRIDE" + PERMISSION = "PERMISSION" + PORT_LIST = "PORT_LIST" + REPORT_FORMAT = "REPORT_FORMAT" + REPORT = "REPORT" + RESULT = "RESULT" + ROLE = "ROLE" + SCANNER = "SCANNER" + SCHEDULE = "SCHEDULE" + TARGET = "TARGET" + TASK = "TASK" + TLS_CERTIFICATE = "TLS_CERTIFICATE" + USER = "USER" + + +class ResourceNames: + @staticmethod + def get_resource_names( + resource_type: Union[ResourceType, str], + *, + filter_string: Optional[str] = None, + ) -> Request: + """Request a list of resource names and IDs + + Arguments: + resource_type: Type must be either ALERT, CERT_BUND_ADV, + CONFIG, CPE, CREDENTIAL, CVE, DFN_CERT_ADV, FILTER, + GROUP, HOST, NOTE, NVT, OS, OVERRIDE, PERMISSION, + PORT_LIST, REPORT_FORMAT, REPORT, RESULT, ROLE, + SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE + or USER + filter_string: Filter term to use for the query + """ + cmd = XmlCommand("get_resource_names") + + if not isinstance(resource_type, ResourceType): + resource_type = ResourceType(resource_type) + + cmd.set_attribute("type", resource_type.value) + cmd.add_filter(filter_string, None) + return cmd + + @classmethod + def get_resource_name( + cls, resource_id: str, resource_type: Union[ResourceType, str] + ) -> Request: + """Request a single resource name + + Arguments: + resource_id: ID of an existing resource + resource_type: Type must be either ALERT, CERT_BUND_ADV, + CONFIG, CPE, CREDENTIAL, CVE, DFN_CERT_ADV, FILTER, + GROUP, HOST, NOTE, NVT, OS, OVERRIDE, PERMISSION, + PORT_LIST, REPORT_FORMAT, REPORT, RESULT, ROLE, + SCANNER, SCHEDULE, TARGET, TASK, TLS_CERTIFICATE + or USER + + Returns: + The response. See :py:meth:`send_command` for details. + """ + if not resource_type: + raise RequiredArgument( + function=cls.get_resource_name.__name__, + argument="resource_type", + ) + + if not isinstance(resource_type, ResourceType): + resource_type = ResourceType(resource_type) + + if not resource_id: + raise RequiredArgument( + function=cls.get_resource_name.__name__, argument="resource_id" + ) + + cmd = XmlCommand("get_resource_names") + cmd.set_attribute("resource_id", resource_id) + cmd.set_attribute("type", resource_type.value) + return cmd diff --git a/gvm/protocols/gmp/core/requests/_version.py b/gvm/protocols/gmp/core/requests/_version.py new file mode 100644 index 000000000..a4462422a --- /dev/null +++ b/gvm/protocols/gmp/core/requests/_version.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +from gvm.xml import XmlCommand + +from .._request import Request + + +class Version: + @staticmethod + def get_version() -> Request: + """Get the Greenbone Vulnerability Manager Protocol version used + by the remote gvmd. + """ + return XmlCommand("get_version")