-
Notifications
You must be signed in to change notification settings - Fork 9
/
virustotal2.py
484 lines (401 loc) · 18.1 KB
/
virustotal2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
#!/usr/bin/env python
import base64
import threading
from itertools import izip_longest
import os
import urlparse
import re
import json
import time
import hashlib
import requests
class VirusTotal2(object):
_SCAN_ID_RE = re.compile(r"^[a-fA-F0-9]{64}-[0-9]{10}$")
def __init__(self, api_key, limit_per_min=None):
self.api_key = api_key
self._urls_per_retrieve = 4
self._hashes_per_retrieve = 4
self._ips_per_retrieve = 1
self._domains_per_retrieve = 1
self._urls_per_scan = 4
self._hashes_per_scan = 25
self._files_per_scan = 1
self.limits = []
self.limit_lock = threading.Lock()
if limit_per_min:
self.limit_per_min = limit_per_min
else:
self.limit_per_min = 4
#we only scan files and URLs
def scan(self, thing, thing_type=None, raw=False, rescan=False):
"""
Submit a file to or URL to VirusTotal for scanning.
Returns a VirusTotal2Report object
Keyword arguments:
thing - a file name on the local system or a URL or list of URLs
thing_type - Optional, a hint to the function as to what you are sending it
raw - Optional, if True return the raw JSON output from VT
Raises a TypeError if it gets something other than a file or URL/list of URLs
Raises an TypeError if VirusTotal returns something we can't parse.
"""
#identify the thing
thing_id = self._whatisthing(thing)
if thing_type is None:
thing_type = thing_id
data = {"apikey": self.api_key}
#set up and execute the query based on what the thing is
#we can only take URLs and Files for scan()
if thing_type == "url":
endpoint = "https://www.virustotal.com/vtapi/v2/url/scan"
if isinstance(thing, list):
data["url"] = "\n".join(thing)
else:
data["url"] = thing
self._limit_call_handler()
result = requests.post(endpoint, data=data).text
elif thing_type == "file_name" or thing_type == "base64":
with open(thing, 'rb') as f:
if thing_type == "base64":
content = base64.b64decode(f.read())
else:
content = f.read()
if rescan:
endpoint = "https://www.virustotal.com/vtapi/v2/file/rescan"
data["resource"] = hashlib.sha256(content).hexdigest()
self._limit_call_handler()
result = requests.post(endpoint, data=data).text
else:
endpoint = "https://www.virustotal.com/vtapi/v2/file/scan"
self._limit_call_handler()
result = requests.post(endpoint, data=data, files={"file": (os.path.basename(thing), content)}).text
elif thing_type == "hash":
if rescan:
endpoint = "https://www.virustotal.com/vtapi/v2/file/rescan"
if isinstance(thing, list):
data["resource"] = ", ".join(thing)
else:
data["resource"] = thing
self._limit_call_handler()
result = requests.post(endpoint, data=data).text()
else:
raise TypeError("Hahses can only be re-scanned, please set rescan=True")
else:
raise TypeError("Unable to scan type '"+thing_type+".")
#should we just return raw JSON?
if raw:
return result
return self._generate_report(result, thing_id, thing)
def retrieve(self, thing, thing_type=None, raw=False):
"""
Retrieve a report from VirusTotal based on a hash, IP, domain, file or URL. NOTE: URLs must include the scheme
(e.g. http://)
Returns a VirusTotal2Report object
Keyword arguments:
thing - a file name on the local system, a URL or list of URLs,
an IP or list of IPs, a domain or list of domains, a hash or list of hashes
thing_type - Optional, a hint to the function as to what you are sending it
raw - Optional, if True return the raw JSON output from VT
Raises a TypeError if it gets something other than a filename, URL, IP domain or hash
Raises an TypeError if VirusTotal returns something we can't parse.
"""
#trust the user-supplied type over the automatic identification
thing_id = self._whatisthing(thing)
if thing_type is None:
thing_type = thing_id
data = {"apikey": self.api_key}
if thing_type == "url":
endpoint = "http://www.virustotal.com/vtapi/v2/url/report"
if isinstance(thing, list):
#break the list we get into groups of 4, per the VT API limits
list_of_lists = self._grouped(thing, self._urls_per_retrieve)
list_of_results = []
#this is going to get a little messy
#we're gong to re-group the list of URLs we get into groups of 4, send that query to VT
#and tack the results onto list_of_results. The results we get back are actually going to be one
#JSON list per query. We're then going to convert the JSON into lists and dicts, mash it all together
#into a single list of dicts, convert it back to JSON and send it on it's way.
#
#API limits? What API limits?
#
for group in list_of_lists:
data["resource"] = "\n".join([url for url in group if url is not None])
self._limit_call_handler()
try:
ret = json.loads(requests.post(endpoint, data=data).text)
except:
raise TypeError
if not isinstance(ret, list):
#if we get a list of URLs that is N+1 (e.g. 5, 9, 13) the last query will not return a list
ret = [ret]
for item in ret:
list_of_results.append(item)
result = json.dumps(list_of_results)
else:
data["resource"] = thing
self._limit_call_handler()
result = requests.post(endpoint, data=data).text
elif thing_type == "ip":
endpoint = "http://www.virustotal.com/vtapi/v2/ip-address/report"
if not isinstance(thing, list):
thing = [thing]
#
#Much like the URL query above, we turn a list of N IPs into N separate queries and aggregate the results
#but in this case the API only supports one IP at a time, so we're spared the tedious de-aggregating
#and re-aggregating of list objects.
#
list_of_results = []
for ip in thing:
data["ip"] = ip
self._limit_call_handler()
try:
ret = json.loads(requests.get(endpoint, params=data).text)
except:
raise TypeError
list_of_results.append(ret)
#if we're only dealing with a single IP, don't return a list of a single element, just
#return the element. This seems to make more intuitive sense than the alternative...
#although what probably makes the most sense is to match how we're called.
if len(list_of_results) == 1:
list_of_results = list_of_results[0]
result = json.dumps(list_of_results)
elif thing_type == "file_name" or thing_type == "base64":
endpoint = "http://www.virustotal.com/vtapi/v2/file/report"
hashes = []
if not isinstance(thing, list):
thing = [thing]
for f in thing:
fh = open(f, 'rb')
if thing_type == "base64":
content = base64.b64decode(fh.read())
else:
content = fh.read()
hashval = hashlib.sha256(content).hexdigest()
hashes.append(hashval)
data["resource"] = ", ".join(hashes)
self._limit_call_handler()
result = requests.post(endpoint, data=data).text
elif thing_type == 'domain':
endpoint = "http://www.virustotal.com/vtapi/v2/domain/report"
#domains don't support bulk queries
if isinstance(thing, list):
raise TypeError
data["domain"] = thing
self._limit_call_handler()
result = requests.get(endpoint, params=data).text
elif thing_type == 'hash':
endpoint = "http://www.virustotal.com/vtapi/v2/file/report"
if isinstance(thing, list):
data["resource"] = ", ".join(thing)
else:
data["resource"] = thing
self._limit_call_handler()
result = requests.post(endpoint, data=data).text
elif thing_type == "scanid":
#The virustotal API doesn't have a single endpoint for scanIDs. You need to submit URL scanIDs
#to the URL endpoint, file scanIDs to the file endpoint, etc. Therefore, we can do nothing with a
#scanID or array of scanIDs unless you specify the thing_type yourself.
raise TypeError("Can't infer the proper endpoint when given scanIDs without a thing_type that is not scanID")
else:
raise TypeError("Unable to scan type '"+thing_type+".")
#should we just return raw JSON?
if raw:
return result
return self._generate_report(result, thing_id, thing)
def _generate_report(self, result, thing_id, thing):
"""
Generate a VirusTotal2Report object based on the passed JSON
Returns a VirusTotal2Report object
Keyword arguments:
result - a JSON string to parse into a report.
thing - the item we're reporting on
thing_id - what kind of item thing is
Raises an TypeError if report is something we can't parse.
"""
report = []
if isinstance(result, basestring):
try:
obj = json.loads(result)
if isinstance(obj, dict):
#one result
report.append(VirusTotal2Report(obj, self, thing_id, thing))
else:
#obj is a list
for (i, rep) in enumerate(obj):
report.append(VirusTotal2Report(rep, self, thing_id, thing[i]))
except:
raise TypeError("VT String is unparsable: "+str(result))
else:
raise TypeError("VT String (which is not a string?) is unparsable: "+str(result))
return report if len(report) > 1 else report[0]
def _limit_call_handler(self):
"""
Ensure we don't exceed the N requests a minute limit by leveraging a thread lock
Keyword arguments:
None
"""
#acquire a lock on our threading.Lock() object
with self.limit_lock:
#if we have no configured limit, exit. the lock releases based on scope
if self.limit_per_min <= 0:
return
now = time.time()
#self.limits is a list of query times + 60 seconds. In essence it is a list of times
#that queries time out of the 60 second query window.
#this check expires any limits that have passed
self.limits = [l for l in self.limits if l > now]
#and we tack on the current query
self.limits.append(now + 60)
#if we have more than our limit of queries (and remember, we call this before we actually
#execute a query) we sleep until the oldest query on the list (element 0 because we append
#new queries) times out. We don't worry about cleanup because next time this routine runs
#it will clean itself up.
if len(self.limits) >= self.limit_per_min:
time.sleep(self.limits[0] - now)
def _grouped(self, iterable, n):
"""
take a list of items and return a list of groups of size n. Fill any missing values at the end with None
Keyword arguments:
n - the size of the groups to return
"""
return izip_longest(*[iter(iterable)] * n, fillvalue=None)
# noinspection PyTypeChecker
def _whatisthing(self, thing):
"""
Bucket the thing it gets passed into the list of items VT supports
Returns a sting representation of the type of parameter passed in
Keyword arguments:
thing - a parameter to identify
"""
if isinstance(thing, list):
thing = thing[0]
#per the API, bulk requests must be of the same type
#ignore that you can intersperse scan IDs and hashes for now
#...although, does that actually matter given the API semantics?
#we use basestring as the type to maintain unicode compliance in a python 2.x world
#https://stackoverflow.com/questions/1979004/what-is-the-difference-between-isinstanceaaa-basestring-and-isinstanceaaa
if isinstance(thing,basestring) and os.path.isfile(thing):
#thing==filename
# TODO: Add check for
if thing.endswith(".base64"):
return "base64"
else:
return "file_name"
#implied failure case, thing is neither a list or a file, so we assume string
if not isinstance(thing, basestring):
return "unknown"
# Test if thing parameter is a hash (32, 40, or 64 characters long)
if all(i in "1234567890abcdef" for i in str(thing).lower()) and len(thing) in [32, 40, 64]:
return "hash"
# Test if thing parameter is an IP address
elif all(i in "1234567890." for i in thing) and len(thing) <= 15:
return "ip"
# Test if thing parameter is a domain name
# TODO: Make this check stronger (technically "com" is a domain)
elif "." in thing and "/" not in thing:
return "domain"
# Test if thing parameter is a VirusTotal scan id
elif self._SCAN_ID_RE.match(thing):
return "scanid"
# Test if thing parameter is a URL
elif urlparse.urlparse(thing).scheme:
return "url"
# If nothing is identified, return "Unknown"
else:
return "unknown"
class VirusTotal2Report(object):
def __init__(self, obj, parent, thing_id, query):
super(VirusTotal2Report, self).__init__()
self.scan = parent
self._json = obj
self.type = thing_id
self.query = query
#initial API calls return response_code = 1
#we expect -2, as the scan is queued, so we update to get what we think we should have,
#
#TODO: we should only do this if we are scanning an object, not if we are getting a report
self.update()
def __repr__(self):
return "<VirusTotal2 report %s (%s)>" % (
self.id,
self.status,
)
def __iter__(self):
if self.type == "ip":
for resolution in self.resolutions.iteritems():
yield resolution
elif self.type == "domain":
for resolution in self.resolutions.iteritems():
yield resolution
elif self.type == "url":
for scanner, report in self.scans.iteritems():
yield (scanner, report["result"])
else:
for antivirus, report in self.scans.iteritems():
yield (
(antivirus, report["version"], report["update"]),
report["result"],
)
def __getattr__(self, attr):
item = {
"id": "resource",
"status": "verbose_msg",
}.get(attr, attr)
try:
return self._json[item]
except KeyError:
raise AttributeError(attr)
def update(self):
"""
Re-query the Virustotal API for new results on the current object. If the current object is listed as
not in VirusTotal (can be the case with IPs or domains), this function does nothing.
Keyword arguments:
none
Raises:
TypeError if we don't get JSON back from VT
"""
if self.response_code == 0:
#it wasn't there the first time. why try again?
# or we already have complete results. why update without a rescan?
return
if self.type in ("ip", "domain"):
data = self.scan.retrieve(self.query, raw=True)
elif self.type == "file_name" or self.type == "base64":
data = self.scan.retrieve(self.scan_id, thing_type="hash", raw=True)
else:
data = self.scan.retrieve(self.scan_id, thing_type=self.type, raw=True)
try:
self._json = json.loads(data)
except:
raise TypeError
def rescan(self):
#only applies to files and URLs
"""
Requests a rescan of the current file. This API only works for reports that have been generated from files or
hashes.
Keyword arguments:
none
Raises:
TypeError if we don't get JSON back from VT
"""
if self.type in ("file_name", "hash"):
data = self.scan.retrieve(self.scan_id, thing_type="hash", raw=True, rescan=True)
else:
raise TypeError("cannot rescan type "+self.type)
try:
self._json = json.loads(data)
except:
raise TypeError
def wait(self):
"""
Wait until the Virustotal API is done scanning the current object. If the current object is listed as not in
VirusTotal (can be the case with IPs or domains), or we already have results this function returns immediately.
Keyword arguments:
none
Raises:
TypeError if we don't get JSON back from VT (it would pass through from the update() function)
"""
interval = 60
self.update()
while self.response_code not in (1, 0):
time.sleep(interval)
self.update()