forked from aidengilmartin/speedtest-to-influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·98 lines (82 loc) · 2.96 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import time
import json
import subprocess
from influxdb import InfluxDBClient
# InfluxDB Settings
DB_ADDRESS = 'db_hostname.network'
DB_PORT = 8086
DB_USER = 'db_username'
DB_PASSWORD = 'db_password'
DB_DATABASE = 'speedtest_db'
# Speedtest Settings
TEST_INTERVAL = 1800 # Time between tests (in seconds).
TEST_FAIL_INTERVAL = 60 # Time before retrying a failed Speedtest (in seconds).
influxdb_client = InfluxDBClient(
DB_ADDRESS, DB_PORT, DB_USER, DB_PASSWORD, None)
def init_db():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == DB_DATABASE, databases))) == 0:
influxdb_client.create_database(
DB_DATABASE) # Create if does not exist.
else:
influxdb_client.switch_database(DB_DATABASE) # Switch to if does exist.
def format_for_influx(cliout):
data = json.loads(cliout)
# There is additional data in the speedtest-cli output but it is likely not necessary to store.
influx_data = [
{
'measurement': 'ping',
'time': data['timestamp'],
'fields': {
'jitter': data['ping']['jitter'],
'latency': data['ping']['latency']
}
},
{
'measurement': 'download',
'time': data['timestamp'],
'fields': {
# Byte to Megabit
'bandwidth': data['download']['bandwidth'] / 125000,
'bytes': data['download']['bytes'],
'elapsed': data['download']['elapsed']
}
},
{
'measurement': 'upload',
'time': data['timestamp'],
'fields': {
# Byte to Megabit
'bandwidth': data['upload']['bandwidth'] / 125000,
'bytes': data['upload']['bytes'],
'elapsed': data['upload']['elapsed']
}
},
{
'measurement': 'packetLoss',
'time': data['timestamp'],
'fields': {
'packetLoss': data['packetLoss']
}
}
]
return influx_data
def main():
init_db() # Setup the database if it does not already exist.
while (1): # Run a Speedtest and send the results to influxDB indefinitely.
speedtest = subprocess.run(
["speedtest", "--accept-license", "--accept-gdpr", "-f", "json"], capture_output=True)
if speedtest.returncode == 0: # Speedtest was successful.
data = format_for_influx(speedtest.stdout)
print("Speedtest Successful:")
if influxdb_client.write_points(data) == True:
print("Data written to DB successfully")
time.sleep(TEST_INTERVAL)
else: # Speedtest failed.
print("Speedtest Failed:")
print(speedtest.stderr)
print(speedtest.stdout)
time.sleep(TEST_FAIL_INTERVAL)
if __name__ == '__main__':
print('Speedtest CLI Data Logger to InfluxDB')
main()