-
Notifications
You must be signed in to change notification settings - Fork 0
/
shitty_mitmproxy.py
executable file
·139 lines (122 loc) · 4.49 KB
/
shitty_mitmproxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!mitmdump -s
import mitmproxy.addonmanager
import mitmproxy.http
import mitmproxy.log
import mitmproxy.tcp
import mitmproxy.websocket
from mitmproxy import ctx
import json
import gzip
from datetime import datetime
class BytesDump(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
obj = unbyte(obj)
return obj
def keys_string(d):
rval = {}
if not isinstance(d, dict):
if isinstance(d,(tuple,list,set)):
v = [keys_string(x) for x in d]
return v
else:
return d
for k,v in d.items():
if isinstance(k,bytes):
k = k.decode()
if isinstance(v,dict):
v = keys_string(v)
elif isinstance(v,(tuple,list,set)):
v = [keys_string(x) for x in v]
else:
try:
v = keys_string(v.__dict__)
except:
v = v
rval[k] = v
return rval
def unbyte(obj):
try:
string = obj.decode()
except UnicodeDecodeError:
try:
string = gzip.decompress(obj).decode()
except gzip.BadGzipFile:
string = obj
return string[:128]
def dump_json(obj):
return json.dumps(keys_string(obj), cls=BytesDump)
def filter_headers(head):
if isinstance(head, str):
return head
fields = dict(head.__dict__.get('fields'))
remove_keys = ['Host', 'Connection', 'accept-language', 'User-Agent', 'Origin', 'X-Requested-With', 'Referer', 'Accept-Encoding']
for key in remove_keys:
fields.pop(key, None)
return fields
class SniffWebSocket:
def __init__(self):
pass
def load(self, loader):
ctx.options.listen_port = 8082
def request(self, flow):
ctx.log.info(f"REQUEST: {datetime.now()} {flow.type} {flow.request.method} {flow.request.url}")
ctx.log.info(f" REQUEST {datetime.now()} HEADERS: {filter_headers(flow.request.headers)}")
ctx.log.info(f" REQUEST {datetime.now()} CONTENT: {unbyte(flow.request.content)}")
return
def response(self, flow):
ctx.log.info(f"RESPONSE: {datetime.now()} {flow.type} {flow.request.method} {flow.request.url}")
r_data = flow.response.data
headers = filter_headers(r_data.headers)
ctx.log.info(f" RESPONSE {datetime.now()} HEADERS: {headers}")
ctx.log.info(f" RESPONSE {datetime.now()} CONTENT: {unbyte(r_data.content)}")
return
def websocket_handshake(self, flow: mitmproxy.http.HTTPFlow):
ctx.log.info(f"WEBSOCKET {datetime.now()} HANDSHAKE: {flow.type} {flow.request.method} {flow.request.url}")
for flow_msg in flow.websocket.messages:
direction = "RECV"
if flow_msg.from_client:
direction = "SENT"
packet = flow_msg.content
msg = f"WEBSOCKET {datetime.now()} HANDSHAKE MSG: {direction} {flow_msg.type}: {packet.decode('ascii')}"
if flow_msg.dropped:
ctx.log.error(f" DROPPED: {msg}")
else:
ctx.log.info(f" {msg}")
def websocket_start(self, flow):
ctx.log.info(f"WEBSOCKET {datetime.now()} START: {flow.type} {flow.request.method} {flow.request.url}")
for flow_msg in flow.websocket.messages:
direction = "RECV"
if flow_msg.from_client:
direction = "SENT"
packet = flow_msg.content
msg = f"WEBSOCKET {datetime.now()} START MSG: {direction} {flow_msg.type}: {packet.decode('ascii')}"
if flow_msg.dropped:
ctx.log.error(f" DROPPED: {msg}")
else:
ctx.log.info(f" {msg}")
def websocket_message(self, flow):
if not hasattr(flow, "websocket"):
return
ctx.log.info(f"WEBSOCKET: {datetime.now()} {flow.type} {flow.request.method} {flow.request.url}")
for flow_msg in flow.websocket.messages:
direction = "RECV"
if flow_msg.from_client:
direction = "SENT"
packet = flow_msg.content
msg = f"WEBSOCKET {datetime.now()} MSG: {direction} {flow_msg.type}: {packet.decode('ascii')}"
if flow_msg.dropped:
ctx.log.error(f" DROPPED: {msg}")
else:
ctx.log.info(f" {msg}")
def websocket_error(self, flow):
"""
A websocket connection has had an error.
"""
def websocket_end(self, flow):
"""
A websocket connection has ended.
"""
addons = [
SniffWebSocket()
]