-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsf_log_parser.py
executable file
·149 lines (107 loc) · 3.6 KB
/
sf_log_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import sys
import json
import time
import threading
class SFLog:
"""
SFLog class contains a log dictionary representing a single Silverfort log
The log is a json formatted Silverfort log
"""
log = {}
def __init__(self, s):
self.log = s
def getDestinationServiceName(self):
return self.log["destinationServiceName"]
def getDeviceVendor(self):
return self.log["deviceVendor"]
def getSilverfortPolicyAction(self):
return self.log["SilverfortPolicyAction"]
def getSourceUserName(self):
return self.log["sourceUserName"]
def getDestinationNtDomain(self):
return self.log["destinationNtDomain"]
def getDeviceProduct(self):
return self.log["deviceProduct"]
def getDeviceEventClassId(self):
return self.log["deviceEventClassId"]
def getDeviceReceiptTime(self):
return self.log["deviceReceiptTime"]
def getSeverityLabel(self):
return self.log["severity_label"]
def getSilverfortMfaResponse(self):
return self.log["SilverfortMfaResponse"]
def getSilverfortPolicy(self):
return self.log["SilverfortPolicy"]
def getApplicationProtocol(self):
return self.log["applicationProtocol"]
def getLogTimeStamp(self):
return self.log["@timestamp"]
def getSilverfortMfaResponseTime(self):
return self.log["SilverfortMfaResponseTime"]
def getSourceAddress(self):
return self.log["sourceAddress"]
def getHost(self):
return self.log["host"]
def getSilverfortReqResult(self):
return self.log["SilverfortReqResult"]
def getSilverfortReqRisk(self):
return self.log["SilverfortReqRisk"]
def getSourceNtDomain(self):
return self.log["sourceNtDomain"]
def getSourceHostName(self):
return self.log["sourceHostName"]
def getLogType(self):
return self.log["name"]
def getDestinationHostName(self):
return self.log["destinationHostName"]
class SFLogger:
"""
SFLogger class contains a list of logs and function to work on that list of logs.
The logs list is a list of json formatted Silverfort logs
"""
sflogs = []
def appendLog(self, s):
l = SFLog(s)
self.sflogs.append(l)
def getLog(self, index):
return self.sflogs[index]
def getLastLog(self):
l = len(self.sflogs)
if (l > 1):
return self.sflogs[l-1]
else:
return None
def getAllLogs(self):
return self.sflogs
def getNumberOfLogs(self):
return len(self.sflogs)
def followSFJSONLog(filepath):
"""
followSFJSONLog tails the given file. Each new line yields out of the function and returns the current line
"""
f = open(filepath, 'r')
while True:
line = f.readline()
if line:
yield line
else:
time.sleep(0.5)
def collectSFLogs(filepath, sflogger):
"""
collectSFLogs reads the given log file via a call to a log follow function.
Each log line the file is added to the given logs list
"""
gen = followSFJSONLog(filepath)
for line in gen:
if (line != '\n'):
s = json.loads(line)
sflogger.appendLog(s)
def startLogCollector(filepath, sflogger):
"""
startLogCollector starts a log collector thread
filepath is a string containing the os path of the log file
logs is a list
"""
# Run the log file collector in a thread, it keeps looking for new lines in the log file
x = threading.Thread(target=collectSFLogs, args=(filepath, sflogger,))
x.start()