Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_emitter: write msgpack buffer directly(#4049) #4128

Merged
merged 3 commits into from
Oct 27, 2021

Conversation

nokute78
Copy link
Collaborator

Fixes #4049
Currently, in_emitter has 2 queues and first queue doesn't have memory limit.
It causes #4049.
#4049 (comment)

This patch is to modify to use single queue and write data directly to use 'mem_buf_limit'.
rewrite_tag -> in_emitter_add_record -> flb_input_chunk_append_raw.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • [N/A] Documentation required for this feature

Example Configuration

[INPUT]
     Name dummy
     Tag dummy1
     Dummy {"msg":"aaa"}
     Rate 1000

[INPUT]
    Name dummy
    Tag dummy2
    Dummy {"msg":"bbb"}
    Rate 5000

[INPUT]
    Name dummy
    Tag dummy3
    Dummy {"msg":"ccc"}
    Rate 10000

[INPUT]
    Name kmsg
    Tag kmsg

[FILTER]
    Name rewrite_tag
    Match dummy*
    Emitter_Name dummy_new
    Emitter_mem_buf_limit 1MB
    Rule $msg ^(.*)$ new false

[FILTER]
    Name rewrite_tag
    Match kmsg
    Emitter_Name kmsg_new
    Emitter_mem_buf_limit 1MB
    Rule $msg ^(.*)$ new false

[OUTPUT]
    Name counter
    Match new

#[OUTPUT]
#    Name stdout
#    Match new

Debug output

This patch reports [2021/09/27 16:41:27] [error] [input:emitter:dummy_new] error registering chunk with tag: new.
It means mem_buf_limit works.

$ bin/fluent-bit -c ~/fluentbit-conf/rewrite_pfm.conf 
Fluent Bit v1.9.0
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/09/27 16:41:24] [ info] [engine] started (pid=14155)
[2021/09/27 16:41:24] [ info] [storage] version=1.1.1, initializing...
[2021/09/27 16:41:24] [ info] [storage] in-memory
[2021/09/27 16:41:24] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/09/27 16:41:24] [ info] [cmetrics] version=0.2.1
[2021/09/27 16:41:24] [ info] [sp] stream processor started
[2021/09/27 16:41:27] [ warn] [input] emitter.4 paused (mem buf overlimit)
[2021/09/27 16:41:27] [error] [input:emitter:dummy_new] error registering chunk with tag: new
[2021/09/27 16:41:27] [error] [input:emitter:dummy_new] error registering chunk with tag: new
[2021/09/27 16:41:27] [error] [input:emitter:dummy_new] error registering chunk with tag: new
[2021/09/27 16:41:27] [error] [input:emitter:dummy_new] error registering chunk with tag: new
(snip)
^C[2021/09/27 16:42:10] [engine] caught signal (SIGINT)
1632728530.736583,14483 (total = 115191)
[2021/09/27 16:42:10] [ warn] [engine] service will stop in 5 seconds
[2021/09/27 16:42:15] [ info] [engine] service stopped

Valgrind output

$ valgrind --leak-check=full bin/fluent-bit -c ~/fluentbit-conf/rewrite_pfm.conf 
==14152== Memcheck, a memory error detector
==14152== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==14152== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==14152== Command: bin/fluent-bit -c /home/taka/fluentbit-conf/rewrite_pfm.conf
==14152== 
Fluent Bit v1.9.0
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/09/27 16:40:43] [ info] [engine] started (pid=14152)
[2021/09/27 16:40:43] [ info] [storage] version=1.1.1, initializing...
[2021/09/27 16:40:43] [ info] [storage] in-memory
[2021/09/27 16:40:43] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/09/27 16:40:43] [ info] [cmetrics] version=0.2.1
[2021/09/27 16:40:43] [ info] [sp] stream processor started
==14152== Warning: client switching stacks?  SP change: 0x57e59c8 --> 0x4c89e50
==14152==          to suppress, use: --max-stackframe=11910008 or greater
==14152== Warning: client switching stacks?  SP change: 0x4c89dc8 --> 0x57e59c8
==14152==          to suppress, use: --max-stackframe=11910144 or greater
==14152== Warning: client switching stacks?  SP change: 0x57e59c8 --> 0x4c89dc8
==14152==          to suppress, use: --max-stackframe=11910144 or greater
==14152==          further instances of this message will not be shown.
1632728447.719220,2757 (total = 2757)
1632728447.779638,708 (total = 3465)
^C[2021/09/27 16:40:51] [engine] caught signal (SIGINT)
1632728451.121981,3155 (total = 6620)
[2021/09/27 16:40:51] [ warn] [engine] service will stop in 5 seconds
[2021/09/27 16:40:55] [ info] [engine] service stopped
==14152== 
==14152== HEAP SUMMARY:
==14152==     in use at exit: 0 bytes in 0 blocks
==14152==   total heap usage: 234,121 allocs, 234,121 frees, 2,438,220,259 bytes allocated
==14152== 
==14152== All heap blocks were freed -- no leaks are possible
==14152== 
==14152== For lists of detected and suppressed errors, rerun with: -s
==14152== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@nokute78
Copy link
Collaborator Author

Performance

Version counts/min
master+this patch 14479
v1.8.7 14962

configuration parameter

Remove Emitter_mem_buf and use flowcounter.

[INPUT]
    Name dummy
    Tag dummy1
    Dummy {"msg":"aaa"}
    Rate 1000

[INPUT]
    Name dummy
    Tag dummy2
    Dummy {"msg":"bbb"}
    Rate 5000

[INPUT]
    Name dummy
    Tag dummy3
    Dummy {"msg":"ccc"}
    Rate 10000

[INPUT]
    Name kmsg
    Tag kmsg

[FILTER]
    Name rewrite_tag
    Match dummy*
    Emitter_Name dummy_new
#    Emitter_mem_buf_limit 1MB
    Rule $msg ^(.*)$ new false

[FILTER]
    Name rewrite_tag
    Match kmsg
    Emitter_Name kmsg_new
#    Emitter_mem_buf_limit 1MB
    Rule $msg ^(.*)$ new false

[OUTPUT]
    Name flowcounter
    Match new

#[OUTPUT]
#    Name stdout
#    Match new

Log

master + this patch

$ bin/fluent-bit -c ~/fluentbit-conf/rewrite_pfm.conf 
Fluent Bit v1.9.0
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/09/27 16:50:15] [ info] [engine] started (pid=14978)
[2021/09/27 16:50:15] [ info] [storage] version=1.1.1, initializing...
[2021/09/27 16:50:15] [ info] [storage] in-memory
[2021/09/27 16:50:15] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/09/27 16:50:15] [ info] [cmetrics] version=0.2.1
[2021/09/27 16:50:15] [ info] [sp] stream processor started
[out_flowcounter] [1632729015, {"counts":0, "bytes":0, "counts/minute":0, "bytes/minute":0 }]
[out_flowcounter] [1632729075, {"counts":867826, "bytes":17427629, "counts/minute":14463, "bytes/minute":290460 }]
[out_flowcounter] [1632729135, {"counts":868774, "bytes":17375480, "counts/minute":14479, "bytes/minute":289591 }]
^C[2021/09/27 16:52:21] [engine] caught signal (SIGINT)
[2021/09/27 16:52:21] [ warn] [engine] service will stop in 5 seconds
[2021/09/27 16:52:25] [ info] [engine] service stopped

v1.8.7

$ bin/fluent-bit -c ~/fluentbit-conf/rewrite_pfm.conf 
Fluent Bit v1.8.7
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/09/27 16:47:27] [ info] [engine] started (pid=14963)
[2021/09/27 16:47:27] [ info] [storage] version=1.1.1, initializing...
[2021/09/27 16:47:27] [ info] [storage] in-memory
[2021/09/27 16:47:27] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/09/27 16:47:27] [ info] [cmetrics] version=0.2.1
[2021/09/27 16:47:27] [ info] [sp] stream processor started
[out_flowcounter] [1632728847, {"counts":0, "bytes":0, "counts/minute":0, "bytes/minute":0 }]
[out_flowcounter] [1632728907, {"counts":866619, "bytes":17403489, "counts/minute":14443, "bytes/minute":290058 }]
[out_flowcounter] [1632728967, {"counts":897766, "bytes":17955320, "counts/minute":14962, "bytes/minute":299255 }]
^C[2021/09/27 16:49:35] [engine] caught signal (SIGINT)
[2021/09/27 16:49:35] [ info] [input] pausing dummy_new
[2021/09/27 16:49:35] [ info] [input] pausing kmsg_new
[2021/09/27 16:49:35] [ warn] [engine] service will stop in 5 seconds
[2021/09/27 16:49:39] [ info] [engine] service stopped

In previous implementation, in_emitter has 2 buffers.
- 1. rewrite_tag -> in_emitter_add_record -> msgpack_sbuffer_write
- 2. (timer thread. every 0.5 sec) cb_queue_chunks -> flb_input_chunk_append_raw

'mem_buf_limit' is for flb_input_chunk API, so the thread 1 doesn't have limits.

The patch is to modify writing sequence.
rewrite_tag -> in_emitter_add_record -> flb_input_chunk_append_raw

Signed-off-by: Takahiro Yamashita <[email protected]>
If in_emitter pauses before original input plugin pauses,
some records will emit even if keep option is disabled.
Since keep property is only used when emitting is success.

Signed-off-by: Takahiro Yamashita <[email protected]>
@nokute78
Copy link
Collaborator Author

nokute78 commented Oct 4, 2021

User reported the original issue was fixed, but the patch caused side effect.
#4049 (comment)

I updated this PR to fix side effect.
e2b566e

@edsiper edsiper merged commit 711da35 into fluent:master Oct 27, 2021
@nokute78 nokute78 deleted the emitter_limit branch October 29, 2021 23:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Rewrite_Tag is causing significant memory spike when upstream connectivity is lost
2 participants