-
Notifications
You must be signed in to change notification settings - Fork 11
/
main.py
225 lines (196 loc) · 6.41 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# -*- coding: utf-8 -*-
# @Time : 2024/7/23 16:33
# @Author : ordar
# @Project : vulnerability-wiki
# @File : main.py
# @Python: 3.7.5
import subprocess
import os
from loguru import logger
from urllib.parse import urlsplit
import sqlite3
import sys
import shutil
# 打开数据库
conn = sqlite3.connect('wiki.db')
cursor = conn.cursor()
logger.info("数据库打开成功")
MONITORLIST = [
{"url": "https://github.com/wy876/POC", "newname": ""},
{"url": "https://github.com/ax1sX/SecurityList", "newname": ""},
]
def create_table(table_name):
"""
创建表
:param table_name:
:return:
"""
sql = f'''CREATE TABLE "main"."{table_name}" (
"dir" text,
"name" text
);'''
try:
cursor.execute(sql)
conn.commit()
logger.info(f"数据表 {table_name} 创建成功")
except Exception as e:
logger.error(e)
def if_exists_table(table_name):
"""
判断某个表是否存在
:param table_name:
:return:
"""
sql = """select * from sqlite_master"""
try:
cursor.execute(sql)
results = cursor.fetchall()
for result in results:
if result[0] == "table" and result[1] == table_name:
return True
except Exception as e:
logger.error(e)
return False
def get_data_by_name(table_name, file_name):
"""
通过文件名找数据
:param table_name:
:param file_name:
:return:
"""
sql = f"""select * from {table_name} where name='{file_name}'; """
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
except Exception as e:
logger.error(e)
return None
def get_all_data(table_name):
"""
获取某个表的所有数据
:param table_name:
:return:
"""
sql = f"""select * from {table_name};"""
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
except Exception as e:
logger.error(e)
return None
def if_exists_data(cursor_result,dir_name,file_name):
"""
判断某台数据是否在数据库
:param cursor_result:
:param dir_name:
:param file_name:
:return:
"""
for i in cursor_result:
if dir_name == i[0] and file_name == i[1]:
return True
return False
def insert_table(table_name, file_dir, file_name):
"""
插入表
:param table_name:
:return:
"""
sql = f'''INSERT INTO "main"."{table_name}"("dir", "name") VALUES ('{file_dir}', '{file_name}')'''
try:
cursor.execute(sql)
conn.commit()
logger.info(f"数据表 {table_name} 插入数据 {file_dir} {file_name} 成功")
except Exception as e:
logger.error(e)
def dingding_sendmsg(file_name, file_path, token=sys.argv[1], secret=sys.argv[2]):
try:
logger.debug(f"钉钉token:{token} secret:{secret}")
import dingtalkchatbot.chatbot as cb
webhook = f"https://oapi.dingtalk.com/robot/send?access_token={token}"
ding = cb.DingtalkChatbot(webhook, secret=secret)
text = "读取文件失败"
with open(file_path,"r",encoding="utf8") as fr:
text = fr.read()
ding.send_markdown(title=f"{file_name}", text=f"{text}", is_at_all=False)
except Exception as e:
logger.error(e)
ding.send_text(msg=f"读取文件 {file_path} 失败")
def exec_command(command):
"""
执行命令
:param command:
:return:
"""
try:
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, close_fds=True)
# command_results = p.stdout.readline().decode("gbk")
stdout, stderr = p.communicate() # 记录错误日志文件
stdout = stdout.decode("utf8")
stderr = stderr.decode("utf8")
returncode = p.returncode
if returncode == 0:
logger.info(f"执行命令成功 {command}")
else:
logger.error(f"执行命令失败 {command}")
logger.error(returncode)
logger.error(stdout)
logger.error(stderr)
return returncode, stdout, stderr
except Exception as e:
logger.error(f"执行命令失败 {command} {str(e)}")
def delete_dir(dir_path):
"""
删除文件夹
:return:
"""
try:
shutil.rmtree(dir_path)
logger.info(f"删除文件成功 {dir_path}")
except Exception as e:
logger.error(f"删除文件失败 {dir_path}")
logger.error(e)
# def thank_author(repo_name, repo_url):
# msg = ""
# with open("README.md", "a", encoding="utf8") as f:
# f.write()
if __name__ == '__main__':
os.chdir("docs") # 切换到docs
for i in MONITORLIST:
logger.debug(i)
repo_name = urlsplit(i['url']).path.split("/")[2]
logger.debug(repo_name)
if repo_name in os.listdir():
delete_dir(repo_name)
exec_command(f"git clone {i['url']}") # 克隆项目
os.chdir(repo_name) # 切换进repo目录,需跟切换..成对出现
delete_dir(".git") # 删除克隆下来的仓库下面的.git文件夹
# 检查并创建表
if not if_exists_table(repo_name):
create_table(repo_name)
# 遍历文件
for root, dirs, files in os.walk("."):
for current_file in files:
# 过滤出md文件
if current_file.endswith(".md"):
file_path = os.path.join(root, current_file) # 文件路径 root是文件夹路径
logger.debug(file_path)
data_list = get_data_by_name(repo_name, current_file)
if if_exists_data(get_data_by_name(repo_name, current_file), root, current_file):
logger.debug(f"{file_path} 已存在数据库 {repo_name} 中")
else:
logger.debug(f"{file_path} 开始插入数据库 {repo_name}")
insert_table(repo_name, root, current_file)
logger.debug(f"开始钉钉通知 {file_path}")
dingding_sendmsg(current_file, file_path)
# 跳出当前仓库
os.chdir(r"..")
# 当前为docs目录下
# 关闭数据库连接
conn.close()
# 生成文件列表,json文件
exec_command("python make_file_list.py")
# 生成readme
exec_command("python make_readme.py")