-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcheck-debuginfod.py
executable file
·99 lines (79 loc) · 3.04 KB
/
check-debuginfod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3
import argparse
import concurrent.futures
import random
import subprocess
import sys
import time
from pathlib import Path
import requests
THRESHOLD = 90
SIZE_THRESHOLD = 10 * 1024 * 1024
TIME_THRESHOLD = 30
parser = argparse.ArgumentParser(description='Check debuginfod based on system binaries')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose')
parser.add_argument('-n', type=int, default=100, help='Number of checked packages')
args = parser.parse_args()
def get_buildid(binary):
output = subprocess.check_output(f'file {binary}', encoding='utf8', shell=True).strip()
if 'symlink' in output:
# print('Skipping symlink: {binary}')
return None
needle = 'BuildID[sha1]='
if needle not in output:
# print(f'Missing Build ID: {binary}')
return None
buildid = output[output.find(needle) + len(needle):]
buildid = buildid.split()[0][:-1]
return (binary, buildid)
def get_debuginfo(binary, buildid, verbose):
url = f'https://debuginfod.opensuse.org/buildid/{buildid}/debuginfo'
start = time.monotonic()
response = requests.get(url, stream=True)
if response.status_code != 200:
print(binary, url, response.status_code)
duration = time.monotonic() - start
size = len(response.content)
if duration > TIME_THRESHOLD:
print(f'WARNING: long request for {binary} taken {duration:.1f} s of size {size}B')
return (binary, response, size, duration)
def is_small(item):
return Path(item[0]).stat().st_size <= SIZE_THRESHOLD
print('Analyzing file types')
buildids = {}
files = list(Path('/usr/bin').iterdir()) + list(Path('/usr/lib64').iterdir())
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
for file in files:
futures.append(executor.submit(get_buildid, str(file)))
concurrent.futures.wait(futures)
for future in futures:
r = future.result()
if r:
buildids[r[0]] = r[1]
print(f'Out of {len(files)} found {len(buildids)} with a Build ID')
seed = int(time.monotonic())
random.seed(seed)
print(f'Using random seed {seed}')
failures = 0
total_size = 0
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = []
files = sorted(filter(is_small, buildids.items()))
files = random.sample(files, args.n)
print(f'Checking {len(files)} packages:')
for file, buildid in files:
futures.append(executor.submit(get_debuginfo, file, buildid, args.verbose))
concurrent.futures.wait(futures)
print()
for future in futures:
binary, response, size, duration = future.result()
if response.status_code != 200:
failures += 1
print('WARNING:', binary, buildids[binary], response.status_code)
else:
total_size += size
success_rate = 100.0 * (len(files) - failures) / len(files)
print(f'Transfered {total_size // (1024 ** 2)} MB')
print(f'Success rate: {success_rate:.2f}%, threshold: {THRESHOLD} %')
sys.exit(1 if success_rate < THRESHOLD else 0)