-
Notifications
You must be signed in to change notification settings - Fork 4
/
surimisp
executable file
·383 lines (333 loc) · 13.4 KB
/
surimisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/python3
"""
Copyright(C) 2014-2018, Stamus Networks
Written by Eric Leblond <[email protected]>
"""
import argparse
import logging
import time
import json
from pygtail import Pygtail
from subprocess import call
import requests
import os
import sys
import yaml
from elasticsearch import Elasticsearch
import redis
from datetime import datetime, timedelta
import signal
from threading import *
from queue import *
MISP_URLS = { 'hostname': 'attributes/text/download/hostname', 'domain': 'attributes/text/download/domain',
'url': 'attributes/text/download/url' }
q = Queue()
must_exit = False
have_daemon = True
try:
import daemon
except:
logging.warning("No daemon support available, install python-daemon if feature is needed")
have_daemon = False
conf_parser = argparse.ArgumentParser(add_help = False)
conf_parser.add_argument("-c", "--conf_file",
metavar="FILE")
args, remaining_argv = conf_parser.parse_known_args()
config = {}
if args.conf_file:
with open(args.conf_file, 'r') as conffile:
config = yaml.load(conffile)
def get_from_conf(config, key, def_value):
if key in config:
return config[key]
else:
return def_value
PROXY_PARAMS = get_from_conf(config, 'proxy_params', None)
WHITELIST = get_from_conf(config, 'whitelist', [])
parser = argparse.ArgumentParser(description='Suricata MISP IOC script')
parser.add_argument("-c", "--conf_file", help='Set config file to use')
parser.add_argument('-f', '--files', default=get_from_conf(config, 'files', '/var/log/suricata/eve.json'), help='JSON files to monitor', nargs='+')
parser.add_argument('-a', '--alerts', default=get_from_conf(config, 'alerts', '/var/log/suricata/ioc.json'), help='JSON file to store events to')
parser.add_argument('-v', '--verbose', default=get_from_conf(config, 'verbose', False), action="count", help="Show verbose output, use multiple times increase verbosity")
parser.add_argument('-l', '--log', default=get_from_conf(config, 'log', None), help='File to log output to (default to stdout)')
parser.add_argument('-b', '--batch', default=get_from_conf(config, 'batch', False), action="store_true", help="Read file and exit at end")
parser.add_argument('-w', '--workers', default=get_from_conf(config, 'workers', 1), type=int, help='Number of alert workers to start')
parser.add_argument('-u', '--url', default=get_from_conf(config, 'url', None), help='Set option to url where JSON file to monitor, if unset no refresh')
parser.add_argument('-e', '--elasticsearch', default=get_from_conf(config, 'elasticsearch', None), help='Set elasticsearch server and use it as input, if unset use file')
parser.add_argument('-r', '--redis', default=get_from_conf(config, 'redis', None), help='Set redis server and use it as input, if unset use file')
parser.add_argument('-i', '--interval', default=get_from_conf(config, 'interval', 3600), type=int, help='Interval between file update in second')
parser.add_argument('-d', '--basedir', default=get_from_conf(config, 'basedir', '/var/lib/surimisp/'), help='Directory where data will stay')
parser.add_argument('-k', '--apikey', default=get_from_conf(config, 'apikey', None), help='API key to use')
parser.add_argument('-S', '--strict', default=get_from_conf(config, 'strict', False), action="store_true", help='Be strict on TLS checks')
ALERT_SUBOBJECT = { "hostname": { "action": "allowed", "category": "Misc Attack", "gid": 1, "rev": 4, "severity": 3, "signature": "IOC alert on HTTP/TLS hostname", "signature_id": 1 },
"domain": { "action": "allowed", "category": "Misc Attack", "gid": 1, "rev": 4, "severity": 3, "signature": "IOC alert on DNS request name", "signature_id": 2 },
"url": { "action": "allowed", "category": "Misc Attack", "gid": 1, "rev": 4, "severity": 3, "signature": "IOC alert on HTTP url", "signature_id": 3 },
"ip": { "action": "allowed", "category": "Misc Attack", "gid": 1, "rev": 4, "severity": 3, "signature": "IOC alert on IP address", "signature_id": 4 } }
if have_daemon:
parser.add_argument('-D', '--daemon', default=False, action="store_true", help="Run as unix daemon")
args = parser.parse_args(remaining_argv)
if args.url and not args.apikey:
print("URL specified and no API key aborting.")
sys.exit(1)
if args.verbose >= 3:
loglevel=logging.DEBUG
elif args.verbose >= 2:
loglevel=logging.INFO
elif args.verbose >= 1:
loglevel=logging.WARNING
else:
loglevel=logging.ERROR
hostname_list = None
domain_list = None
url_list = None
count = { 'hostname': 0, 'url': 0, 'domain': 0 }
def setup_logging(args):
if args.log:
logging.basicConfig(filename=args.log,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
level=loglevel)
else:
logging.basicConfig(level=loglevel)
def fetch_data(baseurl, basedir):
session = requests.Session()
session.headers.update(
{'Authorization': args.apikey }
)
for url in MISP_URLS:
resp = session.get(baseurl + MISP_URLS[url], proxies = PROXY_PARAMS, verify = args.strict)
fpath = os.path.join(basedir, url)
fdata = open(fpath, 'w')
fdata.write(resp.content.decode('utf-8'))
fdata.close()
def load_data(dfile):
iocfile = open(dfile, 'r')
entries = []
for line in iocfile:
entry = line.rstrip('\n')
skip = False
for domain in WHITELIST:
if entry.endswith(domain):
skip = True
break
if not skip:
entries.append(entry)
return set(entries)
def load_all_data(basedir):
global hostname_list
global domain_list
global url_list
hostname_list = load_data(os.path.join(basedir, 'hostname'))
domain_list = load_data(os.path.join(basedir, 'domain'))
url_list = load_data(os.path.join(basedir, 'url'))
def check_http(event, queue = None):
global count
if queue == None:
queue = q
try:
if event['http']['hostname'] in hostname_list:
event['ioc'] = 'hostname'
event['alert'] = ALERT_SUBOBJECT['hostname']
queue.put(event)
count['hostname'] = count['hostname'] + 1
if event['http']['url'] in url_list:
event['ioc'] = 'url'
event['alert'] = ALERT_SUBOBJECT['url']
queue.put(event)
count['url'] = count['url'] + 1
except:
pass
def check_dns(event, queue = None):
if queue == None:
queue = q
try:
if event['dns']['rrname'] in domain_list:
event['ioc'] = 'domain'
event['alert'] = ALERT_SUBOBJECT['domain']
queue.put(event)
count['domain'] = count['domain'] + 1
except:
pass
def check_tls(event, queue = None):
if queue == None:
queue = q
try:
if event['tls']['sni'] in hostname_list:
event['ioc'] = 'hostname'
event['alert'] = ALERT_SUBOBJECT['hostname']
queue.put(event)
count['hostname'] = count['hostname'] + 1
except:
pass
def AlertSender(mode = 'file', alerts = None, queue = None):
global must_exit
while True:
event = queue.get()
# Switch event to alert
event['event_type'] = 'alert'
if mode == 'file':
alerts.write(json.dumps(event) + '\n')
alerts.flush()
queue.task_done()
if must_exit:
return
def FetchData(interval = 3600, url = None, basedir = None):
while 1:
time.sleep(float(interval))
logging.info("Updating IOC lists")
fetch_data(url, basedir)
load_all_data(basedir)
def parse_source_lines(source, queue = None):
for line in source:
try:
event = json.loads(line)
except json.decoder.JSONDecodeError:
continue
if 'event_type' in event:
if event['event_type'] == 'http':
check_http(event, queue = queue)
elif event['event_type'] == 'dns':
check_dns(event, queue = queue)
elif event['event_type'] == 'tls':
check_tls(event, queue = queue)
def TreatJsonFile(args = None, source = None, queue = None):
global must_exit
if args.batch:
for logfile in args.files:
source = open(logfile, 'r')
start_time = time.clock()
parse_source_lines(source, queue = queue)
end_time = time.clock()
logging.info("Matching on '%s' took %fs" % (logfile, end_time - start_time))
logging.info("Count: " + repr(count))
else:
while 1:
logfile = Pygtail(source)
parse_source_lines(logfile, queue = queue)
if must_exit:
return
time.sleep(0.1)
def treat_redis_publisher(args):
r = redis.StrictRedis(host=args.redis)
p = r.pubsub()
# FIXME hardcoded ....
p.psubscribe('logstash-http', 'logstash-dns')
while 1:
msg = p.get_message()
if msg:
if msg['type'] == 'pmessage':
event = json.loads(msg['data'])
if 'event_type' in event:
if event['event_type'] == 'http':
check_http(event)
elif event['event_type'] == 'dns':
check_dns(event)
else:
time.sleep(0.1)
def treat_redis(args):
r = redis.StrictRedis(host=args.redis)
while 1:
# FIXME hardcoded ....
msg = r.rpoplpush('logstash-events', 'logstash')
if msg:
event = json.loads(msg)
if 'event_type' in event:
if event['event_type'] == 'http':
check_http(event)
elif event['event_type'] == 'dns':
check_dns(event)
else:
time.sleep(0.1)
def treat_elasticsearch(args):
es = Elasticsearch([args.elasticsearch])
# FIXME real date
orig_timestamp_str = '2014-03-08T13:43:22.551756'
orig_timestamp = datetime.strptime(orig_timestamp_str,'%Y-%m-%dT%H:%M:%S.%f')
end_timestamp = orig_timestamp + timedelta(hours = 1)
query = '(event_type:http OR event_type:dns OR event_type:tls) AND timestamp:["%s" TO "%s"}' % (orig_timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f'), end_timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f'))
results = es.search(q = query, index='_all', ignore_unavailable = True, scroll = 30, size = 20, search_type = 'scan')
scroll_id = results['_scroll_id']
while 1:
for entry in results['hits']['hits']:
event = entry['_source']
if 'event_type' in event:
if event['event_type'] == 'http':
check_http(event)
elif event['event_type'] == 'dns':
check_dns(event)
print("Scrolling once more")
results = es.scroll(scroll_id = scroll_id, scroll = 10)
print((results['hits']['hits'][0]))
def sigterm_handler(_signo, _stack_frame):
global must_exit
logging.info("Exiting program due to signal")
must_exit = True
def main_task(args):
global must_exit
setup_logging(args)
if args.url:
fetch_data(args.url,args.basedir)
if args.interval:
t = Thread(target=FetchData, kwargs = {'interval': args.interval, 'url': args.url, 'basedir': args.basedir })
t.daemon = True
t.start()
load_all_data(args.basedir)
signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)
if 'instances' in config:
for instance in list(config['instances'].keys()):
inst_config = config['instances'][instance]
alerts = open(inst_config['alerts'], 'a+')
queue = Queue()
for i in range(args.workers):
t = Thread(target=AlertSender, kwargs = {'mode': 'file', 'alerts': alerts, 'queue': queue })
t.daemon = True
t.start()
for logfile in inst_config['files']:
logging.info("Add reader for file '%s' with output '%s'" % ( logfile, inst_config['alerts'] ))
t = Thread(target=TreatJsonFile, kwargs = {'source': logfile, 'args': args, 'queue': queue})
t.daemon = True
t.start()
while 1:
if must_exit:
break
try:
time.sleep(0.1)
except KeyboardInterrupt:
must_exit = True
return
return
alerts = open(args.alerts, 'a+')
for i in range(args.workers):
t = Thread(target=AlertSender, kwargs = {'mode': 'file', 'alerts': alerts, 'queue': q })
t.daemon = True
t.start()
if args.batch:
start_time = time.clock()
if args.batch:
end_time = time.clock()
logging.info("Building sets took %fs" % (end_time - start_time))
if args.redis:
treat_redis(args)
elif args.elasticsearch:
treat_elasticsearch(args)
else:
if args.batch:
TreatJsonFile(args = args)
else:
for logfile in args.files:
t = Thread(target=TreatJsonFile, kwargs = {'source': logfile, 'args': args})
t.daemon = True
t.start()
while 1:
if must_exit:
break
try:
time.sleep(0.1)
except KeyboardInterrupt:
must_exit = True
return
if have_daemon and args.daemon:
with daemon.DaemonContext():
main_task(args)
else:
main_task(args)