-
Notifications
You must be signed in to change notification settings - Fork 0
/
scala-unity-eif-receiver-logstash.conf
155 lines (120 loc) · 4.84 KB
/
scala-unity-eif-receiver-logstash.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#
# Logstash configuration to parse logs from a distributed EIF Receiver deployment in a SCALA environment.
# Typical log source: /opt/scala/LogAnalysis/DataForwarders/EIFReceivers/logs/UnityEifReceiver_eif_inst_1.log
#
# Doug McClure
# v1.0 8/13/14
#
input {
lumberjack {
port => 5043
#these are the local server certs - they must also be on each logstash-forwarder endpoint and referenced in those configs
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
type => "scala-eifr-logs"
#Ideally the multiline codex should be used here to fix multilines
#-- bug in multiline codec for logstash-forwarder/lumberjack per Jordan - use filter for now
#codec => multiline {
# #stack trace message for connection timeout - rows beginning with tab (\t at)
# #fix the wrapping of message lines for the batch size wrapping to new line
# pattern => "(^\d+)|(^NONNEGINT)"
# what => "previous"
# multiline_tag => "fixed-multiline"
#} #end codec
} #end lumberjack
} #end input
filter {
# multilines or places where new lines exist
# stack trace message for connection timeout - rows beginning with tab (\t at)
# fix the wrapping of message lines for the batch size wrapping to new line
# use multiline filter due to its support for 'stream_identity' (host:path:type) so all streams are processed properly
multiline {
pattern => "(^\d{1,20}$)|(^Service.*)|(^updated.*)|(^\t)"
what => "previous"
} #end multiline
#grok into a basic pattern
if "multiline" not in [tags] {
grep {
drop => false
match => { "message" => "BATCH_STATUS" }
add_tag => "batchstatus"
} #end grep
} #end conditional
if "multiline" or "batchstatus" in [tags] {
grok {
match => [ "message", "(?m)%{DATESTAMP:timestamp} %{TZ} \[%{DATA:Pool-Thread}\] %{DATA:LogLevel} - %{DATA:Function} : %{GREEDYDATA:OrigMsg}" ]
add_tag => "grok-1"
} #end grok
} #end conditional
#whack LF (\n) from message field
if "grok-1" in [tags] {
mutate {
gsub => [ "OrigMsg", "\n", " "]
gsub => [ "message", "\n", " "]
add_tag => "whack LF"
} #end mutate
} #end conditional
#explode JSON batch message
if "whack LF" in [tags] and [OrigMsg] =~ /^\{/
{
json {
source => "OrigMsg"
add_tag => "json exploded"
#add fields to send to scala
add_field => ["scalaFields", "EIFR-batchWriteTime"]
add_field => ["scalaFields", "EIFR-indexedSourceVolume"]
add_field => ["scalaFields", "EIFR-indexNumSuccessful"]
add_field => ["scalaFields", "EIFR-indexNumFailures"]
add_field => ["scalaFields", "EIFR-batchSize"]
add_field => ["scalaFields", "EIFR-numSuccessful"]
add_field => ["scalaFields", "EIFR-numFailures"]
} #end json
} #end conditional
if "json exploded" in [tags] {
mutate {
#set fields with values
replace => [ "EIFR-batchWriteTime", "%{[BATCH_STATUS][writeTime]}" ]
replace => [ "EIFR-indexedSourceVolume", "%{[BATCH_STATUS][indexedSourceVolume]}" ]
replace => [ "EIFR-indexNumSuccessful", "%{[BATCH_STATUS][indexNumSuccessful]}" ]
replace => [ "EIFR-indexNumFailures", "%{[BATCH_STATUS][indexNumFailures]}" ]
replace => [ "EIFR-batchSize", "%{[BATCH_STATUS][batchSize]}" ]
replace => [ "EIFR-numSuccessful", "%{[BATCH_STATUS][numSuccessful]}" ]
replace => [ "EIFR-numFailures", "%{[BATCH_STATUS][numFailures]}" ]
#convert to integers for es/kibana - still have to set them for DSV
convert => [ "EIFR-indexedSourceVolume", "integer" ]
convert => [ "EIFR-indexNumSuccessful", "integer" ]
convert => [ "EIFR-indexNumFailures", "integer" ]
convert => [ "EIFR-batchSize", "integer" ]
convert => [ "EIFR-numSuccessful", "integer" ]
convert => [ "EIFR-numFailures", "integer" ]
add_tag => "scala fields done"
} #end mutate
}
#grok out metrics
if "whack LF" in [tags] and "batchstatus" not in [tags] {
grok {
match => [ "OrigMsg", "PostDataJson of size: %{INT:PostQueuePostDataJsonSize}" ]
} #end grok
grok {
match => [ "OrigMsg", "updated Service Queue -- size:%{INT:ServiceQueueSize}" ]
} #end grok
grok {
match => [ "OrigMsg", "Posting Post Data Json of size: %{INT:PostDataSize}" ]
} #end grok
grok {
match => [ "OrigMsg", "Service EIF remove:%{INT:ServiceEIFRemove}" ]
} #end grok
} #end conditional
} #end filter
output {
#send to elasticsearch so we can visualize in kibana
elasticsearch {
embedded => true
} #end es output
#create a CSV file so we can visualize in a spreadsheet
# csv {
# fields => [ "[BATCH_STATUS][writeTime]","[BATCH_STATUS][indexedSourceVolume]","[BATCH_STATUS][indexNumSuccessful]","[BATCH_STATUS][indexNumFailures]","[BATCH_STATUS][batchSize]","[BATCH_STATUS][numSuccessful]","[BATCH_STATUS][numFailures]" ]
# csv_options => { "force_quotes" => "true" }
# path => "/var/log/EIFR-Perf-csv-output.log"
# } #end CSV
} #end output