Skip to content
This repository has been archived by the owner on Nov 29, 2021. It is now read-only.

Move <vts> in <start_scan>. #26

Merged
merged 3 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions doc/OSP.xml
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
<summary>Start a new scan with a VT selection</summary>
<request>
<start_scan target='localhost' ports='80, 443'>
<scanner_params>
<vts>1.2.3.4.5, 22.34.61.1, 3.67.4.2</vts>
</scanner_params>
<scanner_params />
<vts>1.2.3.4.5, 22.34.61.1, 3.67.4.2</vts>
</start_scan>
</request>
<response>
Expand Down
2 changes: 1 addition & 1 deletion doc/USAGE-ospd-scanner
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ $ gvm-cli socket --sockpath <prefix>/var/run/ospd-scanner.sock --xml="<start_sca

Start a scan for two vulnerability tests vt_id_1 and vt_id_2 of a ospd-based scanner:

$ gvm-cli socket --sockpath <prefix>/var/run/ospd-scanner.sock --xml="<start_scan target='www.example.com'><scanner_params><vts>vt_id_1, vt_id_2</vts></scanner_params></start_scan>"
$ gvm-cli socket --sockpath <prefix>/var/run/ospd-scanner.sock --xml="<start_scan target='www.example.com'><scanner_params></scanner_params><vts>vt_id_1, vt_id_2</vts></start_scan>"


Show the list of scans with status and results:
Expand Down
8 changes: 7 additions & 1 deletion ospd/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def ids_iterator(self):

return iter(self.scans_table.keys())

def create_scan(self, scan_id='', target='', ports='', options=dict()):
def create_scan(self, scan_id='', target='', ports='', options=dict(), vts=''):
""" Creates a new scan with provided scan information. """

if self.data_manager is None:
Expand All @@ -126,6 +126,7 @@ def create_scan(self, scan_id='', target='', ports='', options=dict()):
scan_info['progress'] = 0
scan_info['target'] = target
scan_info['ports'] = ports
scan_info['vts'] = vts
scan_info['options'] = options
scan_info['start_time'] = int(time.time())
scan_info['end_time'] = "0"
Expand Down Expand Up @@ -170,6 +171,11 @@ def get_ports(self, scan_id):

return self.scans_table[scan_id]['ports']

def get_vts(self, scan_id):
""" Get a scan's vts list. """

return self.scans_table[scan_id]['vts']

def id_exists(self, scan_id):
""" Check whether a scan exists in the table. """

Expand Down
28 changes: 18 additions & 10 deletions ospd/ospd.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@
'mandatory': 0,
'description': 'Whether to dry run scan.',
},
'vts': {
'type': 'string',
'name': 'Vulnerability Tests',
'default': '',
'mandatory': 0,
'description': 'Comma-separated list of vulnerability test IDs to be executed.',
},
}

COMMANDS_TABLE = {
Expand Down Expand Up @@ -415,6 +408,15 @@ def handle_start_scan_command(self, scan_et):

params = self._preprocess_scan_params(scanner_params)

# VTS is an optional element. If present should not be empty.
vts_str = ''
scanner_vts = scan_et.find('vts')
if scanner_vts is not None:
if scanner_vts.text is None:
raise OSPDError('VTs list is empty', 'start_scan')
else:
vts_str = scanner_vts.text

# Dry run case.
if 'dry_run' in params and int(params['dry_run']):
scan_func = self.dry_run_scan
Expand All @@ -423,7 +425,8 @@ def handle_start_scan_command(self, scan_et):
scan_func = self.start_scan
scan_params = self.process_scan_params(params)

scan_id = self.create_scan(scan_id, target_str, ports_str, scan_params)
scan_id = self.create_scan(scan_id, target_str,
ports_str, scan_params, vts_str)
scan_process = multiprocessing.Process(target=scan_func,
args=(scan_id, target_str))
self.scan_processes[scan_id] = scan_process
Expand Down Expand Up @@ -996,15 +999,16 @@ def run(self, address, port, unix_path):
sock.shutdown(socket.SHUT_RDWR)
sock.close()

def create_scan(self, scan_id, target, ports, options):
def create_scan(self, scan_id, target, ports, options, vts):
""" Creates a new scan.

@target: Target to scan.
@options: Miscellaneous scan options.

@return: New scan's ID.
"""
return self.scan_collection.create_scan(scan_id, target, ports, options)
return self.scan_collection.create_scan(scan_id, target,
ports, options, vts)

def get_scan_options(self, scan_id):
""" Gives a scan's list of options. """
Expand Down Expand Up @@ -1038,6 +1042,10 @@ def get_scan_ports(self, scan_id):
""" Gives a scan's ports list. """
return self.scan_collection.get_ports(scan_id)

def get_scan_vts(self, scan_id):
""" Gives a scan's vts list. """
return self.scan_collection.get_vts(scan_id)

def get_scan_start_time(self, scan_id):
""" Gives a scan's start time. """
return self.scan_collection.get_start_time(scan_id)
Expand Down
2 changes: 1 addition & 1 deletion tests/testSSHDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def testRunCommand(self):
daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca')
scanid = daemon.create_scan(None, 'host.example.com', '80, 443',
dict(port=5, ssh_timeout=15,
username_password='dummy:pw'))
username_password='dummy:pw'), '')
res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
self.assertTrue(isinstance(res, list))
self.assertEqual(commands, ['cat /etc/passwd'])
29 changes: 28 additions & 1 deletion tests/testScanAndResult.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import unittest
import xml.etree.ElementTree as ET

from ospd.ospd import OSPDaemon
from ospd.ospd import OSPDaemon, OSPDError

class Result(object):
def __init__(self, type_, **kwargs):
Expand Down Expand Up @@ -147,3 +147,30 @@ def testStopScan(self):
'<stop_scan scan_id="%s" />' % scan_id))
self.assertEqual(response.get('status'), '200')
print(ET.tostring(response))

def testScanWithVTs(self):
daemon = DummyWrapper([])
cmd = ET.fromstring('<start_scan ' +
'target="localhost" ports="80, 443">' +
'<scanner_params /><vts /></start_scan>')
print(ET.tostring(cmd))
self.assertRaises(OSPDError, daemon.handle_start_scan_command, cmd)

response = ET.fromstring(
daemon.handle_command('<start_scan ' +
'target="localhost" ports="80, 443">' +
'<scanner_params /><vts>1.2.3.4</vts></start_scan>'))
print(ET.tostring(response))
scan_id = response.findtext('id')
time.sleep(0.01)
self.assertEqual(daemon.get_scan_vts(scan_id), '1.2.3.4')
self.assertNotEqual(daemon.get_scan_vts(scan_id), '1.2.3.6')

response = ET.fromstring(
daemon.handle_command('<start_scan ' +
'target="localhost" ports="80, 443">' +
'<scanner_params /></start_scan>'))
print(ET.tostring(response))
scan_id = response.findtext('id')
time.sleep(0.01)
self.assertEqual(daemon.get_scan_vts(scan_id), '')