Skip to content
This repository has been archived by the owner on Nov 29, 2021. It is now read-only.

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnicola committed May 18, 2020
1 parent 33c0a2c commit c58c6bb
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 194 deletions.
39 changes: 23 additions & 16 deletions tests/command/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ospd.errors import OspdCommandError, OspdError
from ospd.misc import create_process

from ..helper import DummyWrapper, assert_called, FakeStream
from ..helper import DummyWrapper, assert_called, FakeStream, FakeDataManager


class GetPerformanceTestCase(TestCase):
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_scan_with_vts_empty_vt_list(self):
with self.assertRaises(OspdCommandError):
cmd.handle_xml(request)

@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
def test_scan_with_vts(self, mock_create_process):
daemon = DummyWrapper([])
cmd = StartScan(daemon)
Expand All @@ -121,11 +121,11 @@ def test_scan_with_vts(self, mock_create_process):
response = et.fromstring(cmd.handle_xml(request))
scan_id = response.findtext('id')

self.assertEqual(
daemon.get_scan_vts(scan_id), {'1.2.3.4': {}, 'vt_groups': []}
)
self.assertNotEqual(daemon.get_scan_vts(scan_id), {'1.2.3.6': {}})
vts_collection = daemon.get_scan_vts(scan_id)
self.assertEqual(vts_collection, {'1.2.3.4': {}, 'vt_groups': []})
self.assertNotEqual(vts_collection, {'1.2.3.6': {}})

daemon.check_pending_scans()
assert_called(mock_create_process)

def test_is_new_scan_allowed_false(self):
Expand All @@ -152,7 +152,7 @@ def test_is_new_scan_allowed_true(self):

self.assertTrue(cmd.is_new_scan_allowed())

@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
def test_scan_without_vts(self, mock_create_process):
daemon = DummyWrapper([])
cmd = StartScan(daemon)
Expand All @@ -172,9 +172,9 @@ def test_scan_without_vts(self, mock_create_process):
response = et.fromstring(cmd.handle_xml(request))

scan_id = response.findtext('id')

self.assertEqual(daemon.get_scan_vts(scan_id), {})

daemon.check_pending_scans()
assert_called(mock_create_process)

def test_scan_with_vts_and_param_missing_vt_param_id(self):
Expand All @@ -200,7 +200,7 @@ def test_scan_with_vts_and_param_missing_vt_param_id(self):
with self.assertRaises(OspdError):
cmd.handle_xml(request)

@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
def test_scan_with_vts_and_param(self, mock_create_process):
daemon = DummyWrapper([])
cmd = StartScan(daemon)
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_scan_with_vts_and_param(self, mock_create_process):
daemon.get_scan_vts(scan_id),
{'1234': {'ABC': '200'}, 'vt_groups': []},
)

daemon.check_pending_scans()
assert_called(mock_create_process)

def test_scan_with_vts_and_param_missing_vt_group_filter(self):
Expand All @@ -253,7 +253,7 @@ def test_scan_with_vts_and_param_missing_vt_group_filter(self):
with self.assertRaises(OspdError):
cmd.handle_xml(request)

@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
def test_scan_with_vts_and_param_with_vt_group_filter(
self, mock_create_process
):
Expand All @@ -280,9 +280,10 @@ def test_scan_with_vts_and_param_with_vt_group_filter(

self.assertEqual(daemon.get_scan_vts(scan_id), {'vt_groups': ['a']})

daemon.check_pending_scans()
assert_called(mock_create_process)

@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
@patch("ospd.command.command.logger")
def test_scan_ignore_multi_target(self, mock_logger, mock_create_process):
daemon = DummyWrapper([])
Expand All @@ -300,16 +301,18 @@ def test_scan_ignore_multi_target(self, mock_logger, mock_create_process):
)

cmd.handle_xml(request)

daemon.check_pending_scans()
assert_called(mock_logger.warning)
assert_called(mock_create_process)

@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
@patch("ospd.command.command.logger")
def test_scan_use_legacy_target_and_port(
self, mock_logger, mock_create_process
):
daemon = DummyWrapper([])
daemon.scan_collection.datamanager = FakeDataManager()

cmd = StartScan(daemon)
request = et.fromstring(
'<start_scan target="localhost" ports="22">'
Expand All @@ -325,20 +328,22 @@ def test_scan_use_legacy_target_and_port(
self.assertEqual(daemon.get_scan_host(scan_id), 'localhost')
self.assertEqual(daemon.get_scan_ports(scan_id), '22')

daemon.check_pending_scans()

assert_called(mock_logger.warning)
assert_called(mock_create_process)


class StopCommandTestCase(TestCase):
@patch("ospd.ospd.os")
@patch("ospd.command.command.create_process")
@patch("ospd.ospd.create_process")
def test_stop_scan(self, mock_create_process, mock_os):
mock_process = mock_create_process.return_value
mock_process.is_alive.return_value = True
mock_process.pid = "foo"

fs = FakeStream()
daemon = DummyWrapper([])
daemon.scan_collection.datamanager = FakeDataManager()
request = (
'<start_scan>'
'<targets>'
Expand All @@ -353,6 +358,8 @@ def test_stop_scan(self, mock_create_process, mock_os):
daemon.handle_command(request, fs)
response = fs.get_response()

daemon.check_pending_scans()

assert_called(mock_create_process)
assert_called(mock_process.start)

Expand Down
10 changes: 10 additions & 0 deletions tests/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import time
import multiprocessing

from unittest.mock import Mock

Expand Down Expand Up @@ -48,12 +49,21 @@ def get_response(self):
return et.fromstring(self.response)


class FakeDataManager:
def __init__(self):
pass

def dict(self):
return dict()


class DummyWrapper(OSPDaemon):
def __init__(self, results, checkresult=True):
super().__init__()
self.checkresult = checkresult
self.results = results
self.initialized = True
self.scan_collection.data_manager = FakeDataManager()

def check(self):
return self.checkresult
Expand Down
Loading

0 comments on commit c58c6bb

Please sign in to comment.