From 875cbc3b75a2e0246284440dad81d4fab44ba94a Mon Sep 17 00:00:00 2001 From: Iain Patterson Date: Fri, 27 Oct 2017 16:56:43 +0100 Subject: [PATCH] Issue 1726: Fix for in_tcp log corruption under load. The TCP input plugin shares a single buffer across all connections. Under load the buffer sometimes gets truncated. The suspicion is that concurrent connections race to modify it. Long message is received. Short message is received while the long message is still being parsed. The short message is parsed so the buffer will be truncated. The buffer, which now contains "longmessage\nshort\n", is truncated by the length of the short message. It is now "longm". Another message arrives. Now the buffer is "longmanother\n", which does not parse. The whole buffer is thrown away and subsequent messages are received and handled as usual. Eventually the pattern repeats. The fix is to use a per-connection buffer. --- lib/fluent/plugin/in_tcp.rb | 9 ++++----- lib/fluent/plugin_helper/server.rb | 3 +++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index 428eb5258a..71d77b0f4f 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -55,13 +55,12 @@ def multi_workers_ready? def start super - @buffer = '' server_create(:in_tcp_server, @port, bind: @bind) do |data, conn| - @buffer << data + conn.buffer << data begin pos = 0 - while i = @buffer.index(@delimiter, pos) - msg = @buffer[pos...i] + while i = conn.buffer.index(@delimiter, pos) + msg = conn.buffer[pos...i] pos = i + @delimiter.length @parser.parse(msg) do |time, record| @@ -77,7 +76,7 @@ def start router.emit(tag, time, record) end end - @buffer.slice!(0, pos) if pos > 0 + conn.buffer.slice!(0, pos) if pos > 0 end end end diff --git a/lib/fluent/plugin_helper/server.rb b/lib/fluent/plugin_helper/server.rb index ffdb57dab1..eb57832475 100644 --- a/lib/fluent/plugin_helper/server.rb +++ b/lib/fluent/plugin_helper/server.rb @@ -425,9 +425,12 @@ def on(event, &callback) end class TCPCallbackSocket < CallbackSocket + attr_accessor :buffer + def initialize(sock) super("tcp", sock, [:data, :write_complete, :close]) @peeraddr = @sock.peeraddr + @buffer = '' end def write(data)