-
Notifications
You must be signed in to change notification settings - Fork 6
/
25-paloalto-filter.conf
103 lines (101 loc) · 4.7 KB
/
25-paloalto-filter.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
filter {
if "paloalto" in [tags] {
if [type] == "url" {
grok {
match => { "message" => "%{MONTH} %{MONTHDAY} %{TIME} %{IPV4:panIP} %{HOSTNAME:firewall}:%{IPV4:sourceIP} %{IPV4:destinationIP} %{NOTSPACE:application} %{NOTSPACE:category} \"%{URIHOST:URIHost}%{URIPATH:URIPath}" }
}
date {
timezone => "Australia/Melbourne"
match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
}
if [destinationIP] and [destinationIP] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/etc/logstash/GeoLiteCity.dat"
source => "destinationIP"
target => "DestinationGeo"
}
#Delete 0,0 in DestinationGeo.location if equal to 0,0
if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
mutate {
replace => [ "destinationIP.location", "" ]
}
}
}
mutate {
remove_field => ["message"]
}
}
if [type] == "traffic" {
grok {
#strips timestamp and host off of the front of the syslog message leaving the raw message generated by the syslog client and saves it as "raw_message"
#patterns_dir => "/opt/logstash/patterns"
match => { "message" => "%{MONTH} %{MONTHDAY} %{TIME} %{HOSTNAME:syslog_host} %{GREEDYDATA:raw_message}" }
}
csv {
source => "raw_message"
columns => [ "PaloAltoDomain","ReceiveTime","SerialNum","Type","Threat-ContentType","ConfigVersion","GenerateTime","SourceAddress","DestinationAddress","NATSourceIP","NATDestinationIP","Rule","SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPProtocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec","Category","Padding","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","pkts_sent","pkts_received" ]
}
date {
timezone => "Australia/Melbourne"
match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
}
#convert fields to proper format
mutate {
convert => [ "Bytes", "integer" ]
convert => [ "BytesReceived", "integer" ]
convert => [ "BytesSent", "integer" ]
convert => [ "ElapsedTimeInSec", "integer" ]
convert => [ "geoip.area_code", "integer" ]
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "NATDestinationPort", "integer" ]
convert => [ "NATSourcePort", "integer" ]
convert => [ "Packets", "integer" ]
convert => [ "pkts_received", "integer" ]
convert => [ "pkts_sent", "integer" ]
convert => [ "seqno", "integer" ]
gsub => [ "Rule", " ", "_",
"Application", "( |-)", "_" ]
remove_field => [ "message", "raw_message" ]
}
#Geolocate logs that have SourceAddress and if that SourceAddress is a non-RFC1918 address
if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/etc/logstash/GeoLiteCity.dat"
source => "SourceAddress"
target => "SourceGeo"
}
#Delete 0,0 in SourceGeo.location if equal to 0,0
if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
mutate {
replace => [ "SourceGeo.location", "" ]
}
}
}
#Geolocate logs that have DestinationAddress and if that DestinationAddress is a non-RFC1918 address
if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/etc/logstash/GeoLiteCity.dat"
source => "DestinationAddress"
}
#Delete 0,0 in DestinationGeo.location if equal to 0,0
if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
mutate {
replace => [ "DestinationAddress.location", "" ]
}
}
}
#Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow. This is a useful
#way to be able to do top N terms queries on flows, not just on one field.
if [SourceAddress] and [DestinationAddress] {
fingerprint {
concatenate_sources => true
method => "SHA1"
key => "logstash"
source => [ "SourceAddress", "SourcePort", "DestinationAddress", "DestinationPort", "IPProtocol" ]
}
}
}
}
}