-
Notifications
You must be signed in to change notification settings - Fork 0
/
cve_dump.py
217 lines (153 loc) · 6.72 KB
/
cve_dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import json
import yaml
import requests
import pandas as pd
import numpy as np
import bisect
from tqdm import tqdm
from openpyxl import load_workbook
from datetime import datetime, timedelta
with open('config.yaml', 'r', encoding='UTF8') as f:
config = yaml.safe_load(f)
SEVERITIES = config['severities']
PRODUCTS = config['products']
EXCEL_FILE = config['save_file_name']
START_DATE = config['start_date']
END_DATE = config['end_date']
class Find_Empty():
def __init__(self, sheet):
self.sheet = sheet
def __getitem__(self, i):
if not i: i = 1
return self.sheet.cell(i, 2).value is None
def __len__(self):
return self.sheet.max_row
def get_url_sets():
url_sets = set()
baseurl = 'https://access.redhat.com/hydra/rest/securitydata/cve.json?'
for date_i in tqdm(range(len(date_list) - 1), desc='date'):
after = 'after=' + date_list[date_i]
before = 'before=' + date_list[date_i+1]
for product in products_m:
product = 'product=' + product
url = baseurl + '&'.join((after, before, product))
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to retrieve the page. Status code: {response.status_code}")
break
json_data = json.loads(response.text)
url_sets.update(a['resource_url'] for a in json_data if a['severity'] in SEVERITIES)
return url_sets
def get_df(url_sets):
total_df = None
for url in tqdm(url_sets, desc='cve'):
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to retrieve the page. Status code: {response.status_code}")
break
js = json.loads(response.text)
js = {'severity': js['threat_severity'], 'package_state': js['package_state'],
'date': js['public_date'], 'name': js['name']}
js['package_state'] = [package_state for package_state in js['package_state'] if package_state['product_name'] in products_set]
for a in js['package_state']:
del a['cpe']
df = pd.DataFrame(js['package_state'])
df = df.pivot(index='package_name', columns='product_name', values='fix_state')
df = df.reset_index()
df['severity'] = js['severity']
dt = datetime.strptime(js['date'], '%Y-%m-%dT%H:%M:%SZ')
df['year'] = dt.year
df['month'] = dt.month
df['day'] = dt.day
df['code'] = js['name']
df['url'] = 'https://access.redhat.com/security/cve/' + js['name']
df['NO'], df['state'], df['reboot'], df['etc'] = [' '] * 4
if total_df is None:
total_df = df
else:
total_df = pd.concat((total_df, df), axis=0, ignore_index=True)
for col in ['Red Hat Enterprise Linux 7', 'Red Hat Enterprise Linux 8', 'Red Hat Enterprise Linux 9']:
if col not in total_df:
total_df[col] = np.nan
total_df = total_df[['NO', 'year', 'month', 'day', 'code', 'package_name', 'severity',
'Red Hat Enterprise Linux 7', 'Red Hat Enterprise Linux 8', 'Red Hat Enterprise Linux 9',
'state', 'reboot', 'url', 'etc']]
total_df = total_df.sort_values(by=['year', 'month', 'day'])
total_df = total_df.fillna('N/A')
return total_df
def save_to_xlsx(df):
try:
# 3. 기존 엑셀 파일 로드
book = load_workbook(EXCEL_FILE)
sheet_name = book.sheetnames[0] # 첫 번째 시트 선택
sheet = book[sheet_name]
# 4. 첫 번째 빈 행 찾기
if sheet.max_row == 1:
# 시트가 비어있는 경우
startrow = 0
header = True
else:
startrow = bisect.bisect_left(Find_Empty(sheet), 1) - 1
# 기존 데이터가 있는 경우 마지막 행 다음부터 시작
header = False
# 5. ExcelWriter를 사용하여 데이터를 추가
with pd.ExcelWriter(EXCEL_FILE, engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
df.to_excel(writer, sheet_name=sheet_name, startrow=startrow, index=False, header=header)
print(f"새 데이터가 '{EXCEL_FILE}' 파일의 시트 '{sheet_name}'에 성공적으로 추가되었습니다.")
except FileNotFoundError:
# 엑셀 파일이 존재하지 않는 경우 새로 생성
startrow = 1
with pd.ExcelWriter(EXCEL_FILE, engine='openpyxl') as writer:
df.to_excel(writer, index=False)
print(f"'{EXCEL_FILE}' 파일이 생성되고 데이터가 저장되었습니다.")
# # 추가된 데이터를 반영하기 위해 다시 불러오기
book = load_workbook(EXCEL_FILE)
sheet_name = book.sheetnames[0] # 첫 번째 시트 선택
sheet = book[sheet_name]
i = startrow - 1
if i:
while sheet.cell(row=i, column=1).value == None: i -= 1
cve_num = sheet.cell(row=i, column=1).value + 1
else:
startrow = 2
cve_num = 1
endrow = startrow + len(df) + 1
# 중복이 발생하는 행 식별
prev_v = None
duple = []
for row in range(startrow, endrow+1):
if prev_v == (v := sheet.cell(row=row, column=5).value): continue
prev_v = v
duple.append(row)
# 중복 제거
prev = duple[0]
sheet.cell(row=prev, column=1).value = cve_num
cve_num += 1
for cur in duple[1:]:
sheet.cell(row=cur, column=1).value = cve_num
cve_num += 1
sheet.merge_cells(start_row=prev, start_column=5, end_row=cur-1, end_column=5)
sheet.merge_cells(start_row=prev, start_column=13, end_row=cur-1, end_column=13)
prev = cur
sheet.cell(duple[-1], column=1).value = None
book.save(EXCEL_FILE)
print(f"'{EXCEL_FILE}' 파일의 중복 데이터가 병합되었습니다.")
products_set = set(PRODUCTS)
products_m = [p.replace(' ', '+') for p in PRODUCTS]
SEVERITIES = set(SEVERITIES)
start_date_m = datetime.strptime(START_DATE, '%Y-%m-%d')
end_date_m = datetime.strptime(END_DATE, '%Y-%m-%d')
# 날짜 목록을 저장할 리스트 초기화
date_list = []
# 현재 날짜를 시작 날짜로 설정
current_date = start_date_m
# 끝 날짜까지 반복하며 날짜를 추가
while current_date <= end_date_m:
date_list.append(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
url_sets = get_url_sets()
if url_sets:
df = get_df(url_sets)
save_to_xlsx(df)
else:
print('조건에 맞는 cve가 없습니다.')