forked from rapid7/recog
-
Notifications
You must be signed in to change notification settings - Fork 2
/
update_cpes.py
executable file
·237 lines (191 loc) · 9.44 KB
/
update_cpes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python
import logging
import re
import sys
import yaml
from lxml import etree
def parse_r7_remapping(file):
with open(file) as remap_file:
return yaml.safe_load(remap_file)["mappings"]
def parse_cpe_vp_map(file):
vp_map = {} # cpe_type -> vendor -> products
parser = etree.XMLParser(remove_comments=False)
doc = etree.parse(file, parser)
namespaces = {'ns': 'http://cpe.mitre.org/dictionary/2.0', 'meta': 'http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2'}
for cpe_name in doc.xpath("//ns:cpe-list/ns:cpe-item/@name", namespaces=namespaces):
cpe_match = re.match('^cpe:/([aho]):([^:]+):([^:]+)', cpe_name)
if cpe_match:
cpe_type, vendor, product = cpe_match.group(1, 2, 3)
if cpe_type not in vp_map:
vp_map[cpe_type] = {}
if vendor not in vp_map[cpe_type]:
vp_map[cpe_type][vendor] = set()
product = product.replace('%2f', '/')
vp_map[cpe_type][vendor].add(product)
else:
logging.error("Unexpected CPE %s", cpe_name)
return vp_map
def main():
if len(sys.argv) != 4:
logging.critical("Expecting exactly 3 arguments; recog XML file, CPE 2.3 XML dictionary, JSON remapping, got %s", (len(sys.argv) - 1))
sys.exit(1)
cpe_vp_map = parse_cpe_vp_map(sys.argv[2])
if not cpe_vp_map:
logging.critical("No CPE vendor => product mappings read from CPE 2.3 XML dictionary %s", sys.argv[2])
sys.exit(1)
r7_vp_map = parse_r7_remapping(sys.argv[3])
if not r7_vp_map:
logging.warning("No Rapid7 vendor/product => CPE mapping read from %s", sys.argv[3])
update_cpes(sys.argv[1], cpe_vp_map, r7_vp_map)
def lookup_cpe(vendor, product, cpe_type, cpe_table, remap):
"""Identify the correct vendor and product values for a CPE
This function attempts to determine the correct CPE using vendor and product
values supplied by the caller as well as a remapping dictionary for mapping
these values to more correct values used by NIST.
For example, the remapping might tell us that a value of 'alpine' for the
vendor string should be 'aplinelinux' instead, or for product 'solaris'
should be 'sunos'.
This function should only emit values seen in the official NIST CPE list
which is provided to it in cpe_table.
Lookup priority:
1. Original vendor / product
2. Original vendor / remap product
3. Remap vendor / original product
4. Remap vendor / remap product
Args:
vendor (str): vendor name
product (str): product name
cpe_type (str): CPE type - o, a, h, etc.
cpe_table (dict): dict containing the official NIST CPE data
remap (dict): dict containing the remapping values
Returns:
success, vendor, product
"""
if (
vendor in cpe_table[cpe_type]
and product in cpe_table[cpe_type][vendor]
):
# Hot path, success with original values
return True, vendor, product
# Everything else depends on a remap of some sort.
# get the remappings for this one vendor string.
vendor_remap = remap.get(vendor, None)
if vendor_remap:
# If we have product remappings, work that angle next
possible_product = None
if (
vendor_remap.get('products', None)
and product in vendor_remap['products']
):
possible_product = vendor_remap['products'][product]
if (vendor in cpe_table[cpe_type]
and possible_product
and possible_product in cpe_table[cpe_type][vendor]):
# Found original vendor, remap product
return True, vendor, possible_product
# Start working the process to find a match with a remapped vendor name
if vendor_remap.get('vendor', None):
new_vendor = vendor_remap['vendor']
if new_vendor in cpe_table[cpe_type]:
if product in cpe_table[cpe_type][new_vendor]:
# Found remap vendor, original product
return True, new_vendor, product
if possible_product and possible_product in cpe_table[cpe_type][new_vendor]:
# Found remap vendor, remap product
return True, new_vendor, possible_product
logging.error("Product %s from vendor %s invalid for CPE %s and no mapping",
product, vendor, cpe_type)
return False, None, None
def update_cpes(xml_file, cpe_vp_map, r7_vp_map):
parser = etree.XMLParser(remove_comments=False, remove_blank_text=True)
doc = etree.parse(xml_file, parser)
for fingerprint in doc.xpath('//fingerprint'):
# collect all the params, grouping by os and service params that could be used to compute a CPE
params = {}
for param in fingerprint.xpath('./param'):
name = param.attrib['name']
# remove any existing CPE params
if re.match(r'^.*\.cpe\d{0,2}$', name):
param.getparent().remove(param)
continue
match = re.search(r'^(?P<fp_type>hw|os|service(?:\.component)?)\.', name)
if match:
fp_type = match.group('fp_type')
if not fp_type in params:
params[fp_type] = {}
if name in params[fp_type]:
raise ValueError('Duplicated fingerprint named {} in fingerprint {} in file {}'.format(name, fingerprint.attrib['pattern'], xml_file))
params[fp_type][name] = param
# for each of the applicable os/service param groups, build a CPE
for fp_type in params:
if fp_type == 'os':
cpe_type = 'o'
elif fp_type.startswith('service'):
cpe_type = 'a'
elif fp_type == 'hw':
cpe_type = 'h'
else:
raise ValueError('Unhandled param type {}'.format(fp_type))
# extract the vendor/product/version values from each os/service group,
# using the static value ('Apache', for example) when pos is 0, and
# otherwise use a value that contains interpolation markers such that
# products/projects that use recog content can insert the value
# extracted from the banner/other data via regex capturing groups
fp_data = {
'vendor': None,
'product': None,
'version': '-',
}
for fp_datum in fp_data:
fp_datum_param_name = "{}.{}".format(fp_type, fp_datum)
if fp_datum_param_name in params[fp_type]:
fp_datum_e = params[fp_type][fp_datum_param_name]
if fp_datum_e.attrib['pos'] == '0':
fp_data[fp_datum] = fp_datum_e.attrib['value']
else:
fp_data[fp_datum] = "{{{}}}".format(fp_datum_e.attrib['name'])
vendor = fp_data['vendor']
product = fp_data['product']
version = fp_data['version']
# build a reasonable looking CPE value from the vendor/product/version,
# lowercasing, replacing whitespace with _, and more
if vendor and product:
if not cpe_type in cpe_vp_map:
logging.error("Didn't find CPE type '%s' for '%s' '%s'", cpe_type, vendor, product)
continue
vendor = vendor.lower().replace(' ', '_').replace(',', '')
product = product.lower().replace(' ', '_').replace(',', '').replace('!', '%21')
if 'unknown' in [vendor, product]:
continue
if (vendor.startswith('{') and vendor.endswith('}')) or (product.startswith('{') and product.endswith('}')):
continue
success, vendor, product = lookup_cpe(vendor, product, cpe_type, cpe_vp_map, r7_vp_map)
if not success:
continue
# Sanity check the value to ensure that no invalid values will
# slip in due to logic or mapping bugs.
# If it's not in the official NIST list then log it and kick it out
if product not in cpe_vp_map[cpe_type][vendor]:
logging.error("Invalid CPE type %s created for vendor %s and product %s. This may be due to an invalid mapping.", cpe_type, vendor, product)
continue
# building the CPE string
# Last minute escaping of '/' and `!`
product = product.replace('/', '\/').replace('%21', '\!')
cpe_value = 'cpe:/{}:{}:{}'.format(cpe_type, vendor, product)
if version:
cpe_value += ":{}".format(version)
cpe_param = etree.Element('param')
cpe_param.attrib['pos'] = '0'
cpe_param.attrib['name'] = '{}.cpe23'.format(fp_type)
cpe_param.attrib['value'] = cpe_value
for param_name in params[fp_type]:
param = params[fp_type][param_name]
parent = param.getparent()
index = parent.index(param) + 1
parent.insert(index, cpe_param)
root = doc.getroot()
with open(xml_file, 'wb') as xml_out:
xml_out.write(etree.tostring(root, pretty_print=True, xml_declaration=True, encoding=doc.docinfo.encoding))
if __name__ == '__main__':
try: sys.exit(main())
except KeyboardInterrupt: pass