Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go: proxy: plugin: Implement multi-threaded Golang input plugin mechanism #5056

Merged
merged 1 commit into from
Aug 4, 2022

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Mar 11, 2022

This PR provides Golang input plugin mechanism on Fluent Bit core.
This work is based on multi-threaded input plugin mechanism that is implemented on #4586.

The related issue is None.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change

fluent-bit.conf

[SERVICE]
    Flush        5
    Daemon       Off
    Log_Level    trace
    Plugins_File plugins.conf
    HTTP_Server  Off
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020

[INPUT]
    Name gdummy
    Tag  gdummy.local

[OUTPUT]
    Name  stdout
    Match gdummy*

plugins.conf

[PLUGINS]
  Path /path/to/workdir/fluent-bit/build/in_gdummy.so
$  bin/fluent-bit -c fluent-bit.conf
Fluent Bit v1.9.0
* Copyright (C) 2015-2021 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2022/03/14 10:36:55] [ info] Configuration:
[2022/03/14 10:36:55] [ info]  flush time     | 5.000000 seconds
[2022/03/14 10:36:55] [ info]  grace          | 5 seconds
[2022/03/14 10:36:55] [ info]  daemon         | 0
[2022/03/14 10:36:55] [ info] ___________
[2022/03/14 10:36:55] [ info]  inputs:
[2022/03/14 10:36:55] [ info]      gdummy
[2022/03/14 10:36:55] [ info] ___________
[2022/03/14 10:36:55] [ info]  filters:
[2022/03/14 10:36:55] [ info] ___________
[2022/03/14 10:36:55] [ info]  outputs:
[2022/03/14 10:36:55] [ info]      stdout.0
[2022/03/14 10:36:55] [ info] ___________
[2022/03/14 10:36:55] [ info]  collectors:
[2022/03/14 10:36:55] [ info] [engine] started (pid=1062825)
[2022/03/14 10:36:55] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2022/03/14 10:36:55] [debug] [storage] [cio stream] new stream registered: gdummy.0
[2022/03/14 10:36:55] [ info] [storage] version=1.1.6, initializing...
[2022/03/14 10:36:55] [ info] [storage] in-memory
[2022/03/14 10:36:55] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2022/03/14 10:36:55] [ info] [cmetrics] version=0.3.0
[2022/03/14 10:36:55] [debug] [gdummy:gdummy.0] created event channels: read=21 write=22
[flb-go] plugin parameter = ''
[2022/03/14 10:36:55] [debug] [stdout:stdout.0] created event channels: read=25 write=26
[2022/03/14 10:36:55] [debug] [router] match rule gdummy.0:stdout.0
[2022/03/14 10:36:55] [ info] [sp] stream processor started
[2022/03/14 10:36:55] [ info] [output:stdout:stdout.0] worker #0 started
[2022/03/14 10:36:56] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:36:56] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:36:57] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:36:57] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:36:58] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:36:58] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:36:59] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:36:59] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:37:00] [debug] [task] created task=0x7f94d401d5d0 id=0 OK
[2022/03/14 10:37:00] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.local: [1647221815.557805574, {"message"=>"dummy"}]
[1] gdummy.local: [1647221816.558982897, {"message"=>"dummy"}]
[2] gdummy.local: [1647221817.559238836, {"message"=>"dummy"}]
[3] gdummy.local: [1647221818.559523525, {"message"=>"dummy"}]
[2022/03/14 10:37:00] [debug] [out flush] cb_destroy coro_id=0
[2022/03/14 10:37:00] [debug] [task] destroy task=0x7f94d401d5d0 (task_id=0)
[2022/03/14 10:37:00] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:37:00] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:37:01] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:37:01] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:37:02] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:37:02] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:37:03] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:37:03] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:37:04] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:37:04] [debug] [input chunk] update output instances with new chunk size diff=26
[2022/03/14 10:37:05] [debug] [task] created task=0x7f94d405d830 id=0 OK
[2022/03/14 10:37:05] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] gdummy.local: [1647221819.560334706, {"message"=>"dummy"}]
[1] gdummy.local: [1647221820.560606884, {"message"=>"dummy"}]
[2] gdummy.local: [1647221821.561860932, {"message"=>"dummy"}]
[3] gdummy.local: [1647221822.562044770, {"message"=>"dummy"}]
[4] gdummy.local: [1647221823.562335594, {"message"=>"dummy"}]
[2022/03/14 10:37:05] [debug] [out flush] cb_destroy coro_id=1
[2022/03/14 10:37:05] [debug] [task] destroy task=0x7f94d405d830 (task_id=0)
[2022/03/14 10:37:05] [trace] [input:gdummy:gdummy.0 at build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:93] input thread read() = 26
[2022/03/14 10:37:05] [debug] [input chunk] update output instances with new chunk size diff=26
^C[2022/03/14 10:37:06] [engine] caught signal (SIGINT)
[2022/03/14 10:37:06] [debug] [task] created task=0x7f94d401d870 id=0 OK
[2022/03/14 10:37:06] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2022/03/14 10:37:06] [ warn] [engine] service will shutdown in max 5 seconds
[0] gdummy.local: [1647221824.562711972, {"message"=>"dummy"}]
[2022/03/14 10:37:06] [debug] [out flush] cb_destroy coro_id=2
[2022/03/14 10:37:06] [debug] [task] destroy task=0x7f94d401d870 (task_id=0)
[2022/03/14 10:37:06] [ info] [engine] service has stopped (0 pending tasks)
[2022/03/14 10:37:06] [debug] [GO] running exit callback
[2022/03/14 10:37:06] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2022/03/14 10:37:06] [ info] [output:stdout:stdout.0] thread worker #0 stopped
  • [N/A] Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch 2 times, most recently from c1f9371 to 92ec27a Compare March 16, 2022 12:40
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch 2 times, most recently from 63e4679 to 3b14f9d Compare April 22, 2022 01:39
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from 3b14f9d to a63b754 Compare May 19, 2022 07:18
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch 2 times, most recently from a506554 to 35d0c34 Compare July 4, 2022 09:17
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from 35d0c34 to 5e30211 Compare July 5, 2022 07:37
@cosmo0920 cosmo0920 temporarily deployed to integration July 13, 2022 02:37 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 13, 2022 04:49 Inactive
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from a93593a to 9bb8aaa Compare July 13, 2022 07:13
@cosmo0920 cosmo0920 temporarily deployed to integration July 13, 2022 07:13 Inactive
@edsiper
Copy link
Member

edsiper commented Jul 13, 2022

Recently we merged a new approach to have native threaded input plugins; parts of the design consider:

  • input plugins are not aware of threading. The engine runs them in a separate thread if the feature has been enabled.
  • plugins ingest data as usual with the flb_input_chunk_append_raw() call, but that function detects if the caller comes from a thread or not; if so, it will use a ring buffer mechanism to ingest data.

There are two ways for an input plugin to specify that it needs to be run in a separate thread:

  • configuration: if the input section enabled the flag threaded on, the plugin will run in a separate thread.

  • plugin registration: plugins have a registration structure and a .flags field to hint the engine about its operational requirements. If the plugin desires to always run in a separate thread, the flag field needs to get appended with the value FLB_INPUT_THREADED.

As an example take a look at the new in_event_test plugin that sets the flag .flags FLB_INPUT_THREADED:

https://github.com/fluent/fluent-bit/blob/master/plugins/in_event_test/event_test.c#L404

The logic of this new proxy interface needs to be adapted, actually I think it will be simplified since no thread management will required, so we can make sure that any plugin that gets connected through the proxy interface by default runs in a separate thread.

@cosmo0920 cosmo0920 temporarily deployed to integration July 14, 2022 23:41 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 19, 2022 02:15 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 19, 2022 15:43 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 20, 2022 05:52 Inactive
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from 37bb521 to 79bff47 Compare July 20, 2022 05:59
@cosmo0920 cosmo0920 temporarily deployed to integration July 20, 2022 05:59 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 20, 2022 07:52 Inactive
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from 72d17c7 to ed1e2e4 Compare July 20, 2022 08:50
@cosmo0920 cosmo0920 temporarily deployed to integration July 20, 2022 08:50 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 20, 2022 13:43 Inactive
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from ed1e2e4 to ba689f7 Compare July 21, 2022 00:38
@cosmo0920 cosmo0920 temporarily deployed to integration July 21, 2022 00:39 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 21, 2022 00:45 Inactive
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from ba689f7 to 2e50b5f Compare July 25, 2022 02:41
@cosmo0920 cosmo0920 temporarily deployed to integration July 25, 2022 02:41 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 25, 2022 02:47 Inactive
@cosmo0920 cosmo0920 force-pushed the golang-input-plugin branch from 2e50b5f to a269dd1 Compare July 28, 2022 05:17
@cosmo0920 cosmo0920 temporarily deployed to integration July 28, 2022 05:17 Inactive
@cosmo0920 cosmo0920 temporarily deployed to integration July 28, 2022 05:22 Inactive
@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Aug 4, 2022

As an example take a look at the new in_event_test plugin that sets the flag .flags FLB_INPUT_THREADED:

https://github.com/fluent/fluent-bit/blob/master/plugins/in_event_test/event_test.c#L404

The logic of this new proxy interface needs to be adapted, actually I think it will be simplified since no thread management will required, so we can make sure that any plugin that gets connected through the proxy interface by default runs in a separate thread.

I already implemented this logic for plugin proxy.
@edsiper Any obstacles to start reviewing this PR?

@niedbalski niedbalski self-requested a review August 4, 2022 09:05
@niedbalski niedbalski added the ok-to-performance-test Run PR performance tests label Aug 4, 2022
@niedbalski niedbalski temporarily deployed to integration August 4, 2022 09:05 Inactive
@niedbalski
Copy link
Collaborator

@edsiper this PR needs your approval. I just +1 it.

@niedbalski niedbalski temporarily deployed to integration August 4, 2022 09:10 Inactive
@edsiper edsiper merged commit a74afc9 into fluent:master Aug 4, 2022
@cosmo0920 cosmo0920 deleted the golang-input-plugin branch August 5, 2022 07:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants