forked from yhy0/github-cve-monitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
github_cve_monitor.py
778 lines (702 loc) · 36.1 KB
/
github_cve_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
#!/usr/bin/python3
# -*- coding:utf-8 -*-
# @Author : yhy&ddm&w4ter
# 每3分钟检测一次github是否有新的cve漏洞提交记录
# 建议使用screen命令运行在自己的linux vps后台上,就可以愉快的接收各种cve了
# https://my.oschina.net/u/4581868/blog/4380482
# https://github.com/kiang70/Github-Monitor
import requests
import json
import base64
import hmac
import time
import re
import datetime
import hashlib
import yaml
import sqlite3
import dingtalkchatbot.chatbot as cb
from lxml import etree
from collections import OrderedDict
# cve 仓库拥有者计数器(也就是黑名单,不过每天会清空重新计数),每天最多推送一个人名下的三个 cve 仓库,防止某些 sb 玩意
counter = {}
# 读取配置文件
def load_config():
with open('config.yaml', 'r', encoding='utf-8') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
global github_headers, github_proxy, need_translate
github_headers = {
'Authorization': 'token {}'.format(config['all_config']['github_token'])
}
github_proxy = {}
if 'http' in config['all_config']['github_proxy']:
github_proxy = {'http': config['all_config']['github_proxy'], 'https': config['all_config']['github_proxy']}
need_translate = False
if int(config['all_config']['translate'][0]['enable']) == 1:
need_translate = True
if int(config['all_config']['dingding'][0]['enable']) == 1:
global dingding_enable, dingding_webhook, dingding_secretKey
dingding_enable = True
dingding_webhook = config['all_config']['dingding'][1]['webhook']
dingding_secretKey = config['all_config']['dingding'][2]['secretKey']
if int(config['all_config']['feishu'][0]['enable']) == 1:
global feishu_enable, feishu_webhook, feishu_secretKey
feishu_enable = True
feishu_webhook = config['all_config']['feishu'][1]['webhook']
feishu_secretKey = config['all_config']['feishu'][2]['secretKey']
if int(config['all_config']['server'][0]['enable']) == 1:
global server_enable, server_sckey
server_enable = True
server_sckey = config['all_config']['server'][1]['sckey']
if int(config['all_config']['pushplus'][0]['enable']) == 1:
global pushplus_enable, pushplus_token
pushplus_enable = True
pushplus_token = config['all_config']['pushplus'][1]['token']
if int(config['all_config']['tgbot'][0]['enable']) == 1:
global tgbot_enable, tgbot_token, tgbot_group_id
tgbot_enable = True
tgbot_token = config['all_config']['tgbot'][1]['token']
tgbot_group_id = config['all_config']['tgbot'][2]['group_id']
if int(config['all_config']['tgbot'][0]['enable']) == 0 and int(config['all_config']['feishu'][0]['enable']) == 0 and int(config['all_config']['server'][0]['enable']) == 0 and int(config['all_config']['pushplus'][0]['enable']) == 0 and int(
config['all_config']['dingding'][0]['enable']) == 0:
print('[-] 配置文件有误, 五个社交软件的enable不能为0')
# 读取黑名单用户
def load_black_user():
with open('config.yaml', 'r', encoding='utf-8') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
global black_user
black_user = config['all_config']['black_user']
# 初始化创建数据库
def create_database():
conn = sqlite3.connect('data.db')
cur = conn.cursor()
try:
cur.execute('''CREATE TABLE IF NOT EXISTS cve_monitor
(cve_name varchar(255),
pushed_at varchar(255),
cve_url varchar(255));''')
print('成功创建CVE监控表')
cur.execute('''CREATE TABLE IF NOT EXISTS keyword_monitor
(keyword_name varchar(255),
pushed_at varchar(255),
keyword_url varchar(255));''')
print('成功创建关键字监控表')
cur.execute('''CREATE TABLE IF NOT EXISTS redteam_tools_monitor
(tools_name varchar(255),
pushed_at varchar(255),
tag_name varchar(255));''')
print('成功创建红队工具监控表')
cur.execute('''CREATE TABLE IF NOT EXISTS user_monitor
(repo_name varchar(255));''')
print('成功创建大佬仓库监控表')
except Exception as e:
print('创建监控表失败!报错:{}'.format(e))
conn.commit() # 数据库存储在硬盘上需要commit 存储在内存中的数据库不需要
conn.close()
if 'dingding_enable' in globals() and dingding_enable:
dingding('test', '连接成功', dingding_webhook, dingding_secretKey)
if 'feishu_enable' in globals() and feishu_enable:
feishu('test', '连接成功', feishu_webhook, feishu_secretKey)
if 'server_enable' in globals() and server_enable:
server('test', '连接成功', server_sckey)
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus('test', '连接成功', pushplus_token)
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot('test', '连接成功', tgbot_token, tgbot_group_id)
# 根据排序获取本年前20条CVE
def getCVENews():
today_cve_info_tmp = []
try:
# 抓取本年的
year = datetime.datetime.now().year
api = 'https://api.github.com/search/repositories?q=CVE-{}&sort=updated'.format(year)
json_str = requests.get(api, headers=github_headers, proxies=github_proxy, timeout=10).json()
today_date = datetime.date.today()
n = len(json_str['items'])
if n > 30:
n = 30
for i in range(0, n):
cve_url = json_str['items'][i]['html_url']
if cve_url.split('/')[-2] not in black_user:
try:
cve_name_tmp = json_str['items'][i]['name'].upper()
cve_name = re.findall('(CVE\-\d+\-\d+)', cve_name_tmp)[0].upper()
pushed_at_tmp = json_str['items'][i]['created_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
if pushed_at == str(today_date):
today_cve_info_tmp.append({'cve_name': cve_name, 'cve_url': cve_url, 'pushed_at': pushed_at})
else:
print('[-] 该 {} 的更新时间为 {}, 不属于今天的CVE'.format(cve_name, pushed_at))
except Exception as e:
pass
else:
pass
today_cve_info = OrderedDict()
for item in today_cve_info_tmp:
user_name = item['cve_url'].split('/')[-2]
if user_name in counter:
if counter[user_name] < 3:
counter[user_name] += 1
today_cve_info.setdefault(item['cve_name'], {**item, })
else:
counter[user_name] = 0
today_cve_info.setdefault(item['cve_name'], {**item, })
today_cve_info = list(today_cve_info.values())
return today_cve_info
except Exception as e:
print(e, 'github链接不通')
return '', '', ''
def getKeywordNews(keyword):
today_keyword_info_tmp = []
try:
# 抓取本年的
api = 'https://api.github.com/search/repositories?q={}&sort=updated'.format(keyword)
json_str = requests.get(api, headers=github_headers, proxies=github_proxy, timeout=10).json()
today_date = datetime.date.today()
n = len(json_str['items'])
if n > 20:
n = 20
for i in range(0, n):
keyword_url = json_str['items'][i]['html_url']
if keyword_url.split('/')[-2] not in black_user:
try:
keyword_name = json_str['items'][i]['name']
pushed_at_tmp = json_str['items'][i]['created_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
if pushed_at == str(today_date):
today_keyword_info_tmp.append({'keyword_name': keyword_name, 'keyword_url': keyword_url, 'pushed_at': pushed_at})
print('[+] keyword: {}, {}'.format(keyword, keyword_name))
else:
print('[-] keyword: {}, 该 {} 的更新时间为 {}, 不属于今天'.format(keyword, keyword_name, pushed_at))
except Exception as e:
pass
else:
pass
today_keyword_info = OrderedDict()
for item in today_keyword_info_tmp:
user_name = item['keyword_url'].split('/')[-2]
if user_name in counter:
if counter[user_name] < 3:
counter[user_name] += 1
today_keyword_info.setdefault(item['keyword_name'], {**item, })
else:
counter[user_name] = 0
today_keyword_info.setdefault(item['keyword_name'], {**item, })
today_keyword_info = list(today_keyword_info.values())
return today_keyword_info
except Exception as e:
print(e, 'github链接不通')
return today_keyword_info_tmp
# 获取到的关键字仓库信息插入到数据库
def keyword_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print('keyword_insert_into_sqlite3 函数 打开数据库成功!')
print(data)
cur = conn.cursor()
for i in range(len(data)):
try:
keyword_name = data[i]['keyword_name']
cur.execute('INSERT INTO keyword_monitor (keyword_name,pushed_at,keyword_url) VALUES ("{}", "{}", "{}")'.format(keyword_name, data[i]['pushed_at'], data[i]['keyword_url']))
print('keyword_insert_into_sqlite3 函数: {} 插入数据成功!'.format(keyword_name))
except Exception as e:
print('keyword_insert_into_sqlite3 error {}'.format(e))
conn.commit()
conn.close()
# 查询数据库里是否存在该关键字仓库的方法
def query_keyword_info_database(keyword_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'SELECT keyword_name FROM keyword_monitor WHERE keyword_name = "{}";'.format(keyword_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
# 获取不存在数据库里的关键字信息
def get_today_keyword_info(today_keyword_info_data):
today_all_keyword_info = []
for i in range(len(today_keyword_info_data)):
try:
today_keyword_name = today_keyword_info_data[i]['keyword_name']
today_cve_name = re.findall('(CVE\-\d+\-\d+)', today_keyword_info_data[i]['keyword_name'].upper())
# 如果仓库名字带有 cve-xxx-xxx, 先查询看看 cve 监控中是否存在, 防止重复推送
if len(today_cve_name) > 0 and query_cve_info_database(today_cve_name.upper()) == 1:
pass
Verify = query_keyword_info_database(today_keyword_name)
if Verify == 0:
print('[+] 数据库里不存在 {}'.format(today_keyword_name))
today_all_keyword_info.append(today_keyword_info_data[i])
else:
print('[-] 数据库里存在 {}'.format(today_keyword_name))
except Exception as e:
pass
return today_all_keyword_info
# 获取到的CVE信息插入到数据库
def cve_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print('cve_insert_into_sqlite3 函数 打开数据库成功!')
cur = conn.cursor()
for i in range(len(data)):
try:
cve_name = re.findall('(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cur.execute('INSERT INTO cve_monitor (cve_name,pushed_at,cve_url) VALUES ("{}", "{}", "{}")'.format(cve_name, data[i]['pushed_at'], data[i]['cve_url']))
print('cve_insert_into_sqlite3 函数: {} 插入数据成功!'.format(cve_name))
except Exception as e:
print('cve_insert_into_sqlite3 error {}'.format(e))
conn.commit()
conn.close()
# 查询数据库里是否存在该CVE的方法
def query_cve_info_database(cve_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'SELECT cve_name FROM cve_monitor WHERE cve_name = "{}";'.format(cve_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
# 查询数据库里是否存在该tools工具名字的方法
def query_tools_info_database(tools_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'SELECT tools_name FROM redteam_tools_monitor WHERE tools_name = "{}";'.format(tools_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
# 获取不存在数据库里的CVE信息
def get_today_cve_info(today_cve_info_data):
today_all_cve_info = []
for i in range(len(today_cve_info_data)):
try:
today_cve_name = re.findall('(CVE\-\d+\-\d+)', today_cve_info_data[i]['cve_name'])[0].upper()
if exist_cve(today_cve_name) == 1:
Verify = query_cve_info_database(today_cve_name.upper())
if Verify == 0:
print('[+] 数据库里不存在 {}'.format(today_cve_name.upper()))
today_all_cve_info.append(today_cve_info_data[i])
else:
print('[-] 数据库里存在 {}'.format(today_cve_name.upper()))
except Exception as e:
pass
return today_all_cve_info
# 获取红队工具信息插入到数据库
def tools_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print('tools_insert_into_sqlite3 函数 打开数据库成功!')
cur = conn.cursor()
for i in range(len(data)):
Verify = query_tools_info_database(data[i]['tools_name'])
if Verify == 0:
print('[+] 红队工具表数据库里不存在{}'.format(data[i]['tools_name']))
cur.execute('INSERT INTO redteam_tools_monitor (tools_name,pushed_at,tag_name) VALUES ("{}", "{}","{}")'.format(data[i]['tools_name'], data[i]['pushed_at'], data[i]['tag_name']))
print('tools_insert_into_sqlite3 函数: {} 插入数据成功!'.format(format(data[i]['tools_name'])))
else:
print('[-] 红队工具表数据库里存在 {}'.format(data[i]['tools_name']))
conn.commit()
conn.close()
# 读取本地红队工具链接文件转换成list
def load_tools_list():
with open('tools_list.yaml', 'r', encoding='utf-8') as f:
list = yaml.load(f, Loader=yaml.FullLoader)
return list['tools_list'], list['keyword_list'], list['user_list']
# 获取红队工具的名称,更新时间,版本名称信息
def get_pushed_at_time(tools_list):
tools_info_list = []
for url in tools_list:
try:
tools_json = requests.get(url, headers=github_headers, proxies=github_proxy, timeout=10).json()
pushed_at_tmp = tools_json['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0] # 获取的是API上的时间
tools_name = tools_json['name']
api_url = tools_json['url']
try:
releases_json = requests.get(url + '/releases', headers=github_headers, proxies=github_proxy, timeout=10).json()
tag_name = releases_json[0]['tag_name']
except Exception as e:
tag_name = 'no releases'
tools_info_list.append({'tools_name': tools_name, 'pushed_at': pushed_at, 'api_url': api_url, 'tag_name': tag_name})
except Exception as e:
print('get_pushed_at_time ', e)
return tools_info_list
# 根据红队名名称查询数据库红队工具的更新时间以及版本名称并返回
def tools_query_sqlite3(tools_name):
result_list = []
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'SELECT pushed_at,tag_name FROM redteam_tools_monitor WHERE tools_name = "{}";'.format(tools_name)
cursor = cur.execute(sql_grammar)
for result in cursor:
result_list.append({'pushed_at': result[0], 'tag_name': result[1]})
conn.close()
print('[###########] tools_query_sqlite3 函数内 result_list 的值 为 - > {}'.format(result_list))
return result_list
# 获取更新了的红队工具在数据库里面的时间和版本
def get_tools_update_list(data):
tools_update_list = []
for dist in data:
print('dist 变量 ->{}'.format(dist))
query_result = tools_query_sqlite3(dist['tools_name'])
if len(query_result) > 0:
today_tools_pushed_at = query_result[0]['pushed_at']
if dist['pushed_at'] != today_tools_pushed_at:
print('今日获取时间: ', dist['pushed_at'], '获取数据库时间: ', today_tools_pushed_at, dist['tools_name'], 'update!!!!')
tools_update_list.append({'api_url': dist['api_url'], 'pushed_at': today_tools_pushed_at, 'tag_name': query_result[0]['tag_name']})
else:
print('今日获取时间: ', dist['pushed_at'], '获取数据库时间: ', today_tools_pushed_at, dist['tools_name'], ' no update')
return tools_update_list
# 监控用户是否新增仓库,不是 fork 的
def getUserRepos(user):
try:
api = 'https://api.github.com/users/{}/repos'.format(user)
json_str = requests.get(api, headers=github_headers, proxies=github_proxy, timeout=10).json()
today_date = datetime.date.today()
for i in range(0, len(json_str)):
created_at = re.findall('\d{4}-\d{2}-\d{2}', json_str[i]['created_at'])[0]
if json_str[i]['fork'] == False and created_at == str(today_date):
Verify = user_insert_into_sqlite3(json_str[i]['full_name'])
print(json_str[i]['full_name'], Verify)
if Verify == 0:
name = json_str[i]['name']
try:
description = json_str[i]['description']
except Exception as e:
description = '作者未写描述'
download_url = json_str[i]['html_url']
text = r'大佬' + r'** ' + user + r' ** ' + r'又分享了一款工具! '
body = '工具名称: ' + name + ' \r\n' + '工具地址: ' + download_url + ' \r\n' + '工具描述: ' + '' + description
if 'dingding_enable' in globals() and dingding_enable:
dingding(text, body, dingding_webhook, dingding_secretKey)
if 'feishu_enable' in globals() and feishu_enable:
feishu(text, body, feishu_webhook, feishu_secretKey)
if 'server_enable' in globals() and server_enable:
server(text, body, server_sckey)
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus(text, body, pushplus_token)
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot(text, body, tgbot_token, tgbot_group_id)
except Exception as e:
print(e, 'github链接不通')
# 获取用户或者组织信息插入到数据库
def user_insert_into_sqlite3(repo_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'SELECT repo_name FROM user_monitor WHERE repo_name = "{}";'.format(repo_name)
Verify = len(list(cur.execute(sql_grammar)))
if Verify == 0:
print('[+] 用户仓库表数据库里不存在 {}'.format(repo_name))
cur.execute('INSERT INTO user_monitor (repo_name) VALUES ("{}")'.format(repo_name))
print('user_insert_into_sqlite3 函数: {}插入数据成功!'.format(repo_name))
else:
print('[-] 用户仓库表数据库里存在 {}'.format(repo_name))
conn.commit()
conn.close()
return Verify
# 获取更新信息并发送到对应社交软件
def send_body(url, query_pushed_at, query_tag_name):
# 考虑到有的工具没有 releases, 则通过 commits 记录获取更新描述
# 判断是否有 releases 记录
json_str = requests.get(url + '/releases', headers=github_headers, proxies=github_proxy, timeout=10).json()
new_pushed_at = re.findall('\d{4}-\d{2}-\d{2}', requests.get(url, headers=github_headers, proxies=github_proxy, timeout=10).json()['pushed_at'])[0]
if len(json_str) != 0:
tag_name = json_str[0]['tag_name']
if query_pushed_at < new_pushed_at:
print('[*] 数据库里的 pushed_at -->', query_pushed_at, ';;;; api 的 pushed_at -->', new_pushed_at)
if tag_name != query_tag_name:
try:
update_log = json_str[0]['body']
except Exception as e:
update_log = '作者未写更新内容'
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具,版本更新啦!'
body = '工具名称:' + tools_name + '\r\n' + '工具地址:' + download_url + '\r\n' + '工具更新日志:' + '\r\n' + update_log
if 'dingding_enable' in globals() and dingding_enable:
dingding(text, body, dingding_webhook, dingding_secretKey)
if 'feishu_enable' in globals() and feishu_enable:
feishu(text, body, feishu_webhook, feishu_secretKey)
if 'server_enable' in globals() and server_enable:
server(text, body, server_sckey)
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus(text, body, pushplus_token)
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot(text, body, tgbot_token, tgbot_group_id)
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'UPDATE redteam_tools_monitor SET tag_name = "{}" WHERE tools_name="{}"'.format(tag_name, tools_name)
sql_grammar1 = 'UPDATE redteam_tools_monitor SET pushed_at = "{}" WHERE tools_name="{}"'.format(new_pushed_at, tools_name)
cur.execute(sql_grammar)
cur.execute(sql_grammar1)
conn.commit()
conn.close()
print('[+] tools_name -->', tools_name, 'pushed_at 已更新,现在 pushed_at 为 -->', new_pushed_at, 'tag_name 已更新,现在 tag_name为 -->', tag_name)
elif tag_name == query_tag_name:
commits_url = url + '/commits'
commits_url_response_json = requests.get(commits_url).text
commits_json = json.loads(commits_url_response_json)
tools_name = url.split('/')[-1]
download_url = commits_json[0]['html_url']
try:
update_log = commits_json[0]['commit']['message']
except Exception as e:
update_log = '作者未写更新内容,具体点击更新详情地址的URL进行查看'
text = r'** ' + tools_name + r' ** 工具小更新了一波!'
body = '工具名称:' + tools_name + '\r\n' + '更新详情地址:' + download_url + '\r\n' + 'commit更新日志:' + '\r\n' + update_log
if 'dingding_enable' in globals() and dingding_enable:
dingding(text, body, dingding_webhook, dingding_secretKey)
if 'feishu_enable' in globals() and feishu_enable:
feishu(text, body, feishu_webhook, feishu_secretKey)
if 'server_enable' in globals() and server_enable:
server(text, body, server_sckey)
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus(text, body, pushplus_token)
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot(text, body, tgbot_token, tgbot_group_id)
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'UPDATE redteam_tools_monitor SET pushed_at = "{}" WHERE tools_name="{}"'.format(new_pushed_at, tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print('[+] tools_name -->', tools_name, 'pushed_at 已更新,现在 pushed_at 为 -->', new_pushed_at)
else:
if query_pushed_at != new_pushed_at:
print('[*] 数据库里的 pushed_at -->', query_pushed_at, ';;;; api 的 pushed_at -->', new_pushed_at)
json_str = requests.get(url + '/commits', headers=github_headers, proxies=github_proxy, timeout=10).json()
update_log = json_str[0]['commit']['message']
download_url = json_str[0]['html_url']
tools_name = url.split('/')[-1]
text = r'** ' + tools_name + r' ** 工具更新啦!'
body = '工具名称:' + tools_name + '\r\n' + '工具地址:' + download_url + '\r\n' + 'commit更新日志:' + '\r\n' + update_log
if 'dingding_enable' in globals() and dingding_enable:
dingding(text, body, dingding_webhook, dingding_secretKey)
if 'feishu_enable' in globals() and feishu_enable:
feishu(text, body, feishu_webhook, feishu_secretKey)
if 'server_enable' in globals() and server_enable:
server(text, body, server_sckey)
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus(text, body, pushplus_token)
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot(text, body, tgbot_token, tgbot_group_id)
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = 'UPDATE redteam_tools_monitor SET pushed_at = "{}" WHERE tools_name="{}"'.format(new_pushed_at, tools_name)
cur.execute(sql_grammar)
conn.commit()
conn.close()
print('[+] tools_name -->', tools_name, 'pushed_at 已更新,现在 pushed_at 为 -->', new_pushed_at)
# 创建md5对象
def nmd5(str):
m = hashlib.md5()
b = str.encode(encoding='utf-8')
m.update(b)
str_md5 = m.hexdigest()
return str_md5
# 有道翻译
def translate(word):
headerstr = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36'
bv = nmd5(headerstr)
lts = str(round(time.time() * 1000))
salt = lts + '90'
# 如果翻译失败,{'errorCode': 50} 请查看 fanyi.min.js: https://shared.ydstatic.com/fanyi/newweb/v1.1.7/scripts/newweb/fanyi.min.js
# 搜索 fanyideskweb sign: n.md5('fanyideskweb' + e + i + 'Y2FYu%TNSbMCxc3t2u^XT') ,Y2FYu%TNSbMCxc3t2u^XT是否改变,替换即可
strexample = 'fanyideskweb' + word + salt + 'Y2FYu%TNSbMCxc3t2u^XT'
sign = nmd5(strexample)
data = {
'i': word,
'from': 'AUTO',
'to': 'AUTO',
'smartresult': 'dict',
'client': 'fanyideskweb',
'salt': salt,
'sign': sign,
'lts': lts,
'bv': bv,
'doctype': 'json',
'version': '2.1',
'keyfrom': 'fanyi.web',
'action': 'FY_BY_CLICKBUTTION',
}
url = 'http://fanyi.youdao.com/translate_o?smartresult=dict&smartresult=rule'
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36',
'Referer': 'http://fanyi.youdao.com/',
'Origin': 'http://fanyi.youdao.com',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Host': 'fanyi.youdao.com',
'cookie': '_ntes_nnid=937f1c788f1e087cf91d616319dc536a,1564395185984; OUTFOX_SEARCH_USER_ID_NCOO=; [email protected]; JSESSIONID=; ___rl__test__cookies=1'
}
res = requests.post(url=url, data=data, headers=header)
result_dict = res.json()
result = ''
for json_str in result_dict['translateResult'][0]:
tgt = json_str['tgt']
result += tgt
return result
# 钉钉
def dingding(text, msg, webhook, secretKey):
ding = cb.DingtalkChatbot(webhook, secret=secretKey)
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
# 飞书
def feishu(text, msg, webhook, secretKey):
def feishu_get_sign(key):
timestamp = round(time.time())
string_to_sign = '{}\n{}'.format(timestamp, key)
hmac_code = hmac.new(string_to_sign.encode(), digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode()
return timestamp, sign
def feishu_send_message(content):
url = webhook
key = secretKey
timestamp, sign = feishu_get_sign(key)
content = {
'timestamp': str(timestamp),
'sign': sign,
'msg_type': 'text',
'content': {'text': content}
}
requests.post(url, headers={'content-type': 'application/json'}, data=json.dumps(content))
feishu_send_message('{}\r\n{}'.format(text, msg))
# server酱 http://sc.ftqq.com/?c=code
def server(text, msg, sckey):
try:
uri = 'https://sc.ftqq.com/{}.send?text={}&desp={}'.format(sckey, text, msg)
requests.get(uri, timeout=10)
except Exception as e:
pass
# pushplus https://www.pushplus.plus/push1.html
def pushplus(text, msg, token):
try:
uri = 'https://www.pushplus.plus/send?token={}&title={}&content={}'.format(token, text, msg)
requests.get(uri, timeout=10)
except Exception as e:
pass
def tgbot(text, msg, token, group_id):
import telegram
try:
bot = telegram.Bot(token="{}".format(token)) # Your Telegram Bot Token
bot.send_message(chat_id=group_id, text='{}\r\n{}'.format(text, msg))
except Exception as e:
pass
# 判断是否存在该CVE
def exist_cve(cve):
try:
query_cve_url = 'https://cve.mitre.org/cgi-bin/cvename.cgi?name=' + cve
response = requests.get(query_cve_url, timeout=10)
html = etree.HTML(response.text)
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
return 1
except Exception as e:
return 0
# 根据CVE名字,获取描述,并翻译
def get_cve_des_zh(cve):
try:
query_cve_url = 'https://cve.mitre.org/cgi-bin/cvename.cgi?name=' + cve
response = requests.get(query_cve_url, timeout=10)
html = etree.HTML(response.text)
des = html.xpath('//*[@id="GeneratedTable"]/table//tr[4]/td/text()')[0].strip()
cve_time = html.xpath('//*[@id="GeneratedTable"]/table//tr[11]/td[1]/b/text()')[0].strip()
if need_translate:
return translate(des)
return des, cve_time
except Exception as e:
print('get_cve_des_zh 函数 error {}'.format(e))
# 发送CVE信息到社交工具
def sendCVENews(data):
try:
text = '有新的CVE送达! \r\n** 请自行分辨是否为红队钓鱼!!! **'
# 获取 cve 名字 ,根据cve 名字,获取描述,并翻译
for i in range(len(data)):
try:
cve_name = re.findall('(CVE\-\d+\-\d+)', data[i]['cve_name'])[0].upper()
cve_zh, cve_time = get_cve_des_zh(cve_name)
body = 'CVE编号: ' + cve_name + ' --- ' + cve_time + ' \r\n' + 'Github地址: ' + str(data[i]['cve_url']) + '\r\n' + 'CVE描述: ' + '\r\n' + cve_zh
if 'dingding_enable' in globals() and dingding_enable:
dingding(text, body, dingding_webhook, dingding_secretKey)
print('钉钉 发送 CVE 成功')
if 'feishu_enable' in globals() and feishu_enable:
feishu(text, body, feishu_webhook, feishu_secretKey)
print('飞书 发送 CVE 成功')
if 'server_enable' in globals() and server_enable:
server(text, body, server_sckey)
print('server酱 发送 CVE 成功')
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus(text, body, pushplus_token)
print('pushplus 发送 CVE 成功')
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot(text, body, tgbot_token, tgbot_group_id)
print('tgbot 发送 CVE 成功')
except IndexError:
pass
except Exception as e:
print('sendCVENews 函数 error {}'.format(e))
# 发送Keyword信息到社交工具
def sendKeywordNews(keyword, data):
try:
text = '有新的关键字监控 - {} - 送达! \r\n** 请自行分辨是否为红队钓鱼!!! **'.format(keyword)
# 获取 cve 名字 ,根据 cve 名字,获取描述,并翻译
for i in range(len(data)):
try:
keyword_name = data[i]['keyword_name']
body = '项目名称: ' + keyword_name + '\r\n' + 'Github地址: ' + str(data[i]['keyword_url']) + '\r\n'
if 'dingding_enable' in globals() and dingding_enable:
dingding(text, body, dingding_webhook, dingding_secretKey)
print('钉钉 发送 Keyword 成功')
if 'feishu_enable' in globals() and feishu_enable:
feishu(text, body, feishu_webhook, feishu_secretKey)
print('飞书 发送 Keyword 成功')
if 'server_enable' in globals() and server_enable:
server(text, body, server_sckey)
print('server酱 发送 Keyword 成功')
if 'pushplus_enable' in globals() and pushplus_enable:
pushplus(text, body, pushplus_token)
print('pushplus 发送 Keyword 成功')
if 'tgbot_enable' in globals() and tgbot_enable:
tgbot(text, body, tgbot_token, tgbot_group_id)
print('tgbot 发送 Keyword 成功')
except IndexError:
pass
except Exception as e:
print('sendKeywordNews 函数 error {}'.format(e))
# main函数
if __name__ == '__main__':
print('cve github 工具 和 大佬仓库 监控中 ...')
# 初始化部分
load_config()
load_black_user()
create_database()
tools_list, keyword_list, user_list = load_tools_list()
tools_data = get_pushed_at_time(tools_list)
tools_insert_into_sqlite3(tools_data)
while True:
# 判断是否达到设定时间
now = datetime.datetime.now()
# 到达设定时间,结束内循环
if now.hour == 23 and now.minute > 50:
counter = {} # 每天初始化黑名单
print('\r\n\t\t 用户仓库监控 \t\t\r\n')
for user in user_list:
getUserRepos(user)
print('\r\n\t\t CVE 监控 \t\t\r\n')
cve_data = getCVENews()
if len(cve_data) > 0:
today_cve_data = get_today_cve_info(cve_data)
sendCVENews(today_cve_data)
cve_insert_into_sqlite3(today_cve_data)
print('\r\n\t\t 关键字监控 \t\t\r\n')
# 关键字监控,最好不要太多关键字,防止 github 速率限制 https://docs.github.com/en/rest/overview/resources-in-the-rest-api#secondary-rate-limits=
for keyword in keyword_list:
time.sleep(1)
keyword_data = getKeywordNews(keyword)
if len(keyword_data) > 0:
today_keyword_data = get_today_keyword_info(keyword_data)
if len(today_keyword_data) > 0:
sendKeywordNews(keyword, today_keyword_data)
keyword_insert_into_sqlite3(today_keyword_data)
print('\r\n\t\t 红队工具监控 \t\t\r\n')
tools_list_new, keyword_list, user_list = load_tools_list()
data2 = get_pushed_at_time(tools_list_new) # 再次从文件中获取工具列表,并从 github 获取相关信息,
data3 = get_tools_update_list(data2) # 与 3 分钟前数据进行对比,如果在三分钟内有新增工具清单或者工具有更新则通知一下用户
for i in range(len(data3)):
try:
send_body(data3[i]['api_url'], data3[i]['pushed_at'], data3[i]['tag_name'])
except Exception as e:
print('main 函数 try 循环 error:{}'.format(e))
time.sleep(5 * 60)