-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add standard chunking format #914
Changes from all commits
88b372e
6596c9a
8e56525
c814a86
2845bad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,10 @@ class EventStream | |
include Enumerable | ||
include MessagePackFactory::Mixin | ||
|
||
def records | ||
raise NotImplementedError, "DO NOT USE THIS CLASS directly." | ||
end | ||
|
||
def repeatable? | ||
false | ||
end | ||
|
@@ -29,7 +33,8 @@ def each(&block) | |
raise NotImplementedError, "DO NOT USE THIS CLASS directly." | ||
end | ||
|
||
def to_msgpack_stream | ||
def to_msgpack_stream(time_int: false) | ||
return to_msgpack_stream_forced_integer if time_int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Direct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we can remove it right now (because it's a v0.14 new feature), and it's better to be done. |
||
out = msgpack_packer | ||
each {|time,record| | ||
out.write([time,record]) | ||
|
@@ -46,7 +51,6 @@ def to_msgpack_stream_forced_integer | |
end | ||
end | ||
|
||
|
||
class OneEventStream < EventStream | ||
def initialize(time, record) | ||
@time = time | ||
|
@@ -57,6 +61,10 @@ def dup | |
OneEventStream.new(@time, @record.dup) | ||
end | ||
|
||
def records | ||
1 | ||
end | ||
|
||
def repeatable? | ||
true | ||
end | ||
|
@@ -81,6 +89,10 @@ def dup | |
ArrayEventStream.new(entries) | ||
end | ||
|
||
def records | ||
@entries.size | ||
end | ||
|
||
def repeatable? | ||
true | ||
end | ||
|
@@ -102,7 +114,7 @@ def each(&block) | |
# | ||
# Use this class as below, in loop of data-enumeration: | ||
# 1. initialize blank stream: | ||
# streams[tag] ||= MultiEventStream | ||
# streams[tag] ||= MultiEventStream.new | ||
# 2. add events | ||
# stream[tag].add(time, record) | ||
class MultiEventStream < EventStream | ||
|
@@ -119,6 +131,10 @@ def dup | |
es | ||
end | ||
|
||
def records | ||
@time_array.size | ||
end | ||
|
||
def add(time, record) | ||
@time_array << time | ||
@record_array << record | ||
|
@@ -144,16 +160,20 @@ def each(&block) | |
|
||
class MessagePackEventStream < EventStream | ||
# Keep cached_unpacker argument for existence plugins | ||
def initialize(data, cached_unpacker = nil) | ||
def initialize(data, records = 0, cached_unpacker = nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some 3rd party plugins use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, it's my mistake. I'll fix it with keyword argument. |
||
@data = data | ||
@records = records | ||
end | ||
|
||
def records | ||
@records | ||
end | ||
|
||
def repeatable? | ||
true | ||
end | ||
|
||
def each(&block) | ||
# TODO format check | ||
msgpack_unpacker.feed_each(@data, &block) | ||
nil | ||
end | ||
|
@@ -162,5 +182,21 @@ def to_msgpack_stream | |
@data | ||
end | ||
end | ||
end | ||
|
||
module ChunkMessagePackEventStreamer | ||
include MessagePackFactory::Mixin | ||
# chunk.extend(ChunkEventStreamer) | ||
# => chunk.each{|time, record| ... } | ||
def each(&block) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Original implementaion resuces EOFError from MessagePack::Unpacker. def msgpack_each(&block)
open do |io|
u = msgpack_factory.unpacker(io)
begin
u.each(&block)
rescue EOFError
end
end
end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MessagePack::Unpacker raises EOFError if data underflow occurs when unpacking data. It's caused by broken/unflushed data on disk. So, this code shouldn't do |
||
open do |io| | ||
msgpack_unpacker(io).each(&block) | ||
end | ||
nil | ||
end | ||
alias :msgpack_each :each | ||
|
||
def to_msgpack_stream | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This mixin is used only for MessagePackEventStream, and it's already packed/encoded from ruby object to msgpack. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay. |
||
read | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -180,7 +180,7 @@ def emit(metadata, data, force: false) | |
chunk.synchronize do | ||
begin | ||
chunk.append(data) | ||
if !size_over?(chunk) || force | ||
if !chunk_size_over?(chunk) || force | ||
chunk.commit | ||
stored = true | ||
@stage_size += (chunk.size - original_size) | ||
|
@@ -198,6 +198,49 @@ def emit(metadata, data, force: false) | |
emit_step_by_step(metadata, data) | ||
end | ||
|
||
def emit_bulk(metadata, bulk, records) | ||
return if bulk.nil? || bulk.empty? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When emit_bulk receives There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for safe. And this method is not for busy loop. |
||
raise BufferOverflowError unless storable? | ||
|
||
stored = false | ||
synchronize do # critical section for buffer (stage/queue) | ||
until stored | ||
chunk = @stage[metadata] | ||
unless chunk | ||
chunk = @stage[metadata] = generate_chunk(metadata) | ||
end | ||
|
||
chunk.synchronize do # critical section for chunk (chunk append/commit/rollback) | ||
begin | ||
empty_chunk = chunk.empty? | ||
chunk.concat(bulk, records) | ||
|
||
if chunk_size_over?(chunk) | ||
if empty_chunk | ||
log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes" | ||
else | ||
chunk.rollback | ||
enqueue_chunk(metadata) | ||
next | ||
end | ||
end | ||
|
||
chunk.commit | ||
stored = true | ||
@stage_size += bulk.bytesize | ||
if chunk_size_full?(chunk) | ||
enqueue_chunk(metadata) | ||
end | ||
rescue | ||
chunk.rollback | ||
raise | ||
end | ||
end | ||
end | ||
end | ||
nil | ||
end | ||
|
||
def queued_records | ||
synchronize { @queue.reduce(0){|r, chunk| r + chunk.records } } | ||
end | ||
|
@@ -310,10 +353,14 @@ def clear_queue! | |
end | ||
end | ||
|
||
def size_over?(chunk) | ||
def chunk_size_over?(chunk) | ||
chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit) | ||
end | ||
|
||
def chunk_size_full?(chunk) | ||
chunk.size >= @chunk_bytes_limit || (@chunk_records_limit && chunk.records >= @chunk_records_limit) | ||
end | ||
|
||
def emit_step_by_step(metadata, data) | ||
attempt_records = data.size / 3 | ||
|
||
|
@@ -336,7 +383,7 @@ def emit_step_by_step(metadata, data) | |
attempt = data.slice(0, attempt_records) | ||
chunk.append(attempt) | ||
|
||
if size_over?(chunk) | ||
if chunk_size_over?(chunk) | ||
chunk.rollback | ||
|
||
if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name
records
seems to representreturns the event entries
.num_records
orsize
, rubish, is better for me.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size
looks bad, because it sometimes means bytes of a data, and a type of EventStream (MessagePackEventStream) is actually a String object.How about
events
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
events
is better for me.@sonots How about this? This method will be used in flowcounter_simple or similar gems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks this method returns an integer, not an array. So, both
records
andevents
look bad for me.What about
num_events
,num_records
, or justlength
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'll use
length
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once I would use
length
, but I re-found thatChunk#records
already exists, and I madeEventStream#records
as same name with it.Chunk
also hasChunk#size
(bytesize), and#size != #length
is extraordinary confusing in ruby way. Hmm.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's best to:
Chunk#size
to return # of events, and make#length
as alias of#size
Chunk#bytesize
to return # of bytes of chunk contentBut this changes the meaning of
chunk.size
, and it will break compatibility of plugins.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create another pull-request to change
Chunk#records
andEventStream#records
to#num_events
later.Let me merge this right now.