Skip to content

Commit

Permalink
Merge pull request #1172 from ganmacs/support-data-compression-in-buffer
Browse files Browse the repository at this point in the history
Support data compression in buffer plugins
  • Loading branch information
tagomoris authored Sep 6, 2016
2 parents c100086 + 0ed911a commit 68d27dd
Show file tree
Hide file tree
Showing 19 changed files with 761 additions and 25 deletions.
23 changes: 23 additions & 0 deletions example/in_dummy_with_compression.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<source>
@type dummy
@label @main
tag "test.data"
size 2
rate 10
dummy {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"}
auto_increment_key number
</source>

<label @main>
<match test.data>
@type buffered_stdout
<buffer>
@type file
path "#{Dir.pwd}/compressed_buffers"
flush_at_shutdown false
chunk_limit_size 1m
flush_interval 10s
compress gzip
</buffer>
</match>
</label>
7 changes: 7 additions & 0 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#

require 'fluent/msgpack_factory'
require 'fluent/plugin/compressable'

module Fluent
class EventStream
include Enumerable
include MessagePackFactory::Mixin
include Fluent::Plugin::Compressable

# dup does deep copy for event stream
def dup
Expand Down Expand Up @@ -61,6 +63,11 @@ def to_msgpack_stream(time_int: false)
out.to_s
end

def to_compressed_msgpack_stream(time_int: false)
packed = to_msgpack_stream(time_int: time_int)
compress(packed)
end

def to_msgpack_stream_forced_integer
out = msgpack_packer
each {|time,record|
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ def resume
def generate_chunk(metadata)
# FileChunk generates real path with unique_id
if @file_permission
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission)
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission, compress: @compress)
else
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create)
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, compress: @compress)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def resume
end

def generate_chunk(metadata)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata, compress: @compress)
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

Metadata = Struct.new(:timekey, :tag, :variables) do
def empty?
timekey.nil? && tag.nil? && variables.nil?
Expand Down Expand Up @@ -458,7 +461,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)
serialized = format.call(data)
chunk.concat(serialized, size ? size.call : data.size)
else
chunk.append(data)
chunk.append(data, compress: @compress)
end
adding_bytesize = chunk.bytesize - original_bytesize

Expand Down Expand Up @@ -558,7 +561,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if format
chunk.concat(format.call(split), split.size)
else
chunk.append(split)
chunk.append(split, compress: @compress)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
Expand Down
72 changes: 67 additions & 5 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
#

require 'fluent/plugin/buffer'
require 'fluent/plugin/compressable'
require 'fluent/unique_id'
require 'fluent/event'

require 'monitor'
require 'tempfile'
require 'zlib'

module Fluent
module Plugin
Expand Down Expand Up @@ -46,7 +49,7 @@ class Chunk

# TODO: CompressedPackedMessage of forward protocol?

def initialize(metadata)
def initialize(metadata, compress: :text)
super()
@unique_id = generate_unique_id
@metadata = metadata
Expand All @@ -57,12 +60,15 @@ def initialize(metadata)
@size = 0
@created_at = Time.now
@modified_at = Time.now

extend Decompressable if compress == :gzip
end

attr_reader :unique_id, :metadata, :created_at, :modified_at, :state

# data is array of formatted record string
def append(data)
def append(data, **kwargs)
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
adding = ''.b
data.each do |d|
adding << d.b
Expand Down Expand Up @@ -141,19 +147,75 @@ def purge
self
end

def read
def read(**kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end

def open(&block)
def open(**kwargs, &block)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
raise NotImplementedError, "Implement this method in child class"
end

def write_to(io)
def write_to(io, **kwargs)
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
open do |i|
IO.copy_stream(i, io)
end
end

module Decompressable
include Fluent::Plugin::Compressable

def append(data, **kwargs)
if kwargs[:compress] == :gzip
io = StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
data.each do |d|
gz.write d
end
end
concat(io.string, data.size)
else
super
end
end

def open(**kwargs, &block)
if kwargs[:compressed] == :gzip
super
else
super(kwargs) do |chunk_io|
output_io = if chunk_io.is_a?(StringIO)
StringIO.new
else
Tempfile.new('decompressed-data')
end
decompress(input_io: chunk_io, output_io: output_io)
output_io.seek(0, IO::SEEK_SET)
yield output_io
end
end
end

def read(**kwargs)
if kwargs[:compressed] == :gzip
super
else
decompress(super)
end
end

def write_to(io, **kwargs)
open(compressed: :gzip) do |chunk_io|
if kwargs[:compressed] == :gzip
IO.copy_stream(chunk_io, io)
else
decompress(input_io: chunk_io, output_io: io)
end
end
end
end
end
end
end
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class FileChunk < Chunk

attr_reader :path, :permission

def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION)
super(metadata)
def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text)
super(metadata, compress: compress)
@permission = perm
@bytesize = @size = @adding_bytes = @adding_size = 0
@meta = nil
Expand Down Expand Up @@ -133,12 +133,12 @@ def purge
File.unlink(@path, @meta_path)
end

def read
def read(**kwargs)
@chunk.seek(0, IO::SEEK_SET)
@chunk.read
end

def open(&block)
def open(**kwargs, &block)
@chunk.seek(0, IO::SEEK_SET)
val = yield @chunk
@chunk.seek(0, IO::SEEK_END) if self.staged?
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module Fluent
module Plugin
class Buffer
class MemoryChunk < Chunk
def initialize(metadata)
def initialize(metadata, compress: :text)
super
@chunk = ''.force_encoding(Encoding::ASCII_8BIT)
@chunk_bytes = 0
Expand Down Expand Up @@ -72,15 +72,15 @@ def purge
true
end

def read
def read(**kwargs)
@chunk
end

def open(&block)
def open(**kwargs, &block)
StringIO.open(@chunk, &block)
end

def write_to(io)
def write_to(io, **kwargs)
# re-implementation to optimize not to create StringIO
io.write @chunk
end
Expand Down
91 changes: 91 additions & 0 deletions lib/fluent/plugin/compressable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'zlib'

module Fluent
module Plugin
module Compressable
def compress(data, **kwargs)
output_io = kwargs[:output_io]
io = output_io || StringIO.new
Zlib::GzipWriter.wrap(io) do |gz|
gz.write data
end

output_io || io.string
end

# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
# https://www.ruby-forum.com/topic/971591#979503
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
case
when input_io && output_io
io_decompress(input_io, output_io)
when input_io
output_io = StringIO.new
io = io_decompress(input_io, output_io)
io.string
when compressed_data.nil? || compressed_data.empty?
# check compressed_data(String) is 0 length
compressed_data
when output_io
# exeucte after checking compressed_data is empty or not
io = StringIO.new(compressed_data)
io_decompress(io, output_io)
else
string_decompress(compressed_data)
end
end

private

def string_decompress(compressed_data)
io = StringIO.new(compressed_data)

out = ''
loop do
gz = Zlib::GzipReader.new(io)
out += gz.read
unused = gz.unused
gz.finish

break if unused.nil?
adjust = unused.length
io.pos -= adjust
end

out
end

def io_decompress(input, output)
loop do
gz = Zlib::GzipReader.new(input)
v = gz.read
output.write(v)
unused = gz.unused
gz.finish

break if unused.nil?
adjust = unused.length
input.pos -= adjust
end

output
end
end
end
end
6 changes: 4 additions & 2 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,10 @@ def send_data(tag, chunk)

sock.write @sender.forward_header # beginArray(3)
sock.write tag.to_msgpack # 1. writeRaw(tag)
sock.write [0xdb, chunk.size].pack('CN') # 2. beginRaw(size) raw32
chunk.write_to(sock) # writeRawBody(packed_es)
chunk.open(compressed: :text) do |chunk_io|
sock.write [0xdb, chunk_io.size].pack('CN') # 2. beginRaw(size) raw32
IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es)
end
sock.write option.to_msgpack # 3. writeOption(option)

if @sender.require_ack_response
Expand Down
Loading

0 comments on commit 68d27dd

Please sign in to comment.