diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index a3eb2d1870..acf49113e0 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -87,6 +87,9 @@ def initialize desc 'Enable client-side DNS round robin.' config_param :dns_round_robin, :bool, default: false # heartbeat_type 'udp' is not available for this + desc 'Compress buffered data.' + config_param :compress, :enum, list: [:text, :gzip], default: :text + attr_reader :nodes # backward compatibility @@ -144,6 +147,13 @@ def configure(conf) log.info "adding forwarding server '#{name}'", host: host, port: port, weight: weight, plugin_id: plugin_id } + if @compress == :gzip && @buffer.compress == :text + @buffer.compress = :gzip + elsif @compress == :text && @buffer.compress == :gzip + @compress = :gzip + log.info "buffer is compressed. If you don't want to compress buffer, remove `compress` configuration in " + end + if @nodes.empty? raise ConfigError, "forward output plugin requires at least one is required" end @@ -317,9 +327,9 @@ def send_data(node, tag, chunk) #end # writeRawBody(packed_es) - chunk.write_to(sock) + chunk.write_to(sock, { compress: @compress }) - option = { 'size' => chunk.size_of_events } + option = { 'size' => chunk.size_of_events, 'compress' => @compress } option['chunk'] = Base64.encode64(chunk.unique_id) if @require_ack_response sock.write option.to_msgpack diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 22edc4b4b2..601a3531b6 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -92,6 +92,28 @@ def test_configure_no_server end end + def test_configure_text_compress + d = create_driver + assert_equal :text, d.instance.compress + end + + def test_configure_gzip_compress + d = create_driver(CONFIG + %[compress gzip]) + assert_equal :gzip, d.instance.compress + assert_equal :gzip, d.instance.buffer.compress + end + + def test_configure_gzip_compress_in_buffer + d = create_driver(CONFIG + %[ + + type memory + compress gzip + + ]) + assert_equal :gzip, d.instance.compress + assert_equal :gzip, d.instance.buffer.compress + end + def test_phi_failure_detector d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0]) node = d.instance.nodes.first