Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_elasticsearch: Refer the plugin provided tag #7360

Merged
merged 1 commit into from
May 12, 2023

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented May 11, 2023

Currently, in_elasticsearch does not refer the Tag property in its configuration.
This PR's motivation is the current implementation always collided for tag routing for multiple in_elasticsearch instantiated fluent-bit.
This should be able to avoided with adding the capability of handling plugin tag property.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[INPUT]
    name elasticsearch
    listen 0.0.0.0
    port 9880
    buffer_max_size 4M
    tag raw.es

[OUTPUT]
    name stdout
    match *
  • Debug log output from testing the change
Fluent Bit v2.1.3
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2023/05/11 17:54:27] [ info] Configuration:
[2023/05/11 17:54:27] [ info]  flush time     | 1.000000 seconds
[2023/05/11 17:54:27] [ info]  grace          | 5 seconds
[2023/05/11 17:54:27] [ info]  daemon         | 0
[2023/05/11 17:54:27] [ info] ___________
[2023/05/11 17:54:27] [ info]  inputs:
[2023/05/11 17:54:27] [ info]      elasticsearch
[2023/05/11 17:54:27] [ info] ___________
[2023/05/11 17:54:27] [ info]  filters:
[2023/05/11 17:54:27] [ info] ___________
[2023/05/11 17:54:27] [ info]  outputs:
[2023/05/11 17:54:27] [ info]      stdout.0
[2023/05/11 17:54:27] [ info] ___________
[2023/05/11 17:54:27] [ info]  collectors:
[2023/05/11 17:54:27] [ info] [fluent bit] version=2.1.3, commit=a98155a705, pid=1360762
[2023/05/11 17:54:27] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/05/11 17:54:27] [ info] [storage] ver=1.2.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/05/11 17:54:27] [ info] [cmetrics] version=0.6.1
[2023/05/11 17:54:27] [ info] [ctraces ] version=0.3.0
[2023/05/11 17:54:27] [ info] [input:elasticsearch:elasticsearch.0] initializing
[2023/05/11 17:54:27] [ info] [input:elasticsearch:elasticsearch.0] storage_strategy='memory' (memory only)
[2023/05/11 17:54:27] [ info] [output:stdout:stdout.0] worker #0 started
[2023/05/11 17:54:27] [debug] [elasticsearch:elasticsearch.0] created event channels: read=21 write=22
[2023/05/11 17:54:27] [debug] [downstream] listening on 0.0.0.0:9200
[2023/05/11 17:54:27] [debug] [stdout:stdout.0] created event channels: read=24 write=25
[2023/05/11 17:54:27] [ info] [sp] stream processor started
[2023/05/11 17:54:28] [trace] [io] connection OK
[2023/05/11 17:54:28] [trace] [input:elasticsearch:elasticsearch.0 at /media/Data3/Gitrepo/fluent-bit/plugins/in_elasticsearch/in_elasticsearch.c:52] new TCP connection arrived FD=40
[2023/05/11 17:54:28] [trace] [io coro=(nil)] [net_read] try up to 511999 bytes
[2023/05/11 17:54:28] [trace] [io coro=(nil)] [net_read] ret=164
[2023/05/11 17:54:28] [trace] [input:elasticsearch:elasticsearch.0 at /media/Data3/Gitrepo/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c:89] read()=164 pre_len=0 now_len=164
[2023/05/11 17:54:29] [trace] [io coro=(nil)] [net_read] try up to 511835 bytes
[2023/05/11 17:54:29] [trace] [io coro=(nil)] [net_read] ret=1281
[2023/05/11 17:54:29] [trace] [input:elasticsearch:elasticsearch.0 at /media/Data3/Gitrepo/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c:89] read()=1281 pre_len=164 now_len=1445
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=76
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=91
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=120
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=107
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=98
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=125
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=98
[2023/05/11 17:54:29] [debug] [input chunk] update output instances with new chunk size diff=115
[2023/05/11 17:54:29] [trace] [io coro=(nil)] [net_write] trying 761 bytes
[2023/05/11 17:54:29] [trace] [io coro=(nil)] [net_write] ret=761 total=761/761
[2023/05/11 17:54:29] [trace] [io coro=(nil)] [net_read] try up to 511999 bytes
[2023/05/11 17:54:29] [trace] [io coro=(nil)] [net_read] ret=0
[2023/05/11 17:54:29] [trace] [input:elasticsearch:elasticsearch.0 at /media/Data3/Gitrepo/fluent-bit/plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c:84] fd=40 closed connection
[2023/05/11 17:54:29] [trace] [downstream] destroy connection #40 to tcp://127.0.0.1:40000
[2023/05/11 17:54:30] [trace] [task 0x71eff10] created (id=0)
[0] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"index"=>{"_index"=>"test", "_id"=>"1"}}, "field1"=>"value1"}]
[1] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"create"=>{"_index"=>"test", "_id"=>"3"}}, "field1"=>"value3", "field2"=>"value4"}]
[2] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"index"=>{"_index"=>"test", "_id"=>"10"}}, "field1"=>"value1", "a"=>"line", "that"=>"is", "long"=>"line", "contained"=>"request"}]
[3] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"create"=>{"_index"=>"test", "_id"=>"30"}}, "field10"=>"value30", "field20"=>"value40", "message"=>"ok"}]
[4] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"index"=>{"_index"=>"test", "_id"=>"11"}}, "field11"=>"value11", "nested"=>{"message"=>"ok"}}]
[5] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"create"=>{"_index"=>"test", "_id"=>"31"}}, "field11"=>"value31", "field21"=>"value41", "nested"=>{"multiply"=>{"message"=>"ok"}}}]
[6] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"index"=>{"_index"=>"test", "_id"=>"41"}}, "field41"=>"value41", "nested"=>{"message"=>"ok"}}]
[7] raw.es: [[1683795269.909020712, {}], {"@meta"=>{"index"=>{"_index"=>"test", "_id"=>"41"}}, "field51"=>"value51", "nested"=>{"message"=>"the end of the line"}}]
[2023/05/11 17:54:30] [debug] [task] created task=0x71eff10 id=0 OK
[2023/05/11 17:54:30] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/05/11 17:54:30] [debug] [out flush] cb_destroy coro_id=0
[2023/05/11 17:54:30] [trace] [coro] destroy coroutine=0x71f0180 data=0x71f01a0
[2023/05/11 17:54:30] [trace] [engine] [task event] task_id=0 out_id=0 return=ERROR
[2023/05/11 17:54:30] [debug] [task] destroy task=0x71eff10 (task_id=0)
^C[2023/05/11 17:54:30] [engine] caught signal (SIGINT)
[2023/05/11 17:54:30] [trace] [engine] flush enqueued data
[2023/05/11 17:54:30] [ warn] [engine] service will shutdown in max 5 seconds
[2023/05/11 17:54:31] [ info] [engine] service has stopped (0 pending tasks)
[2023/05/11 17:54:31] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/05/11 17:54:31] [ info] [output:stdout:stdout.0] thread worker #0 stopped
  • Attached Valgrind output that shows no leaks or memory corruption was found
==1360762== 
==1360762== HEAP SUMMARY:
==1360762==     in use at exit: 0 bytes in 0 blocks
==1360762==   total heap usage: 1,671 allocs, 1,671 frees, 9,500,578 bytes allocated
==1360762== 
==1360762== All heap blocks were freed -- no leaks are possible
==1360762== 
==1360762== For lists of detected and suppressed errors, rerun with: -s
==1360762== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 temporarily deployed to pr May 11, 2023 08:55 — with GitHub Actions Inactive
@cosmo0920 cosmo0920 temporarily deployed to pr May 11, 2023 08:55 — with GitHub Actions Inactive
@cosmo0920 cosmo0920 temporarily deployed to pr May 11, 2023 08:55 — with GitHub Actions Inactive
@cosmo0920 cosmo0920 temporarily deployed to pr May 11, 2023 09:19 — with GitHub Actions Inactive
@cosmo0920 cosmo0920 marked this pull request as ready for review May 11, 2023 09:43
@leonardo-albertovich leonardo-albertovich merged commit 03c879f into master May 12, 2023
@leonardo-albertovich leonardo-albertovich deleted the cosmo0920-in_elasticsearch-refer-tag branch May 12, 2023 06:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants