-
Notifications
You must be signed in to change notification settings - Fork 0
/
supportcleaner.py
executable file
·401 lines (313 loc) · 13.5 KB
/
supportcleaner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#!/usr/bin/env python3
import argparse
import hashlib
import os
import re
import sys
import shutil
import zipfile
from datetime import datetime, timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import List, Tuple, Any
# All files in the defined directories and all subdirectories will be cleaned.
# Setting this to '.' or '/' will cause the tool to clean all existing files in the zip.
LOGDIRS = [
'.',
]
TMPDIR = TemporaryDirectory()
SUPPORT_CLEANER_PATH = Path(__file__).parent.absolute()
if sys.version_info < (3, 5):
raise Exception('Python in version 3.5 or higher is required to run this tool.')
# 1 HELPER FUNCTIONS
def add_unit_prefix(num: float, unit='B') -> str:
"""
source: https://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
"""
for prefix in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, prefix, unit)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', unit)
def remove_unit_prefix(numstr: str) -> Tuple[float, str]:
num, prefix, unit = re.match(pattern=r'(\d+\.?\d*)\s?([KMGTPEZY]i)?(.*)', string=numstr).groups()
num = float(num)
if prefix is None:
return num, unit
for i in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi']:
if prefix == i:
return num, unit
else:
num *= 1024
def get_free_disk_space(path: str) -> int:
_, _, free = shutil.disk_usage(path)
return free
def _list_files_in_dir(path: str, pattern='.*') -> List[str]:
filelist = []
for root, dirs, files in os.walk(path):
for file in files:
if re.match(pattern=pattern, string=file):
filelist.append(os.path.join(root, file))
return filelist
def print_files(files: List[Tuple[str, Any]], intro: str):
print(intro)
for path, value in files:
path = Path(path).relative_to(TMPDIR.name)
print('{path}: {value}'.format(path=path, value=value))
def delete_files(files: List[Tuple[str, Any]], message: str):
while True:
delete = input('\nDo you want to delete them? (y/n)')
if delete == 'y':
print(message)
for file, _ in files:
os.remove(file)
break
elif delete == 'n':
break
# 2 PREPARATION & EXTRACTION
def _arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='CLI tool to clean Atlassian support.zip from various data',
)
parser.add_argument(
'baseurl',
help='Base-URL of the corresponding system',
)
parser.add_argument(
'--supportzip',
help='Path to support zip file to be cleaned, if not set you can clean files without extracting/compressing',
)
parser.add_argument(
'--filterfile',
help='read filters from textfile',
default='{support_cleaner_path}/filters.txt'.format(support_cleaner_path=SUPPORT_CLEANER_PATH)
)
return parser.parse_args()
def _prepare():
try:
os.remove('cleaned.zip')
print('Removed existing cleaned.zip')
except FileNotFoundError:
pass
def _get_uncompressed_size(zipf: zipfile.ZIP_DEFLATED) -> int:
size = 0
for file in zipf.infolist():
size += file.file_size
return size
def _extract_zip(supportzip: str):
global MAX_TMP_DIR_SIZE
with zipfile.ZipFile(supportzip, 'r') as zipf:
uncompressed_size = _get_uncompressed_size(zipf)
while uncompressed_size > MAX_TMP_DIR_SIZE:
print('\nWARNING: Decompressed size of {uncomp} exceeds allowed MAX_TMP_DIR_SIZE of {max_size}.'.format(
uncomp=add_unit_prefix(uncompressed_size),
max_size=add_unit_prefix(MAX_TMP_DIR_SIZE),
))
answer = input(
'Free disk space: {free_space}\n\n'
'Change MAX_TMP_DIR_SIZE to:\n'
'(Enter value in bytes, prefixes are allowed (KiB, MiB, GiB, ...); a to abort)\n'
''.format(free_space=add_unit_prefix(get_free_disk_space(TMPDIR.name)))
)
if answer == 'a':
print('Aborted by user.')
exit()
try:
MAX_TMP_DIR_SIZE, _ = remove_unit_prefix(answer)
print('Changed MAX_TMP_DIR_SIZE to {}\n'.format(answer))
except AttributeError:
print('Input leads to an error: Please enter something like "30MiB"')
zipf.extractall(TMPDIR.name)
# 3 DELETE UNWANTED LOGS
# 3.1 OLD FILES
def _set_age_limit() -> int:
if os.getenv('DELETE_AFTER_DAYS'):
return int(os.getenv('DELETE_AFTER_DAYS'))
while True:
limit = input('\nChoose a limit in days to delete old files (leave empty to skip)\n')
if limit:
try:
return int(limit)
except ValueError:
print('Your input needs to be an integer.')
else:
break
def _collect_old_files(supportzip: str, delete_timedelta: timedelta) -> List[Tuple[str, datetime]]:
old_files = []
with zipfile.ZipFile(supportzip, 'r') as zipf:
for file in zipf.infolist():
name = '{tmpdir}/{rel_path}'.format(tmpdir=TMPDIR.name, rel_path=file.filename)
(year, month, day, hours, minutes, seconds) = file.date_time
date_time = datetime(year=year, month=month, day=day, hour=hours, minute=minutes, second=seconds)
if datetime.now() - date_time > delete_timedelta:
old_files.append((name, date_time))
return sorted(old_files, key=lambda k: k[0])
def _remove_old_files(supportzip: str):
limit = _set_age_limit()
if not limit:
return
delete_timedelta = timedelta(days=limit)
old_files = _collect_old_files(supportzip, delete_timedelta)
print_files(files=old_files, intro='The following files are older than {} days:\n'.format(delete_timedelta.days))
delete_files(files=old_files, message='Deleting old files\n')
# 3.2 BIG FILES
def _collect_largest_files() -> List[Tuple[str, str]]:
logfiles = _list_files_in_dir(TMPDIR.name)
file_sizes = []
for file in logfiles:
file_sizes.append((file, os.stat(file).st_size))
# sort files by size
file_sizes.sort(key=lambda x: x[1])
# select files which are in the LARGEST_PERCENT of files
n_small_files = int(len(file_sizes) * (1 - LARGEST_PERCENT / 100))
large_files = file_sizes[n_small_files:]
return [(path, add_unit_prefix(size)) for (path, size) in large_files]
def _remove_large_files():
large_files = _collect_largest_files()
print_files(files=large_files, intro='Largest {}%:'.format(LARGEST_PERCENT))
delete_files(files=large_files, message='Deleting largest files\n')
# 3.3 MAIL LOGS
def _remove_maillogs():
file_list = _list_files_in_dir(TMPDIR.name, pattern=r'.*(incoming|outgoing)-mail\.log')
mail_log_files = [(file, '') for file in file_list]
print_files(files=mail_log_files, intro='\nFound following mail log files:')
delete_files(files=mail_log_files, message='Deleting mail log files\n')
# 4 CHECK LOGLEVEL
def _check_loglevel():
# This regex matches the beginning of the standard atlassian logging format
# e.g. 2020-01-22 09:03:08,633 http-nio-8080-exec-55 INFO
regex_loglevel = r'^(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(,|.)\d{1,3})\s.*?\s(INFO|DEBUG)'
for logdir in LOGDIRS:
logfiles = _list_files_in_dir('{tmpdir}/{logdir}'.format(tmpdir=TMPDIR.name, logdir=logdir))
for logfile in logfiles:
with open(logfile, 'r', encoding='latin-1') as file:
logcontent = file.read()
if re.search(regex_loglevel, logcontent) is not None:
input(
'{boundary}'
'Logmessages with level INFO or DEBUG have been detected!\n'
'Please consider using a stricter loglevel to avoid exposing too much sensitive information.\n'
'If this is not possible, take extra care to remove any sensitive information from the logs.\n'
'{boundary}'
'Press Enter to proceed.\n'.format(boundary=90 * '#' + '\n')
)
return
# 5 CLEAN LOGS
def _get_filters(filterfile: str) -> List[str]:
with open(filterfile) as file:
return [line.strip() for line in file.readlines() if not line.startswith('#')]
def _generate_hash(string: str) -> str:
return 'SHA256:' + hashlib.sha256(bytes(string, encoding='utf-8')).hexdigest()[:10]
def _hash_replacement(match) -> str:
"""
Takes a re.Match (or in Python <= 3.6 _sre.SRE_MATCH) and returns a string consisting of a replacement-string and
the hash of the named group.
"""
replacement = '{hash}_CLEANED'
substitute = match.group(0)
groups = match.groupdict()
if 'internal_mail' in groups:
replacement = 'INTERNAL_EMAIL_{hash}_CLEANED'
substitute = groups['internal_mail']
elif 'external_mail' in groups:
replacement = 'EXTERNAL_EMAIL_{hash}_CLEANED'
substitute = groups['external_mail']
elif 'user' in groups:
replacement = 'USERNAME_{hash}_CLEANED'
substitute = groups['user']
replacement_hash = _generate_hash(substitute)
return replacement.format(hash=replacement_hash)
def _replace_pattern_in_logs(pattern: str, replacement: str, logfiles: List[str]):
print('-- pattern: "{}" --'.format(pattern))
for logfile in logfiles:
with open(logfile, 'r', encoding='latin-1') as file:
logcontent = file.read()
if '{hash}' in replacement:
# use _hash_replacement function in repl to determine the replacement string
logcontent, nr = re.subn(pattern=re.compile(pattern), repl=_hash_replacement, string=logcontent)
else:
logcontent, nr = re.subn(pattern=re.compile(pattern), repl=replacement, string=logcontent)
if nr:
print('{nr} replacements ({replacement}) in {logfile}'.format(
nr=nr,
replacement=replacement,
logfile=Path(logfile).relative_to(TMPDIR.name),
))
with open(logfile, 'w+', encoding='latin-1') as file:
file.write(logcontent)
def _clean_logs(baseurl: str, filters: List[str]):
for logdir in LOGDIRS:
logfiles = _list_files_in_dir('{tmpdir}/{logdir}'.format(tmpdir=TMPDIR.name, logdir=logdir))
for potential_filter in filters:
try:
pattern, replacement = potential_filter.split('||')
if '{baseurl}' in pattern:
pattern = pattern.replace('{baseurl}', baseurl)
except ValueError:
print('"{}" is no valid filter string'.format(potential_filter))
continue
_replace_pattern_in_logs(
pattern=pattern,
replacement=replacement,
logfiles=logfiles,
)
def _clean_manual():
input(
'\nAutomatic cleaning finished. The extracted files are available at {tmpdir}. \n'
'\n################################################################################\n'
'These filters won\'t have cleaned everything perfectly from the logs!\n'
'Especially usernames and names of people or businesses may still be present.\n'
'##################################################################################\n'
'\nYou can cleanup additional things manually or check how the files look like.\n'
'Press Enter to proceed.\n'
''.format(tmpdir=TMPDIR.name)
)
# 6 CREATE CLEANED ZIP AND CLEANUP
def _zip_dir(ziph: zipfile.ZipFile):
for file in _list_files_in_dir(TMPDIR.name):
ziph.write(filename=file, arcname=str(Path(file).relative_to(TMPDIR.name)))
def _create_cleaned_zip():
with zipfile.ZipFile('cleaned.zip', 'w', zipfile.ZIP_DEFLATED) as cleanedzip:
_zip_dir(cleanedzip)
def _cleanup():
TMPDIR.cleanup()
# -- MAIN PROGRAM -- #
MAX_TMP_DIR_SIZE, _ = remove_unit_prefix(os.getenv('MAX_TMP_DIR_SIZE', '200MiB'))
# files that are in the LARGEST_PERCENTage are flagged for automatic deletion
LARGEST_PERCENT = 10
if __name__ == '__main__':
args = _arguments()
print('---')
print('CLI tool to clean Atlassian support.zip from various data')
print('---')
try:
_prepare()
if args.supportzip:
print('\nExtract support zip')
_extract_zip(supportzip=args.supportzip)
else:
input(
'Copy the files you want to be cleaned to {tmpdir} '
'and press ENTER to proceed.'.format(tmpdir=TMPDIR.name)
)
print('\nRemove old files')
_remove_old_files(supportzip=args.supportzip)
print('\nRemove largest files')
_remove_large_files()
print('\nRemove mail logs')
_remove_maillogs()
print('\nCheck Loglevel')
_check_loglevel()
print('\nClean unwanted information:')
_clean_logs(baseurl=args.baseurl, filters=_get_filters(args.filterfile))
if args.supportzip:
_clean_manual()
print('\nCreate cleaned.zip')
_create_cleaned_zip()
else:
input(
'\nCleaning is done. Copy the cleaned files from {tmpdir} '
'and press ENTER to remove the temporary directory.'.format(tmpdir=TMPDIR.name)
)
finally:
_cleanup()