-
Notifications
You must be signed in to change notification settings - Fork 1
/
anylog.py
115 lines (99 loc) · 3.58 KB
/
anylog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
import requests
class AnyLogConnection:
"""
Missing functions:
- get query log
- get rest
- get rest log
- query status
- get streaming
- get operator
- get rows count [where dbms group]
"""
def __init__(self, conn:str, auth:tuple=(), timeout:int=30):
"""
Connection information for AnyLog
:params:
conn:str - REST IP & Port
auth:tuple - Username/password (optional)
timeout:int - timeout
"""
self.conn = conn
self.auth = auth
self.timeout = timeout
def get_cmd(self, command:str, query:bool=False)->str:
"""
Execute GET command
:args:
command:str - command to execute:
query:bool - whether or not command is of type query (ie remote)
:params:
output:str - content to print
headers:dict - REST headers information
:return:
"""
headers = {
'command': command,
'User-Agent': 'AnyLog/1.23'
}
if query is True:
headers['destination'] = 'network'
try:
r = requests.get(url='http://%s' % self.conn, headers=headers, auth=self.auth, timeout=self.timeout)
except Exception as e:
output = "Failed to execute '%s' against '%s' (Error: %s)" % (headers['command'], self.conn, e)
else:
if int(r.status_code) == 200:
try:
output = r.json()
except Exception as e:
try:
output = r.text
except Exception as e:
output = "Failed to extract results for '%s' (Error: %s)" % (headers['command'], e)
else:
output = "Failed to execute '%s' against '%s' (Network Error: %s)" % (headers['command'], self.conn,
r.status_code)
if 'Content-type: text/json' in output:
output = output.split('Content-type: text/json')[1]
elif 'Content-type: text' in output:
output = output.split('Content-type: text')[1]
return output
def get_status(self)->str:
"""
Execute `get status` command
:return:
raw results from get_cmd
"""
return self.get_cmd(command='get status')
def get_event_log(self)->str:
"""
Execute `get event log` command
:params:
resuls:str - raw content from get_cmd
output:str - results to print on screen
:return:
return results to print on screen
"""
results = self.get_cmd(command='get event log where format=json')
result = 'ID: %s | Time: %s | Type: %s | Text: %s'
output = ''
for row in json.loads(results):
output += result % (row['ID'], row['Time'], row['Type'], row['Text']) + '<br/>'
return output
def get_error_log(self)->str:
"""
Execute `get status` command
:params:
resuls:str - raw content from get_cmd
output:str - results o print on screen
:return:
return results to print on screen
"""
results = self.get_cmd(command='get error log where format=json')
result = 'ID: %s | Time: %s | Type: %s | Text: %s'
output = ''
for row in json.loads(results):
output += result % (row['ID'], row['Time'], row['Type'], row['Text']) + '<br/>'
return output