-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathredis.rb
325 lines (267 loc) · 11.1 KB
/
redis.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "stud/buffer"
# This output will send events to a Redis queue using RPUSH.
# The RPUSH command is supported in Redis v0.0.7+. Using
# PUBLISH to a channel requires at least v1.3.8+.
# While you may be able to make these Redis versions work,
# the best performance and stability will be found in more
# recent stable versions. Versions 2.6.0+ are recommended.
#
# For more information, see http://redis.io/[the Redis homepage]
#
class LogStash::Outputs::Redis < LogStash::Outputs::Base
include Stud::Buffer
config_name "redis"
default :codec, "json"
# The hostname(s) of your Redis server(s). Ports may be specified on any
# hostname, which will override the global port config.
# If the hosts list is an array, Logstash will pick one random host to connect to,
# if that host is disconnected it will then pick another.
#
# For example:
# [source,ruby]
# "127.0.0.1"
# ["127.0.0.1", "127.0.0.2"]
# ["127.0.0.1:6380", "127.0.0.1"]
config :host, :validate => :array, :default => ["127.0.0.1"]
# Shuffle the host list during Logstash startup.
config :shuffle_hosts, :validate => :boolean, :default => true
# The default port to connect on. Can be overridden on any hostname.
config :port, :validate => :number, :default => 6379
# SSL
config :ssl_enabled, :validate => :boolean, :default => false
# Validate the certificate chain against these authorities. You can define multiple files.
# All the certificates will be read and added to the trust store.
config :ssl_certificate_authorities, :validate => :path, :list => true
# Options to verify the server's certificate.
# "full": validates that the provided certificate has an issue date that’s within the not_before and not_after dates;
# chains to a trusted Certificate Authority (CA); has a hostname or IP address that matches the names within the certificate.
# "none": performs no certificate validation. Disabling this severely compromises security (https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf)
config :ssl_verification_mode, :validate => %w[full none], :default => 'full'
# SSL certificate path
config :ssl_certificate, :validate => :path
# SSL key path
config :ssl_key, :validate => :path
# SSL key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil
# NOTE: the default setting [] uses SSL engine defaults
config :ssl_supported_protocols, :validate => %w[TLSv1.1 TLSv1.2 TLSv1.3], :default => [], :list => true
# The list of ciphers suite to use
config :ssl_cipher_suites, :validate => :string, :list => true
# The Redis database number.
config :db, :validate => :number, :default => 0
# Redis initial connection timeout in seconds.
config :timeout, :validate => :number, :default => 5
# Password to authenticate with. There is no authentication by default.
config :password, :validate => :password
# The name of a Redis list or channel. Dynamic names are
# valid here, for example `logstash-%{type}`.
config :key, :validate => :string, :required => true
# Either list or channel. If `redis_type` is list, then we will set
# RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`.
config :data_type, :validate => [ "list", "channel" ], :required => true
# Set to true if you want Redis to batch up values and send 1 RPUSH command
# instead of one command per value to push on the list. Note that this only
# works with `data_type="list"` mode right now.
#
# If true, we send an RPUSH every "batch_events" events or
# "batch_timeout" seconds (whichever comes first).
# Only supported for `data_type` is "list".
config :batch, :validate => :boolean, :default => false
# If batch is set to true, the number of events we queue up for an RPUSH.
config :batch_events, :validate => :number, :default => 50
# If batch is set to true, the maximum amount of time between RPUSH commands
# when there are pending events to flush.
config :batch_timeout, :validate => :number, :default => 5
# Interval for reconnecting to failed Redis connections
config :reconnect_interval, :validate => :number, :default => 1
# In case Redis `data_type` is `list` and has more than `@congestion_threshold` items,
# block until someone consumes them and reduces congestion, otherwise if there are
# no consumers Redis will run out of memory, unless it was configured with OOM protection.
# But even with OOM protection, a single Redis list can block all other users of Redis,
# until Redis CPU consumption reaches the max allowed RAM size.
# A default value of 0 means that this limit is disabled.
# Only supported for `list` Redis `data_type`.
config :congestion_threshold, :validate => :number, :default => 0
# How often to check for congestion. Default is one second.
# Zero means to check on every event.
config :congestion_interval, :validate => :number, :default => 1
def register
require 'redis'
validate_ssl_config!
if @batch
if @data_type != "list"
raise RuntimeError.new(
"batch is not supported with data_type #{@data_type}"
)
end
buffer_initialize(
:max_items => @batch_events,
:max_interval => @batch_timeout,
:logger => @logger
)
end
@redis = nil
if @shuffle_hosts
@host.shuffle!
end
@host_idx = 0
@congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }
@codec.on_event(&method(:send_to_redis))
end # def register
def receive(event)
# TODO(sissel): We really should not drop an event, but historically
# we have dropped events that fail to be converted to json.
# TODO(sissel): Find a way to continue passing events through even
# if they fail to convert properly.
begin
@codec.encode(event)
rescue LocalJumpError
# This LocalJumpError rescue clause is required to test for regressions
# for https://github.com/logstash-plugins/logstash-output-redis/issues/26
# see specs. Without it the LocalJumpError is rescued by the StandardError
raise
rescue StandardError => e
@logger.warn("Error encoding event", :exception => e,
:event => event)
end
end # def receive
def congestion_check(key)
return if @congestion_threshold == 0
if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold.
@logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds")
sleep @congestion_interval
end
@congestion_check_times[key] = Time.now.to_i
end
end
# called from Stud::Buffer#buffer_flush when there are events to flush
def flush(events, key, close=false)
@redis ||= connect
# we should not block due to congestion on close
# to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
congestion_check(key) unless close
@redis.rpush(key, events)
end
# called from Stud::Buffer#buffer_flush when an error occurs
def on_flush_error(e)
@logger.warn("Failed to send backlog of events to Redis",
:identity => identity,
:exception => e,
:backtrace => e.backtrace
)
@redis = connect
end
def close
if @batch
buffer_flush(:final => true)
end
if @data_type == 'channel' and @redis
@redis.quit
@redis = nil
end
end
private
def connect
@current_host, @current_port = @host[@host_idx].split(':')
@host_idx = @host_idx + 1 >= @host.length ? 0 : @host_idx + 1
if not @current_port
@current_port = @port
end
params = {
:host => @current_host,
:port => @current_port,
:timeout => @timeout,
:db => @db,
:ssl => @ssl_enabled,
}
params[:ssl_params] = setup_ssl_params if @ssl_enabled
@logger.debug("connection params", params)
if @password
params[:password] = @password.value
end
Redis.new(params)
end # def connect
def setup_ssl_params
require "openssl"
params = {}
params[:cert_store] = ssl_certificate_store
if @ssl_verification_mode == 'none'
params[:verify_mode] = OpenSSL::SSL::VERIFY_NONE
else
params[:verify_mode] = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
if @ssl_certificate
params[:cert] = OpenSSL::X509::Certificate.new(File.read(@ssl_certificate))
if @ssl_key
# if we have an encrypted key and a password is not provided (nil) than OpenSSL::PKey::RSA
# prompts the user to enter a password interactively - we do not want to do that,
# for plain-text keys the default '' password argument gets simply ignored
params[:key] = OpenSSL::PKey::RSA.new(File.read(@ssl_key), @ssl_key_passphrase.value || '')
end
end
params[:min_version] = :TLS1_1
if @ssl_supported_protocols.any?
protocols = @ssl_supported_protocols.map { |v| v.delete('v').tr(".", "_").to_sym }.sort
params[:min_version] = protocols.first
params[:max_version] = protocols.last
end
params[:ciphers] = @ssl_cipher_suites if @ssl_cipher_suites&.any?
params
end
def ssl_certificate_store
cert_store = new_ssl_certificate_store
cert_store.set_default_paths
@ssl_certificate_authorities&.each do |cert|
cert_store.add_file(cert)
end
cert_store
end
def new_ssl_certificate_store
OpenSSL::X509::Store.new
end
def validate_ssl_config!
unless @ssl_enabled
ignored_ssl_settings = original_params.select { |k| k != 'ssl_enabled' && k.start_with?('ssl_') }
@logger.warn("Configured SSL settings are not used when `ssl_enabled` is set to `false`: #{ignored_ssl_settings.keys}") if ignored_ssl_settings.any?
return
end
if @ssl_certificate && !@ssl_key
raise LogStash::ConfigurationError, "Using an `ssl_certificate` requires an `ssl_key`"
elsif @ssl_key && !@ssl_certificate
raise LogStash::ConfigurationError, 'An `ssl_certificate` is required when using an `ssl_key`'
end
end
# A string used to identify a Redis instance in log messages
def identity
"redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
end
def send_to_redis(event, payload)
# How can I do this sort of thing with codecs?
key = event.sprintf(@key)
if @batch && @data_type == 'list' # Don't use batched method for pubsub.
# Stud::Buffer
buffer_receive(payload, key)
return
end
begin
@redis ||= connect
if @data_type == 'list'
congestion_check(key)
@redis.rpush(key, payload)
else
@redis.publish(key, payload)
end
rescue => e
@logger.warn("Failed to send event to Redis", :event => event,
:identity => identity, :exception => e,
:backtrace => e.backtrace)
sleep @reconnect_interval
@redis = nil
retry
end
end
end