Skip to content

Commit

Permalink
Add: auto connect to mqtt broker as soon as one is available (#727)
Browse files Browse the repository at this point in the history
In case ospd-openvas was started without a broker running, the connection fails and ospd-openvas prints that notus is unavailable. This patch solves this issue by trying to connect to the broker every 10 seconds in case the last try failed.
  • Loading branch information
Kraemii authored Aug 10, 2022
1 parent c10b8c2 commit a6a017b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 24 deletions.
25 changes: 7 additions & 18 deletions ospd_openvas/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from typing import Callable, Optional, Dict, List, Tuple, Iterator, Any
from datetime import datetime
from socket import gaierror

from pathlib import Path
from os import geteuid, environ
Expand Down Expand Up @@ -527,24 +526,14 @@ def init(self, server: BaseServer) -> None:
notus_handler = NotusResultHandler(self.report_results)

if self._mqtt_broker_address:
try:
client = MQTTClient(
self._mqtt_broker_address, self._mqtt_broker_port, "ospd"
)
daemon = MQTTDaemon(client)
subscriber = MQTTSubscriber(client)
client = MQTTClient(
self._mqtt_broker_address, self._mqtt_broker_port, "ospd"
)
daemon = MQTTDaemon(client)
subscriber = MQTTSubscriber(client)

subscriber.subscribe(
ResultMessage, notus_handler.result_handler
)
daemon.run()
except (ConnectionRefusedError, gaierror, ValueError) as e:
logger.error(
"Could not connect to MQTT broker at %s, error was: %s."
" Unable to get results from Notus.",
self._mqtt_broker_address,
e,
)
subscriber.subscribe(ResultMessage, notus_handler.result_handler)
daemon.run()
else:
logger.info(
"MQTT Broker Adress empty. MQTT disabled. Unable to get Notus"
Expand Down
34 changes: 30 additions & 4 deletions ospd_openvas/messaging/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import logging

from functools import partial
from socket import gaierror, timeout
from threading import Thread
from time import sleep
from typing import Callable, Type

import paho.mqtt.client as mqtt
Expand Down Expand Up @@ -156,9 +159,32 @@ def __init__(
self,
client: MQTTClient,
):
self._client = client

self._client.connect()
self._client: MQTTClient = client

def _try_connect_loop(self):
while True:
try:
self._client.connect()
self._client.loop_start()
logger.info("Successfully connected to MQTT broker")
return
except (gaierror, ValueError) as e:
logger.error(
"Could not connect to MQTT broker, error was: %s."
" Unable to get results from Notus.",
e,
)
return
# ConnectionRefusedError - when mqtt declines connection
# timeout - when address is not reachable
# OSError - in container when address cannot be assigned
except (ConnectionRefusedError, timeout, OSError) as e:
logger.warning(
"Could not connect to MQTT broker, error was: %s."
" Trying again in 10s.",
e,
)
sleep(10)

def run(self):
self._client.loop_start()
Thread(target=self._try_connect_loop, daemon=True).start()
4 changes: 2 additions & 2 deletions tests/messaging/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ def test_connect(self):
# pylint: disable=unused-variable
daemon = MQTTDaemon(client)

client.connect.assert_called_with()

def test_run(self):
client = mock.MagicMock()

daemon = MQTTDaemon(client)

daemon.run()

client.connect.assert_called_with()

client.loop_start.assert_called_with()

0 comments on commit a6a017b

Please sign in to comment.