Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_es: out_es: use snprintf wrapper function for bulk header(#4311) #4361

Merged
merged 5 commits into from
Dec 18, 2021

Conversation

nokute78
Copy link
Collaborator

Fixes #4311.

We used fixed char array to construct bulk header.
If long index( the length is greater than 80) , out_es creates broken json since the buffer is small and fixed.
We can reproduce the issue using below command.

fluent-bit -i cpu -o es -p generate_id=on -p index=123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890

This PR is to

  • Add test case for the issue
  • Add new API flb_sds_snprintf_realloc
    • It is similar to snprintf. The difference is that the API increases the buffer of flb_sds_t if there is no room to snprintf.
  • Use flb_sds_t to construct header and use new API to increase the buffer

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • [N/A] Documentation required for this feature

Example Configuration

  1. nc -l 9200
  2. exec fluent-bit using below command
fluent-bit -i cpu -o es -p generate_id=on -p index=123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890

Debug output

$ nc -l 9200
POST /_bulk HTTP/1.1
Host: 127.0.0.1:9200
Content-Length: 1216
Content-Type: application/x-ndjson
User-Agent: Fluent-Bit

{"create":{"_index":"123456789012345678901234567890123456789012345678901234567890123456789012345678901","_type":"_doc","_id":"b9042b24-930a-1f00-d630-03b0ef375788"}}{"@timestamp":"2021-11-23T12:25:01.173Z","cpu_p":29,"user_p":28,"system_p":1.0,"cpu0.p_cpu":29.0,"cpu0.p_user":28.0,"cpu0.p_system":1.0}
$ bin/fluent-bit -i cpu -o es -p generate_id=on -p index=123456789012345678901234567890123456789012345678901234567890123456789012345678901
Fluent Bit v1.9.0
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/11/23 21:25:00] [ info] [engine] started (pid=115228)
[2021/11/23 21:25:00] [ info] [storage] version=1.1.5, initializing...
[2021/11/23 21:25:00] [ info] [storage] in-memory
[2021/11/23 21:25:00] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/11/23 21:25:00] [ info] [cmetrics] version=0.2.2
[2021/11/23 21:25:00] [ info] [sp] stream processor started

Valgrind output

$ valgrind --leak-check=full bin/fluent-bit -i cpu -o es -p generate_id=on -p index=123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 -p logstash_format=on
==115578== Memcheck, a memory error detector
==115578== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==115578== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==115578== Command: bin/fluent-bit -i cpu -o es -p generate_id=on -p index=123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890 -p logstash_format=on
==115578== 
Fluent Bit v1.9.0
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/11/23 21:30:39] [ info] [engine] started (pid=115578)
[2021/11/23 21:30:39] [ info] [storage] version=1.1.5, initializing...
[2021/11/23 21:30:39] [ info] [storage] in-memory
[2021/11/23 21:30:39] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/11/23 21:30:39] [ info] [cmetrics] version=0.2.2
[2021/11/23 21:30:39] [ info] [sp] stream processor started
==115578== Warning: client switching stacks?  SP change: 0x57e59f8 --> 0x4c8da80
==115578==          to suppress, use: --max-stackframe=11894648 or greater
==115578== Warning: client switching stacks?  SP change: 0x4c8da28 --> 0x57e59f8
==115578==          to suppress, use: --max-stackframe=11894736 or greater
==115578== Warning: client switching stacks?  SP change: 0x57e59f8 --> 0x4c8da28
==115578==          to suppress, use: --max-stackframe=11894736 or greater
==115578==          further instances of this message will not be shown.
^C[2021/11/23 21:30:48] [engine] caught signal (SIGINT)
[2021/11/23 21:30:48] [ info] [input] pausing cpu.0
[2021/11/23 21:30:48] [ warn] [engine] service will shutdown in max 5 seconds
[2021/11/23 21:30:49] [ info] [engine] service has stopped (0 pending tasks)
==115578== 
==115578== HEAP SUMMARY:
==115578==     in use at exit: 0 bytes in 0 blocks
==115578==   total heap usage: 1,323 allocs, 1,323 frees, 1,476,024 bytes allocated
==115578== 
==115578== All heap blocks were freed -- no leaks are possible
==115578== 
==115578== For lists of detected and suppressed errors, rerun with: -s
==115578== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

The function is a wrapper of snprintf.
It can increase sds if sds doesn't have enough buffer.

Signed-off-by: Takahiro Yamashita <[email protected]>
We used fixed char array to construct bulk header.
If long index comes, it needs larger buffer.

out_es uses snprintf wrapper function.
If the buffer is small, the api increases the buffer.

Signed-off-by: Takahiro Yamashita <[email protected]>
@@ -407,3 +407,31 @@ void flb_sds_destroy(flb_sds_t s)
head = FLB_SDS_HEADER(s);
flb_free(head);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function is not necessary and we can simply rely on flb_sds_printf() functionality.

flb_sds_t by default always reallocate if necessary, and calculating the final length can be done with flb_sds_len()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionality is a little different.

flb_sds_printf appends offset flb_sds_len(s) and it means appends string.
https://github.com/fluent/fluent-bit/blob/v1.8.10/src/flb_sds.c#L367

I want to overwrite flb_sds_t.
So if we use the API to overwrite, we need to call flb_sds_len_set(sds, 0) before the API.

flb_sds_len_set(j_index, 0);
out_buf = flb_sds_printf(&j_index,
                        ES_BULK_INDEX_FMT_ID_WITHOUT_TYPE,
                        es_index,  id_key_str);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I understand. thanks for the clarification.

On that case so I would suggest renaming the new function from:

  • flb_sds_snprintf_realloc

to

  • flb_sds_snprintf

basically the "realloc" context is something that works behind the scenes, where the user don't need to be aware of it (similar to other sds...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for comment.
I renamed.

@nokute78
Copy link
Collaborator Author

Note: flb-rt-out_elasticsearch failed since the test case flb_test_index_type and flb_test_logstash_format don't take care 7f0db9e
It is not related by this PR since current master also failed on that test cases.

7f0db9e is to use create operation.
However these test cases expect index operation.

@nokute78
Copy link
Collaborator Author

Note: I sent a PR #4480 to fix test cases.

@edsiper edsiper merged commit 0fbca8d into fluent:master Dec 18, 2021
@nokute78 nokute78 deleted the fix_4311_es_fixed_bulk_header branch December 18, 2021 23:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting-for-user Waiting for more information, tests or requested changes
Projects
None yet
2 participants