forked from x4nth055/pythoncode-tutorials
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess_monitor.py
132 lines (122 loc) · 5.39 KB
/
process_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import psutil
from datetime import datetime
import pandas as pd
import time
import os
def get_size(bytes):
"""
Returns size of bytes in a nice format
"""
for unit in ['', 'K', 'M', 'G', 'T', 'P']:
if bytes < 1024:
return f"{bytes:.2f}{unit}B"
bytes /= 1024
def get_processes_info():
# the list the contain all process dictionaries
processes = []
for process in psutil.process_iter():
# get all process info in one shot
with process.oneshot():
# get the process id
pid = process.pid
if pid == 0:
# System Idle Process for Windows NT, useless to see anyways
continue
# get the name of the file executed
name = process.name()
# get the time the process was spawned
try:
create_time = datetime.fromtimestamp(process.create_time())
except OSError:
# system processes, using boot time instead
create_time = datetime.fromtimestamp(psutil.boot_time())
try:
# get the number of CPU cores that can execute this process
cores = len(process.cpu_affinity())
except psutil.AccessDenied:
cores = 0
# get the CPU usage percentage
cpu_usage = process.cpu_percent()
# get the status of the process (running, idle, etc.)
status = process.status()
try:
# get the process priority (a lower value means a more prioritized process)
nice = int(process.nice())
except psutil.AccessDenied:
nice = 0
try:
# get the memory usage in bytes
memory_usage = process.memory_full_info().uss
except psutil.AccessDenied:
memory_usage = 0
# total process read and written bytes
io_counters = process.io_counters()
read_bytes = io_counters.read_bytes
write_bytes = io_counters.write_bytes
# get the number of total threads spawned by this process
n_threads = process.num_threads()
# get the username of user spawned the process
try:
username = process.username()
except psutil.AccessDenied:
username = "N/A"
processes.append({
'pid': pid, 'name': name, 'create_time': create_time,
'cores': cores, 'cpu_usage': cpu_usage, 'status': status, 'nice': nice,
'memory_usage': memory_usage, 'read_bytes': read_bytes, 'write_bytes': write_bytes,
'n_threads': n_threads, 'username': username,
})
return processes
def construct_dataframe(processes):
# convert to pandas dataframe
df = pd.DataFrame(processes)
# set the process id as index of a process
df.set_index('pid', inplace=True)
# sort rows by the column passed as argument
df.sort_values(sort_by, inplace=True, ascending=not descending)
# pretty printing bytes
df['memory_usage'] = df['memory_usage'].apply(get_size)
df['write_bytes'] = df['write_bytes'].apply(get_size)
df['read_bytes'] = df['read_bytes'].apply(get_size)
# convert to proper date format
df['create_time'] = df['create_time'].apply(datetime.strftime, args=("%Y-%m-%d %H:%M:%S",))
# reorder and define used columns
df = df[columns.split(",")]
return df
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Process Viewer & Monitor")
parser.add_argument("-c", "--columns", help="""Columns to show,
available are name,create_time,cores,cpu_usage,status,nice,memory_usage,read_bytes,write_bytes,n_threads,username.
Default is name,cpu_usage,memory_usage,read_bytes,write_bytes,status,create_time,nice,n_threads,cores.""",
default="name,cpu_usage,memory_usage,read_bytes,write_bytes,status,create_time,nice,n_threads,cores")
parser.add_argument("-s", "--sort-by", dest="sort_by", help="Column to sort by, default is memory_usage .", default="memory_usage")
parser.add_argument("--descending", action="store_true", help="Whether to sort in descending order.")
parser.add_argument("-n", help="Number of processes to show, will show all if 0 is specified, default is 25 .", default=25)
parser.add_argument("-u", "--live-update", action="store_true", help="Whether to keep the program on and updating process information each second")
# parse arguments
args = parser.parse_args()
columns = args.columns
sort_by = args.sort_by
descending = args.descending
n = int(args.n)
live_update = args.live_update
# print the processes for the first time
processes = get_processes_info()
df = construct_dataframe(processes)
if n == 0:
print(df.to_string())
elif n > 0:
print(df.head(n).to_string())
# print continuously
while live_update:
# get all process info
processes = get_processes_info()
df = construct_dataframe(processes)
# clear the screen depending on your OS
os.system("cls") if "nt" in os.name else os.system("clear")
if n == 0:
print(df.to_string())
elif n > 0:
print(df.head(n).to_string())
time.sleep(0.7)