-
Notifications
You must be signed in to change notification settings - Fork 6
/
flow_monitor.py
154 lines (144 loc) · 6.21 KB
/
flow_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-
""" 流量监测系统 """
from threading import Thread, Event
from scapy.sendrecv import sniff
from scapy.layers.inet import *
import psutil
class Monitor:
"""
流量监测
"""
# 程序使用的端口
process_ports = []
# 监测系统是否开始
start_flag = Event()
window = None
def __init__(self, window):
self.window = window
self.start_flag.set()
def getProcessList(self): # 获得进程名列表
"""
获取有网络连接的进程列表
:return :返回进程列表
"""
process_list = set()
for process in psutil.process_iter():
connections = process.connections()
if connections:
process_list.add(process.name())
return list(process_list)
def getProcessConnections(self): # 返回进程名列表和UDP,TCP两端连接信息,显示在树形item的列表中【记录了tcp,icmp链接的情况】
"""
获取进程使用的网络连接
:return : 返回进程名字列表和进程对应的连接列表
"""
process_name = set()
process_conn = {}
for process in psutil.process_iter():
connections = process.connections()
if connections:
process_name.add(process.name())
for con in connections:
if con.type == 1: # TCP
protocol = 'TCP'
elif con.type == 2: # UDP
protocol = 'UDP'
# 本地使用的IP及端口
laddr = "%s:%d" % (con.laddr[0], con.laddr[1])
# 远端的IP和端口情况
if con.raddr: # 如果能获得链接的ip和端口就直接获得
raddr = "%s:%d" % (con.raddr[0], con.raddr[1])
elif con.family.value == 2: # 否则判断协议族
# IPv4
raddr = "0.0.0.0:0"
elif con.family.value == 23:
# IPv6
raddr = "[::]:0"
else:
raddr = "*:*"
info = "%s\t%s\nLocal: %s\nRemote: %s\n" % (
protocol, con.status, laddr, raddr)
process_conn.setdefault(process.name(), set()).add(info)
return list(process_name), process_conn
def getPortList(self, process_name): #获得某进程process_name的端口列表
"""
用于刷新某个进程的端口列表的函数
将获得的端口列表设置到self.process_ports
:parma process_name: 输入为程序的名字
"""
ports = set()
while not self.start_flag.is_set():
ports.clear()
for process in psutil.process_iter(): #遍历正在运行的进程,以获取有关特定进程的详细信息
connections = process.connections()
if process.name() == process_name and connections:
for con in connections:
if con.laddr:
ports.add(con.laddr[1])
if con.raddr:
ports.add(con.raddr[1])
if ports:
self.process_ports = list(ports)
else:
# 进程已不存在
self.window.stop()
self.window.refresh_process()
self.window.alert("进程%s已停止运行!" % process_name)
def getConnections(self, pak): # 对每个包的情况进行解析判断,实时图上面的包列表
"""
获取应用的连接信息
:parma pak: 数据包
"""
try:
src = pak.payload.src
dst = pak.payload.dst
length = len(pak)
if src == dst:
# 相同源地址和目的地址,可能为Land攻击
self.window.alert("数据包源地址与目的地址相同, 疑为Land攻击!")
elif len(pak.payload) > 65535:
# IP数据包的最大长度大于64KB(即65535B), 若大于, 则疑为Ping of Death攻击
self.window.alert("收到IP数据包长度大于64KB, 疑为Ping拒绝服务攻击!")
else:
protocol = pak.payload.payload.name
if protocol != 'ICMP':
sport = pak.payload.payload.sport
dport = pak.payload.payload.dport
if sport in self.process_ports and dport in self.process_ports: #属于该进程连接端口的
info = "%-7s%s:%d -> %s:%d%7d" % (protocol, src, sport,
dst, dport, length)
if protocol == 'TCP':
info += '%5s' % str(pak.payload.payload.flags) # 24为PA(PSH[DATA数据传输]、ACK),16为A(ACK)
self.window.conList.addItem(info)
else:
# ICMP报文
self.window.conList.addItem(
"%-7s%s -> %s%7d" % (protocol, src, dst, length))
except:
pass
def capture_packet(self):
"""
设置过滤器, 只接收IP、IPv6、TCP、UDP
"""
sniff(
store=False,
filter="(tcp or udp or icmp) and (ip6 or ip)",
prn=lambda x: self.getConnections(x), #对每个数据包调用connections
stop_filter=lambda x: self.start_flag.is_set())
def start(self, process_name):
"""
开始对某一进程的流量监视
:parma process_name: 进程的名字
"""
# 开启刷新程序端口的线程
self.start_flag.clear()
self.window.conList.clear()
Thread( #获取选定进程的端口列表,赋值process_ports
target=self.getPortList, daemon=True,
args=(process_name, )).start()
Thread(target=self.capture_packet, daemon=True).start() # 将每个嗅探的包放置到window.conList(对关于这个进程的所有连接端口通过的流量进行检测)
def stop(self):
"""
停止监测
"""
self.start_flag.set()