-
Notifications
You must be signed in to change notification settings - Fork 7
/
qradar.py
206 lines (187 loc) · 10 KB
/
qradar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from common_methods import *
import requests
import json
import syslog
def add(indicator,settings, reference_sets):
""" places the indicator in a reference set"""
reference_set_map=settings[reference_sets]
if 'type' in indicator.keys() and indicator['type']=='Address - ipv4-addr':
# adding an ip
add_to_reference_set(reference_set_map[indicator['type']], indicator['ip'], get_sources(indicator), settings)
return True
elif 'type' in indicator.keys() and indicator['type']=='A':
# adding the domain
add_to_reference_set(reference_set_map[indicator['type']], indicator['domain'], get_sources(indicator), settings)
return True
elif 'md5' in indicator.keys():
# adding the md5 hash
if indicator['md5']:
add_to_reference_set(reference_set_map['md5'], indicator['md5'], get_sources(indicator), settings)
return True
elif 'x_mailer' in indicator.keys():
# adding the email address - for now, assuming spearphish, therefore focusing on the <<from>> field
add_to_reference_set(reference_set_map['email'], indicator['from'], get_sources(indicator), settings)
return True
elif 'organization_id' in indicator.keys() and 'email_address' in indicator.keys():
# adding a target email
if indicator['email_address']:
add_to_reference_set(reference_set_map['email'], indicator['email_address'], get_sources(indicator), settings)
# adding the userid
if indicator['organization_id']:
add_to_reference_set(reference_set_map['userid'], indicator['organization_id'], get_sources(indicator), settings)
return True
else:
syslog.syslog('nyx->QRadar: I do not know how to handle the following observable: %s' % str(indicator))
return False
def add_to_reference_set(qset, value, source, settings):
""" Adding the indicator (value) to the qset Reference Set, while maintaining the source"""
headers = {'Version': '4.0', 'Accept': 'application/json','SEC':settings['SEC']}
parameters={'value':value, 'source':source}
resp=requests.post(settings['base_url']+'reference_data/sets/'+qset,headers=headers,params=parameters,verify=False)
# print parameters, resp.text
if resp.status_code==200 or resp.status_code==201:
syslog.syslog(syslog.LOG_INFO,'nyx->QRadar: successfully added to %s to reference set: %s' % (value,qset))
return True
else:
syslog.syslog(syslog.LOG_ERR,str(resp.status_code)+'nyx->QRadar: Unable to add %s to reference set: %s' % (value,qset))
return False
def add_ip(ip,settings,intel_list,tags):
""" adds an IP to the pre-established list. The tags might or might not be supported by the control"""
return add_to_reference_set(intel_list, ip, tags, settings)
def add_domain(domain,settings,intel_list,tags):
""" adds an domain to the pre-established list. The tags might or might not be supported by the control"""
return add_to_reference_set(intel_list, domain, tags, settings)
def remove_from_reference_set(qset,value,settings):
""" removes an indicator from the qset reference set """
headers = {'Version': '2.0', 'Accept': 'application/json','SEC':settings['SEC']}
resp=requests.delete(settings['base_url']+'reference_data/sets/'+qset+'/'+value,headers=headers,verify=False)
if resp.status_code==200 or resp.status_code==201:
syslog.syslog(syslog.LOG_INFO,'nyx->QRadar: deleted %s from reference set: %s' % (value,qset))
return True
else:
syslog.syslog(syslog.LOG_ERR,str(resp.status_code)+'nyx->QRadar: Unable to delete %s from reference set: %s' % (value,qset))
return False
def list_reference_set(qset,settings):
""" retrieves the elements of a reference set """
headers = {'Version': '4.0', 'Accept': 'application/json','SEC':settings['SEC']}
params={'limit':0}
# getting basic metadata
res=requests.get(settings['base_url']+'reference_data/sets/'+qset,headers=headers,params=params,verify=False)
if res.status_code == 200:
metadata=json.loads(res.text)
# trying to get the whole thing:
if metadata['number_of_elements'] > 0:
params['limit']=metadata["number_of_elements"]
resp=requests.get(settings['base_url']+'reference_data/sets/'+qset,headers=headers,params=params,verify=False)
if resp.status_code == 200:
result=json.loads(resp.text)['data']
return result
else:
syslog.syslog(syslog.LOG_ERR,str(resp.status_code)+'nyx->QRadar: Unable to read reference set: %s' % qset)
return []
else:
return []
else:
syslog.syslog(syslog.LOG_ERR,str(resp.status_code)+'nyx->QRadar: Unable to read reference set: %s' % qset)
return []
def list_ips(settings):
""" getting a list of all the ips in the qradar intel-related reference sets"""
qradar_index={}
for ip_cat in settings['map']['ip'].values():
for q_ip in list_reference_set(ip_cat, settings):
qradar_index[q_ip['value']]=ip_cat
return qradar_index
def list_domains(settings):
""" getting a list of domains from the qradar intel-related reference sets"""
qradar_index={}
for domain_cat in settings['map']['domain'].values():
for q_domain in list_reference_set(domain_cat, settings):
qradar_index[q_domain['value']]=domain_cat
return qradar_index
def qradar_sets_cleanup(obs_index, settings):
""" removes the outdated indicators from various sets """
reference_sets={}
# Checking IP addresses
#high confidence first:
qset=settings['high_reference_sets']['Address - ipv4-addr']
for ipaddr in list_reference_set(qset,settings):
if not ipaddr['value'] in obs_index['Address - ipv4-addr']['high']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,ipaddr['value'],settings)
# checking medium confidence IPs
qset=settings['medium_reference_sets']['Address - ipv4-addr']
for ipaddr in list_reference_set(qset,settings):
if not ipaddr['value'] in obs_index['Address - ipv4-addr']['medium']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,ipaddr['value'],settings)
# checking Domains / URLS
# high confidence domains
qset=settings['high_reference_sets']['A']
for domain in list_reference_set(qset,settings):
if not domain['value'] in obs_index['A']['high']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,domain['value'],settings)
# medium confidence domains
qset=settings['medium_reference_sets']['A']
for domain in list_reference_set(qset,settings):
if not domain['value'] in obs_index['A']['medium']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,domain['value'],settings)
# Checking Hashes
# high confidence hashes
qset=settings['high_reference_sets']['md5']
for hash in list_reference_set(qset,settings):
if not hash['value'] in obs_index['md5']['high']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,hash['value'],settings)
# medium confidence hashes
qset=settings['medium_reference_sets']['md5']
for hash in list_reference_set(qset,settings):
if not hash['value'] in obs_index['md5']['medium']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,hash['value'],settings)
# checking emails
# high confidence emails
qset=settings['high_reference_sets']['email']
for email in list_reference_set(qset,settings):
if not email['value'] in obs_index['email']['high']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,email['value'],settings)
# medium confidence emails
qset=settings['medium_reference_sets']['email']
for email in list_reference_set(qset,settings):
if not email['value'] in obs_index['email']['medium']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,email['value'],settings)
# checking userid
# high confidence user_ids
qset=settings['high_reference_sets']['userid']
for userid in list_reference_set(qset,settings):
if not userid['value'] in obs_index['userid']['high']:
# this is an orphan value, needs to be removed
remove_from_reference_set(qset,userid['value'],settings)
def validate_qradar(settings):
"""Adding indicators to QRadar reference sets. Keep in mind the categorization matrix,
and use the indicators in the apropriate buckets. For this example here, we are using the
following indicator buckets:
-> Intel.High.Hashes, Intel.Medium.Hashes for MD5s
-> Intel.High.IPs, Intel.Medium.IPs for IP addresses (ipv4)
-> Intel.High.Domains, Intel.Medium.Domains for FQDNs"""
headers = {'Version': '2.0', 'Accept': 'application/json','SEC':settings['SEC']}
resp=requests.get(settings['base_url']+'reference_data/sets',headers=headers, verify=False)
qradar_sets=json.loads(resp.text)
for vset in settings['sets_to_validate'].keys():
validated=False
for qset in qradar_sets:
if qset['name']==vset:
validated=True
if not validated:
# creating reference sets not already in QRadar
parameters={'name':vset,'element_type':settings['sets_to_validate'][vset]}
resp=requests.post(settings['base_url']+'reference_data/sets',headers=headers,params=parameters,verify=False)
if resp.status_code==201:
syslog.syslog(syslog.LOG_INFO,'nyx->QRadar: Created reference set: %s' % vset)
return True
else:
syslog.syslog(syslog.LOG_ERR,'nyx->QRadar: Unable to create additional reference set: %s' % vset)
exit(-1)