Skip to content

Commit

Permalink
Add: Add first draft of a core GMP protocol implementation
Browse files Browse the repository at this point in the history
Implement the GMP as an IO independent software stack. With this core
protocol it will be possible to implement all kind of stacks for GMP
easily.
  • Loading branch information
bjoernricks authored and greenbonebot committed Jun 14, 2024
1 parent 3f3652a commit d9f79bd
Show file tree
Hide file tree
Showing 9 changed files with 730 additions and 0 deletions.
14 changes: 14 additions & 0 deletions gvm/protocols/gmp/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from ._connection import Connection
from ._request import Request
from ._response import Response, StatusError

__all__ = (
"Connection",
"Request",
"Response",
"StatusError",
)
170 changes: 170 additions & 0 deletions gvm/protocols/gmp/core/_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import AnyStr, Optional, Protocol

from lxml import etree

from gvm.errors import GvmError

from ._request import Request
from ._response import Response


class XmlReader:
"""
Read a XML command until its closing element
"""

def start_xml(self) -> None:
self._first_element: Optional[etree._Element] = None
# act on start and end element events and
# allow huge text data (for report content)
self._parser = etree.XMLPullParser(
events=("start", "end"), huge_tree=True
)

def is_end_xml(self) -> bool:
for action, obj in self._parser.read_events():
if not self._first_element and action in "start":
self._first_element = obj.tag # type: ignore

if (
self._first_element
and action in "end"
and str(self._first_element) == str(obj.tag) # type: ignore
):
return True
return False

def feed_xml(self, data: AnyStr) -> None:
try:
self._parser.feed(data)
except etree.ParseError as e:
raise GvmError(
f"Cannot parse XML response. Response data read {data!r}",
e,
) from None


class InvalidStateError(GvmError):
def __init__(self, message: str = "Invalid State", *args):
super().__init__(message, *args)


class State(Protocol):
def __set_context__(self, context: "Context") -> None: ...
def send(self, request: Request) -> bytes: ...
def receive_data(self, data: bytes) -> Optional[Response]: ...
def close(self) -> None: ...


class Context(Protocol):
def __set_state__(self, state: State) -> None: ...


class AbstractState:
_context: Context

def __set_context__(self, context: Context) -> None:
self._context = context

def set_next_state(self, next_state: State) -> None:
self._context.__set_state__(next_state)


class InitialState(AbstractState):
def send(self, request: Request) -> bytes:
self.set_next_state(AwaitingResponseState(request))
return bytes(request)

def receive_data(self, data: bytes) -> Optional[Response]:
raise InvalidStateError()

def close(self) -> None:
# nothing to do
return


class AwaitingResponseState(AbstractState):
def __init__(self, request: Request) -> None:
self._request = request

def send(self, request: Request) -> bytes:
raise InvalidStateError()

def close(self) -> None:
self.set_next_state(InitialState())

def receive_data(self, data: bytes) -> Optional[Response]:
next_state = ReceivingDataState(self._request)
self.set_next_state(next_state)
return next_state.receive_data(data)


class ErrorState(AbstractState):
message = (
"The connection is in an error state. Please close the connection."
)

def send(self, request: Request) -> bytes:
raise InvalidStateError(self.message)

def close(self) -> None:
self.set_next_state(InitialState())

def receive_data(self, data: bytes) -> Optional[Response]:
raise InvalidStateError(self.message)


class ReceivingDataState(AbstractState):
def __init__(self, request: Request) -> None:
self._request = request
self._data = bytearray()
self._reader = XmlReader()
self._reader.start_xml()

def send(self, request: Request) -> bytes:
raise InvalidStateError()

def close(self) -> None:
self.set_next_state(InitialState())

def receive_data(self, data: bytes) -> Optional[Response]:
self._data += data
try:
self._reader.feed_xml(data)
except GvmError as e:
self.set_next_state(ErrorState())
raise e

if not self._reader.is_end_xml():
return None

self.set_next_state(InitialState())
return Response(data=bytes(self._data), request=self._request)


class Connection:
"""
This is a [SansIO]() connection and not a socket connection
It is responsible for
"""

def __init__(self) -> None:
self.__set_state__(InitialState())

def send(self, request: Request) -> bytes:
return self._state.send(request)

def receive_data(self, data: bytes) -> Optional[Response]:
return self._state.receive_data(data)

def close(self) -> None:
return self._state.close()

def __set_state__(self, state: State) -> None:
self._state = state
self._state.__set_context__(self)
11 changes: 11 additions & 0 deletions gvm/protocols/gmp/core/_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from typing import Protocol, runtime_checkable


@runtime_checkable
class Request(Protocol):
def __bytes__(self) -> bytes: ...
def __str__(self) -> str: ...
70 changes: 70 additions & 0 deletions gvm/protocols/gmp/core/_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from functools import cached_property
from typing import Optional

from typing_extensions import Self

from gvm.errors import GvmError
from gvm.xml import Element, parse_xml

from ._request import Request


class StatusError(GvmError):
def __init__(self, message: str | None, *args, response: "Response"):
super().__init__(message, *args)
self.response = response
self.request = response.request


class Response:
def __init__(self, *, request: Request, data: bytes) -> None:
self._request = request
self._data = data
self.__xml: Optional[Element] = None

def __root_element(self) -> Element:
if self.__xml is None:
self.__xml = self.xml()
return self.__xml

def xml(self) -> Element:
return parse_xml(self.data)

@property
def data(self) -> bytes:
return self._data

@property
def request(self) -> Request:
return self._request

@cached_property
def status_code(self) -> Optional[int]:
root = self.__root_element()
try:
status = root.attrib["status"]
return int(status)
except (KeyError, ValueError):
return None

@property
def is_success(self) -> bool:
status = self.status_code
return status is not None and 200 <= status <= 299

def raise_for_status(self) -> Self:
if self.is_success:
return self
raise StatusError(
f"Invalid status code {self.status_code}", response=self
)

def __bytes__(self) -> bytes:
return self._data

def __str__(self) -> str:
return self._data.decode()
17 changes: 17 additions & 0 deletions gvm/protocols/gmp/core/requests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from ._auth import Authentication
from ._port_list import PortList, PortRangeType
from ._resource_names import ResourceNames, ResourceType
from ._version import Version

__all__ = (
"Authentication",
"PortList",
"PortRangeType",
"Version",
"ResourceNames",
"ResourceType",
)
78 changes: 78 additions & 0 deletions gvm/protocols/gmp/core/requests/_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from gvm.errors import RequiredArgument
from gvm.xml import XmlCommand

from .._request import Request


class Authentication:

@classmethod
def authenticate(cls, username: str, password: str) -> Request:
"""Authenticate to gvmd.
The generated authenticate command will be send to server.
Afterwards the response is read, transformed and returned.
Args:
username: Username
password: Password
"""
cmd = XmlCommand("authenticate")

if not username:
raise RequiredArgument(
function=cls.authenticate.__name__, argument="username"
)

if not password:
raise RequiredArgument(
function=cls.authenticate.__name__, argument="password"
)

credentials = cmd.add_element("credentials")
credentials.add_element("username", username)
credentials.add_element("password", password)
return cmd

@staticmethod
def describe_auth() -> Request:
"""Describe authentication methods
Returns a list of all used authentication methods if such a list is
available.
"""
return XmlCommand("describe_auth")

@classmethod
def modify_auth(
cls, group_name: str, auth_conf_settings: dict[str, str]
) -> Request:
"""Modifies an existing authentication.
Arguments:
group_name: Name of the group to be modified.
auth_conf_settings: The new auth config.
"""
if not group_name:
raise RequiredArgument(
function=cls.modify_auth.__name__, argument="group_name"
)
if not auth_conf_settings:
raise RequiredArgument(
function=cls.modify_auth.__name__,
argument="auth_conf_settings",
)

cmd = XmlCommand("modify_auth")
group = cmd.add_element("group", attrs={"name": str(group_name)})

for key, value in auth_conf_settings.items():
auth_conf = group.add_element("auth_conf_setting")
auth_conf.add_element("key", key)
auth_conf.add_element("value", value)

return cmd
Loading

0 comments on commit d9f79bd

Please sign in to comment.