-
Notifications
You must be signed in to change notification settings - Fork 3
/
merge
executable file
·144 lines (134 loc) · 5 KB
/
merge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
from ipaddress import ip_network, ip_address
import gzip
import json
import os
import socket
import sys
SHA1_HEXDIGEST_LEN = 40
GZIP_MAGIC = b'\x1f\x8b\x08'
JSONZ_SUFFIX = '.kabysdoh.json'
def parse_rkn(rkn):
ip4, net4 = set(), set()
ip6, net6 = set(), set()
for fname in rkn:
with gzip.open(fname, 'rt', encoding='utf-8') as fd:
doc = json.load(fd)
for src, dst in (
('ip', ip4),
('ipSubnet', net4),
('ipv6', ip6),
('ipv6Subnet', net6),
):
for _ in doc[src]:
dst.add(_)
# TODO: merge and/or split networks & re-filter ip sets with the merged set of networks
return ip4, net4, ip6, net6
def prune_rkn(rkn):
# a 10-byte header: {\x1f\x8b\x08, 1:flags, 4:timestamp, 1:xflags, \xff} + {fname, \x00}
# a 8-byte trailer: CRC-32 checksum and the length of the original uncompressed data
magic = JSONZ_SUFFIX.encode('latin-1') + b'\x00'
known = set()
ret = []
for fname in rkn:
with open(fname, 'rb') as fd:
head = fd.read(10 + SHA1_HEXDIGEST_LEN + len(magic))
if head[:3] != GZIP_MAGIC or head[-len(magic):] != magic:
raise RuntimeError('Unexpected magic', head[-len(magic):], magic)
if head not in known:
known.add(head)
ret.append(fname)
return ret
def rkn_or_cdn(files):
rkn, cdn = [], []
for fname in files:
with open(fname, 'rb') as fd:
dst = rkn if fd.read(3) == GZIP_MAGIC else cdn
dst.append(fname)
return rkn, cdn
def parse_cdn(cdn):
cdn4, cdn6 = [], []
known = set()
for fname in cdn:
name = os.path.basename(fname)
with open(fname) as fd:
net = [ip_network(_.strip()) for _ in fd.readlines()]
assert all(not k.overlaps(n) for k in known for n in net)
known.update(net)
net4 = [n for n in net if n.version == 4]
net6 = [n for n in net if n.version == 6]
cdn4.extend((str(n), name) for n in net4)
cdn6.extend((str(n), name) for n in net6)
return cdn4, cdn6
def is_in_cdn(domain, net, name):
for af in socket.AF_INET, socket.AF_INET6:
try:
addrinfo = socket.getaddrinfo(domain, 443, af, socket.SOCK_STREAM)
except socket.gaierror as err:
print('{:s} is not dual-stack, AF:{:d} fails with {:s}'.format(domain, af, str(err)), file=sys.stderr)
return False
for (_, _, _, _, sockaddr) in addrinfo:
ip = ip_address(sockaddr[0]) # IPv4 is IP:Port, but IPv6 is IP:Port:Something:More
if not any(ip in n for n in net):
print('{:s} ({:s}) is not in expected CDN ({:s})'.format(domain, ip, name), file=sys.stderr)
return False
return True
def verify_domains(cdn4, cdn6):
# www or not-www? Just take the "default" webpage domain.
# NB: nike.com is Amazon CloudFront and it redirects to www.nike.com. The latter is Akamai Edge.
# www.flickr.com is the only dual-stack domain in amazon set.
cdnDomains = {
'cloudflare': [
'medium.com.',
'www.udemy.com.',
'pixabay.com.',
'www.pexels.com.',
'discord.com.',
'www.worldometers.info.',
'www.glassdoor.com.',
'www.patreon.com.',
'www.sciencedirect.com.',
'www.webmd.com.',
],
'amazon': [
'www.tradingview.com.',
'www.imdb.com.',
'www.amazon.co.uk.',
'soundcloud.com.',
'www.reuters.com.',
'www.marketwatch.com.',
'www.flickr.com.',
'www.merriam-webster.com.',
'tinder.com.',
'www.surveymonkey.com.',
],
}
known_names = {name for _, name in cdn4 + cdn6}
assert all(name in cdnDomains for name in known_names)
for name in known_names:
net = [ip_network(n) for n, _ in cdn4 + cdn6 if _ == name]
cdnDomains[name] = [d for d in cdnDomains[name] if is_in_cdn(d, net, name)]
# empty cdnDomains should not be fatal as DNS server may learn new CDN domains in runtime.
if not cdnDomains[name]:
raise RuntimeError('no valid domains for CDN', name)
return cdnDomains
def main():
rkn, cdn = rkn_or_cdn(sys.argv[1:])
rkn = prune_rkn(rkn)
ip4, net4, ip6, net6 = parse_rkn(rkn)
update_time = max(int(os.path.basename(_)) for _ in rkn)
cdn4, cdn6 = parse_cdn(cdn)
cdnDomains = verify_domains(cdn4, cdn6)
json.dump({
'updateTime': update_time,
'ip': list(ip4),
'ipSubnet': list(net4),
'ipv6': list(ip6),
'ipv6Subnet': list(net6),
'cdnSubnet': cdn4,
'cdnv6Subnet': cdn6,
# Subset of suitable domains cherry-picked from Alexa top 500.
'cdnDomains': cdnDomains,
}, sys.stdout, ensure_ascii=False, sort_keys=True, separators=(',', ':'))
if __name__ == '__main__':
main()