Skip to content

Commit

Permalink
Merge pull request #4 from section/messagepack-array-prefix
Browse files Browse the repository at this point in the history
Add messagepack array prefix so that multiple records will be processed.
  • Loading branch information
adikus authored Aug 31, 2022
2 parents 3135ea3 + d49388d commit 778fc64
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
9 changes: 8 additions & 1 deletion lib/fluent/plugin/out_logtail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ def deliver(chunk, attempt)
end

http = build_http_client
body = chunk.read
records=0
chunk.each do
records=records+1
end
body = [0xdd,records].pack("CN")
body << chunk.read

begin
resp = http.start do |conn|
req = build_request(body)
log.debug("sending #{req.body.length} bytes to logtail")
conn.request(req)
end
ensure
Expand All @@ -51,6 +57,7 @@ def deliver(chunk, attempt)

code = resp.code.to_i
if code >= 200 && code <= 299
log.debug "POST request to logtail was responded to with status code #{code}"
true
elsif RETRYABLE_CODES.include?(code)
sleep_time = sleep_for_attempt(attempt)
Expand Down
2 changes: 1 addition & 1 deletion spec/fluent/plugin/out_logtail_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def format(tag, time, record)
it "should send a chunked request to the Logtail API" do
stub = stub_request(:post, "https://in.logtail.com/").
with(
:body => start_with("\x85\xA3age\x1A\xAArequest_id\xA242\xA9parent_id\xA6parent\xAArouting_id\xA7routing\xA2dt\xB4".force_encoding("ASCII-8BIT")),
:body => start_with("\xDD\x00\x00\x00\x01\x85\xA3age\x1A\xAArequest_id\xA242\xA9parent_id\xA6parent\xAArouting_id\xA7routing\xA2dt\xB4".force_encoding("ASCII-8BIT")),
:headers => {'Accept'=>'*/*', 'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', 'Authorization'=>'Bearer abcd1234', 'Content-Type'=>'application/msgpack', 'User-Agent'=>'Logtail Fluentd/0.1.1'}
).
to_return(:status => 202, :body => "", :headers => {})
Expand Down

0 comments on commit 778fc64

Please sign in to comment.